

The native Node.js stream implementation is based on event emitters. It’s hard to manage the control flow in your application when using events. If you want to actively wait for a stream to finish, you must wrap it into a promise.

That’s where @supercharge/streams comes handy. It wraps Node.js streams into promises to make them async/await-ready.

The package provides methods like

  • .map(callback)
  • .filter(callback)
  • .through(transformStream)

to interact with the input data.


The @supercharge/streams package lives independently from the Supercharge framework. Using it in your application requires you to install it as a project dependency:

npm i @supercharge/streams

You can use the streams package with every project even if it’s not build on Supercharge. Enjoy!

Using Streams

Imagine streams in Node.js as a list of items available over time. In contrast to a list that is available in memory, you may consume a huge file with a stream providing each item to one or many function in sequence. The benefit of streams is the nearly-constant memory usage. You never load every piece of data into memory and instead process one chunk after another.

Create a streaming pipeline like this:

const Fs = require('fs')
const Stream = require('@supercharge/streams')

await Stream(
    Fs.createReadStream('./list-of-emails.csv')  // notice: you probably need a CSV parser :)
  .map(async email => {
    return User.findOne({ email })

Error Handling

The native Node.js streams use event emitters and this comes with separate channels for data and errors. The @supercharge/streams package transforms the event-based streams into promise-based streams. Promises have a single channel for data and errors.

You must actively catch errors if you don’t want them to bubble up in your appplication:

try {
  await Stream(input)
    .map(() => throw new Error('sh*t happens'))
} catch (error) {
  // handle error

Errors will be thrown as soon as they appear. The stream will stop and clean up without processing the remaining data.



Creates a new stream wrapping the given input into a read stream if the input is not a stream already

const Stream = require('@supercharge/streams')

const stream = Stream([1, 2, 3]).asStream()


Returns the created stream or stream pipeline as a stream instance:

const stream = Stream([1, 2, 3]).asStream()


Tells the underlying stream pipeline to operate in object mode:

const stream = Stream([1, 2, 3])


Pipes the stream through the given transformStream instance

const { Transform } = require('stream')

const stream = Stream([1, 2, 3])
  .through(new Transform({
    objectMode: true,

    transform (chunk, __, next) {
      return next(null, chunk)

// the transform stream passed to .through() will forward the items without modifications -> [1, 2, 3]


Returns a promise and starts the streaming pipeline. Pipes the streaming results into the given output stream.

await Stream([1, 2, 3])

.map(async callback)

Runs the given async callback on each item that is piped through the stream. A transform stream will be created under the hood to run the callback function.

const emails = [

await Stream(emails)
  .map(async email => {
    const user = await User.findOne({ email })

    return user
      ? user
      : User.create({ email, password: 'random-password' })

.filter(async callback)

Filters all items that are piped through the stream satisfying the given async callback function. A transform stream will be created under the hood to run the callback function.

const emails = [

await Stream(emails)
  .filter(async email => {
    return User.findOne({ email })