class Concord::Computation
Thrift
service definition. Wraps a user-defined computation.
Attributes
Public Class Methods
Initialize a new ‘Computation` and register it with the proxy @param handler [Object] The user-defined computation @param proxy_host
[String] The address the proxy is listening on @param proxy_port
[FixNum] The port the proxy is listening on
# File lib/concord.rb, line 87 def initialize(handler: nil, proxy_host: nil, proxy_port: nil) self.handler = handler self.proxy_host = proxy_host self.proxy_port = proxy_port end
Initialize a new ‘Computation` and start serving it. This is the only method directly called by users. @param computation [Object] The user-defined computation
# File lib/concord.rb, line 102 def self.serve(computation) listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr] proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr] listen_host, listen_port = listen_address.split(':') proxy_host, proxy_port = proxy_address.split(':') handler = self.new(handler: computation, proxy_host: proxy_host, proxy_port: Integer(proxy_port)) processor = Thrift::ComputationService::Processor.new(handler) transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port)) transport_factory = ::Thrift::FramedTransportFactory.new protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new # The reason the client computations MUST use a simple blocking server # is that we have process_timer and process_record both which exec as # a callback in the work thread pool which means that you might get # 2 callbacks whichs makes the code multi threaded - we guarantee single # thread for each callback server = ::Thrift::SimpleServer.new(processor, transport, transport_factory, protocol_factory) # Register with localhost proxy. Note that this method is `oneway' # which means after final TCP 'ack' it finishes. handler.register_with_scheduler server.serve end
Public Instance Methods
@return [Concord::Thrift::ComputationMetadata] The user-defined computation metadata.
# File lib/concord.rb, line 178 def boltMetadata metadata = nil log_failure do metadata = handler.metadata end enrich_metadata(metadata) end
Process records from upstream. Wraps the user method in transactions, which are returned to the proxy upon completion. @param records [Concord::Thrift::Record] The record to process
# File lib/concord.rb, line 134 def boltProcessRecords(records) txs = [] records.each do |record| ctx = ComputationContext.new(self) log_failure do handler.process_record(ctx, record) end txs.push(ctx.transaction) end txs end
Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion. @param key [String] Callback identifier @param time [FixNum] Time this callback was scheduled to trigger.
# File lib/concord.rb, line 150 def boltProcessTimer(key, time) ctx = ComputationContext.new(self) log_failure do handler.process_timer(ctx, key, time) end ctx.transaction end
The class destructor, use this method to perform any cleanup before the proxy kills the process this instance resides in.
# File lib/concord.rb, line 171 def destroy log_failure do handler.destroy end end
Retrieve a binary blob stored in the proxy state @param key [String] Key to fetch from data store @return [String] Binary blob of data
# File lib/concord.rb, line 189 def get_state(key) proxy.getState(key) end
The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.
# File lib/concord.rb, line 161 def init ctx = ComputationContext.new(self) log_failure do handler.init(ctx) end ctx.transaction end
# File lib/concord.rb, line 93 def register_with_scheduler log_failure do proxy.registerWithScheduler(boltMetadata) end end
Store a binary blob, identified by a key, in the proxy state @param key [String] Key to set in data store @param value [String] Binary blob
# File lib/concord.rb, line 196 def set_state(key, value) proxy.setState(key, value) end
Private Instance Methods
# File lib/concord.rb, line 229 def enrich_metadata(metadata) def enrich_stream(stream) sm = ::Concord::Thrift::StreamMetadata.new if stream.is_a?(Array) sm.name = stream.first sm.grouping = stream.last else sm.name = stream end sm end cm = ::Concord::Thrift::ComputationMetadata.new cm.name = metadata.name cm.istreams = metadata.istreams.map { |x| enrich_stream(x) } cm.ostreams = metadata.ostreams cm.proxyEndpoint = ::Concord::Thrift::Endpoint.new cm.proxyEndpoint.ip = proxy_host cm.proxyEndpoint.port = proxy_port cm end
# File lib/concord.rb, line 230 def enrich_stream(stream) sm = ::Concord::Thrift::StreamMetadata.new if stream.is_a?(Array) sm.name = stream.first sm.grouping = stream.last else sm.name = stream end sm end
# File lib/concord.rb, line 202 def log(msg, handle: $stderr) handle.puts msg handle.flush end
# File lib/concord.rb, line 207 def log_failure(&block) begin block.call rescue => e log e.message log e.backtrace.join("\n") raise e end end
# File lib/concord.rb, line 217 def proxy if @proxy.nil? || !@proxy_socket.open? @proxy_socket = ::Thrift::Socket.new(proxy_host, proxy_port) transport = ::Thrift::FramedTransport.new(@proxy_socket) protocol = ::Thrift::BinaryProtocol.new(transport) @proxy = ::Concord::Thrift::BoltProxyService::Client.new(protocol) transport.open end @proxy end