class Hutch::ConsumerMsg

Consumer Message wrap Hutch::Message and Consumer

Attributes

consumer[R]
message[R]

Public Class Methods

new(consumer, message) click to toggle source
# File lib/hutch/patch/worker.rb, line 165
def initialize(consumer, message)
  @consumer = consumer
  @message  = message
end

Public Instance Methods

enqueue_in_or_not() click to toggle source

if delays > 10s then let the message to rabbitmq to delay and enqueue again instead of rabbitmq reqneue

# File lib/hutch/patch/worker.rb, line 179
def enqueue_in_or_not
  # interval 小于 5s, 的则不会传, 在自己的 buffer 中等待
  return false if interval < Hutch::Config.get(:worker_buffer_flush_interval)
  # 等待时间过长的消息, 交给远端的 rabbitmq 去进行等待, 不占用 buffer 空间
  # 如果数据量特别大, 但 ratelimit 特别严格, 那么也会变为固定周期的积压, 需要增加对执行次数的记录以及延长
  # 市场 30s 执行一次的任务, 积累了 200 个, 那么这个积压会越来越多, 直到保持到一个 RabbitMQ 与 hutch 之间的最长等待周期, 会一直空转
  #  - 要么增加对执行次数的考虑, 拉长延长. 但最终会有一个最长的延长 10800 (3h), 这个问题最终仍然会存在
  #  - 设置延长多长之后, 就舍弃这个任务, 因为由于 ratelimit 的存在, 但又持续的积压, 不可能处理完这个任务
  # 这个方案没有很好的解决方法, 这是一个典型的 "生产速度大于消费速度" 的问题, 如果长时间的 生产 > 消费, 这个问题是无解的
  Hutch.broker.ack(message.delivery_info.delivery_tag)
  # TODO: 如果存在 x-death 的 count 需要额外考虑, 解决与 error retry 的 x-death 复用的问题
  # 临时给一个随机的 1,2 倍率的延迟, 大概率为 1 倍,小概率为 2 倍
  consumer.enqueue_in(interval * [rand(3), 1].max, message.body, message.properties.to_hash)
end
handle_cmsg_args() click to toggle source
# File lib/hutch/patch/worker.rb, line 170
def handle_cmsg_args
  [consumer, message.delivery_info, message.properties, message.payload, message]
end
interval() click to toggle source
# File lib/hutch/patch/worker.rb, line 174
def interval
  @interval ||= consumer.interval(message)
end
logger() click to toggle source
# File lib/hutch/patch/worker.rb, line 161
def logger
  Hutch::Logging.logger
end