module Hutch::Enqueue

If consumer need `enqueue`, just include this module

Public Instance Methods

attempts(times) click to toggle source
# File lib/hutch/enqueue.rb, line 76
def attempts(times)
  @max_retries = [times, 0].max
end
delay_seconds_level(delay_seconds) click to toggle source

计算 delay 的 level 5s 10s 20s 30s 60s 120s 180s 240s 300s 360s 420s 480s 540s 600s 1200s 1800s 2400s 3600s 7200s 10800s

# File lib/hutch/enqueue.rb, line 88
def delay_seconds_level(delay_seconds)
  case delay_seconds
  when 0..5 # 5s
    5
  when 5..10 # 10s
    10
  when 10..20 # 20s
    20
  when 20..30 # 30s
    30
  when 30..60 # 60s
    60
  when 60..120 # 120s
    120
  when 120..180 # 180s
    180
  when 180..240 # 240s
    240
  when 240..300 # 300s
    300
  when 300..360 # 360s
    360
  when 360..420 # 420s
    420
  when 420..480 # 480s
    480
  when 480..540 # 540s
    540
  when 540..600 # 600s
    600
  when 600..1200 # 1200s
    1200
  when 1200..1800 # 1800s
    1800
  when 1800..2400 # 2400s
    2400
  when 2400..3000 # 3000s
    3000
  when 3000..3600 # 3600s
    3600
  when 3600..7200 # 7200s
    7200
  when 7200..10800 # 10800s
    10800
  else
    10800
  end
end
enqueue(msg = {}) click to toggle source

Publish the message to this consumer with one routing_key

# File lib/hutch/enqueue.rb, line 16
def enqueue(msg = {})
  Hutch.publish(enqueue_routing_key, msg)
end
enqueue_at(time, message = {}, props = {}) click to toggle source

delay at exatly time point

# File lib/hutch/enqueue.rb, line 50
def enqueue_at(time, message = {}, props = {})
  # compatible with with ActiveJob API
  time_or_timestamp = time.respond_to?(:utc) ? time.utc.to_f : time
  # if time is early then now then just delay 1 second
  interval = [(time_or_timestamp - Time.now.utc.to_f), 1.second].max
  enqueue_in(interval, message, props)
end
enqueue_in(interval, message = {}, props = {}) click to toggle source

publish message at a delay times interval: delay interval seconds message: publish message

# File lib/hutch/enqueue.rb, line 29
def enqueue_in(interval, message = {}, props = {})
  # TODO: 超过 3h 的延迟也会接收, 但是不会延迟那么长时间, 但给予 warn
  delay_seconds = delay_seconds_level(interval)
  
  # 设置固定的延迟, 利用 headers 中的 CC, 以及区分的 topic, 将消息重新投递进入队列
  prop_headers      = props[:headers] || {}
  properties        = props.merge(
    expiration: (delay_seconds * 1000).to_i,
    headers:    prop_headers.merge(CC: [enqueue_routing_key])
  )
  delay_routing_key = Hutch::Schedule.delay_routing_key("#{delay_seconds}s")
  
  Hutch::Schedule.publish(delay_routing_key, message, properties)
end
enqueue_routing_key() click to toggle source

routing_key: the purpose is to send message to hutch exchange and then routing to the correct queue, so can use any of them routing_key that the consumer is consuming.

# File lib/hutch/enqueue.rb, line 71
def enqueue_routing_key
  raise "Routing Keys is not set!" if routing_keys.size < 1
  routing_keys.to_a.last
end
enqueue_uniq(uniq_key, msg = {}) click to toggle source

enqueue unique message

# File lib/hutch/enqueue.rb, line 21
def enqueue_uniq(uniq_key, msg = {})
  return false unless uniq_key_check(uniq_key)
  enqueue(msg)
end
enqueue_uniq_at(uniq_key, time, message = {}, props = {}) click to toggle source
# File lib/hutch/enqueue.rb, line 58
def enqueue_uniq_at(uniq_key, time, message = {}, props = {})
  return false unless uniq_key_check(uniq_key)
  enqueue_at(time, message, props)
end
enqueue_uniq_in(uniq_key, interval, message = {}, props = {}) click to toggle source
# File lib/hutch/enqueue.rb, line 44
def enqueue_uniq_in(uniq_key, interval, message = {}, props = {})
  return false unless uniq_key_check(uniq_key)
  enqueue_in(interval, message, props)
end
max_attempts() click to toggle source
# File lib/hutch/enqueue.rb, line 80
def max_attempts
  @max_retries || 0
end
uniq_key_check(uniq_key) click to toggle source

check uniq_key is set or not expire time set for 24h

# File lib/hutch/enqueue.rb, line 65
def uniq_key_check(uniq_key)
  Hutch::Schedule.ns.set(uniq_key, "1", ex: 86400, nx: true)
end