class Rubybear::Stream
Rubybear::Stream
allows a live view of the BEARS blockchain.
All values returned by `get_dynamic_global_properties` can be streamed.
For example, if you want to know which witness is currently signing blocks, use the following:
stream = Rubybear::Stream.new stream.current_witness do |witness| puts witness end
More importantly, full blocks, transactions, and operations can be streamed.
Constants
- BLOCK_PRODUCTION
Note, even though block production is advertised at 3 seconds, often blocks are available in 1.5 seconds. However, we still keep our expectations at 3 seconds. @private
- INITIAL_TIMEOUT
@private
- MAX_BLOCKS_PER_NODE
@private
- MAX_TIMEOUT
@private
- RANGE_BEHIND_WARNING
Public Class Methods
Rubybear::Api::new
# File lib/rubybear/stream.rb, line 34 def initialize(options = {}) super end
Public Instance Methods
Returns the latest blocks from the blockchain.
stream = Rubybear::Stream.new stream.blocks do |bk, num| puts "[#{num}] #{bk.to_json}" end
For convenience and memory management, the api used to poll the current block data is also available inside the block, e.g.:
stream = Rubybear::Stream.new stream.blocks do |bk, num, api| puts "[#{num}] #{bk.to_json}" api.get_ops_in_block(num, true) do |vops, error| puts vops end end
This idiom is useful for very long running scripts.
@param start starting block @param mode we have the choice between
* :head the last block * :irreversible the block that is confirmed by 2/3 of all block producers and is thus irreversible!
@param max_blocks_per_node the number of blocks to read before trying a new node @param block the block to execute for each result, optional. Yields: |bk, num, api| @return [::Hash]
# File lib/rubybear/stream.rb, line 249 def blocks(start = nil, mode = :irreversible, max_blocks_per_node = MAX_BLOCKS_PER_NODE, &block) reset_api replay = !!start counter = 0 latest_block_number = -1 @api_options[:max_requests] = [max_blocks_per_node * 2, @api_options[:max_requests].to_i].max loop do break if stop? catch :sequence do; begin head_block = api.get_dynamic_global_properties do |properties, error| if !!error standby "Node responded with: #{error.message || 'unknown error'}, retrying ...", { error: error, and: {throw: :sequence} } end break if stop? if properties.head_block_number.nil? # This can happen if a reverse proxy is acting up. standby "Bad block sequence after height: #{latest_block_number}", { and: {throw: :sequence} } end case mode.to_sym when :head then properties.head_block_number when :irreversible then properties.last_irreversible_block_num else; raise StreamError, '"mode" has to be "head" or "irreversible"' end end if head_block == latest_block_number # This can happen when there's a delay in block production. if current_timeout > BLOCK_PRODUCTION * 6 standby "Stream has stalled severely ...", { and: {backoff: api, throw: :sequence} } elsif current_timeout > BLOCK_PRODUCTION * 3 warning "Stream has stalled ..." end timeout and throw :sequence elsif head_block < latest_block_number # This can happen if a reverse proxy is acting up. standby "Invalid block sequence at height: #{head_block}", { and: {backoff: api, throw: :sequence} } end reset_timeout start ||= head_block range = (start..head_block) for n in range break if stop? if (counter += 1) > max_blocks_per_node reset_api counter = 0 end if !replay && range.size > RANGE_BEHIND_WARNING # When the range is above RANGE_BEHIND_WARNING blocks, it's time # to warn, unless we're replaying. r = [*range] index = r.index(n) current_range = r[index..-1] if current_range.size % RANGE_BEHIND_WARNING == 0 warning "Stream behind by #{current_range.size} blocks (about #{(current_range.size * 3) / 60.0} minutes)." end end scoped_api, block_options = if use_condenser_namespace? [api, n] else [block_api, {block_num: n}] end scoped_api.get_block(n) do |current_block, error| if !!error if error.message == 'Unable to acquire database lock' start = n timeout standby "Node was unable to acquire database lock, retrying ...", { and: {throw: :sequence} } else standby "Node responded with: #{error.message || 'unknown error'}, retrying ...", { error: error, and: {throw: :sequence} } end elsif current_block.nil? standby "Node responded with: empty block, retrying ...", { and: {throw: :sequence} } end latest_block_number = n return current_block, n if block.nil? yield current_block, n, api end start = head_block + 1 sleep BLOCK_PRODUCTION / range.size end rescue StreamError; raise # rescue => e # warning "Unknown streaming error: #{e.inspect}, retrying ... " # warning e # redo end; end end end
@private
# File lib/rubybear/stream.rb, line 397 def method_names @method_names ||= [ :head_block_number, :head_block_id, :time, :current_witness, :total_pow, :num_pow_witnesses, :virtual_supply, :current_supply, :confidential_supply, :current_bsd_supply, :confidential_bsd_supply, :total_coining_fund_bears, :total_coining_shares, :total_reward_fund_bears, :total_reward_shares2, :total_activity_fund_bears, :total_activity_fund_shares, :bsd_interest_rate, :average_block_size, :maximum_block_size, :current_aslot, :recent_slots_filled, :participation_count, :last_irreversible_block_num, :max_virtual_bandwidth, :current_reserve_ratio, :block_numbers, :blocks ].freeze end
@private
# File lib/rubybear/stream.rb, line 431 def method_params(method) case method when :block_numbers then {head_block_number: nil} when :blocks then {get_block: :head_block_number} else; nil end end
Returns the latest operations from the blockchain.
stream = Rubybear::Stream.new stream.operations do |op| puts op.to_json end
If symbol are passed, then only that operation is returned. Expected symbols are:
account_create account_create_with_delegation account_update account_witness_proxy account_witness_vote cancel_transfer_from_savings change_recovery_account claim_reward_balance comment comment_options convert custom custom_json decline_voting_rights delegate_coining_shares delete_comment escrow_approve escrow_dispute escrow_release escrow_transfer feed_publish limit_order_cancel limit_order_create limit_order_create2 pow pow2 recover_account request_account_recovery set_withdraw_coining_route transfer transfer_from_savings transfer_to_savings transfer_to_coining vote withdraw_coining witness_update
For example, to stream only votes:
stream = Rubybear::Stream.new stream.operations(:vote) do |vote| puts vote.to_json end
You can also stream virtual operations:
stream = Rubybear::Stream.new stream.operations(:author_reward) do |vop| puts "#{vop.author} got paid for #{vop.permlink}: #{[vop.bsd_payout, vop.bears_payout, vop.coining_payout]}" end
… or multiple virtual operation types;
stream = Rubybear::Stream.new stream.operations([:producer_reward, :author_reward]) do |vop| puts vop end
… or all types, inluding virtual operation types;
stream = Rubybear::Stream.new stream.operations(nil, nil, :head, include_virtual: true) do |vop| puts vop end
Expected virtual operation types:
producer_reward author_reward curation_reward fill_convert_request fill_order fill_coining_withdraw interest shutdown_witness
@param type [symbol || ::Array<symbol>] the type(s) of operation, optional. @param start starting block @param mode we have the choice between
* :head the last block * :irreversible the block that is confirmed by 2/3 of all block producers and is thus irreversible!
@param block the block to execute for each result, optional. Yields: |op, trx_id, block_num, api| @param options [::Hash] additional options @option options [Boollean] :include_virtual Also stream virtual options. Setting this true will impact performance. Default: false. @return [::Hash]
# File lib/rubybear/stream.rb, line 133 def operations(type = nil, start = nil, mode = :irreversible, options = {include_virtual: false}, &block) type = [type].flatten.compact.map(&:to_sym) include_virtual = !!options[:include_virtual] if virtual_op_type?(type) include_virtual = true end latest_block_number = -1 transactions(start, mode) do |transaction, trx_id, block_number| virtual_ops_collected = latest_block_number == block_number latest_block_number = block_number ops = transaction.operations.map do |t, op| t = t.to_sym if type.size == 1 && type.first == t op elsif type.none? || type.include?(t) {t => op} end end.compact if include_virtual && !virtual_ops_collected catch :pop_vops do; begin api.get_ops_in_block(block_number, true) do |vops, error| if !!error standby "Node responded with: #{error.message || 'unknown error'}, retrying ...", { error: error, and: {throw: :pop_vops} } end vops.each do |vtx| next unless defined? vtx.op t = vtx.op.first.to_sym op = vtx.op.last if type.size == 1 && type.first == t ops << op elsif type.none? || type.include?(t) ops << {t => op} end end end end; end virtual_ops_collected = true end next if ops.none? return ops unless !!block ops.each do |op| yield op, trx_id, block_number, api end end end
Stops the persistant http connections.
# File lib/rubybear/stream.rb, line 374 def shutdown flappy = false begin unless @api.nil? flappy = @api.send(:flappy?) @api.shutdown end unless @block_api.nil? flappy = @block_api.send(:flappy?) unless flappy @block_api.shutdown end rescue => e warning("Unable to shut down: #{e}") end @api = nil @block_api = nil GC.start end
Returns the latest transactions from the blockchain.
stream = Rubybear::Stream.new stream.transactions do |tx, trx_id| puts "[#{trx_id}] #{tx.to_json}" end
@param start starting block @param mode we have the choice between
* :head the last block * :irreversible the block that is confirmed by 2/3 of all block producers and is thus irreversible!
@param block the block to execute for each result, optional. Yields: |tx, trx_id, api| @return [::Hash]
# File lib/rubybear/stream.rb, line 206 def transactions(start = nil, mode = :irreversible, &block) blocks(start, mode) do |b, block_number| next if (_transactions = b.transactions).nil? return _transactions unless !!block _transactions.each_with_index do |transaction, index| trx_id = if !!b['transaction_ids'] b['transaction_ids'][index] end yield transaction, trx_id, block_number, api end end end
Private Instance Methods
# File lib/rubybear/stream.rb, line 497 def current_timeout @timeout || INITIAL_TIMEOUT end
Rubybear::Api#method_missing
# File lib/rubybear/stream.rb, line 439 def method_missing(m, *args, &block) super unless respond_to_missing?(m) @latest_values ||= [] @latest_values.shift(5) if @latest_values.size > 20 loop do break if stop? value = if (n = method_params(m)).nil? key_value = api.get_dynamic_global_properties.result[m] else key = n.keys.first if !!n[key] r = api.get_dynamic_global_properties.result key_value = param = r[n[key]] result = nil loop do break if stop? response = api.send(key, param) raise StreamError, JSON[response.error] if !!response.error result = response.result break if !!result warning "#{key}: #{param} result missing, retrying with timeout: #{current_timeout} seconds" reset_api timeout end reset_timeout result else key_value = api.get_dynamic_global_properties.result[key] end end unless @latest_values.include? key_value @latest_values << key_value if !!block yield value else return value end end sleep current_timeout end end
# File lib/rubybear/stream.rb, line 484 def reset_api shutdown !!api && !!block_api end
# File lib/rubybear/stream.rb, line 501 def reset_timeout @timeout = nil end
# File lib/rubybear/stream.rb, line 515 def standby(message, options = {}) error = options[:error] secondary = options[:and] || {} backoff_api = secondary[:backoff] throwable = secondary[:throw] warning message warning error if !!error backoff_api.send :backoff if !!backoff_api throw throwable if !!throwable end
# File lib/rubybear/stream.rb, line 511 def stop? @api.nil? || @block_api.nil? end
# File lib/rubybear/stream.rb, line 489 def timeout @timeout ||= INITIAL_TIMEOUT @timeout *= 2 reset_timeout if @timeout > MAX_TIMEOUT sleep @timeout || INITIAL_TIMEOUT @timeout end
# File lib/rubybear/stream.rb, line 505 def virtual_op_type?(type) type = [type].flatten.compact.map(&:to_sym) (Rubybear::OperationTypes::TYPES.keys && type).any? end