class Neo4j::Core::CypherSession::Adaptors::Bolt

Constants

BYTE_STRINGS

Don't need to calculate these every time. Cache in memory

GOGOBOLT
STREAM_INSPECTOR
SUPPORTED_VERSIONS
VERSION

Public Class Methods

new(url, options = {}) click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
25 def initialize(url, options = {})
26   self.url = url
27   @options = options
28   @net_tcp_client_options = {read_timeout: options.fetch(:read_timeout, -1),
29                              write_timeout: options.fetch(:write_timeout, -1),
30                              connect_timeout: options.fetch(:connect_timeout, 10),
31                              ssl: options.fetch(:ssl, {})}
32 
33   open_socket
34 end
transaction_class() click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
62 def self.transaction_class
63   require 'neo4j/core/cypher_session/transactions/bolt'
64   Neo4j::Core::CypherSession::Transactions::Bolt
65 end

Public Instance Methods

connect() click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
36 def connect
37   handshake
38 
39   init
40 
41   message = flush_messages[0]
42   return if message.type == :success
43 
44   data = message.args[0]
45   fail "Init did not complete successfully\n\n#{data['code']}\n#{data['message']}"
46 end
connected?() click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
58 def connected?
59   !!@tcp_client && !@tcp_client.closed?
60 end
query_set(transaction, queries, options = {}) click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
48 def query_set(transaction, queries, options = {})
49   setup_queries!(queries, transaction, skip_instrumentation: options[:skip_instrumentation])
50 
51   self.class.instrument_request(self) do
52     send_query_jobs(queries)
53 
54     build_response(queries, options[:wrap_level] || @options[:wrap_level])
55   end
56 end
ssl?() click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
75 def ssl?
76   @tcp_client.socket.is_a?(OpenSSL::SSL::SSLSocket)
77 end

Private Instance Methods

build_response(queries, wrap_level) click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
81 def build_response(queries, wrap_level)
82   catch(:cypher_bolt_failure) do
83     Responses::Bolt.new(queries, method(:flush_messages), wrap_level: wrap_level).results
84   end.tap do |error_data|
85     handle_failure!(error_data) if !error_data.is_a?(Array)
86   end
87 end
flush_messages() click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
174 def flush_messages
175   if structures = flush_response
176     structures.map do |structure|
177       Message.new(structure.signature, *structure.list).tap do |message|
178         log_message :S, message.type, message.args.join(' ') if logger.debug?
179       end
180     end
181   end
182 end
flush_response() click to toggle source

Replace with Enumerator?

    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
196 def flush_response
197   chunk = ''
198 
199   while !(header = recvmsg(2)).empty? && (chunk_size = header.unpack('s>*')[0]) > 0
200     log_message :S, :chunk_size, chunk_size
201 
202     chunk << recvmsg(chunk_size)
203   end
204 
205   unpacker = PackStream::Unpacker.new(StringIO.new(chunk))
206   [].tap { |r| while arg = unpacker.unpack_value!; r << arg; end }
207 end
handle_failure!(error_data) click to toggle source
   # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
89 def handle_failure!(error_data)
90   flush_messages
91 
92   send_job do |job|
93     job.add_message(:ack_failure)
94   end
95   fail 'Expected SUCCESS for ACK_FAILURE' if flush_messages[0].type != :success
96 
97   fail CypherError.new_from(error_data['code'], error_data['message'])
98 end
handshake() click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
124 def handshake
125   log_message :C, :handshake, nil
126 
127   sendmsg(GOGOBOLT + SUPPORTED_VERSIONS.pack('l>*'))
128 
129   agreed_version = recvmsg(4).unpack('l>*')[0]
130 
131   if agreed_version.zero?
132     @tcp_client.close
133 
134     fail "Couldn't agree on a version (Sent versions #{SUPPORTED_VERSIONS.inspect})"
135   end
136 
137   logger.debug { "Agreed to version: #{agreed_version}" }
138 end
init() click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
140 def init
141   send_job do |job|
142     job.add_message(:init, USER_AGENT_STRING, principal: user, credentials: password, scheme: 'basic')
143   end
144 end
log_message(side, *args) click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
184 def log_message(side, *args)
185   logger.debug do
186     if args.size == 1
187       "#{side}: #{STREAM_INSPECTOR.call(args[0])}"
188     else
189       type, message = args
190       "#{side}: #{ANSI::CYAN}#{type.to_s.upcase}#{ANSI::CLEAR} #{message}"
191     end
192   end
193 end
new_job() click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
109 def new_job
110   Job.new(self)
111 end
open_socket() click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
117 def open_socket
118   @tcp_client = Net::TCPClient.new(@net_tcp_client_options.merge(buffered: false, server: "#{host}:#{port}"))
119 rescue Errno::ECONNREFUSED => e
120   raise Neo4j::Core::CypherSession::ConnectionFailedError, e.message
121 end
recvmsg(size) click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
168 def recvmsg(size)
169   @tcp_client.read(size) do |result|
170     log_message :S, result
171   end
172 end
secure_connection?() click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
113 def secure_connection?
114   @is_secure_socket ||= @options.key?(:ssl)
115 end
send_job() { |job| ... } click to toggle source

Allows you to send messages to the server Returns an array of Message objects

    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
148 def send_job
149   new_job.tap do |job|
150     yield job
151     log_message :C, :job, job
152     sendmsg(job.chunked_packed_stream)
153   end
154 end
send_query_jobs(queries) click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
100 def send_query_jobs(queries)
101   send_job do |job|
102     queries.each do |query|
103       job.add_message(:run, query.cypher, query.parameters || {})
104       job.add_message(:pull_all)
105     end
106   end
107 end
sendmsg(message) click to toggle source
    # File lib/neo4j/core/cypher_session/adaptors/bolt.rb
163 def sendmsg(message)
164   log_message :C, message
165   @tcp_client.write(message)
166 end