class Sad::Server
Public Class Methods
fetch(queue)
click to toggle source
# File lib/sad/server.rb, line 12 def fetch(queue) ::Sad::Procline.set("Waiting for #{queue}") request = ::Sad::Config.redis.blpop(queue, 30) request.callback{|_, data| ::Sad::Procline.set("Fetched #{queue} - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}") if data ::Sad.logger.info '-'*15 + data.inspect + '-'*15 payload = Payload.decode(data) payload_call(payload) end fetch_with_interval(queue) } request.errback{ ::Sad.logger.error "error with redis request.\n#{request.inspect}" fetch_with_interval(queue) } end
fetch_with_interval(queue)
click to toggle source
# File lib/sad/server.rb, line 33 def fetch_with_interval(queue) EM.add_timer(::Sad::Config.interval){ fetch(queue) unless shutdown? } end
payload_call(payload)
click to toggle source
# File lib/sad/server.rb, line 39 def payload_call(payload) # 如果该任务有延时执行要求, # 则在定时器执行时将其延时的key删掉, # 再重新入队 if payload.sad_args['delay'] and payload.sad_args['delay'] != '' and payload.sad_args['delay'] != 0 EM.add_timer(payload.sad_args['delay'].to_i){ payload.sad_args.delete('delay') payload.enqueue } else begin payload.perform rescue Exception => e ::Sad.logger.error("#{e.to_s}$/#{e.backtrace.join($/)}") end end end
register_signal()
click to toggle source
# File lib/sad/server.rb, line 57 def register_signal trap('TERM') { shutdown! } trap('INT') { shutdown! } trap('QUIT') { shutdown } end
run(queue)
click to toggle source
# File lib/sad/server.rb, line 4 def run(queue) ::Sad.logger.info("#{'#'*5} Sad server start. #{'#'*5}") @_shutdown = false register_signal Sad.on_before_start.call if Sad.on_before_start fetch(Sad::Config.queue(queue)) end
shutdown()
click to toggle source
# File lib/sad/server.rb, line 72 def shutdown @_shutdown = true EM.stop end
shutdown!()
click to toggle source
# File lib/sad/server.rb, line 63 def shutdown! EM.stop exit(0) end
shutdown?()
click to toggle source
# File lib/sad/server.rb, line 68 def shutdown? @_shutdown end
status()
click to toggle source
# File lib/sad/server.rb, line 77 def status if EventMachine.instance_eval {defined? :threadqueue} and EventMachine.instance_eval {defined? :resultqueue} content = { 'threadqueue' => EventMachine.instance_eval {@threadqueue and @threadqueue.size}, 'resultqueue' => EventMachine.instance_eval {@resultqueue and @resultqueue.size}, 'num_waiting' => EventMachine.instance_eval {@threadqueue and @threadqueue.num_waiting} } return content else {} end end