class Flor::Caller
The caller calls Ruby or other scripts.
Public Class Methods
new(unit)
click to toggle source
NB: tasker configuration entries start with “cal_”
# File lib/flor/unit/caller.rb, line 11 def initialize(unit) @unit = unit end
Public Instance Methods
call(service, conf, message)
click to toggle source
# File lib/flor/unit/caller.rb, line 19 def call(service, conf, message) return ruby_call(service, conf, message) \ if conf['class'] || conf['module'] return cmd_call(service, conf, message) \ if conf['cmd'] fail ArgumentError.new("don't know how to call item at #{conf['_path']}") rescue => err [ Flor.to_error_message(message, err) ] end
shutdown()
click to toggle source
# File lib/flor/unit/caller.rb, line 16 def shutdown end
split_cmd(cmd)
click to toggle source
# File lib/flor/unit/caller_jruby.rb, line 7 def split_cmd(cmd) #cmd.split(/ +/) # too naive #Raabro.pp(Flor::Caller::CmdParser.parse(cmd, debug: 3), colours: true) Flor::Caller::CmdParser.parse(cmd) end
Protected Instance Methods
cmd_call(service, conf, message)
click to toggle source
# File lib/flor/unit/caller.rb, line 144 def cmd_call(service, conf, message) h = conf.dup # shallow h['m'] = message h['f'] = message['payload'] h['v'] = message['vars'] h['tag'] = (message['tags'] || []).first m = encode(conf, message) out, _ = spawn(conf, m) r = decode(conf, out) to_messages(r) end
conf()
click to toggle source
# File lib/flor/unit/caller.rb, line 35 def conf @unit ? @unit.conf : {} end
decode(context, data)
click to toggle source
# File lib/flor/unit/caller.rb, line 170 def decode(context, data) coder = Flor.h_fetch(context, 'decoder', 'coder', 'encoder') || Flor.h_fetch(conf, 'cal_decoder', 'cal_coder', 'cal_encoder') || '::JSON' Flor.const_get(coder) .load(data) end
do_load(conf, path)
click to toggle source
# File lib/flor/unit/caller.rb, line 61 def do_load(conf, path) fail ArgumentError.new('".." not allowed in paths') \ if path =~ /\.\./ path += '.rb' unless path.match(/\.rb\z/) begin load(path) return rescue LoadError end root = File.dirname(conf['_path'] || '.') load(fjoin(root, path)) end
do_require(conf, path)
click to toggle source
# File lib/flor/unit/caller.rb, line 45 def do_require(conf, path) fail ArgumentError.new('".." not allowed in paths') \ if path =~ /\.\./ begin require(path) return rescue LoadError => le end root = File.dirname(conf['_path'] || '.') require(fjoin(root, path)) end
encode(context, message)
click to toggle source
# File lib/flor/unit/caller.rb, line 159 def encode(context, message) coder = Flor.h_fetch(context, 'encoder', 'coder') || Flor.h_fetch(conf, 'cal_encoder', 'cal_coder') || '::JSON' Flor.const_get(coder) .dump(message) end
fjoin(root, path)
click to toggle source
# File lib/flor/unit/caller.rb, line 40 def fjoin(root, path) root == '.' ? path : File.join(root, path) end
ruby_call(service, conf, message)
click to toggle source
# File lib/flor/unit/caller.rb, line 79 def ruby_call(service, conf, message) Flor.h_fetch_a(conf, 'require').each { |pa| do_require(conf, pa) } Flor.h_fetch_a(conf, 'load').each { |pa| do_load(conf, pa) } # # initialize k = case com = conf['class'] || conf['module'] when String then Flor.const_lookup(com) when Class then com else fail ArgumentError.new("don't know how to call #{com.inspect}") end o = if k.class == Module k else case i = k.instance_method(:initialize).arity when 1, 2, 3 then k.new( *[ service, conf, message ][0, i]) when -1 then k.new({ service: service, configuration: conf, message: message }) else k.new end end # # call pt = message['point'] ms = [ "call_#{pt}", "on_#{pt}", :on_message, :on, pt ] case pt when 'detask' then ms = ms + [ :on_cancel, :cancel ] when 'return' then ms = [ :on_return, :return, :post_task ] # /!\ end m = ms.find { |mm| o.respond_to?(mm) } return [ message ] if pt == 'return' && ! m # don't call if :post_task not present fail( "#{k.class.to_s.downcase} #{k} doesn't respond to " + ms[0..-2].collect { |e| "##{e}" }.join(', ') + ", or ##{ms[-1]}" ) unless m r = case o.method(m).arity when 1 then o.send(m, message) when 2 then o.send(m, conf, message) when 3 then o.send(m, service, conf, message) when -1 then o.send(m, { service: service, configuration: conf, message: message }) else o.send(m) end # # reply to_messages(r) end
spawn(conf, data)
click to toggle source
# File lib/flor/unit/caller.rb, line 198 def spawn(conf, data) t0 = Time.now cmd = conf['cmd'] to = Fugit.parse(conf['timeout'] || '14s') to = to.is_a?(Fugit::Duration) ? to.to_sec : 14 to = 0 if to < 0 # no timeout i, o = IO.pipe # _ / stdout f, e = IO.pipe # _ / stderr r, w = IO.pipe # stdin / _ pid = Kernel.spawn(cmd, in: r, out: o, err: e) w.write(data) w.close o.close e.close _, status = timeout(to) { Process.wait2(pid) } fail SpawnNonZeroExitError.new(conf, { to: to, t0: t0 }, status, i, f) \ if status.exitstatus != 0 [ i.read, status ] rescue => err Process.detach(pid) \ if pid (Process.kill(9, pid) rescue nil) \ unless Flor.no?(conf['on_error_kill']) raise err if err.is_a?(SpawnError) raise WrappedSpawnError.new(conf, { to: to, t0: t0, pid: pid }, err) ensure [ i, o, f, e, r, w ].each { |x| x.close rescue nil } end
timeout(t, &block)
click to toggle source
# File lib/flor/unit/caller.rb, line 181 def timeout(t, &block) #Timeout.timeout(t, &block) # avoid using Ruby Timeout :-( It was 2008, is that still relevant? t0 = Time.now th = Thread.new { Thread.current[:return] = block.call } while (Time.now - t0 < t) do break if th.key?(:return) sleep 0.014 end fail TimeoutError.new('execution expired') unless th.key?(:return) th[:return] end
to_messages(o)
click to toggle source
# File lib/flor/unit/caller.rb, line 323 def to_messages(o) if Flor.is_array_of_messages?(o) o elsif Flor.is_message?(o) [ o ] else [] end end