class DistributedJob::Job

A ‘DistributedJob::Job` instance allows to keep track of a distributed job, i.e. a job which is split into multiple units running in parallel and in multiple workers using redis.

@example Creating a distributed job

distributed_job = DistributedJobClient.build(token: SecureRandom.hex)

# Add job parts and queue background jobs
distributed_job.push_each(Date.parse('2021-01-01')..Date.today) do |date, part|
  SomeBackgroundJob.perform_async(date, distributed_job.token, part)
end

distributed_job.token # can be used to query the status of the distributed job

@example Processing a distributed job part

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    return if distributed_job.stopped?

    # ...

    if distributed_job.done(part)
      # perform e.g. cleanup or the some other job
    end
  rescue
    distributed_job.stop

    raise
  end
end

Attributes

client[R]
token[R]
ttl[R]

Public Class Methods

new(client:, token:, ttl: 86_400) click to toggle source

Initializes a new distributed job.

@param client [DistributedJob::Client] The client instance @param token [String] Some token to be used to identify the job. You can

e.g. use SecureRandom.hex to generate one.

@param ttl [Integer] The number of seconds this job will stay available

in redis. This value is used to automatically expire and clean up the
job in redis. Default is 86400, i.e. one day. The ttl is used everytime
the job is modified in redis.

@example

DistributedJobClient = DistributedJob::Client.new(redis: Redis.new)

distributed_job = DistributedJob::Job.new(client: DistributedJobClient, token: SecureRandom.hex)

# However, the preferred way to build a distributed job is:

distributed_job = DistributedJobClient.build(token: SecureRandom.hex)
# File lib/distributed_job/job.rb, line 59
def initialize(client:, token:, ttl: 86_400)
  @client = client
  @token = token
  @ttl = ttl
end

Public Instance Methods

count() click to toggle source

Returns the number of pushed parts which are not finished.

@example

distributed_job.count # => e.g. 8
# File lib/distributed_job/job.rb, line 161
def count
  redis.scard("#{redis_key}:parts")
end
done(part) click to toggle source

Removes the specified part from the distributed job, i.e. from the set of unfinished parts. Use this method when the respective job part has been successfully processed, i.e. finished.

@param part [String] The job part @returns [Boolean] Returns true when there are no more unfinished parts

left or false otherwise

@example

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    # ...

    distributed_job.done(part)
  end
end
# File lib/distributed_job/job.rb, line 132
    def done(part)
      @done_script ||= <<~SCRIPT
        local key, part, ttl = ARGV[1], ARGV[2], tonumber(ARGV[3])

        if redis.call('srem', key .. ':parts', part) == 0 then return end

        redis.call('expire', key .. ':parts', ttl)
        redis.call('expire', key .. ':state', ttl)

        return redis.call('scard', key .. ':parts')
      SCRIPT

      redis.eval(@done_script, argv: [redis_key, part.to_s, ttl]) == 0 && closed?
    end
finished?() click to toggle source

Returns true if there are no more unfinished parts.

@example

distributed_job.finished? #=> true/false
# File lib/distributed_job/job.rb, line 170
def finished?
  closed? && count.zero?
end
open_parts() click to toggle source

Returns all parts of the distributed job which are not yet finished.

@return [Enumerator] The enum which allows to iterate all parts

# File lib/distributed_job/job.rb, line 109
def open_parts
  redis.sscan_each("#{redis_key}:parts")
end
push_each(enum) { |previous_object, previous_index| ... } click to toggle source

Pass an enum to be used to iterate all the units of work of the distributed job. The distributed job needs to know all of them to keep track of the overall number and status of the parts. Passing an enum is much better compared to pushing the parts manually, because the distributed job needs to be closed before the last part of the distributed job is enqueued into some job queue. Otherwise it could potentially happen that the last part is already processed in the job queue before it is pushed to redis, such that the last job doesn’t know that the distributed job is finished.

@param enum [#each_with_index] The enum which can be iterated to get all

job parts

@example

distributed_job.push_each(Date.parse('2021-01-01')..Date.today) do |date, part|
  # e.g. SomeBackgroundJob.perform_async(date, distributed_job.token, part)
end

@example ActiveRecord

distributed_job.push_each(User.select(:id).find_in_batches) do |batch, part|
  # e.g. SomeBackgroundJob.perform_async(batch.first.id, batch.last.id, distributed_job.token, part)
end
# File lib/distributed_job/job.rb, line 87
def push_each(enum)
  previous_object = nil
  previous_index = nil

  enum.each_with_index do |current_object, current_index|
    push(current_index)

    yield(previous_object, previous_index.to_s) if previous_index

    previous_object = current_object
    previous_index = current_index
  end

  close

  yield(previous_object, previous_index.to_s) if previous_index
end
stop() click to toggle source

Allows to stop a distributed job. This is useful if some error occurred in some part, i.e. background job, of the distributed job and you then want to stop all other not yet finished parts. Please note that only jobs can be stopped which ask the distributed job actively whether or not it was stopped.

@returns [Boolean] Always returns true

@example

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)
  rescue
    distributed_job.stop

    raise
  end
end
# File lib/distributed_job/job.rb, line 199
def stop
  redis.multi do |transaction|
    transaction.hset("#{redis_key}:state", 'stopped', 1)

    transaction.expire("#{redis_key}:state", ttl)
    transaction.expire("#{redis_key}:parts", ttl)
  end

  true
end
stopped?() click to toggle source

Returns true when the distributed job was stopped or false otherwise.

@returns [Boolean] Returns true or false

@example

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)
  rescue
    distributed_job.stop

    raise
  end
end
# File lib/distributed_job/job.rb, line 231
def stopped?
  redis.hget("#{redis_key}:state", 'stopped') == '1'
end
total() click to toggle source

Returns the total number of pushed parts, no matter if finished or not.

@example

distributed_job.total # => e.g. 13
# File lib/distributed_job/job.rb, line 152
def total
  redis.hget("#{redis_key}:state", 'total').to_i
end

Private Instance Methods

close() click to toggle source
# File lib/distributed_job/job.rb, line 245
def close
  redis.multi do |transaction|
    transaction.hset("#{redis_key}:state", 'closed', 1)

    transaction.expire("#{redis_key}:state", ttl)
    transaction.expire("#{redis_key}:parts", ttl)
  end

  true
end
closed?() click to toggle source
# File lib/distributed_job/job.rb, line 256
def closed?
  redis.hget("#{redis_key}:state", 'closed') == '1'
end
namespace() click to toggle source
# File lib/distributed_job/job.rb, line 241
def namespace
  client.namespace
end
push(part) click to toggle source
# File lib/distributed_job/job.rb, line 260
    def push(part)
      @push_script ||= <<~SCRIPT
        local key, part, ttl = ARGV[1], ARGV[2], tonumber(ARGV[3])

        if redis.call('sadd', key .. ':parts', part) == 1 then
          redis.call('hincrby', key .. ':state', 'total', 1)
        end

        redis.call('expire', key .. ':parts', ttl)
        redis.call('expire', key .. ':state', ttl)
      SCRIPT

      redis.eval(@push_script, argv: [redis_key, part.to_s, ttl])
    end
redis() click to toggle source
# File lib/distributed_job/job.rb, line 237
def redis
  client.redis
end
redis_key() click to toggle source
# File lib/distributed_job/job.rb, line 275
def redis_key
  @redis_key ||= [namespace, 'distributed_jobs', token].compact.join(':')
end