@roostjs/queue

Background job processing on Cloudflare Queues. Typed job classes with retry configuration, dispatch methods, chaining, batching, and lifecycle hooks.

Installation

bun add @roostjs/queue

Configuration

Declare a Cloudflare Queue binding in wrangler.jsonc:

{
  "queues": {
    "producers": [{ "queue": "my-queue", "binding": "QUEUE" }],
    "consumers": [{ "queue": "my-queue" }]
  }
}

Job API

Job<TPayload> is an abstract base class. Extend it and implement handle(). The generic parameter types the payload property.

Instance Properties

payload: TPayload

The deserialized job payload. Typed by the generic parameter.

attempt: number

The current attempt number. Starts at 1 and increments on each retry.

Instance Methods

abstract async handle(): Promise<void>

The job's main logic. Throwing any error triggers the retry strategy.

async onSuccess(): Promise<void>

Optional. Called after a successful handle() invocation.

async onFailure(error: Error): Promise<void>

Optional. Called after handle() throws and all retries are exhausted.

Static Methods

static async dispatch(payload: TPayload): Promise<void>

Enqueue the job immediately.

static async dispatchAfter(seconds: number, payload: TPayload): Promise<void>

Enqueue the job with a delay of seconds before first execution.

static async chain(jobs: JobDescriptor[]): Promise<void>

Enqueue a sequence of jobs that run one after another. The next job starts only after the previous one succeeds.

static async batch(jobs: JobDescriptor[]): Promise<string>

Enqueue multiple independent jobs. Jobs run in parallel (no ordering guarantee). Returns the batch ID string.

static fake(): void

Enable fake mode. All dispatch() calls are recorded but not enqueued.

static restore(): void

Disable fake mode.

static assertDispatched(jobClassOrName?: typeof Job | string): void

Assert that the job was dispatched (in fake mode). Optionally filter by job class or name.

static assertNotDispatched(): void

Assert that the job was not dispatched.

JobConsumer API

Processes messages from a Cloudflare Queue consumer handler.

constructor(registry: JobRegistry)

Construct with a JobRegistry containing all registered job classes.

async processMessage(message: Message): Promise<void>

Deserialize the message, instantiate the matching job class, and call handle(). Calls onSuccess() on success, onFailure() on final failure.

JobRegistry API

register(JobClass: typeof Job): void

Register a job class so the consumer can deserialize and route messages to it.

Dispatcher API

Sends serialized job messages to the configured Cloudflare Queue binding. Used internally by Job.dispatch() and Job.dispatchAfter(). Resolve from the container when you need to dispatch jobs outside of a Job class context.

Decorators

Class decorators applied to Job subclasses.

@Queue(name: string)

Specify the queue binding name this job should be dispatched to. Overrides the default queue configured in the service provider.

@Delay(seconds: number)

Default delay in seconds before the job is first executed. Can be overridden per-dispatch by calling Job.dispatchAfter(seconds, payload) directly.

@MaxRetries(count: number)

Maximum number of retry attempts before calling onFailure(). Defaults to 3.

@Backoff(strategy: 'exponential' | 'fixed')

Retry backoff strategy. 'exponential' doubles the delay on each attempt. 'fixed' uses the same delay every time. Defaults to 'exponential'.

@RetryAfter(seconds: number)

Base delay in seconds before the first retry. For exponential backoff, subsequent delays are multiplied by 2 on each attempt.

@JobTimeout(seconds: number)

Maximum execution time in seconds for a single handle() invocation before it is considered failed.

getJobConfig(JobClass: typeof Job): JobConfig

Read the merged job configuration from a job class's decorators. Useful for inspecting or testing job configuration.

Types

interface JobDescriptor {
  jobClass: typeof Job;
  payload: unknown;
}