class ActiveJob::GoogleCloudPubsub::Worker

Constants

MAX_DEADLINE

Public Class Methods

new(queue: 'default', pubsub: Google::Cloud::Pubsub.new(timeout: 60), logger: Logger.new($stdout)) click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 15
def initialize(queue: 'default', pubsub: Google::Cloud::Pubsub.new(timeout: 60), logger: Logger.new($stdout))
  @queue_name  = queue
  @pubsub      = pubsub
  @logger      = logger
end

Public Instance Methods

ensure_subscription() click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 36
def ensure_subscription
  @pubsub.subscription_for @queue_name

  nil
end
run() click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 21
def run
  subscriber = @pubsub.subscription_for(@queue_name).listen(streams: 1, threads: { callback: 1 }) do |message|
    @logger&.info "Message(#{message.message_id}) was received."
    process message
  end

  subscriber.on_error do |error|
    @logger&.error(error)
  end

  subscriber.start

  sleep
end

Private Instance Methods

process(message) click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 44
def process(message)
  timer_opts = {
    execution_interval: MAX_DEADLINE - 10.seconds,
    timeout_interval: 5.seconds,
    run_now: true
  }

  delay_timer = Concurrent::TimerTask.execute(timer_opts) do
    message.modify_ack_deadline! MAX_DEADLINE.to_i
  end

  begin
    succeeded = false
    failed    = false

    ActiveJob::Base.execute JSON.parse(message.data)

    succeeded = true
  rescue StandardError
    failed = true
    raise
  ensure
    delay_timer.shutdown

    if succeeded || failed
      message.acknowledge!
      @logger&.info "Message(#{message.message_id}) was acknowledged."
    else
      # terminated from outside
      message.modify_ack_deadline! 0
    end
  end
end