class App42::Message::QueueService
Manages Asynchronous queues. Allows to create, delete, purge messages, view pending messages and get all messages
@see Queue
Public Class Methods
this is a constructor that takes
@param apiKey @param secretKey @param baseURL
# File lib/message/QueueService.rb, line 29 def initialize(api_key, secret_key, base_url) puts "Message Service->initialize" @api_key = api_key @secret_key = secret_key @base_url = base_url @resource = "queue" @messageResource = "message" @version = "1.0" end
Public Instance Methods
Creates a type Pull Queue
@param queueName
- The name of the queue which has to be created
@param queueDescription
- The description of the queue
@return Queue
object containing queue name which has been created
@raise App42Exception
# File lib/message/QueueService.rb, line 51 def create_pull_queue(queueName, queueDescription) puts "Create Pull Queue Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(queueDescription, "Queue Description"); begin connection = App42::Connection::RESTConnection.new(@base_url) body = {'app42' => {"queue"=> { "name" => queueName, "description" => queueDescription }}}.to_json puts "Body #{body}" query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, "type"=>'pull' } query_params = params.clone params.store("body", body) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pull" response = connection.post(signature, resource_url, query_params, body) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end
Deletes the Pull type Queue
@param queueName
- The name of the queue which has to be deleted
@return App42Response
if deleted successfully
@raise App42Exception
# File lib/message/QueueService.rb, line 101 def delete_pull_queue(queueName) puts "Delete Pull Queue Called " puts "Base url #{@base_url}" response = nil; responseObj = App42Response.new(); util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, "type"=>'pull' } query_params = params.clone params.store("queueName", queueName) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pull/#{queueName}" response = connection.delete(signature, resource_url, query_params) responseObj.strResponse=(response) responseObj.isResponseSuccess=(true) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return responseObj end
Messages are retrieved and dequeued from the Queue
.
@param queueName
- The name of the queue which have to be retrieved
@param receiveTimeOut
- Receive time out
@return Queue
object containing messages in the Queue
@raise App42Exception
# File lib/message/QueueService.rb, line 234 def get_messages(queueName, receiveTimeOut) puts "Get Messages Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("receiveTimeOut", (receiveTimeOut.to_i).to_s) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/messages/#{queueName}/#{(receiveTimeOut.to_i).to_s}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end
Messages which are pending to be dequeue. Note: Calling this method does not dequeue the messages in the Queue
. The messages stay in the Queue
till they are dequeued
@param queueName
- The name of the queue from which pending messages have to be fetched
@return Queue
object containing pending messages in the Queue
@raise App42Exception
# File lib/message/QueueService.rb, line 189 def pending_messages(queueName) puts "Pending Messages Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pending/#{queueName}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end
Purges message on the Queue
. Note: once the Queue
is purged the messages are removed from the Queue
and wont be available for dequeueing.
@param queueName
- The name of the queue which has to be purged
@return Queue
object containing queue name which has been purged
@raise App42Exception
# File lib/message/QueueService.rb, line 145 def purge_pull_queue(queueName) puts "Purge Pull Queue Called " puts "Base url #{@base_url}" response = nil; responseObj = App42Response.new(); util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, "type"=>'pull' } query_params = params.clone params.store("queueName", queueName) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@resource}/pull/purge/#{queueName}" response = connection.delete(signature, resource_url, query_params) responseObj.strResponse=(response) responseObj.isResponseSuccess=(true) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return responseObj end
Pulls all the message from the queue
@param queueName
- The name of the queue from which messages have to be pulled
@param receiveTimeOut
- Receive time out
@return Queue
object containing messages which have been pulled
@raise App42Exception
# File lib/message/QueueService.rb, line 336 def receive_message(queueName, receiveTimeOut) puts "Receive Message Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("receiveTimeOut", "" + (receiveTimeOut.to_i).to_s) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{(receiveTimeOut.to_i).to_s}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end
Pull message based on the correlation id
@param queueName
- The name of the queue from which the message has to be pulled
@param receiveTimeOut
- Receive time out
@param correlationId
- Correlation Id for which message has to be pulled
@return Queue
containing message which has pulled based on the correlation id
@raise App42Exception
# File lib/message/QueueService.rb, line 385 def receive_message_by_correlation_id(queueName, receiveTimeOut, correlationId) puts "Receive Message Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(receiveTimeOut, "ReceiveTimeOut"); util.throwExceptionIfNullOrBlank(correlationId, "Correlation Id"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("receiveTimeOut", "" + (receiveTimeOut.to_i).to_s) params.store("correlationId", "" + correlationId) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{(receiveTimeOut.to_i).to_s}/#{correlationId}" response = connection.get(signature, resource_url, query_params) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end
Remove message from the queue based on the message id. Note: Once the message is removed it cannot be pulled
@param queueName
- The name of the queue from which the message has to be removed
@param messageId
- The message id of the message which has to be removed.
@return App42Response
if removed successfully
@raise App42Exception
# File lib/message/QueueService.rb, line 434 def remove_message(queueName, messageId) puts "Remove Message Called " puts "Base url #{@base_url}" response = nil; responseObj = App42Response.new(); util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(messageId, "messageId"); begin connection = App42::Connection::RESTConnection.new(@base_url) query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, } query_params = params.clone params.store("queueName", queueName) params.store("messageId", messageId) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}/#{messageId}" response = connection.delete(signature, resource_url, query_params) responseObj.strResponse=(response) responseObj.isResponseSuccess=(true) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return responseObj end
Send message on the queue with an expiry. The message will expire if it is not pulled/dequeued before the expiry
@param queueName
- The name of the queue to which the message has to be sent
@param msg
- Message that has to be sent
@param exp
- Message expiry time
@return Queue
object containing message which has been sent with its message id and correlation id
@raise App42Exception
# File lib/message/QueueService.rb, line 283 def send_message(queueName, msg, exp) puts "Get Messages Called " puts "Base url #{@base_url}" response = nil; messageObj = nil; messageObj = Queue.new util = Util.new util.throwExceptionIfNullOrBlank(queueName, "Queue Name"); util.throwExceptionIfNullOrBlank(msg, "Message"); util.throwExceptionIfNullOrBlank(exp, "Exipiration"); begin connection = App42::Connection::RESTConnection.new(@base_url) body = {'app42' => {"payLoad"=> { "message" => msg, "expiration" => exp }}}.to_json puts "Body #{body}" query_params = Hash.new params = { 'apiKey'=> @api_key, 'version' => @version, 'timeStamp' => util.get_timestamp_utc, 'queueName' => queueName } query_params = params.clone params.store("body", body) puts query_params signature = util.sign(@secret_key, params) resource_url = "#{@version}/#{@messageResource}/#{queueName}" response = connection.post(signature, resource_url, query_params, body) message = QueueResponseBuilder.new() messageObj = message.buildResponse(response) rescue App42Exception =>e raise e rescue Exception => e raise App42Exception.new(e) end return messageObj end