module Rbuv::EM

Constants

VERSION

Public Class Methods

add_periodic_timer(interval, blk=nil, &block) click to toggle source
# File lib/rbuv/em.rb, line 68
def self.add_periodic_timer(interval, blk=nil, &block)
  blk ||= block
  EventMachine::PeriodicTimer.new(interval, blk)
end
add_timer(interval, blk=nil, &block) click to toggle source
# File lib/rbuv/em.rb, line 60
def self.add_timer(interval, blk=nil, &block)
  if blk ||= block
    s = add_oneshot_timer((interval.to_f * 1000).to_i, &blk)
    @timers[s] = blk
    s
  end
end
bind_connect(bind_addr, bind_port, server, port=nil, handler=nil, *args) { |c| ... } click to toggle source
# File lib/rbuv/em.rb, line 107
def self.bind_connect bind_addr, bind_port, server, port=nil, handler=nil, *args
  begin
    port = Integer(port)
  rescue ArgumentError, TypeError
    # there was no port, so server must be a unix domain socket
    # the port argument is actually the handler, and the handler is one of the args
    args.unshift handler if handler
    handler = port
    port = nil
  end if port

  klass = klass_from_handler(Connection, handler, *args)

  s = if port
        if bind_addr
          bind_connect_server bind_addr, bind_port.to_i, server, port
        else
          connect_server server, port
        end
      else
        connect_unix_server server
      end

  c = klass.new s, *args
  @conns[s] = c
  block_given? and yield c
  c
end
cancel_timer(timer_or_sig) click to toggle source
# File lib/rbuv/em.rb, line 73
def self.cancel_timer(timer_or_sig)
  if timer_or_sig.respond_to? :cancel
    timer_or_sig.cancel
  else
    @timers[timer_or_sig] = false if @timers.has_key?(timer_or_sig)
  end
end
close_connection(tcp, _after_writing) click to toggle source
# File lib/rbuv/em.rb, line 141
def self.close_connection(tcp, _after_writing)
  tcp.close
end
connect(server, port=nil, handler=nil, *args, &blk) click to toggle source
# File lib/rbuv/em.rb, line 103
def self.connect(server, port=nil, handler=nil, *args, &blk)
  bind_connect nil, nil, server, port, handler, *args, &blk
end
reactor_running?() click to toggle source
# File lib/rbuv/em.rb, line 56
def self.reactor_running?
  @reactor_running || false
end
run(blk=nil, tail=nil, &block) click to toggle source
# File lib/rbuv/em.rb, line 13
def self.run(blk=nil, tail=nil, &block)
  tail && @tails.unshift(tail)

  b = blk || block
  if reactor_running?
    b && b.call
  else
    @conns = {}
    @acceptors = {}
    @timers = {}
    @tails ||= []
    begin
      @reactor_running = true
      initialize_event_machine
      b && add_timer(0, b)
      run_machine
    ensure
      until @tails.empty?
        @tails.pop.call
      end

      release_machine

      @reactor_running = false
    end
  end
end
run_block() { || ... } click to toggle source
# File lib/rbuv/em.rb, line 49
def self.run_block
  run do
    yield
    Rbuv.stop
  end
end
send_data(tcp, data, _size) click to toggle source
# File lib/rbuv/em.rb, line 136
def self.send_data(tcp, data, _size)
  p [tcp, data, _size]
  tcp.write(data)
end
start_server(server, port=nil, handler=nil, *args, &block) click to toggle source
# File lib/rbuv/em.rb, line 81
def self.start_server(server, port=nil, handler=nil, *args, &block)
  begin
    port = Integer(port)
  rescue ArgumentError, TypeError
    # there was no port, so server must be a unix domain socket
    # the port argument is actually the handler, and the handler is one of the args
    args.unshift handler if handler
    handler = port
    port = nil
  end if port

  klass = klass_from_handler(Connection, handler, *args)

  s = if port
        start_tcp_server server, port
      else
        start_unix_server server
      end
  @acceptors[s] = [klass, args, block]
  s
end
stop() click to toggle source
# File lib/rbuv/em.rb, line 41
def self.stop
  Rbuv.stop
end
stop_event_loop() click to toggle source
# File lib/rbuv/em.rb, line 45
def self.stop_event_loop
  self.stop
end

Private Class Methods

add_oneshot_timer(interval) { || ... } click to toggle source
# File lib/rbuv/em.rb, line 156
def self.add_oneshot_timer(interval)
  Rbuv::Timer.start(interval, 0) { yield }
end
bind_connect_server(bind_addr, bind_port, server, port) click to toggle source
# File lib/rbuv/em.rb, line 215
def self.bind_connect_server(bind_addr, bind_port, server, port)
  c = Rbuv::Tcp.new
  c.bind bind_addr, bind_port if bind_addr
  c.connect(server, port) do
    self.on_connect(c)
  end
  c
end
connect_server(server, port) click to toggle source
# File lib/rbuv/em.rb, line 211
def self.connect_server(server, port)
  self.bind_connect_server nil, nil, server, port
end
initialize_event_machine() click to toggle source
# File lib/rbuv/em.rb, line 146
def self.initialize_event_machine
end
klass_from_handler(klass = Connection, handler = nil, *args) click to toggle source
# File lib/rbuv/em.rb, line 160
def self.klass_from_handler(klass = Connection, handler = nil, *args)
  klass = if handler && handler.is_a?(Class)
            raise ArgumentError, "must provide module or subclass of #{klass.name}" unless klass >= handler
            handler
          elsif handler
            begin
              handler::EM_CONNECTION_CLASS
            rescue NameError
              handler::const_set(:EM_CONNECTION_CLASS, Class.new(klass) { include handler })
            end
          else
            klass
          end

  arity = klass.instance_method(:initialize).arity
  expected = arity >= 0 ? arity : -(arity + 1)
  if (arity >= 0 and args.size != expected) or (arity < 0 and args.size < expected)
    raise ArgumentError, "wrong number of arguments for #{klass}#initialize (#{args.size} for #{expected})"
  end

  klass
end
on_accept(s) click to toggle source
# File lib/rbuv/em.rb, line 195
def self.on_accept(s)
  klass,args,blk = @acceptors[s]
  c_tcp = Rbuv::Tcp.new
  s.accept(c_tcp)
  c = klass.new(c_tcp, *args)
  c_tcp.read_start do |data, error|
    if error.is_a?(EOFError)
      c.unbind
    else
      c.receive_data(data)
    end
  end
  @conns[c_tcp] = c
  blk && blk.call(c)
end
on_connect(c_tcp) click to toggle source
# File lib/rbuv/em.rb, line 224
def self.on_connect(c_tcp)
  c = @conns[c_tcp] or raise ConnectionNotBound, "received ConnectionCompleted for unknown signature: #{c_tcp}"
  c.connection_completed
  c_tcp.read_start do |data, error|
    if error.is_a?(EOFError)
      c.unbind
    else
      c.receive_data(data)
    end
  end
end
release_machine() click to toggle source
# File lib/rbuv/em.rb, line 153
def self.release_machine
end
run_machine() click to toggle source
# File lib/rbuv/em.rb, line 149
def self.run_machine
  Rbuv::Loop.run
end
start_tcp_server(server, port) click to toggle source
# File lib/rbuv/em.rb, line 183
def self.start_tcp_server(server, port)
  s = Rbuv::Tcp.new
  s.bind server, port
  s.listen 128 do
    self.on_accept(s)
  end
  s
end
start_unix_server(server) click to toggle source
# File lib/rbuv/em.rb, line 192
def self.start_unix_server(server)
end