Rocket Job Batch Programmers Guide

Table of Contents

Batch Jobs

Regular jobs run on a single worker. In order to scale up and use all available workers it is necessary to break up the input data into “slices” so that different parts of the job can be processed in parallel.

Jobs that include RocketJob::Batch break their work up into slices so that many workers can work on the individual slices at the same time. Slices take a large and unwieldy job and break it up into bite-size pieces that can be processed a slice at a time by the workers.

Since batch jobs consist of lots of smaller slices the job can be paused, resumed, or even aborted as a whole. If there are any failed slices when the job finishes, they can all be retried by retrying the job itself.

For example, using the default slice_size of 100, and the uploaded file contains 1,000,000 lines, then the job will contain 10,000 slices.

Slices are made up of records, 100 by default, each record usually refers to a line or row in a file, but is any valid BSON object for which work is to be performed.

A running batch job will be interrupted if a new job with a higher priority is queued for processing. This allows low priority jobs to use all available resources until a higher priority job arrives, and then to resume processing once the higher priority job is complete.

class ReverseJob < RocketJob::Job
  include RocketJob::Batch

  # Keep the job around after it has finished
  self.destroy_on_complete = false

  # Number of lines/records for each slice
  input_category slice_size: 100

  # Collect any output from the job
  output_category

  def perform(line)
    # Work on a single record at a time across many workers
  end
end

Queue the job for processing:

# Words would come from a database query, file, etc.
words = %w(these are some words that are to be processed at the same time on many workers)

job = ReverseJob.new

# Load words as individual records for processing into the job
job.upload do |records|
  words.each do |word|
    records << word
  end
end

# Queue the job for processing
job.save!

Batch Output

Display the output from the above batch:

# Display the results that were returned
job.output.each do |slice|
  slice.each do |record|
    # Display each record returned from job
    puts record
  end
end

Output Ordering

The order of the slices and records is exactly the same as the order in which the records were uploaded into the job. This makes it easy to correlate an input record with its corresponding output record.

There are cases however where the exact input and output record ordering can be changed:

Job Completion

The output from a job can be queried at any time, but will be incomplete until the job has completed processing.

To programatically wait for a job to complete processing:

loop do
  sleep 1
  job.reload
  break unless job.running? || job.queued?
  puts "Job is still #{job.state}"
end

Large File Processing

Batch jobs can process very large files. Entire files are uploaded into a Job for processing and automatically broken up into slices for workers to process.

Queue the job for processing:

job = ReverseJob.new
# Upload a file into the job for processing
job.upload('myfile.txt')
job.save!

Once the job has completed, download the output records into a file:

# Download the output and compress the output into a GZip file
job.download('reversed.txt.gz')

Rocket Job has built-in support for reading and writing

Note:

Uploading data

Rocket Job uploads the data for the job into a unique Mongo Collection for every batch job. During processing the slices are removed from this collection as soon as they are processed.

Failed slices remain in the collection and are marked as failed so that they can be investigated or retried.

Benefits of uploading data into the job:

Data can be uploaded into a batch job from many sources:

Uploading Files

Upload every line in a file as records into the job for processing.

Returns the number of lines uploaded into the job as an Integer.

Parameters

Example, load plain text records from a file

job.upload('hello.csv')

Example:

Load plain text records from a file, stripping all non-printable characters, as well as any characters that cannot be converted to UTF-8

job.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')

Example: Zip

job.upload('myfile.csv.zip')

Example: Encrypted Zip

job.upload('myfile.csv.zip.enc')

Example: Explicitly set the streams

job.upload('myfile.ze', streams: [:zip, :enc])

Example: Supply custom options

job.upload('myfile.csv.enc', streams: :enc])

Example: Extract streams from filename but write to a temp file

streams = IOStreams.streams_for_file_name('myfile.gz.enc')
t = Tempfile.new('my_project')
job.upload(t.to_path, streams: streams)

Notes:

See: IOStreams for more information on supported file types and conversions that can be applied during calls to upload and download.

Active Record Queries

Upload results from an Active Record Query into a batch job.

Parameters

If a Block is supplied it is passed the model returned from the database and should return the work item to be uploaded into the job.

Returns [Integer] the number of records uploaded

Example: Upload id’s for all users

arel = User.all                                                                  
job.upload_arel(arel)                                         

Example: Upload selected user id’s

arel = User.where(country_code: 'US')
job.upload_arel(arel)                                         

Example: Upload user_name and zip_code

arel = User.where(country_code: 'US')                                            
job.upload_arel(arel, :user_name, :zip_code)                  

Mongoid Queries

Upload the result of a MongoDB query to the input collection for processing. Useful when an entire MongoDB collection, or part thereof needs to be processed by a job.

Returns [Integer] the number of records uploaded

If a Block is supplied it is passed the document returned from the database and should return a record for processing.

If no Block is supplied then the record will be the :fields returned from MongoDB.

Notes:

Example: Upload document ids

criteria = User.where(state: 'FL')
job.upload_mongo_query(criteria)

Example: Specify one or more columns other than just the document id to upload:

criteria = User.where(state: 'FL')
job.upload_mongo_query(criteria, :zip_code)

Upload Block

When a block is supplied, it is given a record stream into which individual records can be written.

Upload by writing records one at a time to the upload stream.

job.upload do |writer|
  10.times { |i| writer << i }
end

Batch Job Throttling

Throttle the number of workers that can work on a batch job instance at any time.

Limiting can be used when too many concurrent workers are:

Worker limiting also allows batch jobs to be processed concurrently instead of sequentially.

The throttle_running_workers throttle can be changed at any time, even while the job is running to either increase or decrease the number of workers working on that job.

class ReverseJob < RocketJob::Job
  include RocketJob::Batch

  # No more than 10 workers should work on this job at a time
  self.throttle_running_workers = 10

  def perform(line)
    line.reverse
  end
end

Multiple Output Files

A single batch job can also create multiple output files by categorizing the result of the perform method.

This can be used to output one file with results from the job and another for outputting for example the lines that were too short.

class MultiFileJob < RocketJob::Job
  include RocketJob::Batch

  self.destroy_on_complete = false

  output_category
  # Register additional `:invalid` output category for this job
  output_category(name: :invalid)

  def perform(line)
    if line.length < 10
      # The line is too short, send it to the invalid output collection
      Result.new(line, :invalid)
    else
      # Reverse the line ( default output goes to the :main output collection )
      line.reverse
    end
  end
end

When complete, download the results of the batch into 2 files:

# Download the regular results
job.download('reversed.txt.gz')

# Download the invalid results to a separate file
job.download('invalid.txt.gz', category: :invalid)

Error Handling

Since a Batch job breaks a single job into slices, individual records within slices can fail while others are still being processed.

# Display the exceptions for failed slices:
job = RocketJob::Job.find('55bbce6b498e76424fa103e8')
job.input.each_failed_record do |record, slice|
  p slice.exception
end

Once all slices have been processed and there are only failed slices left, then the job as a whole is failed.

Reading Tabular Files

Very often received data is in a format very similar to that of a spreadsheet with rows and columns, such as CSV, or Excel files. Usually the first row is the header that describes what each column contains. The remaining rows are the actual data for processing.

To direct Rocket Job Batch to read the input as csv, add the format option to the input_category. Now each CSV line will be parsed just before the perform method is called, and a Hash will be passed in as the first argument to perform, instead of the csv line.

This Hash consists of the header field names as keys and the values that were received for the specific row in the file.

class TabularJob < RocketJob::Job
  input_category format: :csv
  
  def perform(record)
  #  record is a Hash, for example: 
  #  {
  #     "first_field" => 100,
  #     "second"      => 200,
  #     "third"       => 300
  #   }
  end
end

Upload a file into the job for processing

job = TabularJob.new
job.upload('my_really_big_csv_file.csv')
job.save!

Notes:

This job can be changed so that it handles any supported tabular informat. For example: csv, psv, json, xlsx.

Auto Detect file type

Set the format to :auto to use the file name during the upload step to auto-detect the file type:

class TabularJob < RocketJob::Job
  include RocketJob::Batch
  
  input_category format: :auto
  
  def perform(record)
  #  record is a Hash, for example: 
  #  {
  #     "first_field" => 100,
  #     "second"      => 200,
  #     "third"       => 300
  #   }
  end
end

Upload a csv file into the job for processing

job = TabularJob.new
job.upload("my_really_big_csv_file.csv")
job.save!

Upload a xlsx spreadsheet with the same column headers into the same job for processing, without changing the job in any way:

job = TabularJob.new
job.upload("really_big.xlsx")
job.save!

And so on, for example reading a json file:

job = TabularJob.new
job.upload("really_big.json")
job.save!

Writing Tabular Files

Jobs can also output tabular data such as CSV files. Instead of making the job deal with CSV transformations directly, it can set the format on the output_category to :csv:

class ExportUsersJob < RocketJob::Job
  include RocketJob::Batch
  
  # Columns to include in the output file
  output_category format: :csv, columns: ["login", "last_login"]
  
  def perform(id)
    u = User.find(id)
    # Return a Hash that tabular will render to CSV
    {
      "login"      => u.login,
      "last_login" => u.updated_at
    }
  end
end

Upload a file into the job for processing

job = ExportUsersJob.new
# Upload the list of locked user logins to export.
arel = User.where(locked: true)
job.upload(arel)
job.save!

Once the job has completed, export the output:

job.download("output.csv")

Sample contents of output.csv:

login,last_login
jbloggs,2019-02-11 05:43:20
kadams,2019-01-12 01:20:20

Filtering Output

Rocket Job will only export the list of columns specified, so for example the same job can output different columns between runs. For Example, one customer gets more columns than other, and one job will handle both cases.

In the example below many attributes are being exported, yet only a subset is exported by default:

class ExportUsersJob < RocketJob::Job
  include RocketJob::Batch
  
  # Columns to include in the output file
  output_category format: :csv, columns: ["login", "last_login"]
  
  def perform(login)
    u = User.find_by(login: login)
    # Return a Hash of all available attributes from which it will extract
    # the "login", "last_login" columns.
    u.attributes 
  end
end

Run the job:

job = ExportUsersJob.create!

Once the job has completed, export the output:

job.download("output.csv")

Sample contents of output.csv:

login,last_login
jbloggs,2019-02-11 05:43:20
kadams,2019-01-12 01:20:20

For another customer the list of columns can be increased by overriding the output columns. For example, make the job output a CSV file with the “login”, “last_login”, “name”, and “state” columns:

job = ExportUsersJob.new
job.output_category.columns = ["login", "last_login", "name", "state"]
job.save!

Once the job has completed, export the output:

job.download("output.csv")

Sample contents of output.csv:

login,last_login,name,state
jbloggs,2019-02-11 05:43:20,Joe Bloggs,FL
kadams,2019-01-12 01:20:20,Kevin Adams,TX

Single Output File

Example: Process a very large csv file and return a single output csv file:

class MultiFileJob < RocketJob::Job
  include RocketJob::Batch

  # Prevent this job from being destroyed on completion.
  self.destroy_on_complete = false

  # Specify that the main input category should parse the uploaded CSV file
  # and pass each line one at a time into the `perform` method. 
  input_category format: :csv
  
  # Register an output category to output a CSV file.
  output_category format: :csv

  # When the job completes automatically download the output files.
  after_batch :download_file
  
  # Since the input category has format: :csv, the `perform` method will receive a hash:
  # {
  #   "first_name" => "Jack",
  #   "last_name" => "Jones",
  #   "age" => "21",
  #   "zip_code" => "12345"
  # }
  def perform(row)
    # Since the output_category format is `:csv`, Rocket Job will convert this hash into a line in the csv file.
    {
      name: "#{row['first_name'].downcase} #{row['last_name'].downcase}",
      age:  row["age"]
    }
  end

  # Download the output from this job into a CSV file
  def download_file
    download("names.csv")
  end
end

Multiple Output Files

When multiple output files need to be created, add a second output category to hold its contents.

For example, the upload file is a csv file as follows, by running this code in a Rails console:

# Create a sample CSV file to test with:
str = <<STRING
First Name, Last name, age, zip code
Jack,Jones,21,12345
Mary,Jane,32,55512
STRING
~~~ruby

Now display the file contents as hashes: 
~~~ruby
io = StringIO.new(str)
IOStreams.stream(io).each(:hash) {|h| p h}

The output from the above code:

{"first_name"=>"Jack", "last_name"=>"Jones", "age"=>"21", "zip_code"=>"12345"}
{"first_name"=>"Mary", "last_name"=>"Jane", "age"=>"32", "zip_code"=>"55512"}

Now lets build a job to process the file above and create 2 output files:

class MultiFileJob < RocketJob::Job
  include RocketJob::Batch

  # Prevent this job from being destroyed on completion.
  self.destroy_on_complete = false

  # Instruct the main input category to parse each line of the csv file,
  # pass them in one at a time into the `perform` method. 
  input_category format: :csv
  
  # Register a main output category to output a CSV file.
  output_category name: :main, format: :csv

  # Register a `zip_codes` output category to output a separate CSV file.
  output_category name: :zip_codes, format: :csv
  
  # When the job completes automatically download the output files.
  after_batch :download_files
  
  # Since the input category has format: :csv, the `perform` method will receive a hash:
  # {
  #   "first_name" => "Jack",
  #   "last_name" => "Jones",
  #   "age" => "21",
  #   "zip_code" => "12345"
  # }
  def perform(row)
    # Collect multiple outputs into this collection
    outputs = RocketJob::Batch::Results.new
    
    # Lets output the names into the main file:
    main_result = {
      name: "#{row['first_name'].downcase} #{row['last_name'].downcase}",
      age:  row["age"]
    }
    # Add the result to the main output category:
    outputs << main_result

    # And the zip codes into the zip_codes file:
    zip_codes_result = {
      zip: row["zip_code"]
    }

    # Add the zip codes result to the zip_code output category:
    outputs << RocketJob::Batch::Result.new(:zip_codes, zip_codes_result)
    
    # Return the collected outputs
    outputs
  end
  
  def download_files
    # Download the main output file
    download("names.csv")
    
    # Download the zip_codes output file
    download("zip_codes.csv", category: :zip_codes)
  end
end

Compression

Compression reduces network utilization and disk storage requirements. Highly recommended when processing large files, or large amounts of data.

By setting the input and output categories serializer to :compress it ensures that all data uploaded into this job is compressed.

By default with Rocket Job v6 the default serializer is now :compress. Set it to :none to disable compression.

class ReverseJob < RocketJob::Job
  include RocketJob::Batch

  # Compress input and output data
  input_category serializer: :compress
  output_category serializer: :compress

  def perform(line)
    line.reverse
  end
end

Encryption

By setting the input and output categories serializer to :encrypt it ensures that all data uploaded into this job is encrypted. Encryption helps ensure sensitive data meets compliance requirements both at rest and in-flight.

When encryption is enabled, the data is automatically compressed before encryption to reduce the amount of data that is encrypted and unencrypted.

class ReverseJob < RocketJob::Job
  include RocketJob::Batch

  # Encrypt input and output data
  input_category serializer: :encrypt
  output_category serializer: :encrypt

  def perform(line)
    line.reverse
  end
end

PGP Encryption

When exchanging files with other systems, using an open standard like PGP is ideal.

Below is an example on how to create a PGP encrypted output file:

class MultiFileJob < RocketJob::Job
  include RocketJob::Batch

  # Prevent this job from being destroyed on completion.
  self.destroy_on_complete = false

  # Specify that the main input category should parse the uploaded CSV file
  # and pass each line one at a time into the `perform` method. 
  input_category format: :csv
  
  # Register an output category to output a CSV file.
  output_category format: :csv

  # Define a field to hold the `pgp_public_key` of the recipient.
  field :pgp_public_key, type: String
  
  validates_presence_of :pgp_public_key

  # When the job completes automatically download the output files.
  after_batch :download_file

  # Since the input category has format: :csv, the `perform` method will receive a hash:
  # {
  #   "first_name" => "Jack",
  #   "last_name" => "Jones",
  #   "age" => "21",
  #   "zip_code" => "12345"
  # }
  def perform(row)
    # Since the output_category format is `:csv`, Rocket Job will convert this hash into a line in the csv file.
    {
      name: "#{row['first_name'].downcase} #{row['last_name'].downcase}",
      age:  row["age"]
    }
  end

  # Download the output from this job into a CSV file encrypted with PGP
  def download_file
    path = IOStreams.path("names.csv")
    
    # Add the pgp public key to encrypt the file with:
    path.option(:pgp, import_and_trust_key: pgp_public_key)
    
    download(path)
  end
end