class Rakie::Channel
Constants
- DEFAULT_BUFFER_SIZE
Attributes
delegate[RW]
Public Class Methods
new(io, delegate=nil)
click to toggle source
# File lib/rakie/channel.rb, line 7 def initialize(io, delegate=nil) @io = io @read_buffer = String.new @write_buffer = String.new @write_task = [] @delegate = delegate Event.push(io, self, Event::READ_EVENT) end
Public Instance Methods
close()
click to toggle source
# File lib/rakie/channel.rb, line 159 def close if @io.closed? return nil end Event.delete(@io) return nil end
closed?()
click to toggle source
# File lib/rakie/channel.rb, line 172 def closed? @io.closed? end
eof?()
click to toggle source
# File lib/rakie/channel.rb, line 168 def eof? @read_buffer.empty? end
handle_write(len)
click to toggle source
# File lib/rakie/channel.rb, line 53 def handle_write(len) task = @write_task[0] while len > 0 if len < task @write_task[0] = task - len return end len -= task @write_task.shift if @delegate != nil @delegate.on_send(self) Log.debug("Channel handle on_send") end task = @write_task[0] end end
on_detach(io)
click to toggle source
# File lib/rakie/channel.rb, line 109 def on_detach(io) if io.closed? return end begin io.close rescue Log.debug("Channel is already closed") return end if @delegate @delegate.on_close(self) end Log.debug("Channel close ok") end
on_read(io)
click to toggle source
# File lib/rakie/channel.rb, line 16 def on_read(io) begin loop do @read_buffer << io.read_nonblock(DEFAULT_BUFFER_SIZE) end rescue IO::EAGAINWaitReadable Log.debug("Channel read pending") rescue IO::EWOULDBLOCKWaitReadable Log.debug("Channel read pending") rescue Exception => e # Process the last message on exception if @delegate != nil @delegate.on_recv(self, @read_buffer) @read_buffer = String.new # Reset buffer end Log.debug("Channel error #{io}: #{e}") return Event::HANDLE_FAILED end if @delegate != nil Log.debug("Channel handle on_recv") len = @delegate.on_recv(self, @read_buffer) if len > @read_buffer.length len = @read_buffer.length end @read_buffer = @read_buffer[len .. -1] end return Event::HANDLE_CONTINUED end
on_write(io)
click to toggle source
# File lib/rakie/channel.rb, line 74 def on_write(io) len = 0 offset = 0 begin while @write_buffer.length > 0 len = io.write_nonblock(@write_buffer) offset += len @write_buffer = @write_buffer[len .. -1] end Log.debug("Channel write #{len} bytes finished") rescue IO::EAGAINWaitWritable self.handle_write(offset) Log.debug("Channel write pending") return Event::HANDLE_CONTINUED rescue IO::EWOULDBLOCKWaitWritable self.handle_write(offset) Log.debug("Channel write pending") return Event::HANDLE_CONTINUED rescue Log.debug("Channel close #{io}") return Event::HANDLE_FAILED end self.handle_write(offset) return Event::HANDLE_FINISHED end
read(size)
click to toggle source
# File lib/rakie/channel.rb, line 129 def read(size) if self.eof? return "" end if size > data.length size = data.length end data = @read_buffer[0 .. (size - 1)] @read_buffer = @read_buffer[size .. -1] return data end
write(data)
click to toggle source
# File lib/rakie/channel.rb, line 144 def write(data) if @io.closed? return -1 end @write_buffer << data @write_task << data.length Log.debug("write buffer append size: #{data.length}") Event.modify(@io, self, (Event::READ_EVENT | Event::WRITE_EVENT)) return 0 end