class Pigato::Worker

Constants

HEARTBEAT_LIVENESS

Public Class Methods

new(broker, service, conf = {}) click to toggle source
# File lib/pigato/worker.rb, line 7
def initialize broker, service, conf = {}
  @broker = broker
  @service = service

  @conf = {
    :autostart => false,
    :timeout => 2500,
    :reconnect => 2500
  }
  
  @conf.merge!(conf)

  @heartbeat_at = Time.now - 60
  @liveness = 0
  @reply_to = nil
  @reply_rid = nil
  @reply_service = nil

  init

  if @conf[:autostart]
    start
  end

  @@mtx.lock
  if @@global_thread.nil?
    @@global_thread = true
    Thread.new do
      client = Pigato::Client.new(broker, { :autostart => true })
      loop do
        @@mtx.lock
        begin
          if Time.now > @@global_heartbeat_at
            @@sockets_ids.each do |iid, sid|
              request = [Pigato::C_CLIENT, Pigato::W_HEARTBEAT, "worker", sid]
              msg = ZMQ::Message.new
              request.reverse.each{|p| msg.push(ZMQ::Frame(p))}
              client.send msg
            end
            @@global_heartbeat_at = Time.now + 2.5
          end
        rescue => e
          puts e
        end
        @@mtx.unlock
        sleep 2.5
      end
    end
  end
  @@mtx.unlock
end

Public Instance Methods

recv() click to toggle source
# File lib/pigato/worker.rb, line 64
def recv
  val = nil

  @reply_rid = nil
  @reply_to = nil
  @reply_service = nil

  iid = get_iid

  start if @@sockets[iid] == nil && @conf[:autostart]

  socket = get_socket
  return nil if socket.nil?

  socket.rcvtimeo = @conf[:timeout]

  msg = socket.recv_message

  if msg && msg.size
    @liveness = HEARTBEAT_LIVENESS

    header = msg.pop.data
    if header != Pigato::W_WORKER
      puts "E: Header is not Pigato::WORKER"
      return nil
    end

    command = msg.pop.data

    case command
    when Pigato::W_REQUEST
      @reply_to = msg.pop.data
      @reply_service = msg.pop.data
      msg.pop # empty
      @reply_rid = msg.pop.data
      val = Oj.load(msg.pop.data) 
    when Pigato::W_HEARTBEAT
    when Pigato::W_DISCONNECT
      start
    else
    end
  else
    @liveness -= 1
    if @liveness == 0
      sleep 0.001 * @conf[:reconnect]
      start
    end
  end
    
  if Time.now > @heartbeat_at
    send(Pigato::W_HEARTBEAT, ['', Oj.dump({ 'concurrency' => 1 })])
    @heartbeat_at = Time.now + 0.001 * (@conf[:timeout] * 1.5)
  end
  
  val
end
reply(reply) click to toggle source
# File lib/pigato/worker.rb, line 59
def reply reply
  reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
  send Pigato::W_REPLY, reply
end
send(command, data = nil) click to toggle source
# File lib/pigato/worker.rb, line 134
def send command, data = nil
  if data.nil?
    data = []
  elsif not data.is_a?(Array)
    data = [data]
  end

  socket = get_socket

  data = [Pigato::W_WORKER, command].concat data
  msg = ZMQ::Message.new
  data.reverse.each{|p| msg.push(ZMQ::Frame(p))}
  socket.send_message msg
end
start() click to toggle source
Calls superclass method Pigato::Base#start
# File lib/pigato/worker.rb, line 121
def start 
  stop
  sock_create
  send Pigato::W_READY, @service
  super
  @liveness = HEARTBEAT_LIVENESS
end
stop() click to toggle source
Calls superclass method Pigato::Base#stop
# File lib/pigato/worker.rb, line 129
def stop
  sock_close
  super
end