class Beaneater::Connection

Represents a connection to a beanstalkd instance.

Constants

DEFAULT_PORT

Default port value for beanstalk connection

DEFAULT_RETRY_INTERVAL

Default retry interval

MAX_RETRIES

Default number of retries to send a command to a connection

Attributes

address[R]

@!attribute address

@return [String] returns Beanstalkd server address
@example
  @conn.address # => "localhost:11300"

@!attribute host

@return [String] returns Beanstalkd server host
@example
  @conn.host # => "localhost"

@!attribute port

@return [Integer] returns Beanstalkd server port
@example
  @conn.port # => "11300"

@!attribute connection

@return [Net::TCPSocket] returns connection object
connection[R]

@!attribute address

@return [String] returns Beanstalkd server address
@example
  @conn.address # => "localhost:11300"

@!attribute host

@return [String] returns Beanstalkd server host
@example
  @conn.host # => "localhost"

@!attribute port

@return [Integer] returns Beanstalkd server port
@example
  @conn.port # => "11300"

@!attribute connection

@return [Net::TCPSocket] returns connection object
host[R]

@!attribute address

@return [String] returns Beanstalkd server address
@example
  @conn.address # => "localhost:11300"

@!attribute host

@return [String] returns Beanstalkd server host
@example
  @conn.host # => "localhost"

@!attribute port

@return [Integer] returns Beanstalkd server port
@example
  @conn.port # => "11300"

@!attribute connection

@return [Net::TCPSocket] returns connection object
port[R]

@!attribute address

@return [String] returns Beanstalkd server address
@example
  @conn.address # => "localhost:11300"

@!attribute host

@return [String] returns Beanstalkd server host
@example
  @conn.host # => "localhost"

@!attribute port

@return [Integer] returns Beanstalkd server port
@example
  @conn.port # => "11300"

@!attribute connection

@return [Net::TCPSocket] returns connection object
tube_used[RW]

@!attribute tubes_watched

@returns [Array<String>] returns currently watched tube names

@!attribute tube_used

@returns [String] returns currently used tube name
tubes_watched[RW]

@!attribute tubes_watched

@returns [Array<String>] returns currently watched tube names

@!attribute tube_used

@returns [String] returns currently used tube name

Public Class Methods

new(address) click to toggle source

Initializes new connection.

@param [String] address beanstalkd instance address. @example

Beaneater::Connection.new('127.0.0.1')
Beaneater::Connection.new('127.0.0.1:11300')

ENV['BEANSTALKD_URL'] = '127.0.0.1:11300'
@b = Beaneater.new
@b.connection.host # => '127.0.0.1'
@b.connection.port # => '11300'
# File lib/beaneater/connection.rb, line 51
def initialize(address)
  @address = address || _host_from_env || Beaneater.configuration.beanstalkd_url
  @mutex = Mutex.new
  @tube_used = 'default'
  @tubes_watched = ['default']

  establish_connection
rescue
  _raise_not_connected!
end

Public Instance Methods

add_to_watched(tube_name) click to toggle source
# File lib/beaneater/connection.rb, line 107
def add_to_watched(tube_name)
  @tubes_watched << tube_name
  @tubes_watched.uniq
end
close() click to toggle source

Close connection with beanstalkd server.

@example

@conn.close
# File lib/beaneater/connection.rb, line 90
def close
  if @connection
    @connection.close
    @connection = nil
  end
end
inspect()
Alias for: to_s
remove_from_watched(tube_name) click to toggle source
# File lib/beaneater/connection.rb, line 112
def remove_from_watched(tube_name)
  @tubes_watched.delete(tube_name)
end
to_s() click to toggle source

Returns string representation of job.

@example

@conn.inspect
# File lib/beaneater/connection.rb, line 102
def to_s
  "#<Beaneater::Connection host=#{host.inspect} port=#{port.inspect}>"
end
Also aliased as: inspect
transmit(command, **options) click to toggle source

Send commands to beanstalkd server via connection.

@param [Hash{String => String, Number}>] options Retained for compatibility @param [String] command Beanstalkd command @return [Array<Hash{String => String, Number}>] Beanstalkd command response @example

@conn = Beaneater::Connection.new
@conn.transmit('bury 123')
@conn.transmit('stats')
# File lib/beaneater/connection.rb, line 72
def transmit(command, **options)
  _with_retry(**options.slice(:retry_interval, :init)) do
    @mutex.synchronize do
      _raise_not_connected! unless connection

      command = command.force_encoding('ASCII-8BIT') if command.respond_to?(:force_encoding)
      connection.write(command.to_s + "\r\n")
      res = connection.readline
      parse_response(command, res)
    end
  end
end

Protected Instance Methods

config() click to toggle source

Returns configuration options for beaneater

@return [Beaneater::Configuration] configuration object

# File lib/beaneater/connection.rb, line 167
def config
  Beaneater.configuration
end
establish_connection() click to toggle source

Establish a connection based on beanstalk address.

@return [Net::TCPSocket] connection for specified address. @raise [Beaneater::NotConnected] Could not connect to specified beanstalkd instance. @example

establish_connection('localhost:3005')
# File lib/beaneater/connection.rb, line 125
def establish_connection
  @address = address.first if address.is_a?(Array)
  match = address.split(':')
  @host, @port = match[0], Integer(match[1] || DEFAULT_PORT)

  @connection = TCPSocket.new @host, @port
end
parse_response(cmd, res) click to toggle source

Parses the response and returns the useful beanstalk response. Will read the body if one is indicated by the status.

@param [String] cmd Beanstalk command transmitted @param [String] res Telnet command response @return [Array<Hash{String => String, Number}>] Beanstalk response with `status`, `id`, `body` @raise [Beaneater::UnexpectedResponse] Response from beanstalk command was an error status @example

parse_response("delete 56", "DELETED 56\nFOO")
 # => { :body => "FOO", :status => "DELETED", :id => 56 }
# File lib/beaneater/connection.rb, line 144
def parse_response(cmd, res)
  status = res.chomp
  body_values = status.split(/\s/)
  status = body_values[0]
  raise UnexpectedResponse.from_status(status, cmd) if UnexpectedResponse::ERROR_STATES.include?(status)
  body = nil
  if ['OK','FOUND', 'RESERVED'].include?(status)
    bytes_size = body_values[-1].to_i
    raw_body = connection.read(bytes_size)
    body = status == 'OK' ? YAML.load(raw_body) : config.job_parser.call(raw_body)
    crlf = connection.read(2) # \r\n
    raise ExpectedCrlfError.new('EXPECTED_CRLF', cmd) if crlf != "\r\n"
  end
  id = body_values[1]
  response = { :status => status }
  response[:id] = id if id
  response[:body] = body if body
  response
end

Private Instance Methods

_host_from_env() click to toggle source

The host provided by BEANSTALKD_URL environment variable, if available.

@return [String] A beanstalkd host address @example

ENV['BEANSTALKD_URL'] = "localhost:1212"
 # => 'localhost:1212'
# File lib/beaneater/connection.rb, line 233
def _host_from_env
  ENV['BEANSTALKD_URL'].respond_to?(:length) && ENV['BEANSTALKD_URL'].length > 0 && ENV['BEANSTALKD_URL'].strip
end
_initialize_tubes() click to toggle source
# File lib/beaneater/connection.rb, line 173
def _initialize_tubes
  if @tubes_watched != ['default']
    tubes_watched.each do |t|
      transmit("watch #{t}", init: false)
    end

    transmit("ignore default", init: false)
  end

  transmit("use #{tube_used}", init: false) if @tube_used != 'default'
end
_raise_not_connected!() click to toggle source

Raises an error to be triggered when the connection has failed @raise [Beaneater::NotConnected] Beanstalkd is no longer connected

# File lib/beaneater/connection.rb, line 239
def _raise_not_connected!
  raise Beaneater::NotConnected, "Connection to beanstalk '#{@host}:#{@port}' is closed!"
end
_reconnect(original_exception, retry_interval, tries=MAX_RETRIES) click to toggle source

Tries to re-establish connection to the beanstalkd

@param [Exception] original_exception The exception caused the retry @param [Integer] retry_interval The time to wait before the next reconnect @param [Integer] tries The maximum number of attempts to reconnect

# File lib/beaneater/connection.rb, line 214
def _reconnect(original_exception, retry_interval, tries=MAX_RETRIES)
  close
  establish_connection
rescue Errno::ECONNREFUSED
  tries -= 1
  if tries.zero?
    _raise_not_connected!
  end
  sleep(retry_interval || DEFAULT_RETRY_INTERVAL)
  retry
end
_with_retry(retry_interval: DEFAULT_RETRY_INTERVAL, init: true, tries: MAX_RETRIES) { || ... } click to toggle source

Wrapper method for capturing certain failures and retry the payload block

@param [Proc] block The command to execute. @param [Integer] retry_interval The time to wait before the next retry @param [Integer] tries The maximum number of tries in draining mode @return [Object] Result of the block passed

# File lib/beaneater/connection.rb, line 192
def _with_retry(retry_interval: DEFAULT_RETRY_INTERVAL, init: true, tries: MAX_RETRIES, &block)
  yield
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE,
  Errno::ECONNREFUSED => ex
  _reconnect(ex, retry_interval)
  _initialize_tubes if init
  retry
rescue Beaneater::DrainingError
  tries -= 1
  if tries.zero?
    close
    raise
  end
  sleep(retry_interval)
  retry
end