class Cassandra::Protocol::V4::Decoder
Constants
- READY
Public Class Methods
new(handler, compressor = nil, custom_type_handlers = {})
click to toggle source
# File lib/cassandra/protocol/v4.rb 63 def initialize(handler, compressor = nil, custom_type_handlers = {}) 64 @handler = handler 65 @compressor = compressor 66 @state = :initial 67 @header = nil 68 @version = nil 69 @code = nil 70 @length = nil 71 @buffer = CqlByteBuffer.new 72 @custom_type_handlers = custom_type_handlers 73 end
Public Instance Methods
<<(data)
click to toggle source
# File lib/cassandra/protocol/v4.rb 75 def <<(data) 76 @buffer << data 77 78 __send__(:"decode_#{@state}", @buffer) 79 end
Private Instance Methods
actual_decode(buffer, fields, frame_length, code)
click to toggle source
# File lib/cassandra/protocol/v4.rb 166 def actual_decode(buffer, fields, frame_length, code) 167 protocol_version = (fields >> 24) & 0x7f 168 compression = ((fields >> 16) & 0x01) == 0x01 169 tracing = ((fields >> 16) & 0x02) == 0x02 170 payload = ((fields >> 16) & 0x04) == 0x04 171 warning = ((fields >> 16) & 0x08) == 0x08 172 stream_id = fields & 0xffff 173 stream_id = (stream_id & 0x7fff) - (stream_id & 0x8000) 174 opcode = code & 0xff 175 176 # If we're dealing with a compressed body, read the whole body, decompress, 177 # and treat the uncompressed body as if that's what we got in the first place. 178 # This means, reset frame_length to that uncompressed size. 179 if compression 180 if @compressor 181 buffer = CqlByteBuffer.new( 182 @compressor.decompress(buffer.read(frame_length)) 183 ) 184 frame_length = buffer.size 185 else 186 raise Errors::DecodingError, 187 'Compressed frame received, but no compressor configured' 188 end 189 end 190 191 # We want to read one full frame; but after we read/parse chunks of the body 192 # there may be more cruft left in the frame that we don't care about. So, 193 # we save off the current size of the buffer, do all our reads for the 194 # frame, get the final remaining size, and based on that discard possible 195 # remaining bytes in the frame. In particular, we account for the possibility 196 # that the buffer contains some/all of a subsequent frame as well, and we 197 # don't want to mess with that. 198 199 buffer_starting_length = buffer.length 200 201 trace_id = (buffer.read_uuid if tracing) 202 203 warnings = (buffer.read_string_list if warning) 204 205 custom_payload = (buffer.read_bytes_map.freeze if payload) 206 207 remaining_frame_length = frame_length - 208 (buffer_starting_length - buffer.length) 209 response = decode_response(opcode, protocol_version, buffer, 210 remaining_frame_length, trace_id, custom_payload, 211 warnings) 212 213 # Calculate and discard remaining cruft in the frame. 214 extra_length = frame_length - (buffer_starting_length - buffer.length) 215 buffer.discard(extra_length) if extra_length > 0 216 217 if stream_id == -1 218 @handler.notify_event_listeners(response) 219 else 220 @handler.complete_request(stream_id, response) 221 end 222 end
decode_body(buffer)
click to toggle source
# File lib/cassandra/protocol/v4.rb 134 def decode_body(buffer) 135 frame_header = @header 136 frame_code = @code 137 frame_length = @length 138 buffer_length = buffer.length 139 140 until buffer_length < frame_length 141 actual_decode(buffer, frame_header, frame_length, frame_code) 142 buffer_length = buffer.length 143 144 if buffer_length < 9 145 @header = nil 146 @code = nil 147 @length = nil 148 @state = :header 149 150 return 151 end 152 153 frame_header = buffer.read_int 154 frame_code = buffer.read_byte 155 frame_length = buffer.read_int 156 buffer_length -= 9 157 end 158 159 @header = frame_header 160 @code = frame_code 161 @length = frame_length 162 163 nil 164 end
decode_header(buffer)
click to toggle source
# File lib/cassandra/protocol/v4.rb 110 def decode_header(buffer) 111 buffer_length = buffer.length 112 113 while buffer_length >= 9 114 frame_header = buffer.read_int 115 frame_code = buffer.read_byte 116 frame_length = buffer.read_int 117 118 if (buffer_length - 9) < frame_length 119 @header = frame_header 120 @code = frame_code 121 @length = frame_length 122 @state = :body 123 124 return 125 end 126 127 actual_decode(buffer, frame_header, frame_length, frame_code) 128 buffer_length = buffer.length 129 end 130 131 nil 132 end
decode_initial(buffer)
click to toggle source
# File lib/cassandra/protocol/v4.rb 85 def decode_initial(buffer) 86 return if buffer.length < 9 87 88 frame_header = buffer.read_int 89 protocol_version = (frame_header >> 24) & 0x7f 90 91 if protocol_version < 3 92 stream_id = (frame_header >> 8) & 0xff 93 stream_id = (stream_id & 0x7f) - (stream_id & 0x80) 94 95 error_response = ErrorResponse.new(nil, nil, 0x000A, 96 'Invalid or unsupported protocol version') 97 @handler.complete_request(stream_id, error_response) 98 99 return 100 end 101 102 @header = frame_header 103 @code = buffer.read_byte 104 @length = buffer.read_int 105 @state = :body 106 107 decode_body(buffer) 108 end
decode_response(opcode, protocol_version, buffer, size, trace_id, custom_payload, warnings)
click to toggle source
# File lib/cassandra/protocol/v4.rb 224 def decode_response(opcode, 225 protocol_version, 226 buffer, 227 size, 228 trace_id, 229 custom_payload, 230 warnings) 231 case opcode 232 when 0x00 # ERROR 233 code = buffer.read_int 234 message = buffer.read_string 235 236 case code 237 when 0x1000 238 UnavailableErrorResponse.new(custom_payload, 239 warnings, 240 code, 241 message, 242 buffer.read_consistency, 243 buffer.read_int, 244 buffer.read_int) 245 when 0x1100 246 WriteTimeoutErrorResponse.new(custom_payload, 247 warnings, 248 code, 249 message, 250 buffer.read_consistency, 251 buffer.read_int, 252 buffer.read_int, 253 buffer.read_string) 254 when 0x1200 255 ReadTimeoutErrorResponse.new(custom_payload, 256 warnings, 257 code, 258 message, 259 buffer.read_consistency, 260 buffer.read_int, 261 buffer.read_int, 262 (buffer.read_byte != 0)) 263 when 0x1300 264 cl = buffer.read_consistency 265 received = buffer.read_int 266 block_for = buffer.read_int 267 if protocol_version < 5 268 ReadFailureErrorResponse.new(custom_payload, 269 warnings, 270 code, 271 message, 272 cl, 273 received, 274 block_for, 275 buffer.read_int, 276 (buffer.read_byte != 0), 277 nil) 278 else 279 failures_by_node = buffer.read_reason_map 280 ReadFailureErrorResponse.new(custom_payload, 281 warnings, 282 code, 283 message, 284 cl, 285 received, 286 block_for, 287 nil, 288 (buffer.read_byte != 0), 289 failures_by_node) 290 end 291 when 0x1400 292 FunctionFailureErrorResponse.new(custom_payload, 293 warnings, 294 code, 295 message, 296 buffer.read_string, 297 buffer.read_string, 298 buffer.read_string_list) 299 when 0x1500 300 cl = buffer.read_consistency 301 received = buffer.read_int 302 block_for = buffer.read_int 303 if protocol_version < 5 304 WriteFailureErrorResponse.new(custom_payload, 305 warnings, 306 code, 307 message, 308 cl, 309 received, 310 block_for, 311 buffer.read_int, 312 buffer.read_string, 313 nil) 314 else 315 failures_by_node = buffer.read_reason_map 316 WriteFailureErrorResponse.new(custom_payload, 317 warnings, 318 code, 319 message, 320 cl, 321 received, 322 block_for, 323 nil, 324 buffer.read_string, 325 failures_by_node) 326 end 327 when 0x2400 328 AlreadyExistsErrorResponse.new(custom_payload, 329 warnings, 330 code, 331 message, 332 buffer.read_string, 333 buffer.read_string) 334 when 0x2500 335 UnpreparedErrorResponse.new(custom_payload, 336 warnings, 337 code, 338 message, 339 buffer.read_short_bytes) 340 else 341 ErrorResponse.new(custom_payload, warnings, code, message) 342 end 343 when 0x02 # READY 344 READY 345 when 0x03 # AUTHENTICATE 346 AuthenticateResponse.new(buffer.read_string) 347 when 0x06 # SUPPORTED 348 SupportedResponse.new(buffer.read_string_multimap) 349 when 0x08 # RESULT 350 result_type = buffer.read_int 351 case result_type 352 when 0x0001 # Void 353 VoidResultResponse.new(custom_payload, warnings, trace_id) 354 when 0x0002 # Rows 355 original_buffer_length = buffer.length 356 column_specs, paging_state = Coder.read_metadata_v4(buffer) 357 358 if column_specs.nil? 359 consumed_bytes = original_buffer_length - buffer.length 360 remaining_bytes = 361 CqlByteBuffer.new(buffer.read(size - consumed_bytes - 4)) 362 RawRowsResultResponse.new(custom_payload, 363 warnings, 364 protocol_version, 365 remaining_bytes, 366 paging_state, 367 trace_id, 368 @custom_type_handlers) 369 else 370 RowsResultResponse.new(custom_payload, 371 warnings, 372 Coder.read_values_v4(buffer, column_specs, @custom_type_handlers), 373 column_specs, 374 paging_state, 375 trace_id) 376 end 377 when 0x0003 # SetKeyspace 378 SetKeyspaceResultResponse.new(custom_payload, 379 warnings, 380 buffer.read_string, 381 trace_id) 382 when 0x0004 # Prepared 383 id = buffer.read_short_bytes 384 pk_idx, params_metadata = Coder.read_prepared_metadata_v4(buffer) 385 result_metadata = Coder.read_metadata_v4(buffer).first 386 387 PreparedResultResponse.new(custom_payload, 388 warnings, 389 id, 390 params_metadata, 391 result_metadata, 392 pk_idx, 393 trace_id) 394 when 0x0005 # SchemaChange 395 change = buffer.read_string 396 target = buffer.read_string 397 name = nil 398 arguments = EMPTY_LIST 399 400 case target 401 when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 402 keyspace = buffer.read_string 403 when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE, 404 Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT 405 keyspace = buffer.read_string 406 name = buffer.read_string 407 when Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION, 408 Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 409 keyspace = buffer.read_string 410 name = buffer.read_string 411 arguments = buffer.read_string_list 412 else 413 raise Errors::DecodingError, 414 "Unsupported event target: #{target.inspect}" 415 end 416 417 SchemaChangeResultResponse.new(custom_payload, 418 warnings, 419 change, 420 keyspace, 421 name, 422 target, 423 arguments, 424 trace_id) 425 else 426 raise Errors::DecodingError, 427 "Unsupported result type: #{result_type.inspect}" 428 end 429 when 0x0C # EVENT 430 event_type = buffer.read_string 431 case event_type 432 when 'SCHEMA_CHANGE' 433 change = buffer.read_string 434 target = buffer.read_string 435 arguments = EMPTY_LIST 436 437 case target 438 when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 439 keyspace = buffer.read_string 440 name = nil 441 when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE, 442 Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT, 443 Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION, 444 Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 445 keyspace = buffer.read_string 446 name = buffer.read_string 447 else 448 raise Errors::DecodingError, 449 "Unsupported event target: #{target.inspect}" 450 end 451 452 if target == Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION \ 453 || target == Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 454 arguments = buffer.read_string_list 455 end 456 457 SchemaChangeEventResponse.new(change, keyspace, name, target, arguments) 458 when 'STATUS_CHANGE' 459 StatusChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 460 when 'TOPOLOGY_CHANGE' 461 TopologyChangeEventResponse.new(buffer.read_string, *buffer.read_inet) 462 else 463 raise Errors::DecodingError, 464 "Unsupported event type: #{event_type.inspect}" 465 end 466 when 0x0E # AUTH_CHALLENGE 467 AuthChallengeResponse.new(buffer.read_bytes) 468 when 0x10 # AUTH_SUCCESS 469 AuthSuccessResponse.new(buffer.read_bytes) 470 else 471 raise Errors::DecodingError, 472 "Unsupported response opcode: #{opcode.inspect}" 473 end 474 end