Architecture and Internals

Contents

This page explains how Rocket Job works under the hood: the concurrency model, how jobs are stored and claimed, and why the design is reliable and scalable. For the day-to-day programming interface, see the Programmer’s Guide and the Batch Guide. A class diagram of the core relationships is further down this page.

Concurrency model

A Rocket Job server process runs a pool of worker threads, one job per thread. Using threads rather than forked processes has several benefits:

Each worker is independent of the others, so it runs as fast as Ruby allows without coordinating with its peers. Because many workers run perform at the same time, job code must be thread-safe. The rules for writing thread-safe jobs are in Thread Safety in the Programmer’s Guide.

In-place processing in MongoDB

Rocket Job processes a job “in place” in MongoDB. A job is created once and stored entirely as a single document, so everything about it lives in one place:

Because the job only lives in one document, its status is immediately visible in Mission Control without updating any separate store.

The single-document approach is made efficient by MongoDB’s find_and_modify, which atomically finds the highest-priority ready job and marks it as running in one operation. This lets any of hundreds of workers claim work without the locking contention that a relational database would suffer, and without a separate message broker. MongoDB is both the queue and the system of record, and it spills from memory to disk, so a single job can hold far more input and output than fits in RAM.

For how this extends to parallel batch processing, where the input is split into independently claimed slices, see How batch jobs work and How it works.

Reliability

If a worker process crashes while processing a job, the job is never lost. It stays in MongoDB, and when the dead server is cleaned up its in-flight jobs are automatically re-queued and picked up by another worker. For batch jobs the same is true at the slice level: only the slices that were in flight are re-queued, and already-completed slices are not reprocessed.

Scalability

Rocket Job scales linearly: adding more servers adds CPU, memory, and local disk, and doubling the number of worker servers should roughly double throughput. In practice the bottleneck moves to the databases, networks, or external services that jobs call into during processing.

Those downstream bottlenecks can be scaled independently. For example, read-heavy jobs can offload their reads to database replicas. ActiveRecord Slave redirects ActiveRecord reads across one or more replica servers, spreading the load generated by a large pool of workers. Within a single job, throttles and batch worker limits cap how hard a shared resource is hit, so adding workers does not overwhelm it.

Performance

The benchmarks below were run with the bundled rocketjob_perf (simple jobs) and rocketjob_batch_perf (batch jobs) scripts. They are historical numbers, captured on modest hardware, and are included to show the order of magnitude Rocket Job reaches and how it scales with more processes. Real throughput depends on your hardware, MongoDB deployment, and what each job does.

Both tests used the same setup:

Start the worker processes by running the following in 3 separate windows:

bundle exec rocketjob --log_level warn --workers 5

Simple jobs

Run a quick test with bundle exec rocketjob_perf -c 1000, or the full test with bundle exec rocketjob_perf. With 15 workers distributed across 3 processes:

{
  :count           => 100000,
  :duration        => 70.493,
  :jobs_per_second => 1418
}

1,418 jobs per second, processed reliably, in priority order, and with full visibility of every job.

Batch jobs

Run a quick test with bundle exec rocketjob_batch_perf -c 1000, or the full test with bundle exec rocketjob_batch_perf. With 15 workers distributed across 3 processes:

{
  :count              => 10000000,
  :duration           => 93.411,
  :records_per_second => 107053.773,
  :workers            => 15,
  :worker_processes   => 3
}

Over 100,000 records per second on a single laptop. Increasing slice_size further raises the processing rate, and enabling or disabling compression and/or encryption does not appear to have a significant impact on processing times.

Public vs internal API

The public interface is small and deliberately so. Application code subclasses RocketJob::Job, optionally mixes in capability modules such as RocketJob::Batch, defines typed fields, and implements #perform. Queuing, downloading results, and the documented job state transitions round out the surface that is guaranteed to remain stable.

Everything else, the plugin modules under RocketJob::Plugins, the slicing machinery under RocketJob::Sliced, and the runtime classes (Supervisor, Server, WorkerPool, Worker, Subscriber), is internal. It is documented here for contributors extending Rocket Job, not for callers using it, and may change between minor releases as long as the public job interface is preserved.

The job is a composition of plugins

RocketJob::Job is deliberately almost empty: it composes its behavior by including a stack of modules, each an ActiveSupport::Concern that contributes fields, callbacks, validations, and methods (Plugins::Document, Plugins::Job::Model, Persistence, Callbacks, Logger, StateMachine, Worker, Throttle, and so on). Plugins::Document is the concern that pulls in Mongoid::Document and pins the job to the rocketjob MongoDB client. The state transitions (queued to running to completed / failed / aborted / paused) are driven by the aasm gem via the state machine plugins.

Optional capabilities are themselves modules that a job opts into: Batch, Cron, Singleton, Retry, ProcessingWindow, Transaction, and ThrottleDependentJobs.

This composition is what makes Rocket Job extensible: your own behavior can be packaged as a plugin and included into any job, exactly like the built-in ones. See Extending Jobs with Plugins in the Programmer’s Guide for how to write one.

Batch jobs and slices

RocketJob::Batch is a concern that layers slicing on top of a job. Input and output are organized into categories (Category::Input / Category::Output), each with its own serializer (plain, compressed, encrypted, bzip2) and collection. The data itself is stored separately from the job in the rocketjob_slices MongoDB client: Sliced::Input and Sliced::Output (both subclasses of Sliced::Slices) manage dynamically created collections of Sliced::Slice documents, each holding a batch of records for one worker to process.

Runtime: Supervisor, Server, Workers

A running process is driven by the Supervisor, started via bin/rocketjob (RocketJob::CLI). It registers a Server document, manages a WorkerPool of Worker threads (ThreadWorker), handles OS signals, and runs the listeners. Cross-process coordination (shutdown, pause, log-level changes) does not use a separate broker: it rides on MongoDB through RocketJob::Event and the Subscriber / Subscribers::* classes.

Class diagram

The core classes and their relationships. The diagram leaves out the smaller pieces (the remaining Plugins::Job::* modules, the serializer-specific slice classes such as EncryptedSlice and BZip2OutputSlice, the built-in jobs under RocketJob::Jobs, and Subscribers::SecretConfig) to keep it legible. They follow the same patterns as the classes shown.

classDiagram
    direction LR

    %% ===== Persistence foundation =====
    class MongoidDocument["Mongoid::Document"]
    class Document["Plugins::Document"] {
        <<concern>>
        store_in client rocketjob
        +find_and_update()
    }
    MongoidDocument <|.. Document : includes

    %% ===== Public job interface =====
    class Job["RocketJob::Job"] {
        +perform()
        +priority +description
        +state (aasm)
        +start! complete! fail! abort!
    }
    Document <|.. Job : includes
    Job ..|> JobModel : includes
    Job ..|> StateMachinePlugin : includes
    Job ..|> JobWorker : includes
    Job ..|> Throttle : includes

    class JobModel["Plugins::Job::Model"]
    class StateMachinePlugin["Plugins::StateMachine (aasm)"]
    class JobWorker["Plugins::Job::Worker"]
    class Throttle["Plugins::Job::Throttle"]

    %% ===== Optional capability mixins =====
    class Batch["RocketJob::Batch"] {
        <<concern>>
        +upload() +download()
        input_category / output_category
    }
    class Cron["Plugins::Cron"]
    class Singleton["Plugins::Singleton"]
    class Retry["Plugins::Retry"]
    Job <.. Batch : opt-in mixin
    Job <.. Cron : opt-in mixin
    Job <.. Singleton : opt-in mixin
    Job <.. Retry : opt-in mixin

    %% ===== Categories =====
    class CategoryBase["Category::Base"] {
        <<concern>>
        +name +serializer +columns +format
    }
    class CategoryInput["Category::Input"]
    class CategoryOutput["Category::Output"]
    CategoryBase <|.. CategoryInput : includes
    CategoryBase <|.. CategoryOutput : includes
    Batch o--> "*" CategoryInput : input_categories
    Batch o--> "*" CategoryOutput : output_categories

    %% ===== Sliced storage (rocketjob_slices) =====
    class Slices["Sliced::Slices"] {
        +slice_size +collection_name
        +upload download
    }
    class SlicedInput["Sliced::Input"]
    class SlicedOutput["Sliced::Output"]
    class Slice["Sliced::Slice"] {
        Array of records + state
    }
    Slices <|-- SlicedInput
    Slices <|-- SlicedOutput
    Document <|.. Slice : includes
    Slices o--> "*" Slice : holds
    Batch ..> SlicedInput : input data
    Batch ..> SlicedOutput : output data

    %% ===== Runtime =====
    class Supervisor {
        +self.run
        +supervise_pool
    }
    class Server {
        +state (aasm)
        +heartbeat
    }
    class WorkerPool {
        +rebalance(max_workers)
    }
    class Worker {
        +run +shutdown!
    }
    class ThreadWorker
    Document <|.. Server : includes
    Worker <|-- ThreadWorker
    Supervisor o--> "1" Server : registers
    Supervisor o--> "1" WorkerPool : manages
    WorkerPool o--> "*" Worker : runs
    Worker ..> Job : reserves & performs

    %% ===== Coordination (pub/sub over MongoDB) =====
    class Event["RocketJob::Event"]
    class Subscriber["RocketJob::Subscriber"]
    class SubServer["Subscribers::Server"]
    class SubWorker["Subscribers::Worker"]
    class SubLogger["Subscribers::Logger"]
    Document <|.. Event : includes
    Subscriber <|-- SubServer
    Subscriber <|-- SubWorker
    Subscriber <|-- SubLogger
    Supervisor ..> Event : listens
    Subscriber ..> Event : publishes/consumes

Next steps