class Newque::Newque_http
Constants
- BASE_OPTIONS
Attributes
conn[R]
options[R]
timeout[R]
Public Class Methods
new(host, port, options, timeout)
click to toggle source
# File lib/newque/http/newque_http.rb, line 18 def initialize host, port, options, timeout @host = host @port = port @options = Util.compute_options BASE_OPTIONS, options @timeout = timeout / 1000.0 @conn = Faraday.new({ url: "#{@options[:https] ? "https" : "http"}://#{host}:#{port}" }) @instance = if @options[:http_format] == :json Http_json.new self elsif @options[:http_format] == :plaintext Http_plaintext.new self end end
Public Instance Methods
count(channel)
click to toggle source
# File lib/newque/http/newque_http.rb, line 63 def count channel thread = Thread.new do res = @conn.get do |req| set_req_options req req.url "/v1/#{channel}/count" end parsed = parse_json_response res.body Count_response.new parsed['count'] end Future.new thread, @timeout end
delete(channel)
click to toggle source
# File lib/newque/http/newque_http.rb, line 75 def delete channel thread = Thread.new do res = @conn.delete do |req| set_req_options req req.url "/v1/#{channel}" end parsed = parse_json_response res.body Delete_response.new end Future.new thread, @timeout end
health(channel, global=false)
click to toggle source
# File lib/newque/http/newque_http.rb, line 87 def health channel, global=false thread = Thread.new do res = @conn.get do |req| set_req_options req req.url "/v1#{global ? '' : '/' + channel}/health" end parsed = parse_json_response res.body Health_response.new end Future.new thread, @timeout end
read_stream(channel, mode, limit=nil)
click to toggle source
# File lib/newque/http/newque_http.rb, line 33 def read_stream channel, mode, limit=nil Enumerator.new do |generator| Net::HTTP.start(@host, @port) do |http| req = Net::HTTP::Get.new "/v1/#{channel}" req['newque-mode'] = mode req['newque-read-max'] = limit unless limit.nil? req['Transfer-Encoding'] = 'chunked' http.open_timeout = @timeout http.read_timeout = @timeout http.request req do |res| if res.code != '200' generator << parse_json_response(res.body) # Will throw else leftover = nil res.read_body do |chunk| lines = "#{leftover || ''}#{chunk}".split(@options[:separator], -1) *fulls, partial = lines fulls.each do |msg| generator << msg end leftover = partial end generator << leftover unless leftover.nil? end end end end end
Private Instance Methods
parse_json_response(body)
click to toggle source
# File lib/newque/http/newque_http.rb, line 106 def parse_json_response body parsed = JSON.parse body if parsed['errors'].size > 0 raise Util.newque_error parsed['errors'] end parsed end
set_req_options(req)
click to toggle source
# File lib/newque/http/newque_http.rb, line 101 def set_req_options req req.options.open_timeout = @timeout req.options.timeout = @timeout end