class BackgrounDRb::Connection

Attributes

cluster_conn[RW]
connection_status[RW]
server_ip[RW]
server_port[RW]

Public Class Methods

new(ip,port,cluster_conn) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 5
def initialize ip,port,cluster_conn
  @mutex = Mutex.new
  @server_ip = ip
  @server_port = port
  @cluster_conn = cluster_conn
  @connection_status = true
end

Public Instance Methods

all_worker_info() click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 109
def all_worker_info
  p_data = { }
  p_data[:type] = :all_worker_info
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb() }
  close_connection
  bdrb_response
end
ask_result(p_data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 145
def ask_result(p_data)
  if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
    return_result_from_memcache(p_data)
  else
    p_data[:type] = :get_result
    dump_object(p_data)
    bdrb_response = nil
    @mutex.synchronize { bdrb_response = read_from_bdrb() }
    close_connection
    bdrb_response ? bdrb_response[:data] : nil
  end
end
ask_work(p_data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 84
def ask_work p_data
  p_data[:type] = :async_invoke
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb() }
  close_connection
  bdrb_response
end
close_connection() click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 79
def close_connection
  @connection.close
  @connection = nil
end
delete_worker(p_data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 119
def delete_worker p_data
  p_data[:type] = :delete_worker
  dump_object(p_data)
  close_connection
end
dump_object(data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 68
def dump_object data
  establish_connection
  raise BackgrounDRb::BdrbConnError.new("Error while connecting to the backgroundrb server #{server_info}") unless @connection_status

  object_dump = Marshal.dump(data)
  dump_length = object_dump.length.to_s
  length_str = dump_length.rjust(9,'0')
  final_data = length_str + object_dump
  @mutex.synchronize { write_data(final_data) }
end
establish_connection() click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 14
def establish_connection
  begin
    timeout(3) do
      @connection = TCPSocket.open(server_ip, server_port)
      @connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
    end
    @connection_status = true
  rescue Timeout::Error
    @connection_status = false
  rescue Exception => e
    @connection_status = false
  end
end
flush_in_loop(data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 56
def flush_in_loop(data)
  t_length = data.length
  loop do
    break if t_length <= 0
    written_length = @connection.write(data)
    raise "Error writing to socket" if written_length <= 0
    result = @connection.flush
    data = data[written_length..-1]
    t_length = data.length
  end
end
gen_key(options) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 136
def gen_key options
  if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
    key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
    key
  else
    options[:job_key]
  end
end
new_worker(p_data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 93
def new_worker p_data
  p_data[:type] = :start_worker
  dump_object(p_data)
  close_connection
  # RailsWorkerProxy.worker(p_data[:worker],p_data[:worker_key],self)
end
read_from_bdrb(timeout = 3) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 158
def read_from_bdrb(timeout = 3)
  begin
    ret_val = select([@connection],nil,nil,timeout)
    return nil unless ret_val
    raw_response = read_object()
    master_response = Marshal.load(raw_response)
    return master_response
  rescue
    return nil
  end
end
read_object() click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 125
def read_object
  begin
    message_length_str = @connection.read(9)
    message_length = message_length_str.to_i
    message_data = @connection.read(message_length)
    return message_data
  rescue
    raise BackgrounDRb::BdrbConnError.new("Not able to connect #{server_info}")
  end
end
send_request(p_data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 170
def send_request(p_data)
  p_data[:type] = :sync_invoke
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb(nil) }
  close_connection
  bdrb_response
end
server_info() click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 52
def server_info
  "#{server_ip}:#{server_port}"
end
worker_info(p_data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 100
def worker_info(p_data)
  p_data[:type] = :worker_info
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb() }
  close_connection
  bdrb_response
end
write_data(data) click to toggle source
# File lib/backgroundrb/bdrb_connection.rb, line 28
def write_data data
  begin
    flush_in_loop(data)
  rescue Errno::EAGAIN
    return
  rescue Errno::EPIPE
    establish_connection
    if @connection_status
      flush_in_loop(data)
    else
      @connection_status = false
      raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}")
    end
  rescue
    establish_connection
    if @connection_status
      flush_in_loop(data)
    else
      @connection_status = false
      raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}")
    end
  end
end