class Sidekiq::Promise::Worker

Attributes

jid[RW]

Public Class Methods

new(worker_klass, *args) click to toggle source
Calls superclass method
# File lib/sidekiq/promise/worker.rb, line 7
def initialize worker_klass, *args
  @worker_klass = worker_klass
  @args         = args

  super proc { subscribe }
end

Private Instance Methods

applicable?(message) click to toggle source
# File lib/sidekiq/promise/worker.rb, line 36
def applicable? message
  message['jid'] == jid
end
noop(*args;) click to toggle source
# File lib/sidekiq/promise/worker.rb, line 54
def noop *args; end
process_complete_message(message) click to toggle source
# File lib/sidekiq/promise/worker.rb, line 44
def process_complete_message message
  unsubscribe
  resolve message['result']
end
process_dequeued_message(*args;)
Alias for: noop
process_enqueued_message(*args;)
Alias for: noop
process_error_message(message) click to toggle source
# File lib/sidekiq/promise/worker.rb, line 49
def process_error_message message
  unsubscribe
  reject message['exception']
end
process_message(message) click to toggle source
# File lib/sidekiq/promise/worker.rb, line 40
def process_message message
  send "process_#{message['status']}_message", message
end
queue_job() click to toggle source
# File lib/sidekiq/promise/worker.rb, line 29
def queue_job
  future = ::Celluloid::Future.new do
    @worker_klass.perform_async *@args
  end
  @jid = future.value
end
subscribe() click to toggle source
# File lib/sidekiq/promise/worker.rb, line 16
def subscribe
  @subscription_id = Sidekiq::Promise::Subscription.subscribe do |message|
    process_message message if applicable? message
  end
  Sidekiq::Promise::Subscription.ready.then do
    queue_job
  end
end
unsubscribe() click to toggle source
# File lib/sidekiq/promise/worker.rb, line 25
def unsubscribe
  Sidekiq::Promise::Subscription.unsubscribe @subscription_id if @subscription_id
end