module Stud::Buffer
@author Alex Dean
Implements a generic framework for accepting events which are later flushed in batches. Flushing occurs whenever :max_items
or :max_interval
(seconds) has been reached.
Including class must implement flush
, which will be called with all accumulated items either when the output buffer fills (:max_items
) or when a fixed amount of time (:max_interval
) passes.
batch_receive and flush¶ ↑
General receive/flush can be implemented in one of two ways.
batch_receive(event) / flush(events)¶ ↑
flush
will receive an array of events which were passed to buffer_receive
.
batch_receive('one') batch_receive('two')
will cause a flush invocation like
flush(['one', 'two'])
batch_receive(event, group) / flush(events, group)¶ ↑
flush() will receive an array of events, plus a grouping key.
batch_receive('one', :server => 'a') batch_receive('two', :server => 'b') batch_receive('three', :server => 'a') batch_receive('four', :server => 'b')
will result in the following flush calls
flush(['one', 'three'], {:server => 'a'}) flush(['two', 'four'], {:server => 'b'})
Grouping keys can be anything which are valid Hash keys. (They don’t have to be hashes themselves.) Strings or Fixnums work fine. Use anything which you’d like to receive in your flush
method to help enable different handling for various groups of events.
on_flush_error¶ ↑
Including class may implement on_flush_error
, which will be called with an Exception instance whenever buffer_flush
encounters an error.
-
buffer_flush
will automatically re-try failed flushes, soon_flush_error
should not try to implement retry behavior. -
Exceptions occurring within
on_flush_error
are not handled bybuffer_flush
.
on_full_buffer_receive¶ ↑
Including class may implement on_full_buffer_receive
, which will be called whenever buffer_receive
is called while the buffer is full.
on_full_buffer_receive
will receive a Hash like {:pending => 30, :outgoing => 20}
which describes the internal state of the module at the moment.
final flush¶ ↑
Including class should call buffer_flush(:final => true)
during a teardown/shutdown routine (after the last call to buffer_receive
) to ensure that all accumulated messages are flushed.
Public Instance Methods
Source
# File lib/stud/buffer.rb, line 180 def buffer_flush(options={}) force = options[:force] || options[:final] final = options[:final] # final flush will wait for lock, so we are sure to flush out all buffered events if options[:final] @buffer_state[:flush_mutex].lock elsif ! @buffer_state[:flush_mutex].try_lock # failed to get lock, another flush already in progress return 0 end items_flushed = 0 begin time_since_last_flush = (Time.now - @buffer_state[:last_flush]) return 0 if @buffer_state[:pending_count] == 0 return 0 if (!force) && (@buffer_state[:pending_count] < @buffer_config[:max_items]) && (time_since_last_flush < @buffer_config[:max_interval]) @buffer_state[:pending_mutex].synchronize do @buffer_state[:outgoing_items] = @buffer_state[:pending_items] @buffer_state[:outgoing_count] = @buffer_state[:pending_count] buffer_clear_pending end @buffer_config[:logger].debug("Flushing output", :outgoing_count => @buffer_state[:outgoing_count], :time_since_last_flush => time_since_last_flush, :outgoing_events => @buffer_state[:outgoing_items], :batch_timeout => @buffer_config[:max_interval], :force => force, :final => final ) if @buffer_config[:logger] @buffer_state[:outgoing_items].each do |group, events| begin if group.nil? flush(events,final) else flush(events, group, final) end @buffer_state[:outgoing_items].delete(group) events_size = events.size @buffer_state[:outgoing_count] -= events_size items_flushed += events_size rescue => e @buffer_config[:logger].warn("Failed to flush outgoing items", :outgoing_count => @buffer_state[:outgoing_count], :exception => e.class.name, :backtrace => e.backtrace ) if @buffer_config[:logger] if @buffer_config[:has_on_flush_error] on_flush_error e end sleep 1 retry end @buffer_state[:last_flush] = Time.now end ensure @buffer_state[:flush_mutex].unlock end return items_flushed end
Try
to flush events.
Returns immediately if flushing is not necessary/possible at the moment:
-
:max_items have not been accumulated
-
:max_interval seconds have not elapased since the last flush
-
another flush is in progress
buffer_flush(:force => true)
will cause a flush to occur even if :max_items
or :max_interval
have not been reached. A forced flush will still return immediately (without flushing) if another flush is currently in progress.
buffer_flush(:final => true)
is identical to buffer_flush(:force => true)
, except that if another flush is already in progress, buffer_flush(:final => true)
will block/wait for the other flush to finish before proceeding.
@param [Hash] options Optional. May be {:force => true}
or {:final => true}
. @return [Fixnum] The number of items successfully passed to flush
.
Source
# File lib/stud/buffer.rb, line 126 def buffer_full? @buffer_state[:pending_count] + @buffer_state[:outgoing_count] >= @buffer_config[:max_items] end
Determine if :max_items
has been reached.
buffer_receive
calls will block while buffer_full? == true
.
@return [bool] Is the buffer full?
Source
# File lib/stud/buffer.rb, line 80 def buffer_initialize(options={}) if ! self.class.method_defined?(:flush) raise ArgumentError, "Any class including Stud::Buffer must define a flush() method." end @buffer_config = { :max_items => options[:max_items] || 50, :max_interval => options[:max_interval] || 5, :logger => options[:logger] || nil, :has_on_flush_error => self.class.method_defined?(:on_flush_error), :has_on_full_buffer_receive => self.class.method_defined?(:on_full_buffer_receive) } @buffer_state = { # items accepted from including class :pending_items => {}, :pending_count => 0, # guard access to pending_items & pending_count :pending_mutex => Mutex.new, # items which are currently being flushed :outgoing_items => {}, :outgoing_count => 0, # ensure only 1 flush is operating at once :flush_mutex => Mutex.new, # data for timed flushes :last_flush => Time.now, :timer => Thread.new do loop do sleep(@buffer_config[:max_interval]) buffer_flush(:force => true) end end } # events we've accumulated buffer_clear_pending end
Initialize the buffer.
Call directly from your constructor if you wish to set some non-default options. Otherwise buffer_initialize
will be called automatically during the first buffer_receive
call.
Options:
-
:max_items, Max number of items to buffer before flushing. Default 50.
-
:max_interval, Max number of seconds to wait between flushes. Default 5.
-
:logger, A logger to write log messages to. No default. Optional.
@param [Hash] options
Source
# File lib/stud/buffer.rb, line 142 def buffer_receive(event, group=nil) buffer_initialize if ! @buffer_state # block if we've accumulated too many events while buffer_full? do on_full_buffer_receive( :pending => @buffer_state[:pending_count], :outgoing => @buffer_state[:outgoing_count] ) if @buffer_config[:has_on_full_buffer_receive] sleep 0.1 end @buffer_state[:pending_mutex].synchronize do @buffer_state[:pending_items][group] << event @buffer_state[:pending_count] += 1 end buffer_flush end
Save an event for later delivery
Events are grouped by the (optional) group parameter you provide. Groups of events, plus the group name, are later passed to flush
.
This call will block if :max_items
has been reached.
@see Stud::Buffer
The overview has more information on grouping and flushing.
@param event An item to buffer for flushing later. @param group Optional grouping key. All events with the same key will be
passed to +flush+ together, along with the grouping key itself.
Private Instance Methods
Source
# File lib/stud/buffer.rb, line 255 def buffer_clear_pending @buffer_state[:pending_items] = Hash.new { |h, k| h[k] = [] } @buffer_state[:pending_count] = 0 end