Queues
- Introduction
- Configuration
- Connections vs. Queues
- Creating Jobs
- Dispatching Jobs
- Run the Queue Worker
Introduction
Supercharge ships with first-class queueing support. Queues in Supercharge follow a driver-based approach providing a unified interface across all drivers. A queue driver is the queueing backend, like Amazon Simple Queueing Service.
Queues are a common way to push long-running tasks into a background thread. Processing a queue job in Supercharge requires a dedicated worker process that must be started separately from the application.
Configuration
A Supercharge application comes with a queue configuration file located at config/queue.js
. This configuration file includes the default queueing driver. You’ll also find the connection details for all supported queue drivers available in Supercharge: the database, Amazon SQS, Faktory, and a synchronous queue driver processing the jobs immediately (on the main application thread).
Connections vs. Queues
Before moving on, let’s clarify the terminology of “queues” vs. ”connections”.
A queue connection in Supercharge identifies the queueing backend service where the queue jobs will be stored. In contrast, a queue in Supercharge is a name (string value) for a “bucket” of queued jobs.
An individual queue connection may have multiple “queues” which represent individual piles of tasks in a group. Using different queues on a connection may help you to separate fast and slow jobs within a connection.
For example, you may use a “download” queue to download a file. The file download may take up to one minute. In contrast, sending an email after user sign up may take 200 milliseconds. Imagine a “welcome-mail” queue for these tasks.
You may now start two queue workers, one for the “download” queue and one for the “welcome-mail” queue. Both workers interact independently from each other. You can scale out the “download” queue workers when needed, like during rush hours.
Available Connections
By default, Supercharge uses the sync
queueing driver which is useful for local development. Here's a list of available drivers:
Queue Driver | Description |
---|---|
sync |
Executes queued jobs immediately on the main thread |
database |
Enqueues jobs into the database. Requires a dedicated worker to process jobs. |
sqs |
Sends jobs to AWS SQS. Requires a dedicated worker to process jobs. |
faktory |
Stores jobs in the Faktory queue. Requires a dedicated worker to process jobs. |
All queue drivers except sync
enqueue jobs for asynchrounous processing. For debugging during development, you may use a related UI for the individual service. Faktory comes with a web UI (running on port 7420
). Amazon SQS has a UI to inspect jobs right in the AWS Console.
When using the database
driver, you may use a UI like:
- TablePlus
- Robo 3T (previously Robomongo)
- MongoDB Compass
Creating Jobs
The Craft CLI shipping with every Supercharge application provides a make:job <name>
command to generate a new queue job class. Queue job files are located into the app/jobs
directory. If this directory doesn’t exist yet, the Craft CLI creates it on the fly.
Generate a Job Class
Scaffold a new queue job class using the Craft CLI. Run the make:job
command and provide the job name as a parameter:
node craft make:job SendWelcomeMail
The created job class extends the @supercharge/framework/queue/dispatchable
class providing the abitlity to dispatch a new welcome mail queue job.
Job Class Structure
Queue jobs in Supercharge are JavaScript classes implementing a handle()
method. The queue dispatcher calls the handle()
method when processing a job.
Let’s implement an exemplary SendWelcomeMail
queue job sending a welcome mail after a user registers for your platform:
const Dispatchable = require('@supercharge/framework/queue/dispatchable')
class SendWelcomeMail extends Dispatchable {
constructor(user) {
super()
this.user = user
}
/**
* Process the job.
*/
async handle() {
// send mail to `user` (maybe using `@supercharge/framework/mailer`)
}
}
When dispatching a new SendWelcomeMail
job onto the queue, you’ll provide the affected user. This user is then available to a job instance as an argument.
Dispatching Jobs
Having the queue job class implemented and ready to use in your project, you can go ahead and dispatch the first jobs.
Dispatch jobs in Supercharge via a queue job class by calling the static QueueJob.dispatch(data)
method. The arguments passed to the dispatch method will be available as constructor parameters in your job class:
const SendWelcomeMail = require('../jobs/send-welcome-mail')
module.exports = {
method: 'POST',
path: '/signup',
options: {
handler: async (request, h) => {
// create new user
await SendWelcomeMail.dispatch(user)
}
}
}
Customizing the Queue and Connection
You may change the configured default queue used by the default driver when dispatching queue jobs. The Dispatchable
class provides provides the onQueue
and onConnection
methods to customize the related values.
Dispatching on a Queue
Imagine your queues as a type of categorization where you put the same tasks into the same category. This way you can scale out workers on individual queues when necessary. Use the onQueue(name)
method to dispatch jobs onto the queue identified by name
:
const SendWelcomeMail = require('../jobs/send-welcome-mail')
module.exports = {
method: 'POST',
path: '/signup',
options: {
handler: async (request, h) => {
// create new user
await SendWelcomeMail
.onQueue('send-welcome-mail')
.dispatch(user)
}
}
}
Dispatching on a Connection
You can also utilize different queue connections. In situations where you want to push jobs to a specific connection, use the onConnection(name)
method:
const SendWelcomeMail = require('../jobs/send-welcome-mail')
module.exports = {
method: 'POST',
path: '/signup',
options: {
handler: async (request, h) => {
// create new user
await SendWelcomeMail
.onConnection('sqs')
.dispatch(user)
}
}
}
You can also combine the onQueue
and onConnection
methods using the fluent interface (provided by the Dispatchable
class):
const SendWelcomeMail = require('../jobs/send-welcome-mail')
await SendWelcomeMail
.onConnection('send-welcome-mail')
.onConnection('sqs')
.dispatch(user)
You must call.dispatch()
as the last item in the call chain to enqueue a new job. All methods before .dispatch()
customize the configuration for the job.
Job Error Handling
The queue worker catches all errors thrown while processing a job and releases the job back to the queue for another try. The queue worker stops the cycle of releasing the job back when it attempts reach the allowed maximum number of attempts.
The maximun number of attempts is defined by the --attempts <number>
flag when starting the queue worker with the queue:work
Craft command.
When a job failed and reaches the maximunum allowed number of attempts, the queue worker tries to call the async failed(error)
method of your queue job instance. This method is a good place for you to implement clean-up handling, like resetting state that may has been made by the queue job:
const Dispatchable = require('@supercharge/framework/queue/dispatchable')
class SendWelcomeMail extends Dispatchable {
constructor(user) {
super()
this.user = user
}
/**
* Process the job.
*/
async handle() {
// send mail to `user` (maybe using `@supercharge/framework/mailer`)
}
/**
* Handle job failure.
*
* @param {error} Error
*/
async failed(error) {
// deal with failed job, e.g. reset state or send a notification
}
}
Run the Queue Worker
The Supercharge queue comes with a dedicated queue worker to process jobs. This queue worker must be started separately from the main application/HTTP server. This separation moves the queue worker to its own long-living Node.js process coming with the benefit of not affecting the HTTP server when processing jobs. Also, it allows you to scale out queue workers to your needs.
Start a queue worker using the queue:work
Craft command. The started queue worker process runs until you manually stop it (or close your terminal):
node craft queue:work
During deployments, please make sure to use a process manager like PM2 to keep your worker process running.
Remember that once the queue worker is started, it won’t pick up any code changes until restart. During development, you may start queue workers using a tool like Nodemon which monitors for any changes in your code and automatically restarts the proces.
Worker Sleep Duration
The queue worker processes available jobs continuously. If there are no jobs available, the queue worker “sleeps” for 1 second and polls the queue connection again for new jobs.
At this point, you can’t customize the worker’s sleep duration.
We appreciate a pull request on adding a --sleep <duration>
flag to the queue:work
command.
Run a Worker for a Specific Connection
When starting a queue worker, it uses the configured default connection. Customize the queue worker connection by passing the connection name as the first argument to the queue:work <name>
command:
node craft queue:work sqs
The connection name must correspond to one of the configured connections in your config/queue.js
file.
Run a Worker for a Specific Connection and Queue
Queues in Supercharge are a way to categorize tasks in specific buckets. You can start queue workers for a queue by using the --queue <queue-names>
flag. For example, if you want a dedicated queue worker for a queue named “send-welcome-mail”, you may start a worker like this:
node craft queue:work sqs --queue send-welcome-mail
A worker can handle multiple queues. Use the --queue
flag and pass all queue names separated by a comma:
node craft queue:work --queue send-welcome-mail, default, another-queue
Specify Max Job Attempts
By default, the queue worker tries to process a job indefinitely. The queue worker pushes a failed job back to the queue and tries to re-run it immediately. At this point, there’s no delay when a job should be attempted the next time.
In production situations, you may want to limit the number of attempts for failed jobs. Use the --attempts <amount>
flag for the queue:work
command to limit the maximun number of attempts:
node craft queue:work --attempts 3
Job Expiration
At this point, there’s no timeout when processing a job. A queue worker will process a given job as long as the job “needs”. This may cause your queue worker to become stale and not fetching the next job for processing.
I appreciate a pull request adding a worker timeout feature to ensure a maximum number of seconds a job is allowed to be processed. If you’re interested, send a PR in the superchargejs/framework
repository on GitHub.