PhaultLines

Using cancellation with async iterators in JavaScript

Chrome 63, released in December, included async iterators and the associated for-await-of syntax. As the feature is now official in V8 6.3, it’s also likely to land in the upcoming Node.js 10 release this April. In light of its imminent availability in Node.js, I figured that it was time to start exploring the real-world potential of async iterators on the backend.

Combining the async and await keywords with generator functions provides an elegant way to express streaming operations in modern JavaScript applications. You can find some great introductions elsewhere on the web, so I’m going to avoid reiterating (pun intended) all of the specifics here. This blog post demonstrates a particular use case and delves into some details about how promise cancellation works with async iterators.

Serving an async generator as an SSE event stream

To get started, I figured that it might be interesting to make a function that consumes the values yielded by an async generator function, serving them as an SSE event stream. For those who are unfamiliar, the Server Sent Events (SSE) standard is for building real-time web applications. It lets the server broadcast a stream of messages which are easy to consume in the browser with the EventSource client API. The following is a simple, contrived example using an async generator:

const http = require("http");

const timer = time =>
  new Promise(resolve => setTimeout(resolve, time));

async function* counter() {
  let counter = 0;
  while (true) {
    await timer(5000);
    yield counter++;
  }
}

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  for await (let item of iter)
    res.write(`event: counter\ndata: ${item}\n\n`);
}

http.createServer((req, res) =>
  iterStream(res, counter())).listen(8000);

The generator function yields a new value from the counter every five seconds. The iterStream function takes each new value and broadcasts it to the connected user. With an EventSource on the client side, I can receive the incrementing value:

let events = new EventSource("/");
events.addEventListener("counter", ({data}) => console.log(data));

As savvy readers may have already noticed, there’s a pretty big wrinkle in the example above: when the user disconnects, the generator function continues running, incrementing the counter and yielding new values. Obviously, we don’t want to incur the overhead of leaving behind a dangling generator function every time the user refreshes the page.

To solve the problem, I first tried adding a line to my iterStream function that invokes iter.return when the user’s connection closes. Calling the return method on an iterator forces it to complete and stop yielding new values.

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => iter.return());
  for await (let item of iter)
    res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
}

Superficially, this approach seems to work. In practice, it introduces yet another wrinkle. Although iter.return forces the generator function to finish When the user closes their connection, the timer in the current loop continues until completion. Using return to stop the iterator doesn’t interrupt the pending promise.

To see this in action, here’s a version of the timer example that is instrumented with simple console logging statements:

async function* counter() {
  let counter = 0;
  try {
    while (true) {
      console.log("Awaiting the timer")
      await timer(5000);
      console.log("Yielding a new item")
      yield counter++;
    }
  }
  finally {
    console.log("Iterator finished")
  }
}

async function iterStream(res, iter) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => {
    console.log("Connection closed");
    iter.return();
  });

  for await (let item of iter) {
    console.log("Sending item:", item);
    res.write(`event: item\ndata: ${item}\n\n`);
  }
}

When I run the example above, connect an EventSource client, and disconnect after receiving item 2, the resulting output looks like this:

waiting the timer
Yielding a new item
Sending item: 0
Awaiting the timer
Yielding a new item
Sending item: 1
Awaiting the timer
Yielding a new item
Sending item: 2
Awaiting the timer
Connection closed
Yielding a new item
Iterator finished
Sending item: 3

After the connection closes, the finally block doesn’t execute until the end of the pending timer.

Simple promise cancellation

The underlying issue is promise cancellation, terminating a pending promise before it rejects or resolves. Here’s a simple approach to promise cancellation:

function timer(time, cancelled) {
  let handle;

  return new Promise((resolve, reject) => {
    handle = setTimeout(resolve, time);
    cancelled.then(() => clearTimeout(handle));
  });
}

let cancel, cancelled = new Promise(resolve => cancel = resolve);
timer(5000, cancelled);
cancel();

In the new version of the timer function, there is a second parameter called cancelled, which expects to receive a promise. When the cancelled promise resolves, it clears the timer.

When I invoke the timer function, I create a promise to use for cancellation, assigning the value of its resolve parameter to a cancel variable. To cancel the timer, I just invoke the function stored in the cancel variable, which forces the cancelled promise to resolve.

There’s still something important missing from this example, however. A promise can have multiple consumers: if one of those consumers cancels a promise, it could disrupt other consumers that might still expect the promise to resolve. It’s important for the promise to reject when cancelled so that all of the other consumers are notified:

function timer(time, cancelled) {
  let handle;

  return new Promise((resolve, reject) => {
    handle = setTimeout(resolve, time);
    cancelled.then(() => {
      reject(new Error("Cancelled"));
      clearTimeout(handle);
    });
  });
}

let cancel, cancelled = new Promise(resolve => cancel = resolve);
timer(5000, cancelled);
cancel()

The pattern demonstrated above is widely used for promise cancellation. The browser vendors settled on a similar approach for making the Fetch API cancellable on the web. They introduced the AbortController, which produces an abort signal observer that you can pass into Fetch invocations and use to cancel pending requests.

There was also previously a TC39 proposal that sought to use a similar approach to add native cancellation to promises. It introduced a new form of the catch statement that made it easy to separate cancellation handling from conventional error handling. Amid objections from various parties, the proposal was later withdrawn. More recently, there’s a new proposal for a generic cancellation primitive that’s starting to gain traction.

Putting the pieces together

Let’s revisit the iterStream example, but with added support for promise cancellation:

function timer(time, cancelled) {
  let handle;

  return new Promise((resolve, reject) => {
    handle = setTimeout(resolve, time);
    cancelled.then(() => {
      reject(new Error("Cancelled"));
      clearTimeout(handle);
    });
  });
}

async function* counter(cancelled) {
  let counter = 0;
  while (true) {
    try {
      await timer(5000, cancelled);
      yield counter++;
    }
    catch (err) {
      if (err.message === "Cancelled")
        break;
    }
  }
}

async function iterStream(res, iter, cancel) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", cancel);

  for await (let item of iter)
    res.write(`event: item\ndata: ${item}\n\n`);
}

let cancel, cancelled = new Promise(resolve => cancel = resolve);

http.createServer((req, res) =>
  iterStream(res, counter(cancelled), cancel)).listen(8000);

In this example, notice how the cancelled promise gets handed through both counter and timer. The promise rejection thrown inside of timer bubbles up to counter, where we have a conditional expression that checks to see if the thrown error is promise cancellation or some other kind of error. If it’s promise cancellation, then the infinite loop terminates.

In this simple case, the behavior that I’ve just described is pretty reasonable. But there are also going to be cases where you may want to exert more granular control by creating separate cancellation promises and being more thoughtful about how you thread them through your call graph. For example, if you are implementing a batch file uploading operation, you might want to support cancellation for the whole operation as well as cancellation for an individual file upload.

A real-world example

To see how the features described in this blog post might be used in the real world, let’s look at a less contrived example. Instead of using a timer to introduce an artificial delay in an infinite loop, I’m going to use a RethinkDB query with a changefeed, which broadcasts live updates from the database:

const http = require("http");
const r = require("rethinkdbdash")();

function nextRow(cursor, cancelled) {
  return new Promise((resolve, reject) => {
    cursor.next((err, row) => err ? reject(err) : resolve(row));
    cancelled.then(() => reject(new Error("Cancelled")));
  });
}

async function* query(cancelled) {
  let cursor = await r.db("rethinkdb").table("stats").changes();
  cancelled.then(() => cursor.close());

  while (true)
    yield nextRow(cursor, cancelled);
}

async function iterStream(res, iter, cancel) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => cancel());

  for await (let item of iter)
    res.write(`event: item\ndata: ${JSON.stringify(item)}\n\n`);
}

async function handler(req, res) {
  let cancel, cancelled = new Promise(resolve => cancel = resolve);
  
  try {
    await iterStream(res, query(cancelled), cancel)
  }
  catch (err) {
    if (err.message === "Cancelled")
      console.log("Connection was closed");
    else console.log("Unexpected error:", err.stack);
  }
}

http.createServer(handler).listen(8000)

Thanks to the iterStream function, I can encapsulate all of my streaming database queries in asynchronous generator functions that are neatly decoupled them from my application’s HTTP server logic.

The query generator function waits for and yields new values from the database in a loop. When the query is cancelled, the function closes the database cursor and terminates the pending promise that fetches the next row.

I made a slight modification to the iterStream function so that it serializes the yielded data (a database change record, in this case) into JSON before broadcasting it to the end user.

Go’s context in JavaScript

While I was writing this blog post, I consulted my friend Kris Kowal, maintainer of the Q promise library and author of A General Theory of Reactivity. He walked me through the nuances of promise cancellation and greatly enhanced my understanding of the subject.

Kris also showed me the Go standard library’s Context type, which implements the generic cancellation primitive that is ubiquitously used in Go. In addition to cancellation, the context also has a timeout feature that applications can use to make an asynchronous operation cancel automatically after a specified interval if the operation doesn’t complete in time.

After our conversation about promise cancellation, Kris was inspired to build a JavaScript version of Go’s context. The following is my iterStream counter example implemented with his context package:

const http = require("http");
const contxter = require("@kriskowal/context");

async function* counter(context) {
  let counter = 0;
  while (true) {
    try {
      await context.delay(5000);
      yield counter++;
    }
    catch (err) {
      if (err.message === "Cancelled")
        break;
    }
  }
}

async function iterStream(res, iter, cancel) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => cancel(new Error("Cancelled")));

  for await (let item of iter)
    res.write(`event: item\ndata: ${item}\n\n`);
}

let {context, cancel} = contexter.withCancel();

http.createServer((req, res) =>
  iterStream(res, counter(context), cancel)).listen(8000);

You can find his library on npm and use it in your own applications. It’s a good way to handle cancellation for now, at least until the TC39 manages to build consensus on a standards-based solution to this problem.