class Newque::Newque_http

Constants

BASE_OPTIONS

Attributes

conn[R]
options[R]
timeout[R]

Public Class Methods

new(host, port, options, timeout) click to toggle source
# File lib/newque/http/newque_http.rb, line 18
def initialize host, port, options, timeout
  @host = host
  @port = port
  @options = Util.compute_options BASE_OPTIONS, options
  @timeout = timeout / 1000.0

  @conn = Faraday.new({ url: "#{@options[:https] ? "https" : "http"}://#{host}:#{port}" })

  @instance = if @options[:http_format] == :json
    Http_json.new self
  elsif @options[:http_format] == :plaintext
    Http_plaintext.new self
  end
end

Public Instance Methods

count(channel) click to toggle source
# File lib/newque/http/newque_http.rb, line 63
def count channel
  thread = Thread.new do
    res = @conn.get do |req|
      set_req_options req
      req.url "/v1/#{channel}/count"
    end
    parsed = parse_json_response res.body
    Count_response.new parsed['count']
  end
  Future.new thread, @timeout
end
delete(channel) click to toggle source
# File lib/newque/http/newque_http.rb, line 75
def delete channel
  thread = Thread.new do
    res = @conn.delete do |req|
      set_req_options req
      req.url "/v1/#{channel}"
    end
    parsed = parse_json_response res.body
    Delete_response.new
  end
  Future.new thread, @timeout
end
health(channel, global=false) click to toggle source
# File lib/newque/http/newque_http.rb, line 87
def health channel, global=false
  thread = Thread.new do
    res = @conn.get do |req|
      set_req_options req
      req.url "/v1#{global ? '' : '/' + channel}/health"
    end
    parsed = parse_json_response res.body
    Health_response.new
  end
  Future.new thread, @timeout
end
read_stream(channel, mode, limit=nil) click to toggle source
# File lib/newque/http/newque_http.rb, line 33
def read_stream channel, mode, limit=nil
  Enumerator.new do |generator|
    Net::HTTP.start(@host, @port) do |http|
      req = Net::HTTP::Get.new "/v1/#{channel}"
      req['newque-mode'] = mode
      req['newque-read-max'] = limit unless limit.nil?
      req['Transfer-Encoding'] = 'chunked'
      http.open_timeout = @timeout
      http.read_timeout = @timeout

      http.request req do |res|
        if res.code != '200'
          generator << parse_json_response(res.body) # Will throw
        else
          leftover = nil
          res.read_body do |chunk|
            lines = "#{leftover || ''}#{chunk}".split(@options[:separator], -1)
            *fulls, partial = lines
            fulls.each do |msg|
              generator << msg
            end
            leftover = partial
          end
          generator << leftover unless leftover.nil?
        end
      end
    end
  end
end

Private Instance Methods

parse_json_response(body) click to toggle source
# File lib/newque/http/newque_http.rb, line 106
def parse_json_response body
  parsed = JSON.parse body
  if parsed['errors'].size > 0
    raise Util.newque_error parsed['errors']
  end
  parsed
end
set_req_options(req) click to toggle source
# File lib/newque/http/newque_http.rb, line 101
def set_req_options req
  req.options.open_timeout = @timeout
  req.options.timeout = @timeout
end