Stream + async await

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream + async await

Naveen Chawla
It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

RE: Stream + async await

Domenic Denicola

https://github.com/tc39/proposal-async-iteration

 

From: es-discuss [mailto:[hidden email]] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24
To: [hidden email]
Subject: Stream + async await

 

It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).


_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Naveen Chawla
Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

```
for await (const item of requestItems){

}
```
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <[hidden email]> wrote:

https://github.com/tc39/proposal-async-iteration

 

From: es-discuss [mailto:[hidden email]] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24
To: [hidden email]
Subject: Stream + async await

 

It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).


_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

RE: Stream + async await

Ron Buckton

One option might be something like https://github.com/rbuckton/prex/blob/master/docs/scheduling.md#class-asyncqueue. It allows you to put items on the queue as soon as they are available.

 

Ron

 

From: [hidden email]
Sent: Saturday, July 29, 2017 3:54 AM
To: [hidden email]; [hidden email]
Subject: Re: Stream + async await

 

Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

```
for await (const item of requestItems){

}
```
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <[hidden email]> wrote:

https://github.com/tc39/proposal-async-iteration

 

From: es-discuss [mailto:[hidden email]] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24
To: [hidden email]
Subject: Stream + async await

 

It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).


_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Isiah Meadows-2
In reply to this post by Naveen Chawla

There's also this strawman of mine which deals with most things async, but it has several of its own issues that I haven't quite addressed (complexity still being one after a week straight):

https://github.com/isiahmeadows/non-linear-proposal

I will caution that async iteration and observation can't really be merged meaningfully as you would hope. They both represent multiple values over time, but one is eager, the other lazy.


On Sat, Jul 29, 2017, 06:54 Naveen Chawla <[hidden email]> wrote:
Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

```
for await (const item of requestItems){

}
```
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <[hidden email]> wrote:

https://github.com/tc39/proposal-async-iteration

 

From: es-discuss [mailto:[hidden email]] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24
To: [hidden email]
Subject: Stream + async await

 

It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Naveen Chawla
Guys!

I thought of a way of doing this. Roughly it's having 2 loops, one for consumer and one for requester. Would this work?:

```
    startRequestingAsync();
    startConsumingAsync();

    async startRequestingAsync(){
        for async (const item of requestItems){
            //Control the requesting. Do nothing to actually consume the data
        }
    }

    async startConsumingAsync(){
        for async(const item of requestItems){
            //Consume the item. Here you can also apply "front pressure" to request the next item if the "request" loop is behind
        }
    }
```

I haven't actually done this so it's just thinking out loud.

Thoughts?

On Sun, 30 Jul 2017 at 03:59 Isiah Meadows <[hidden email]> wrote:

There's also this strawman of mine which deals with most things async, but it has several of its own issues that I haven't quite addressed (complexity still being one after a week straight):

https://github.com/isiahmeadows/non-linear-proposal

I will caution that async iteration and observation can't really be merged meaningfully as you would hope. They both represent multiple values over time, but one is eager, the other lazy.


On Sat, Jul 29, 2017, 06:54 Naveen Chawla <[hidden email]> wrote:
Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

```
for await (const item of requestItems){

}
```
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <[hidden email]> wrote:

https://github.com/tc39/proposal-async-iteration

 

From: es-discuss [mailto:[hidden email]] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24
To: [hidden email]
Subject: Stream + async await

 

It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

kai zhu
so that everyday programmers can relate, can someone give a code-snippet of the proposal equivalent to the common-use nodejs example below?

```javascript
function consumeReadableStream(stream, consumeChunk, callback) {
/*
 * stream - readable stream
 * consumeChunk - has signature - function (chunk) {...}
 * callback - has signature - function (error) {...}
 */
    var callbackOnce, done, timerTimeout;

    callbackOnce = function (error) {
    /*
     * this function will ensure callback is called only once
     */
        if (done) {
            return;
        }
        done = true;
        // clear timeout
        clearTimeout(timerTimeout);
        callback(error);
    };

    // init timeout handler
    timerTimeout = setTimeout(function () {
        callbackOnce(new Error('30000 ms timeout'));
        stream.destroy();
    }, 30000);

    stream.on('data', consumeChunk);
    stream.on('end', callbackOnce);
    stream.on('error', callbackOnce);
};
```

On Jul 30, 2017, at 2:16 PM, Naveen Chawla <[hidden email]> wrote:

Guys!

I thought of a way of doing this. Roughly it's having 2 loops, one for consumer and one for requester. Would this work?:

```
    startRequestingAsync();
    startConsumingAsync();

    async startRequestingAsync(){
        for async (const item of requestItems){
            //Control the requesting. Do nothing to actually consume the data
        }
    }

    async startConsumingAsync(){
        for async(const item of requestItems){
            //Consume the item. Here you can also apply "front pressure" to request the next item if the "request" loop is behind
        }
    }
```

I haven't actually done this so it's just thinking out loud.

Thoughts?

On Sun, 30 Jul 2017 at 03:59 Isiah Meadows <[hidden email]> wrote:

There's also this strawman of mine which deals with most things async, but it has several of its own issues that I haven't quite addressed (complexity still being one after a week straight):

https://github.com/isiahmeadows/non-linear-proposal

I will caution that async iteration and observation can't really be merged meaningfully as you would hope. They both represent multiple values over time, but one is eager, the other lazy.


On Sat, Jul 29, 2017, 06:54 Naveen Chawla <[hidden email]> wrote:
Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

```
for await (const item of requestItems){

}
```
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <[hidden email]> wrote:

https://github.com/tc39/proposal-async-iteration

 

From: es-discuss [mailto:[hidden email]] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24
To: [hidden email]
Subject: Stream + async await

 

It'd be great to have async stream constructs such as: http://reactivex.io/rxjs/ , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Re: Stream + async await

James Browning
It'll look something like this:

```javascript

async function consumeReadableStream(stream) {
    const start = Date.now()
    for await (const chunk of stream) {

        /* Do whatever you want with the chunk here e,g, await other
async tasks with chunks
            send them off to wherever, etc
        */

        if (Date.now() - start > 30000) {
            throw new Error('30000 ms timeout')
        }
    }
    /* Instead of callbackOnce the returned promise from this function
itself can be used */
}

```
_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

kai zhu
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Naveen Chawla
Yes, you need to intervene and reject the latest promise upon timeout (by having a reference to its "reject" callback).

This makes me wonder (and I'd like to be corrected if wrong) if async iterators are more of a hindrance than a help?

We can currently do a loop over an array of promises, without async iterators:

```javascript
async requestLoopAsync(){
    for(const requestItemPromise of requestItemPromises){
        //We can intervene here BEFORE we await the promise, unlike
        //with async iterators e.g. requestItemPromise.myRejectCallbackReference()

        const response = await requestItemPromise; //etc.
    }
}
```

Am I right or wrong?

(For the timeout example, we could do `currentRequestItemPromise = requestItemPromise` then on timeout do `currentRequestItemPromise.myRejectCallbackReference()` (where `myRejectCallbackReference` was assigned when we created the promise e.g. `this.myRejectCallbackReference = reject` from the `reject` parameter in `(resolve, reject)=>`)

On Mon, 31 Jul 2017 at 10:40 kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Naveen Chawla
OK let me correct myself. That's preposterous. You have to `await` a promise to kick off it's process. But my point about being able to assign the promise to a wider-scoped  `currentRequestItemPromise` before awaiting it which seems impossible with async iterators, which rejects upon timeout, stands, doesn't it?

On Mon, 31 Jul 2017 at 16:08 Naveen Chawla <[hidden email]> wrote:
Yes, you need to intervene and reject the latest promise upon timeout (by having a reference to its "reject" callback).

This makes me wonder (and I'd like to be corrected if wrong) if async iterators are more of a hindrance than a help?

We can currently do a loop over an array of promises, without async iterators:

```javascript
async requestLoopAsync(){
    for(const requestItemPromise of requestItemPromises){
        //We can intervene here BEFORE we await the promise, unlike
        //with async iterators e.g. requestItemPromise.myRejectCallbackReference()

        const response = await requestItemPromise; //etc.
    }
}
```

Am I right or wrong?

(For the timeout example, we could do `currentRequestItemPromise = requestItemPromise` then on timeout do `currentRequestItemPromise.myRejectCallbackReference()` (where `myRejectCallbackReference` was assigned when we created the promise e.g. `this.myRejectCallbackReference = reject` from the `reject` parameter in `(resolve, reject)=>`)

On Mon, 31 Jul 2017 at 10:40 kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

T.J. Crowder-2
In reply to this post by kai zhu
Related: https://esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of `await.race`), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at *why* it was withdrawn?)

-- T.J. Crowder

On Mon, Jul 31, 2017 at 6:10 AM, kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Jan-Ivar Bruaroey
Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. https://stackoverflow.com/a/41417429/918910

.: Jan-Ivar :.

On 7/31/17 7:35 AM, T.J. Crowder wrote:
Related: https://esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of `await.race`), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at *why* it was withdrawn?)

-- T.J. Crowder

On Mon, Jul 31, 2017 at 6:10 AM, kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss



_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


--
.: Jan-Ivar :.

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Domenic Denicola
That is not why.



From: Jan-Ivar Bruaroey <[hidden email]>
Sent: Aug 1, 2017 3:47 PM
To: [hidden email]
Subject: Re: Stream + async await

Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. https://stackoverflow.com/a/41417429/918910

.: Jan-Ivar :.

On 7/31/17 7:35 AM, T.J. Crowder wrote:
Related: https://esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of `await.race`), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at *why* it was withdrawn?)

-- T.J. Crowder

On Mon, Jul 31, 2017 at 6:10 AM, kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss



_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


--
.: Jan-Ivar :.

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Naveen Chawla
No, you can easily terminate a promise by saving a reference to its reject/resolve callbacks, and calling either of them.

On Wed, 2 Aug 2017 at 01:59 Domenic Denicola <[hidden email]> wrote:
That is not why.



From: Jan-Ivar Bruaroey <[hidden email]>
Sent: Aug 1, 2017 3:47 PM
To: [hidden email]

Subject: Re: Stream + async await

Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. https://stackoverflow.com/a/41417429/918910

.: Jan-Ivar :.

On 7/31/17 7:35 AM, T.J. Crowder wrote:
Related: https://esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of `await.race`), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at *why* it was withdrawn?)

-- T.J. Crowder

On Mon, Jul 31, 2017 at 6:10 AM, kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss



_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


--
.: Jan-Ivar :.
_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Jan-Ivar Bruaroey
In reply to this post by Domenic Denicola
Right, the proposals went beyond that. In this context though I thought T.J. was looking for a control surface in promises. I shouldn't assume.

In any case, the original challenge here seems solved by something like:

```javascript

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function consumeReadableStream(stream) {

  async function read() {
    try {
      for await (const chunk of stream) {
        // do work
      }
    } catch (e) {
      if (e.name != "AbortError") throw e;
    }
  }
  await Promise.race([read(), wait(30000).then(() => stream.destroy())]);
}

```

...assuming .destroy() is overloaded to throw AbortError somehow, a detail IMHO. But aren't we digressing?

For me, for-await still seems like a useful paradigm, and I'm not sure readability is helped by using linear code to represent non-linear code flow.

.: Jan-Ivar :.

On 8/1/17 4:29 PM, Domenic Denicola wrote:
That is not why.



From: Jan-Ivar Bruaroey [hidden email]
Sent: Aug 1, 2017 3:47 PM
To: [hidden email]
Subject: Re: Stream + async await

Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. https://stackoverflow.com/a/41417429/918910

.: Jan-Ivar :.

On 7/31/17 7:35 AM, T.J. Crowder wrote:
Related: https://esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of `await.race`), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at *why* it was withdrawn?)

-- T.J. Crowder

On Mon, Jul 31, 2017 at 6:10 AM, kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss



_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


--
.: Jan-Ivar :.

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Stream + async await

Naveen Chawla
Jan-Ivar Bruaroey,

You are perhaps conflating asynchronicity with non-linearity.

Many asynchronous data flows are most definitely linear.

Hence the great benefit in code cleanliness afforded by `await` `async`.

I've concluded that a lot of stream-like functionality can be achieved already with `await` `async` and promises alone, even without `for` `await`, and in some cases `for` `await` gets in the way (as exemplified).

I just needed to think through it. Where a target value needs to change on each iteration, it can simply be a wider scoped variable manipulated by the stream. Understanding that a promise resolves to a single value just means a new promise for each iteration.

So as things stand, I am very comfortable with just the current features, and will be unlikely to use `for` `await` for what I was thinking of.

On Thu, 3 Aug 2017 at 07:15 Jan-Ivar Bruaroey <[hidden email]> wrote:
Right, the proposals went beyond that. In this context though I thought T.J. was looking for a control surface in promises. I shouldn't assume.

In any case, the original challenge here seems solved by something like:

```javascript

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function consumeReadableStream(stream) {

  async function read() {
    try {

      for await (const chunk of stream) {
        // do work
      }
    } catch (e) {
      if (e.name != "AbortError") throw e;
    }
  }
  await Promise.race([read(), wait(30000).then(() => stream.destroy())]);
}

```

...assuming .destroy() is overloaded to throw AbortError somehow, a detail IMHO. But aren't we digressing?

For me, for-await still seems like a useful paradigm, and I'm not sure readability is helped by using linear code to represent non-linear code flow.


.: Jan-Ivar :.


On 8/1/17 4:29 PM, Domenic Denicola wrote:
That is not why.



From: Jan-Ivar Bruaroey [hidden email]
Sent: Aug 1, 2017 3:47 PM
To: [hidden email]
Subject: Re: Stream + async await

Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. https://stackoverflow.com/a/41417429/918910

.: Jan-Ivar :.

On 7/31/17 7:35 AM, T.J. Crowder wrote:
Related: https://esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of `await.race`), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at *why* it was withdrawn?)

-- T.J. Crowder

On Mon, Jul 31, 2017 at 6:10 AM, kai zhu <[hidden email]> wrote:
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

> On Jul 31, 2017, at 7:26 AM, James Browning <[hidden email]> wrote:
>
> It'll look something like this:
>
> ```javascript
>
> async function consumeReadableStream(stream) {
>     const start = Date.now()
>     for await (const chunk of stream) {
>
>        /* Do whatever you want with the chunk here e,g, await other
> async tasks with chunks
>            send them off to wherever, etc
>        */
>
>         if (Date.now() - start > 30000) {
>             throw new Error('30000 ms timeout')
>        }
>    }
>    /* Instead of callbackOnce the returned promise from this function
> itself can be used */
> }
>
> ```
> _______________________________________________
> es-discuss mailing list
> [hidden email]
> https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss



_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss


--
.: Jan-Ivar :.
_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss

_______________________________________________
es-discuss mailing list
[hidden email]
https://mail.mozilla.org/listinfo/es-discuss