I really don't understand why you'd await an undici request.stream? Like, it seems if you want to stream from say network to s3, and you want to await it, then you need to do Promise.allSettled([ upload.done(), requestStream ])

If you await the requestStream first, the upload doesn't seem to receive any data for some reason.

#nodejs #undici

This weirdness caused me to create like a million zero byte files in S3. So that was "fun”.

Turns out you need to await it with Promise.all() because otherwise errors just get eaten... because sure.

I'm beginning to think I've written this sub-optimally.. all I wanna do is download a file and stream it to s3.

@thisismissem Does .pipe(…) help you at all?
@samir nope, because you can't create an Upload to S3 with a .pipe; It takes in a readable stream as an argument.
@thisismissem @samir But you can pipe to that Readable stream, can't you?
@coderbyheart True, you could pass it a PassThrough and then pipe to its write side.
@samir @coderbyheart yeah, but something in the order of awaits means the upload gets zero bytes but everything else is fine

@thisismissem @coderbyheart I guess you might need to await on the β€œdone” event of the input stream? I am really just guessing, though; no clue if this would help.

Events + promises = nightmare. I hope you can figure it out.

@samir @coderbyheart we were using the callback version of undici's stream method, but that caused uncaught exceptions which bullmq couldn't catch when executing the job, which caused an outage; I thought I's fixed it by awaiting the request instead, but that produced 0 byte uploads, and now I've a fix that awaits both request & uploads. It was properly frustrating to try to understand what the hell was happening here.
@thisismissem @coderbyheart Sounds highly unpleasant. Reminds me a lot of trying to subvert various Java frameworks so I could smash together the relevant parts of each.
@thisismissem @samir The way Node.js deals with Exceptions is definitely my main source of pain.

@coderbyheart @samir yeah, but also I think this is me not understanding this API.

Tempted to try to find a different way to write it.. but idk how because streams are event emitters and that's kinda incompatible with async/await

I could avoid the throws by doing a HEAD request first, I guess?

@thisismissem @coderbyheart @samir that would still leave (albeit tiny) race condition.

Maybe a not-as-efficient-streaming-but-better-API fetch() impl is a better intermediate choice.

@janl @coderbyheart @samir I'm also wondering if fetch() but with response.stream() might work?

I don't know. What I really wanna say is β€œstart me an upload, and give me a writeable stream” then I can pipe the response to the writeable stream when I'm ready

Maybe I want to tee the stream instead of pipe'ing it?

@thisismissem @janl @coderbyheart I’d love to try and help further but this is a pretty difficult way of doing it. Do you feel like throwing up some code online in a REPL environment or something?
remote-file-upload-to-s3-undici.ts

GitHub Gist: instantly share code, notes, and snippets.

Gist

@thisismissem @janl @coderbyheart I'm a bit confused by this. It seems you have two things reading from the passthrough: the upload and the hasher. You're writing to it once, in the HTTP request.

I think you might need to explicitly clone the data across readers. Something like this: https://github.com/levansuper/readable-stream-clone/blob/master/src/readable-stream-clone.ts

If you remove the hasher (just as a test), does it start working?

(Also, I agree, Undici being both a streamer and a Promise is very odd. I don't like it.)

readable-stream-clone/src/readable-stream-clone.ts at master Β· levansuper/readable-stream-clone

Clone Readable Stream Multiple Times. Contribute to levansuper/readable-stream-clone development by creating an account on GitHub.

GitHub
@samir @thisismissem @coderbyheart I dont feel a clone is needed as in my head I built a pipeline: http req > hasher > counter > upload stream. But the code might not reflect that :)

@janl @thisismissem @coderbyheart The hasher doesn't go through to the upload, though; the pipe is being split, which means the data is being shared by two consumers. And I'm guessing the hasher always wins (perhaps because it's registered first?), which is why the upload ends up being 0 bytes.

I just saw there's a whole other thread of conversation in the comments. Shout if you'd like to move this discussion there.

@samir @thisismissem @coderbyheart hm, there is an original version of this code with a slightly different shape that had that same behaviour IIRC and it did the right thing (but didn’t allow errors to be thrown in the stream factory)

@samir @thisismissem @coderbyheart but your assessment about the current shape might be right.

(πŸ’€for real now, godspeed)

@janl @thisismissem @coderbyheart I can't imagine how that would work, but that doesn't mean it's not possible, just that I can't visualise it without seeing it. :-)
@samir @janl @coderbyheart yeah, I think that could be why we saw zero byte uploads; but I think we'd want to really use stream.pipeline here or something? I don't know, I've gotta brush up on my streams knowledge.
@thisismissem @janl @coderbyheart Looking at the docs, pipeline seems similar to .pipe(…).pipe(…).pipe(…)…, but I have never used it so I could be wrong.
@thisismissem it would really help to see some code. Do you have any samples? I haven't done this specific thing in a while. But maybe I can help.
@polotek I'll try to write something up.

@polotek the current β€œmaybe working" code: https://gist.github.com/ThisIsMissEm/b3977a76ca239b8c0328972e10ee4ea2

It seems to do the thing, but then again, so did the previous code I had with using callback style for httpClient.stream, but that threw an uncaughtException on the process

remote-file-upload-to-s3-undici.ts

GitHub Gist: instantly share code, notes, and snippets.

Gist
@thisismissem thanks. I'll take a look. I'm not familiar with using undici. The way you're using the passthrough leaves me with questions. What's the error you get?
@thisismissem I guess I could use the gist comments. I'll meet you there.
@polotek @thisismissem oh what’s wrong with pass through streams to calculate metadata like checksums and bytes transferred?
@janl @thisismissem no, that's fine. As long as whatever you pipe it to doesn't try to close the connection too early. That's one thing to look at. What I'm referring to above is more about the `opaque` option on undici. I don't know what that does.
@polotek @thisismissem ah, it avoids building a closure by passing a callback through undici. I think it is a memory/performance thing.
@janl @thisismissem yeah that's what I read. I just don't quite understand where it shows up.
@janl @thisismissem basically I'm familiar with the s3 upload. And that usage looks okay to me so far. But I don't know undici, so I'm trying to familiarize myself.

@polotek @thisismissem oh that might be the error, iirc it is an argument to the callback in line 57 and you have to do something with it.

(Note: I wrote the initial draft of that code)

@janl @polotek yeah, you can receive the opaque there, but in this case it is equal to the passthrough.

What I really want to do is say β€œstart this fetch request, check the response headers & status code, and if it's a 200-ok then pipe the response to passthrough, otherwise abort the request and stop consuming the response body”

@polotek @janl yeah, the documentation on Undici isn't fantastic here: https://undici.nodejs.org/#/docs/api/Dispatcher?id=dispatcherstreamoptions-factory-callback

It just says β€œit is recommended to use the option.opaque property to avoid creating a closure for the factory method.", though also here I'm using both?

I'm unclear as to if there's a way to get back the statusCode / headers from an Undici stream before the response has finished?

cc @mcollina

Node.js Undici

A HTTP/1.1 client, written from scratch for Node.js.