class Opener::Daemons::Worker

Downlods a KAF document, passes it to a component and submits the output to a callback URL or a default queue. Each Worker instance runs in an isolated thread

@!attribute [r] config

@return [Opener::Daemons::Configuration]

@!attribute [r] uploader

@return [Opener::Daemons::Uploader]

@!attribute [r] downloader

@return [Opener::Daemons::Downloader]

@!attribute [r] callback_handler

@return [Opener::CallbackHandler]

Constants

INLINE_IO

Attributes

callback_handler[R]
config[R]
downloader[R]
uploader[R]

Public Class Methods

new(config) click to toggle source

@param [Opener::Daemons::Configuration] config

# File lib/opener/daemons/worker.rb, line 31
def initialize(config)
  @config           = config
  @downloader       = Downloader.new
  @uploader         = Uploader.new
  @callback_handler = CallbackHandler.new
  @input            = nil
  @output           = nil
end

Public Instance Methods

handle_unsupported_language() click to toggle source

Sends the unsupported input to the last callback URL.

# File lib/opener/daemons/worker.rb, line 114
def handle_unsupported_language
  last_url = config.callbacks.last

  callback_handler.post last_url, input_params.merge(
    identifier: config.identifier,
    metadata:   config.metadata,
  )

  Core::Syslog.info(
    "Submitted input with an unsupported language to #{last_url}",
    config.metadata
  )
end
process() click to toggle source

Processes a document.

@raise [Oni::WrappedError]

# File lib/opener/daemons/worker.rb, line 45
def process
  add_transaction_attributes

  begin
    process_input
    run_component
    process_output
    submit_callbacks

  # Unsupported languages are handled in a different manner as they can
  # occur quite often. In these cases we _do_ want the data to be sent
  # to the final callback URL (skipping whatever comes before it) so it
  # can act upon it.
  rescue Core::UnsupportedLanguageError
    handle_unsupported_language
  end
end
process_input() click to toggle source
# File lib/opener/daemons/worker.rb, line 65
def process_input
  if config.input
    @input = Zlib.gunzip Base64.decode64 config.input
    @input.force_encoding 'UTF-8'
  else
    @input = downloader.download config.input_url
  end
end
process_output() click to toggle source

@param [String] output @return [Aws::S3::Object]

# File lib/opener/daemons/worker.rb, line 85
def process_output
  if INLINE_IO
    @next_input = Base64.encode64 Zlib.gzip @output
  else
    @object = uploader.upload config.identifier, @output, config.metadata
  end
end
run_component() click to toggle source

@return [String]

# File lib/opener/daemons/worker.rb, line 77
def run_component
  @output = config.component_instance.run @input, config.metadata['custom_config']
end
submit_callbacks() click to toggle source

Sends the object’s URL to the next callback URL.

@param [Aws::S3::Object] object

# File lib/opener/daemons/worker.rb, line 98
def submit_callbacks
  urls     = config.callbacks.dup
  next_url = urls.shift

  callback_handler.post next_url, next_input_params.merge(
    identifier: config.identifier,
    callbacks:  urls,
    metadata:   config.metadata,
  )

  Core::Syslog.info("Submitted response to #{next_url}", config.metadata)
end

Private Instance Methods

add_transaction_attributes() click to toggle source
# File lib/opener/daemons/worker.rb, line 152
def add_transaction_attributes
  Transaction.current.add_parameters(
    input_url:  config.input_url,
    identifier: config.identifier,
    callbacks:  config.callbacks,
    metadata:   config.metadata,
  )
end
input_params() click to toggle source

Preserve input for last callback

# File lib/opener/daemons/worker.rb, line 133
def input_params
  if config.input_url
    {input_url: config.input_url}
  else
    {input:     config.input}
  end
end
next_input_params() click to toggle source

Use generated output as new input

# File lib/opener/daemons/worker.rb, line 144
def next_input_params
  if INLINE_IO
    {input:     @next_input}
  else
    {input_url: @object.public_url.to_s}
  end
end