class Concord::Computation

Thrift service definition. Wraps a user-defined computation.

Attributes

handler[RW]
proxy_host[RW]
proxy_port[RW]

Public Class Methods

new(handler: nil, proxy_host: nil, proxy_port: nil) click to toggle source

Initialize a new ‘Computation` and register it with the proxy @param handler [Object] The user-defined computation @param proxy_host [String] The address the proxy is listening on @param proxy_port [FixNum] The port the proxy is listening on

# File lib/concord.rb, line 87
def initialize(handler: nil, proxy_host: nil, proxy_port: nil)
  self.handler = handler
  self.proxy_host = proxy_host
  self.proxy_port = proxy_port
end
serve(computation) click to toggle source

Initialize a new ‘Computation` and start serving it. This is the only method directly called by users. @param computation [Object] The user-defined computation

# File lib/concord.rb, line 102
def self.serve(computation)
  listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr]
  proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr]
  listen_host, listen_port = listen_address.split(':')
  proxy_host, proxy_port = proxy_address.split(':')

  handler = self.new(handler: computation,
                     proxy_host: proxy_host,
                     proxy_port: Integer(proxy_port))

  processor = Thrift::ComputationService::Processor.new(handler)
  transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port))
  transport_factory = ::Thrift::FramedTransportFactory.new
  protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new
  # The reason the client computations MUST use a simple blocking server
  # is that we have process_timer and process_record both which exec as
  # a callback in the work thread pool which means that you might get
  # 2 callbacks whichs makes the code multi threaded - we guarantee single
  # thread for each callback
  server = ::Thrift::SimpleServer.new(processor,
                                      transport,
                                      transport_factory,
                                      protocol_factory)
  # Register with localhost proxy. Note that this method is `oneway'
  # which means after final TCP 'ack' it finishes.
  handler.register_with_scheduler
  server.serve
end

Public Instance Methods

boltMetadata() click to toggle source

@return [Concord::Thrift::ComputationMetadata] The user-defined computation metadata.

# File lib/concord.rb, line 178
def boltMetadata
  metadata = nil
  log_failure do
    metadata = handler.metadata
  end
  enrich_metadata(metadata)
end
boltProcessRecords(records) click to toggle source

Process records from upstream. Wraps the user method in transactions, which are returned to the proxy upon completion. @param records [Concord::Thrift::Record] The record to process

# File lib/concord.rb, line 134
def boltProcessRecords(records)
  txs = []
  records.each do |record|
    ctx = ComputationContext.new(self)
    log_failure do
      handler.process_record(ctx, record)
    end
    txs.push(ctx.transaction)
  end
  txs
end
boltProcessTimer(key, time) click to toggle source

Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion. @param key [String] Callback identifier @param time [FixNum] Time this callback was scheduled to trigger.

# File lib/concord.rb, line 150
def boltProcessTimer(key, time)
  ctx = ComputationContext.new(self)
  log_failure do
    handler.process_timer(ctx, key, time)
  end
  ctx.transaction
end
destroy() click to toggle source

The class destructor, use this method to perform any cleanup before the proxy kills the process this instance resides in.

# File lib/concord.rb, line 171
def destroy
  log_failure do
    handler.destroy
  end
end
get_state(key) click to toggle source

Retrieve a binary blob stored in the proxy state @param key [String] Key to fetch from data store @return [String] Binary blob of data

# File lib/concord.rb, line 189
def get_state(key)
  proxy.getState(key)
end
init() click to toggle source

The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.

# File lib/concord.rb, line 161
def init
  ctx = ComputationContext.new(self)
  log_failure do
    handler.init(ctx)
  end
  ctx.transaction
end
register_with_scheduler() click to toggle source
# File lib/concord.rb, line 93
def register_with_scheduler
  log_failure do
    proxy.registerWithScheduler(boltMetadata)
  end
end
set_state(key, value) click to toggle source

Store a binary blob, identified by a key, in the proxy state @param key [String] Key to set in data store @param value [String] Binary blob

# File lib/concord.rb, line 196
def set_state(key, value)
  proxy.setState(key, value)
end

Private Instance Methods

enrich_metadata(metadata) click to toggle source
# File lib/concord.rb, line 229
def enrich_metadata(metadata)
  def enrich_stream(stream)
    sm = ::Concord::Thrift::StreamMetadata.new
    if stream.is_a?(Array)
      sm.name = stream.first
      sm.grouping = stream.last
    else
      sm.name = stream
    end
    sm
  end

  cm = ::Concord::Thrift::ComputationMetadata.new
  cm.name = metadata.name
  cm.istreams = metadata.istreams.map { |x| enrich_stream(x) }
  cm.ostreams = metadata.ostreams
  cm.proxyEndpoint = ::Concord::Thrift::Endpoint.new
  cm.proxyEndpoint.ip = proxy_host
  cm.proxyEndpoint.port = proxy_port
  cm
end
enrich_stream(stream) click to toggle source
# File lib/concord.rb, line 230
def enrich_stream(stream)
  sm = ::Concord::Thrift::StreamMetadata.new
  if stream.is_a?(Array)
    sm.name = stream.first
    sm.grouping = stream.last
  else
    sm.name = stream
  end
  sm
end
log(msg, handle: $stderr) click to toggle source
# File lib/concord.rb, line 202
def log(msg, handle: $stderr)
  handle.puts msg
  handle.flush
end
log_failure(&block) click to toggle source
# File lib/concord.rb, line 207
def log_failure(&block)
  begin
    block.call
  rescue => e
    log e.message
    log e.backtrace.join("\n")
    raise e
  end
end
proxy() click to toggle source
# File lib/concord.rb, line 217
def proxy
  if @proxy.nil? || !@proxy_socket.open?
    @proxy_socket = ::Thrift::Socket.new(proxy_host, proxy_port)
    transport = ::Thrift::FramedTransport.new(@proxy_socket)
    protocol = ::Thrift::BinaryProtocol.new(transport)
    @proxy = ::Concord::Thrift::BoltProxyService::Client.new(protocol)
    transport.open
  end

  @proxy
end