class SeapigServer
Attributes
connected[R]
socket[R]
Public Class Methods
new(uri, options={})
click to toggle source
# File lib/seapig/client.rb, line 30 def initialize(uri, options={}) @connected = false @uri = uri @options = options @slave_objects = {} @master_objects = {} @notifier_objects = {} connect end
Public Instance Methods
connect()
click to toggle source
# File lib/seapig/client.rb, line 42 def connect if @socket @socket.onclose {} @socket.close end @timeout_timer ||= EM.add_periodic_timer(10) { next if not @socket next if Time.new.to_f - @last_communication_at < 20 puts "Seapig ping timeout, reconnecting" connect } @connected = false @last_communication_at = Time.new.to_f @socket = WebSocket::EventMachine::Client.connect(uri: @uri) @socket.onopen { puts 'Connected to seapig server' @connected = true @socket.send JSON.dump(action: 'client-options-set', options: @options) @slave_objects.each_pair { |object_id, object| @socket.send JSON.dump(action: 'object-consumer-register', id: object_id, "known-version" => object.version) } @master_objects.each_pair { |object_id, object| @socket.send JSON.dump(action: 'object-producer-register', pattern: object_id, "known-version" => object.version) } @last_communication_at = Time.new.to_f } @socket.onmessage { |message| message = JSON.load message #p message['action'], message['id'], message['patch'] case message['action'] when 'object-update' @slave_objects.values.each { |object| object.patch(message) if object.matches?(message['id']) } when 'object-destroy' @slave_objects.values.each { |object| object.destroy(message) if object.matches?(message['id']) } when 'object-produce' handler = @master_objects.keys.find { |key| key.include?('*') and (message['id'] =~ Regexp.new(Regexp.escape(key).gsub('\*','.*?'))) or (message['id'] == key) } raise "Stupid produce" if not @master_objects[handler] @master_objects[handler].produce(message['id'],message['version']) else p :wtf, message end @last_communication_at = Time.new.to_f } @socket.onclose { |code, reason| puts 'Seapig connection died unexpectedly (code:'+code.inspect+', reason:'+reason.inspect+'), reconnecting in 1s' EM.add_timer(1) { connect } } @socket.onerror { |error| puts 'Seapig error: '+error.inspect @socket.close EM.add_timer(1) { connect } } @socket.onping { @last_communication_at = Time.new.to_f } end
detach_fd()
click to toggle source
# File lib/seapig/client.rb, line 132 def detach_fd disconnect(true) end
disconnect(detach_fd = false)
click to toggle source
# File lib/seapig/client.rb, line 114 def disconnect(detach_fd = false) @connected = false if @timeout_timer @timeout_timer.cancel @timeout_timer = nil end if @socket @socket.onclose {} if detach_fd IO.new(@socket.detach).close else @socket.close end @socket = nil end end
master(object_id)
click to toggle source
# File lib/seapig/client.rb, line 144 def master(object_id) object = if object_id.include?('*') then SeapigWildcardObject.new(self, object_id) else SeapigObject.new(self, object_id) end object.version = nil @socket.send JSON.dump(action: 'object-producer-register', pattern: object_id) if @connected @master_objects[object_id] = object end
notifier(object_id)
click to toggle source
# File lib/seapig/client.rb, line 152 def notifier(object_id) object = SeapigObject.new(self, object_id) object.version = nil @notifier_objects[object_id] = object end
slave(object_id)
click to toggle source
# File lib/seapig/client.rb, line 137 def slave(object_id) object = if object_id.include?('*') then SeapigWildcardObject.new(self, object_id) else SeapigObject.new(self, object_id) end @socket.send JSON.dump(action: 'object-consumer-register', id: object_id, latest_known_version: object.version) if @connected @slave_objects[object_id] = object end