class Plugin::Pulsar
Public Instance Methods
close_listen()
click to toggle source
# File lib/plugin/pulsar.rb, line 45 def close_listen @consumer&.close end
close_publish()
click to toggle source
# File lib/plugin/pulsar.rb, line 49 def close_publish @producer&.close end
listen(topic, worker, opts = {})
click to toggle source
# File lib/plugin/pulsar.rb, line 5 def listen(topic, worker, opts = {}) create_consumer(topic, opts).listen do |cmd, msg| ::Mqjob.logger.debug("#{self.class.name}::#{__method__}"){"receive msg: #{msg.payload}"} worker.do_work(cmd, msg) end end
publish(topic, msg, opts = {})
click to toggle source
opts
in publish message in X seconds at publish message at specific time init_subscription Boolean
# File lib/plugin/pulsar.rb, line 16 def publish(topic, msg, opts = {}) create_consumer(topic, opts) if opts[:init_subscription] base_cmd = ::Pulsar::Proto::BaseCommand.new( type: ::Pulsar::Proto::BaseCommand::Type::SEND, send: ::Pulsar::Proto::CommandSend.new( num_messages: 1 ) ) get_timestamp = lambda {|v| (v.to_f * 1000).floor} deliver_at = case when opts[:in] Time.now.localtime + opts[:in].to_f when opts[:at] opts[:at] else Time.now.localtime end metadata = ::Pulsar::Proto::MessageMetadata.new( deliver_at_time: get_timestamp.call(deliver_at) ) p_msg = ::PulsarSdk::Producer::Message.new(msg, metadata) create_producer(topic, opts).execute_async(base_cmd, p_msg) end
Private Instance Methods
create_consumer(topic, opts)
click to toggle source
# File lib/plugin/pulsar.rb, line 54 def create_consumer(topic, opts) @consumer ||= begin topic_type = :topic if opts[:topic_type].to_sym == :regex topic_type = :topics_pattern elsif topic.is_a?(Array) topic_type = :topics end consumer_opts = ::PulsarSdk::Options::Consumer.new( topic_type => topic, subscription_type: (opts[:subscription_mode] || @subscription_mode).to_s.capitalize.to_sym, subscription_name: opts[:subscription_name], prefetch: opts[:prefetch] || 1, listen_wait: 0.1 ) ::Mqjob.logger.debug(__method__){consumer_opts.inspect} @client.subscribe(consumer_opts) end end
create_producer(topic, opts)
click to toggle source
# File lib/plugin/pulsar.rb, line 77 def create_producer(topic, opts) @producer ||= begin producer_opts = ::PulsarSdk::Options::Producer.new( topic: topic ) @client.create_producer(producer_opts) end end