class HareDo::Peer
This is an abstract base class used for both Client and Service
.
Attributes
Public Class Methods
# File src/lib/haredo/peer.rb, line 169 def initialize(name=nil) # Message identifier @mid = 0 # Client attributes @timeout = 1.0 @sleep_interval = 0.001 @receive_queue = {} @trace = false @listen_queues = [] # The number of messages to prefecth from Rabbit @prefetch = 10 @plugins = Plugins::Manager.new(self) @name = name end
Public Instance Methods
Sends a message and waits for response
@param to The to address @param :data Message data @param :headers Message headers @param :headers Message from address @param :properties Message properties
@return Returns the response message if successful, nil otherwise. Will block until @timeout seconds elapse. Sleeps in busywait. Sleep interval is given by @sleep_interval.
# File src/lib/haredo/peer.rb, line 341 def call(to, args) data = args[:data] || '' from = args[:from] headers = args[:headers] || {} properties = args[:headers] || {} # Get message id of sent message id = send(to, args) # Put blank entry in queue to indicate we are expecting a response with that # message id. @receive_queue[id] = nil return receive(id) end
Connect to RabbitMQ
@param user The RabbitMQ
username (default guest) @param password The RabbitMQ
password (default guest) @param host The RabbitMQ
host (default localhost) @param port The RabbitMQ
port (default 5672) @param vhost The RabbitMQ
vhost (default /) @param ssl Use SSL (default false) @return Returns true if connection was successful, false othewise.
# File src/lib/haredo/peer.rb, line 199 def connect(args) queue_args = { #'x-expires' => 30000 #'x-message-ttl' => 1000 } user = args[:user] || 'guest' password = args[:password] || 'guest' host = args[:host] || 'localhost' port = args[:port] || '5672' vhost = args[:vhost] || '' queue_props = args[:queue] || queue_args exchange = args[:exchange] || '' exclusive = args[:exclusive] || true ssl = args[:ssl] || {} tls_cert = ssl['tls_cert'] || nil tls_key = ssl['tls_key'] || nil tls_ca_certs = nil if ssl['tls_cert'] tls_cert = ssl['tls_cert'] end if ssl['tls_key'] tls_key = ssl['tls_key'] end if ssl['tls_ca'] tls_ca_certs = [ssl['tls_ca']] end use_ssl = false if ssl.size > 0 and ssl['enable'] == true use_ssl = true port = ssl['port'] || '5671' end if vhost == '/' vhost = '' end if use_ssl == true @cnx = Bunny.new( "amqps://#{user}:#{password}@#{host}:#{port}#{vhost}", :log_file => '/dev/null', :tls_cert => tls_cert, :tls_key => tls_key, :tls_ca_certificates => tls_ca_certs) else @cnx = Bunny.new("amqp://#{user}:#{password}@#{host}:#{port}#{vhost}") end @cnx.start() @channel = @cnx.create_channel() @channel.prefetch(@prefetch) @queue = @channel.queue( '', :auto_delete => true, :exclusive => exclusive, :arguments => queue_props ) @exchange = @channel.default_exchange() return true end
Defined the queue this service will listen on. Assumes a single-instance service therefore declares queue as exclusive.
# File src/lib/haredo/peer.rb, line 434 def createQueue(args={}) queue_args = { #'x-expires' => 30000 #'x-message-ttl' => 1000 } queue_name = args[:queue] queue_props = args[:properties] || queue_args auto_delete = args[:auto_delete] || true exclusive = true if args.has_key?(:exclusive) exclusive = args[:exclusive] end return @channel.queue( queue_name, :auto_delete => true, :exclusive => exclusive ) end
Disconnect from RabbitMQ
# File src/lib/haredo/peer.rb, line 268 def disconnect() if @queue != nil @queue.delete() @queue = nil end @listen_queues.each do |listen_queue| listen_queue.delete() listen_queue = nil end @listen_queues = {} if @cnx != nil @cnx.close() @cnx = nil end @plugins.shutdown() end
Causes the service to listen for incoming messages.
@param :blocking If this is set to true, will go into indefinite blocking loop processing incoming messages.
@returns Returns nil if non-blocking. Never returns if blocking.
# File src/lib/haredo/peer.rb, line 462 def listen(args={}) queue = @queue if args.has_key?(:queue) listen_queue = createQueue(args) @listen_queues << listen_queue if args.has_key?(:exchange) listen_queue.bind(args[:exchange]) end queue = listen_queue end block = args[:blocking] || false if $syslog.nil? Syslog.open( "haredo #{@name}", Syslog::LOG_PID, Syslog::LOG_DAEMON | Syslog::LOG_LOCAL7 ) $syslog = true end Syslog.notice('listen()') queue.subscribe(:block => block, :manual_ack => true) do |info, props, data| @channel.acknowledge(info.delivery_tag, false) if serve(RabbitMQ::Message.new(info, props, data)) == false exit 0 end end end
# File src/lib/haredo/peer.rb, line 358 def receive(mid=nil) if mid != nil if @receive_queue.has_key?(mid) msg = @receive_queue[mid] if msg != nil @receive_queue.delete(mid) return msg end end end now = Time::now.to_f while true delivery_info, properties, payload = @queue.pop() if delivery_info != nil msg = RabbitMQ::Message.new(delivery_info, properties, payload) if @trace == true dump_message(msg, 'receive') end if mid != nil if properties[:correlation_id].to_i == mid.to_i # Reply flag must be set #if msg.headers['reply'] == 1 return msg #end end end # Only add to receive queue if we are expecting it if @receive_queue.has_key?(mid) @receive_queue[msg.headers['id']] = msg end end if (Time::now.to_f - now) > @timeout # Delete entry from receive queue @receive_queue.delete mid return nil end sleep @sleep_interval end end
You should use this method to reply back to a peer. It sets the reply header which tells the remote that this message is a response (as opposed to a message originating from another source which just happens to have the same message_id).
# File src/lib/haredo/peer.rb, line 413 def reply(msg, args) data = args[:data] || '' headers = args[:headers] || {} id = msg.properties.message_id.to_i to = msg.properties.reply_to # Set the reply flag to indicate that this is a response to a message # sent. The message_id should already be set in the headers. headers[:reply] = 1 headers[:id] = id.to_i properties = {} properties[:correlation_id] = msg.properties.message_id.to_i send(to, :headers => headers, :properties => properties, :data => data) end
Sends a message. Sends a message. Adds the @mid as message_id in message properties. @param to The to address @param :data Message data @param :headers Message headers @param :headers Message from address @param :properties Message properties
@return Returns the message ID of sent message
# File src/lib/haredo/peer.rb, line 300 def send(to, args) data = args[:data] || '' from = args[:from] headers = args[:headers] || {} properties = args[:properties] || {} properties[:routing_key] = to properties[:headers] = headers properties[:message_id] = @mid rc = @mid @mid += 1 if not from.nil? properties[:reply_to] = from else properties[:reply_to] = @queue.name if @queue end @exchange.publish(data, properties) if @trace == true dump_message(RabbitMQ::Message.new(headers, properties, data), 'send()') end return rc end
# File src/lib/haredo/peer.rb, line 496 def serve(msg) @plugins.process(msg) end