module Flor::Pro::ReceiveAndMerge
Module extracted out of “concurrence”, deals with receivers and mergers.
Should it deal with remainder?
Constants
- MMERGERS
- MORDERS
- STACK_REX
order:
-
first, last: first or last to reply
-
top/north/head, bottom/south/tail: position in concurrence, in collection
merge:
-
deep
-
mix/plain
-
override
-
ignore
-
stack
-
- TRANSLATORS
Protected Instance Methods
apply_merger()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 97 def apply_merger if (m = determine_merger).is_a?(Hash) apply_merger_method(m) else apply_merger_function(m) end end
apply_merger_function(func_tree)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 118 def apply_merger_function(func_tree) @node['merging'] = true apply(func_tree, merger_args, tree[2]) end
apply_merger_method(h)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 106 def apply_merger_method(h) o = h[:order] || h['order'] m = h[:merger] || h['merger'] payloads = send("mom__#{o}", h, Flor.dup(@node['payloads'])) payload = send("mmm__#{m}", h, payloads) msg = { 'payload' => payload } receive_from_merger(msg) end
apply_receiver()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 18 def apply_receiver @node['receiver'] ||= determine_receiver if @node['receiver'].is_a?(String) apply_receiver_method else apply_receiver_function end end
apply_receiver_function()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 37 def apply_receiver_function (@node['on_receive_queue'] ||= []) << from dequeue_receiver_function end
apply_receiver_method()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 29 def apply_receiver_method ret = send('rm__' + @node['receiver']) msg = { 'payload' => { 'ret' => ret } } receive_from_receiver(msg) end
cancel_children(rem)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 260 def cancel_children(rem) (rem && rem != 'forget') ? wrap_cancel_children : [] end
default_merger()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 192 def default_merger { order: :first, merger: :deep } end
dequeue_receiver_function()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 44 def dequeue_receiver_function if @node['on_receive_nids'] [] elsif f = (@node['on_receive_queue'] || []).shift ms = apply(@node['receiver'], receiver_args(f), tree[2]) @node['on_receive_nids'] = [ ms.first['nid'], f ] ms else [] end end
determine_merger()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 197 def determine_merger m = att(:on_merge, :merger, :merge) h = default_merger return h if m == nil return m unless m.is_a?(String) mm = m.split(/[-\s_]+/) mm = mm[0].chars if mm.size == 1 && mm[0].size < 3 # d = mm .collect { |s| TRANSLATORS.inject(s) { |r, (k, v)| r.match(k) ? v : r } } .collect { |s| s[0, 1] }.join MORDERS.each do |rex, order| if d.match(rex); h[:order] = order; break; end end MMERGERS.each do |rex, merger| if d.match(rex); h[:merger] = merger; break; end end h[:key] = m.match(STACK_REX)[1] \ if h[:merger] == :stack h end
determine_receiver()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 162 def determine_receiver ex = att(:expect) return 'expect_integer_receive' if ex && ex.is_a?(Integer) && ex > 0 att(:on_receive, :receiver) || 'default_receive' end
determine_remainder()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 255 def determine_remainder att(:remaining, :rem) || 'cancel' end
merger_args()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 125 def merger_args rs = Flor.dup(@node['payloads']) [ [ 'rets', rs.inject({}) { |h, (k, v)| h[k] = v['ret']; h } ], [ 'replies', rs ], [ 'branch_count', @node['branch_count'] ] ] end
mmm__deep(_, ordered_payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 238 def mmm__deep(_, ordered_payloads) ordered_payloads.inject { |h, pl| Flor.deep_merge!(h, pl) } end
mmm__ignore(_, _)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 247 def mmm__ignore(_, _) node_payload.copy end
mmm__mix(_, ordered_payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 241 def mmm__mix(_, ordered_payloads) ordered_payloads.inject { |h, pl| h.merge!(pl) } end
mmm__override(_, ordered_payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 244 def mmm__override(_, ordered_payloads) ordered_payloads.last end
mmm__stack(h, ordered_payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 250 def mmm__stack(h, ordered_payloads) k = h[:key] || 'ret' node_payload.copy.merge!(k => ordered_payloads.reverse) end
mom__first(h, payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 225 def mom__first(h, payloads) mom__last(h, payloads).reverse end
mom__last(_, payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 228 def mom__last(_, payloads) payloads.values end
mom__north(h, payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 231 def mom__north(h, payloads) mom__south(h, payloads).reverse end
mom__south(_, payloads)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 234 def mom__south(_, payloads) payloads.sort_by { |k, _| k }.collect(&:last) end
post_merge()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 277 def post_merge @node['merged_payload'] end
receive_from_branch()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 11 def receive_from_branch (@node['payloads'] ||= {})[from] = @message['payload'] apply_receiver end
receive_from_merger(msg=message)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 134 def receive_from_merger(msg=message) pl = msg ? msg['payload'] : {} ret = pl['ret'] pl = ret['payload'] \ if ret.is_a?(Hash) && ret.keys == %w[ done payload ] # TODO somehow, what if done is false, should we un-over the concurrence? @node['merged_payload'] = pl \ if msg && ! @node.has_key?('merged_payload') rem = determine_remainder cancel_children(rem) + reply_to_parent(rem) end
receive_from_receiver(msg=message)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 68 def receive_from_receiver(msg=message) ret = msg['payload']['ret'] over = @node['over'] if ret.is_a?(Hash) && ret.keys == %w[ done payload ] over = over || ret['done'] from = @node['on_receive_nids'][1] @node['payloads'][from] = ret['payload'] else over = over || ret end @node['on_receive_nids'] = nil just_over = over && ! @node['over'] @node['over'] ||= just_over if just_over apply_merger elsif ! over [] # wait for more branches else receive_from_merger(nil) end + dequeue_receiver_function end
receiver_args(from)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 57 def receiver_args(from) rs = Flor.dup(@node['payloads']) [ [ 'reply', rs[from] ], [ 'from', from ], [ 'replies', rs ], [ 'branch_count', @node['branch_count'] ], [ 'over', !! @node['over'] ] ] end
reply_to_parent(rem)
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 265 def reply_to_parent(rem) return [] \ if @node['replied'] return [] \ if @node['payloads'].size < branch_count && ( ! rem || rem == 'wait') @node['replied'] = true wrap_reply('payload' => post_merge) end
rm__default_receive()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 152 def rm__default_receive @node['payloads'].size >= branch_count end
rm__expect_integer_receive()
click to toggle source
# File lib/flor/punit/m_receive_and_merge.rb, line 157 def rm__expect_integer_receive @node['payloads'].size >= att(:expect) end