Kicking Node into high gear for data processing or how to hire 100 cats to catch a mouse

I recently came across a problem at work that required doing some not so light processing across hundreds of directories. Initial runs didn't take too long, but I figured whatever I could do to speed it up would pay off in the long run. I anticipated I'll need to run it dozens, if not hundreds of times before I got it right.

If you asked me a year ago how exactly the Node event loop works, I'd say I have no idea. Today I'm (not so) happy to say I still don't know how it works, at least not in detail. In the meantime, I learned a good chunk of Rust and some of its async/await internals. That pushed me in the right direction of understanding how async/await can be faster in Node, even though it's single-threaded.

Async/await

This will be the shortest introduction to the async/await syntax I can imagine.

We all remember callbacks and the diabolical messes they could make. Okay, maybe not all of us but probably a significant percentage. Nesting callbacks could easily produce code that is hard to follow, and sometimes hard to understand even after rereading. People call this phenomenon the callback hell.

Then came a promise (a PROMISE 🤕) of clean and readable code even when dealing with async processes. It was straightforward to combine promises and chain them to sequentially process data. Despite that, people still tended to nest promises until the logic was understandable only for the chosen ones. That would usually last for a month. After that, the understanding would decay, and the only logical next step was a rewrite.

After a period of time, someone had an idea.

What if we could write async code the same way we write sync code.

That's how async/await was born. It was a great idea.

Async on a single thread

Before I dug into Tokio and async-std, I had no idea how all of this worked. How do you speed up something that runs on a single thread?

It seems there is code that mostly does "nothing". To be more specific it waits for something most of the time. I'm sure everybody fired an HTTP request numerous times when playing with Node. We read files, transformed their contents, then logged or wrote the results in more files. It appears that things like these take a considerable amount of time waiting for the OS to do the right task. Your network connection might be slow or congested, your drives might be slow or dying, the network destination might be slow or very far.

Let's see what happens when we fire a lot of sequential network requests.

  • | - start of the request
  • - - waiting for something
  • o - getting data
  • > - end of the request
|--o-oo-o>
          |--o-oo----------oo>
                              |-o-oooooo-ooooo-ooo>

There's a lot of "nothing" going on during these requests. Running them at the same time means we can do some useful work while we're waiting for some of them.

|-o-----o---oooo>
|o-o-----oo-----oo----ooooo-oooooo>
|---oooo---o------oooo>

If you take a closer look you'll see how new data is processed only for a single request at a time. That's why requests can run faster even on a single thread. Of course, adding more threads could result in much better times, or could slow down the whole thing. More on that later on.

You might realize we're constantly jumping from one request to the other, therefore there must be some sort of an overhead in doing that. To be honest I don't know how much impact. Either way, in real-world tasks there would be a lot more - (waiting) than o (data). That decreases the overhead to a minimum.

A proof of concept

Like everything else, you need to have a reason to use async/await. If you're reading a few files in your script you might as well use sync and not worry about this at all. The use cases where async might shine are those with hundreds of file reads and network calls. NodeJS is quite limited when it comes to this. It makes it super easy to run things sequentially, one by one, or all at the same time. There are no tools for workloads in between.

Let's see what kind of API would be suitable for these cases. We'll use promisified functions to keep the code clean.

const work = filePath => {
  const contents = await fs.readFile(filePath, 'utf-8');
  const json = JSON.parse(contents);
  
  const { url } = json.details.documentation;
  const res = await fetch(url);
  
  return res.body;
};

Now we know what needs to be done. It's probably safe to assume the filePaths will be an array no matter how we obtained them.

const filePaths = [
  'foo/bar/baz/1',
  'foo/bar/baz/2',
  'foo/bar/baz/3',
  ...
];

The simplest form of API I can think of:

const documentationPages = await parEach(work, filePaths);

It might also accept an option object as a third parameter but let's leave that for later.

Implementation

The basic shell for the parEach function could go something like this:

const parEach = async (work, allArgs, options = {}) => {
  // TODO
};

The options object can contain many configurables, but for now let's focus only on the number of concurrent (not parallel) workers being run.

const { batchSize = 5 } = options;

We need a few variables to see how much work has been done and know when to stop. All the code from now on is inside the parEach function.

let callWhenDone;
let workerSpawned = 0;
let nextArgsIndex = 0;

Now let's write the part that will do the actual work.

const spawn = () => {
  const args = allArgs[nexArgsIndex];
  
  if (Array.isArray(args)) {
    work(...args).then(after);
  } else {
    work(args).then(after);
  }
  
  workersSpawned += 1;
  nextArgsIndex += 1;
};

It should be straightforward to see what is happening in the spawn function. We get the right arguments and check is there only one or multiple. We start doing work and update the variables accordingly. You've probably noticed the after function. Let's see how that one looks.

const after = () => {
  workersSpawned -= 1;
  
  const allArgsUsed = nextArgsIndex >= allArgs.length;
  const allWorkersWork = workersSpawned >= batchSize;
  const noWorkersWork = workersSpawned <= 0;
  
  if (!allWorkersWork && !allArgsUsed) {
    spawn();
  }
  
  if (noWorkersWork && allArgsUsed) {
    callWhenDone();
  }
}

It also updates the workersSpawned variable as some work has just been done. We do some checking to see is it a good time to do more work, or to wrap up and finish everything. There's just one more thing left to write, kickstarting all of this.

for (let i = 0; i < batchSize; i++) {
  spawn();
}

return new Promise(resolve => (callWhenDone = resolve));

That's it! We fire up a few workers as defined by the batchSize variable. save the resolve function that will "unblock" the awaiting of the parEach function when called.

Surprise

I've already published this as a package so you can play with it. Check out pareach on NPM. I've spent a lot of time searching for a name that's not taken. This one is a mixture of "parallel" and forEach. It almost sounds like it's doing something per each thing (whatever it is). It can spawn multiple threads to boost your processing even more. Check out these speedups:

Sequential took 51.21 seconds
Concurrent took 5.45 seconds
Parallelized took 1.18 seconds

This was produced by waiting on random sub second timeouts. Firing hundreds of requests yields similar ratios.

I'd like to know is something like this useful for any of you.

Please clone the repo and try running the test. You should see how much the parEach version is faster than sequential processing.