class Cassandra::Cluster::ControlConnection

@private

Constants

SELECT_LOCAL
SELECT_PEERS
SELECT_PEER_QUERY

Public Class Methods

new(logger, io_reactor, cluster_registry, cluster_schema, cluster_metadata, load_balancing_policy, reconnection_policy, address_resolution_policy, connector, connection_options, schema_fetcher) click to toggle source
   # File lib/cassandra/cluster/control_connection.rb
25 def initialize(logger, io_reactor, cluster_registry, cluster_schema,
26                cluster_metadata, load_balancing_policy,
27                reconnection_policy, address_resolution_policy, connector,
28                connection_options, schema_fetcher)
29   @logger                = logger
30   @io_reactor            = io_reactor
31   @registry              = cluster_registry
32   @schema                = cluster_schema
33   @metadata              = cluster_metadata
34   @load_balancing_policy = load_balancing_policy
35   @reconnection_policy   = reconnection_policy
36   @address_resolver      = address_resolution_policy
37   @connector             = connector
38   @connection_options    = connection_options
39   @connection            = nil
40   @schema_fetcher        = schema_fetcher
41   @refreshing_statuses   = ::Hash.new(false)
42   @refresh_schema_future = nil
43   @status                = :closed
44   @refreshing_hosts      = false
45   @refreshing_host       = ::Hash.new(false)
46   @closed_promise        = Ione::Promise.new
47   @schema_changes        = ::Array.new
48   @schema_refresh_timer  = nil
49   @schema_refresh_window = nil
50 
51   mon_initialize
52 end

Public Instance Methods

close_async() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
129 def close_async
130   synchronize do
131     return @closed_promise.future if @status == :closing || @status == :closed
132     @status = :closing
133   end
134   f = @io_reactor.stop
135 
136   f.on_value(&method(:connection_closed))
137   f.on_failure(&method(:connection_closed))
138 
139   @closed_promise.future
140 end
connect_async() click to toggle source
   # File lib/cassandra/cluster/control_connection.rb
59 def connect_async
60   synchronize do
61     return Ione::Future.resolved if @status == :connecting || @status == :connected
62     @status = :connecting
63   end
64 
65   f = @io_reactor.start.flat_map do
66     plan = @load_balancing_policy.plan(nil, VOID_STATEMENT, VOID_OPTIONS)
67     connect_to_first_available(plan)
68   end
69   f
70 end
connection_closed(cause) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
142 def connection_closed(cause)
143   @closed_promise.fulfill
144 end
host_down(host) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
101 def host_down(host)
102   schedule = nil
103   timer    = nil
104 
105   synchronize do
106     if @refreshing_statuses[host] ||
107        @load_balancing_policy.distance(host) == :ignore
108       return Ione::Future.resolved
109     end
110 
111     schedule = @reconnection_policy.schedule
112     timeout  = schedule.next
113 
114     @logger.debug("Starting to continuously refresh status of #{host.ip} in " \
115       "#{timeout} seconds")
116 
117     @refreshing_statuses[host] = timer = @io_reactor.schedule_timer(timeout)
118   end
119 
120   timer.on_value do
121     refresh_host_status(host).fallback do |_e|
122       refresh_host_status_with_retry(timer, host, schedule)
123     end
124   end
125 
126   nil
127 end
host_found(host) click to toggle source
   # File lib/cassandra/cluster/control_connection.rb
72 def host_found(host)
73 end
host_lost(host) click to toggle source
   # File lib/cassandra/cluster/control_connection.rb
75 def host_lost(host)
76   synchronize do
77     timer = @refreshing_statuses.delete(host)
78     @io_reactor.cancel_timer(timer) if timer
79   end
80 
81   nil
82 end
host_up(host) click to toggle source
   # File lib/cassandra/cluster/control_connection.rb
84 def host_up(host)
85   synchronize do
86     timer = @refreshing_statuses.delete(host)
87     @io_reactor.cancel_timer(timer) if timer
88 
89     unless @connection ||
90            (@status == :closing || @status == :closed) ||
91            @load_balancing_policy.distance(host) == :ignore
92       return connect_to_first_available(
93         @load_balancing_policy.plan(nil, VOID_STATEMENT, VOID_OPTIONS)
94       )
95     end
96   end
97 
98   Ione::Future.resolved
99 end
inspect() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
146 def inspect
147   "#<#{self.class.name}:0x#{object_id.to_s(16)}>"
148 end
on_close(&block) click to toggle source
   # File lib/cassandra/cluster/control_connection.rb
54 def on_close(&block)
55   @closed_promise.future.on_value(&block)
56   @closed_promise.future.on_failure(&block)
57 end

Private Instance Methods

connect_to_first_available(plan, errors = nil) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
605       def connect_to_first_available(plan, errors = nil)
606         unless plan.has_next?
607           if errors.nil? && synchronize { @refreshing_statuses.empty? }
608             @logger.fatal(<<-MSG)
609 Control connection failed and is unlikely to recover.
610 
611         This usually means that all hosts are ignored by current load
612         balancing policy, most likely because they changed datacenters.
613         Reconnections attempts will continue getting scheduled to
614         repeat this message in the logs.
615             MSG
616           end
617 
618           return Ione::Future.failed(Errors::NoHostsAvailable.new(errors))
619         end
620 
621         host = plan.next
622         @logger.debug("Connecting to #{host.ip}")
623 
624         f = connect_to_host(host)
625         f = f.flat_map do |connection|
626           synchronize do
627             @status = :connected
628 
629             @connection = connection
630 
631             connection.on_closed do |cause|
632               reconnect = false
633 
634               synchronize do
635                 if connection == @connection
636                   if @status == :closing
637                     @status = :closed
638                   else
639                     @status = :reconnecting
640                     reconnect = true
641                   end
642 
643                   if cause
644                     @logger.info('Control connection closed ' \
645                       "(#{cause.class.name}: #{cause.message})")
646                   else
647                     @logger.info('Control connection closed')
648                   end
649 
650                   @connection = nil
651                 end
652               end
653 
654               reconnect_async(@reconnection_policy.schedule) if reconnect
655             end
656           end
657 
658           refresh_maybe_retry(:metadata)
659         end
660         f = f.flat_map { register_async }
661         f = f.flat_map { refresh_peers_async_maybe_retry }
662         f = f.flat_map { refresh_maybe_retry(:schema) } if @connection_options.synchronize_schema?
663         f = f.fallback do |error|
664           @logger.debug("Connection to #{host.ip} failed " \
665             "(#{error.class.name}: #{error.message})")
666 
667           case error
668           when Errors::HostError, Errors::TimeoutError
669             errors ||= {}
670             errors[host] = error
671             connect_to_first_available(plan, errors)
672           else
673             Ione::Future.failed(error)
674           end
675         end
676 
677         f.on_complete do |connection_future|
678           @logger.info('Control connection established') if connection_future.resolved?
679         end
680 
681         f
682       end
connect_to_host(host) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
684 def connect_to_host(host)
685   @connector.connect(host)
686 end
handle_schema_change(change) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
886 def handle_schema_change(change)
887   timer = nil
888   expiration_timer = nil
889 
890   synchronize do
891     # If change is nil, it means we want to set up timers (if there are pending
892     # changes). Otherwise, we definitely have a change and want to set up timers.
893     # Also, we only want to set up timers if we're not in the middle of a full
894     # refresh.
895     @schema_changes << change if change
896 
897     unless @schema_changes.empty? || @refresh_schema_future
898       @io_reactor.cancel_timer(@schema_refresh_timer) if @schema_refresh_timer
899       timer = @schema_refresh_timer =
900                 @io_reactor.schedule_timer(@connection_options.schema_refresh_delay)
901 
902       unless @schema_refresh_window
903         @schema_refresh_window =
904           @io_reactor.schedule_timer(@connection_options.schema_refresh_timeout)
905         expiration_timer = @schema_refresh_window
906       end
907     end
908   end
909 
910   expiration_timer.on_value do
911     schema_changes = nil
912 
913     synchronize do
914       @io_reactor.cancel_timer(@schema_refresh_timer)
915 
916       @schema_refresh_window = nil
917       @schema_refresh_timer = nil
918 
919       schema_changes = @schema_changes
920       @schema_changes = ::Array.new
921     end
922 
923     process_schema_changes(schema_changes)
924   end if expiration_timer
925 
926   timer.on_value do
927     schema_changes = nil
928 
929     synchronize do
930       @io_reactor.cancel_timer(@schema_refresh_window)
931 
932       @schema_refresh_window = nil
933       @schema_refresh_timer = nil
934 
935       schema_changes = @schema_changes
936       @schema_changes = ::Array.new
937     end
938 
939     process_schema_changes(schema_changes)
940   end if timer
941 
942   nil
943 end
peer_ip(data, host_address) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
688 def peer_ip(data, host_address)
689   peer = data['peer']
690 
691   return nil unless peer && data['host_id'] && data['data_center'] && data['rack'] && data['tokens']
692 
693   rpc_address = data['rpc_address']
694 
695   if rpc_address.nil?
696     @logger.info("The system.peers row for '#{data['peer']}' has no rpc_address. This is likely " \
697                      'a gossip or snitch issue. This host will be ignored.')
698     return nil
699   end
700 
701   if peer == host_address || rpc_address == host_address
702     # Some DSE versions were inserting a line for the local node in peers (with mostly null values).
703     # This has been fixed, but if we detect that's the case, ignore it as it's not really a big deal.
704 
705     @logger.debug("System.peers on node #{host_address} has a line for itself. This is not normal but is a " \
706                       'known problem of some DSE versions. Ignoring the entry.')
707     return nil
708   end
709 
710   ip = rpc_address
711   ip = peer if ip == '0.0.0.0'
712 
713   @address_resolver.resolve(ip)
714 end
process_schema_changes(schema_changes) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
716 def process_schema_changes(schema_changes)
717   refresh_keyspaces = ::Hash.new
718   refresh_tables_and_views = ::Hash.new
719   refresh_types      = ::Hash.new
720 
721   # This hash is of the form <keyspace, [Change (for function changes)]>
722   refresh_functions  = ::Hash.new
723 
724   # This hash is of the form <keyspace, [Change (for aggregate changes)]>
725   refresh_aggregates = ::Hash.new
726 
727   schema_changes.each do |change|
728     keyspace = change.keyspace
729 
730     next if refresh_keyspaces.key?(keyspace)
731 
732     case change.target
733     when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
734       refresh_tables_and_views.delete(keyspace)
735       refresh_types.delete(keyspace)
736       refresh_functions.delete(keyspace)
737       refresh_aggregates.delete(keyspace)
738       refresh_keyspaces[keyspace] = true
739     when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE
740       # We can't distinguish between table and view change events, so refresh both.
741       tables_and_views = refresh_tables_and_views[keyspace] ||= ::Hash.new
742       tables_and_views[change.name] = true
743     when Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT
744       types = refresh_types[keyspace] ||= ::Hash.new
745       types[change.name] = true
746     when Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION
747       functions = refresh_functions[keyspace] ||= []
748       functions << change
749     when Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE
750       aggregates = refresh_aggregates[keyspace] ||= []
751       aggregates << change
752     end
753   end
754 
755   futures = ::Array.new
756 
757   refresh_keyspaces.each_key do |keyspace|
758     futures << refresh_maybe_retry(:keyspace, keyspace)
759   end
760 
761   refresh_tables_and_views.each do |(keyspace, tables_and_views)|
762     tables_and_views.each_key do |table_or_view|
763       futures << refresh_maybe_retry(:table, keyspace, table_or_view)
764       futures << refresh_maybe_retry(:materialized_view, keyspace, table_or_view)
765     end
766   end
767 
768   refresh_types.each do |(keyspace, types)|
769     types.each_key do |type|
770       futures << refresh_maybe_retry(:type, keyspace, type)
771     end
772   end
773 
774   refresh_functions.each do |(keyspace, function_changes)|
775     function_changes.each do |change|
776       futures <<
777         refresh_maybe_retry(:function, keyspace, change.name, change.arguments)
778     end
779   end
780 
781   refresh_aggregates.each do |(keyspace, aggregate_changes)|
782     aggregate_changes.each do |change|
783       futures <<
784         refresh_maybe_retry(:aggregate, keyspace, change.name, change.arguments)
785     end
786   end
787 
788   Ione::Future.all(*futures)
789 end
reconnect_async(schedule) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
172 def reconnect_async(schedule)
173   timeout = schedule.next
174 
175   @logger.debug("Reestablishing control connection in #{timeout} seconds")
176 
177   f = @io_reactor.schedule_timer(timeout)
178   f = f.flat_map do
179     if synchronize { @status == :reconnecting }
180       plan = @load_balancing_policy.plan(nil, VOID_STATEMENT, VOID_OPTIONS)
181       connect_to_first_available(plan)
182     else
183       Ione::Future.resolved
184     end
185   end
186   f.fallback do |e|
187     @logger.error("Control connection failed (#{e.class.name}: #{e.message})")
188 
189     return Ione::Future.resolved unless synchronize { @status == :reconnecting }
190 
191     # We're reconnecting...
192     reconnect_async(schedule)
193   end
194 end
refresh_aggregate_async(keyspace_name, aggregate_name, aggregate_args) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
369 def refresh_aggregate_async(keyspace_name, aggregate_name, aggregate_args)
370   connection = @connection
371 
372   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
373 
374   @logger.info('Refreshing user-defined aggregate ' \
375     "\"#{keyspace_name}.#{aggregate_name}\"")
376 
377   # aggregate_args is an array of string, and we need an array of parsed types.
378   parsed_aggregate_args = @schema_fetcher.parse_argument_types(connection,
379                                                                keyspace_name,
380                                                                aggregate_args)
381   @schema_fetcher.fetch_aggregate(connection,
382                                   keyspace_name,
383                                   aggregate_name,
384                                   parsed_aggregate_args).map do |aggregate|
385     if aggregate
386       @schema.replace_aggregate(aggregate)
387     else
388       @schema.delete_aggregate(keyspace_name, aggregate_name, parsed_aggregate_args)
389     end
390 
391     @logger.info('Completed refreshing user-defined aggregate ' \
392       "\"#{keyspace_name}.#{aggregate_name}(#{aggregate_args.join(',')})\"")
393   end
394 end
refresh_function_async(keyspace_name, function_name, function_args) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
343 def refresh_function_async(keyspace_name, function_name, function_args)
344   connection = @connection
345 
346   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
347 
348   @logger.info('Refreshing user-defined function ' \
349     "\"#{keyspace_name}.#{function_name}\"")
350 
351   # function_args is an array of string, and we need an array of parsed types.
352   parsed_function_args =
353     @schema_fetcher.parse_argument_types(connection, keyspace_name, function_args)
354   @schema_fetcher.fetch_function(connection,
355                                  keyspace_name,
356                                  function_name,
357                                  parsed_function_args).map do |function|
358     if function
359       @schema.replace_function(function)
360     else
361       @schema.delete_function(keyspace_name, function_name, parsed_function_args)
362     end
363 
364     @logger.info('Completed refreshing user-defined function ' \
365       "\"#{keyspace_name}.#{function_name}(#{function_args.join(',')})\"")
366   end
367 end
refresh_host_async(address) click to toggle source

@param address [IPAddr] address node address, as reported from a C* event. Thus it is not resolved

relative to the client, but rather is the address that other nodes would use to communicate with
this node.
    # File lib/cassandra/cluster/control_connection.rb
571 def refresh_host_async(address)
572   connection = @connection
573   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
574 
575   ip = address.to_s
576 
577   @logger.info("Refreshing host metadata: #{ip}")
578 
579   request = if ip == connection.host
580               SELECT_LOCAL
581             else
582               Protocol::QueryRequest.new(SELECT_PEER_QUERY % ip,
583                                          EMPTY_LIST,
584                                          EMPTY_LIST,
585                                          :quorum)
586             end
587 
588   send_select_request(connection, request).map do |rows|
589     raise Errors::InternalError, "Unable to find host metadata: #{ip}" if rows.empty?
590 
591     @logger.info("Completed refreshing host metadata: #{ip}")
592     address = if ip == connection.host
593                 @address_resolver.resolve(address)
594               else
595                 peer_ip(rows.first, connection.host)
596               end
597     @registry.host_found(address, rows.first)
598 
599     self
600   end
601 rescue => e
602   @logger.error("Refreshing host metadata failed (#{e.class.name}: #{e.message})")
603 end
refresh_host_async_maybe_retry(address) click to toggle source

@param address [IPAddr] address node address, as reported from a C* event. Thus it is not resolved

relative to the client, but rather is the address that other nodes would use to communicate with
this node.
    # File lib/cassandra/cluster/control_connection.rb
521 def refresh_host_async_maybe_retry(address)
522   synchronize do
523     return Ione::Future.resolved if @refreshing_hosts || @refreshing_host[address]
524     @refreshing_host[address] = true
525   end
526 
527   refresh_host_async(address).fallback do |e|
528     case e
529     when Errors::HostError, Errors::TimeoutError
530       refresh_host_async_retry(address, e, @reconnection_policy.schedule)
531     else
532       connection = @connection
533       connection && connection.close(e)
534 
535       Ione::Future.failed(e)
536     end
537   end.map do
538     synchronize do
539       @refreshing_host.delete(address)
540     end
541   end
542 end
refresh_host_async_retry(address, error, schedule) click to toggle source

@param address [IPAddr] address node address, as reported from a C* event. Thus it is not resolved

relative to the client, but rather is the address that other nodes would use to communicate with
this node.
    # File lib/cassandra/cluster/control_connection.rb
547 def refresh_host_async_retry(address, error, schedule)
548   timeout = schedule.next
549   @logger.info("Failed to refresh host #{address} (#{error.class.name}: " \
550     "#{error.message}), retrying in #{timeout}")
551 
552   timer = @io_reactor.schedule_timer(timeout)
553   timer.flat_map do
554     refresh_host_async(address).fallback do |e|
555       case e
556       when Errors::HostError, Errors::TimeoutError
557         refresh_host_async_retry(address, e, schedule)
558       else
559         connection = @connection
560         connection && connection.close(e)
561 
562         Ione::Future.failed(e)
563       end
564     end
565   end
566 end
refresh_host_status(host) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
491 def refresh_host_status(host)
492   @connector.refresh_status(host)
493 end
refresh_host_status_with_retry(original_timer, host, schedule) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
495 def refresh_host_status_with_retry(original_timer, host, schedule)
496   timer = nil
497 
498   synchronize do
499     timer = @refreshing_statuses[host]
500 
501     # host must have been lost/up or timer was rescheduled
502     return Ione::Future.resolved if timer.nil? || timer != original_timer
503 
504     timeout = schedule.next
505 
506     @logger.debug("Checking host #{host.ip} in #{timeout} seconds")
507 
508     @refreshing_statuses[host] = timer = @io_reactor.schedule_timer(timeout)
509   end
510 
511   timer.on_value do
512     refresh_host_status(host).fallback do |_e|
513       refresh_host_status_with_retry(timer, host, schedule)
514     end
515   end
516 end
refresh_keyspace_async(keyspace_name) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
271 def refresh_keyspace_async(keyspace_name)
272   connection = @connection
273 
274   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
275 
276   @logger.info("Refreshing keyspace \"#{keyspace_name}\"")
277 
278   @schema_fetcher.fetch_keyspace(connection, keyspace_name).map do |keyspace|
279     if keyspace
280       @schema.replace_keyspace(keyspace)
281     else
282       @schema.delete_keyspace(keyspace_name)
283     end
284 
285     @logger.info("Completed refreshing keyspace \"#{keyspace_name}\"")
286   end
287 end
refresh_materialized_view_async(keyspace_name, view_name) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
307 def refresh_materialized_view_async(keyspace_name, view_name)
308   connection = @connection
309 
310   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
311 
312   @logger.info("Refreshing materialized view \"#{keyspace_name}.#{view_name}\"")
313 
314   @schema_fetcher.fetch_materialized_view(connection, keyspace_name, view_name).map do |view|
315     if view
316       @schema.replace_materialized_view(view)
317     else
318       @schema.delete_materialized_view(keyspace_name, view_name)
319     end
320 
321     @logger.info("Completed refreshing materialized view \"#{keyspace_name}.#{view_name}\"")
322   end
323 end
refresh_maybe_retry(what, *args) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
791 def refresh_maybe_retry(what, *args)
792   send(:"refresh_#{what}_async", *args).fallback do |e|
793     case e
794     when Errors::HostError, Errors::TimeoutError
795       refresh_retry(what, e, @reconnection_policy.schedule, *args)
796     else
797       connection = @connection
798       connection && connection.close(e)
799 
800       Ione::Future.failed(e)
801     end
802   end
803 end
refresh_metadata_async() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
471 def refresh_metadata_async
472   connection = @connection
473 
474   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
475 
476   @logger.info("Refreshing connected host's metadata")
477 
478   send_select_request(connection, SELECT_LOCAL).map do |local|
479     raise Errors::InternalError, "Unable to fetch connected host's metadata" if local.empty?
480 
481     data = local.first
482     @registry.host_found(@address_resolver.resolve(IPAddr.new(connection.host)), data)
483     @metadata.update(data)
484 
485     @logger.info("Completed refreshing connected host's metadata")
486 
487     nil
488   end
489 end
refresh_peers_async() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
440 def refresh_peers_async
441   connection = @connection
442 
443   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
444 
445   @logger.info('Refreshing peers metadata')
446 
447   send_select_request(connection, SELECT_PEERS).map do |peers|
448     @logger.debug("#{peers.size} peer(s) found")
449 
450     ips = ::Set.new
451 
452     peers.shuffle!
453     peers.each do |data|
454       ip = peer_ip(data, connection.host)
455       next unless ip
456       ips << ip
457       @registry.host_found(ip, data)
458     end
459 
460     @registry.each_host do |host|
461       next if host.ip == connection.host
462       @registry.host_lost(host.ip) unless ips.include?(host.ip)
463     end
464 
465     @logger.info('Completed refreshing peers metadata')
466 
467     nil
468   end
469 end
refresh_peers_async_maybe_retry() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
396 def refresh_peers_async_maybe_retry
397   synchronize do
398     return Ione::Future.resolved if @refreshing_hosts
399     @refreshing_hosts = true
400   end
401 
402   refresh_peers_async.fallback do |e|
403     case e
404     when Errors::HostError, Errors::TimeoutError
405       refresh_peers_async_retry(e, @reconnection_policy.schedule)
406     else
407       connection = @connection
408       connection && connection.close(e)
409 
410       Ione::Future.failed(e)
411     end
412   end.map do
413     synchronize do
414       @refreshing_hosts = false
415     end
416   end
417 end
refresh_peers_async_retry(error, schedule) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
419 def refresh_peers_async_retry(error, schedule)
420   timeout = schedule.next
421   @logger.info("Failed to refresh hosts (#{error.class.name}: " \
422     "#{error.message}), retrying in #{timeout}")
423 
424   timer = @io_reactor.schedule_timer(timeout)
425   timer.flat_map do
426     refresh_peers_async.fallback do |e|
427       case e
428       when Errors::HostError, Errors::TimeoutError
429         refresh_peers_async_retry(e, schedule)
430       else
431         connection = @connection
432         connection && connection.close(e)
433 
434         Ione::Future.failed(e)
435       end
436     end
437   end
438 end
refresh_retry(what, error, schedule, *args) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
805 def refresh_retry(what, error, schedule, *args)
806   timeout = schedule.next
807   @logger.info("Failed to refresh #{what} #{args.inspect} " \
808     "(#{error.class.name}: #{error.message}), retrying in #{timeout}")
809 
810   timer = @io_reactor.schedule_timer(timeout)
811   timer.flat_map do
812     send(:"refresh_#{what}_async", *args).fallback do |e|
813       case e
814       when Errors::HostError, Errors::TimeoutError
815         refresh_retry(what, e, schedule, *args)
816       else
817         connection = @connection
818         connection && connection.close(e)
819 
820         Ione::Future.failed(e)
821       end
822     end
823   end
824 end
refresh_schema_async() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
257 def refresh_schema_async
258   connection = @connection
259 
260   @logger.info('Refreshing schema')
261 
262   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
263 
264   @schema_fetcher.fetch(connection).map do |keyspaces|
265     @schema.replace(keyspaces)
266     @metadata.rebuild_token_map
267     @logger.info('Schema refreshed')
268   end
269 end
refresh_schema_async_wrapper() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
826 def refresh_schema_async_wrapper
827   # This is kinda tricky. We want to start refreshing the schema asynchronously.
828   # However, if we're already in the process of doing so, return the future
829   # representing that result rather than starting another schema refresh.
830   #
831   # A few other nuances while a refresh is in progress:
832   # * if a new attempt is made to refresh, keep track of that and schedule another
833   #   refresh after the current one completes.
834   # * we don't want schema_change events to be processed since the full refresh
835   #   may overwrite the results of handling the schema_change events with older
836   #   data. That said, we don't want to lose track of schema_change events; just
837   #   delay processing them until after the full refresh is done.
838   #
839   # Finally, when a full refresh begins, clear out any pending changes in
840   # @schema_changes because the full refresh subsumes them. This has two benefits:
841   # 1. avoid round trips to Cassandra to get details related to those schema
842   #    changes.
843   # 2. avoid race conditions where those updates may return older data than our
844   #    full refresh and might win as last writer with that potentially older data.
845   synchronize do
846     if @refresh_schema_future
847       @pending_schema_refresh = true
848       return @refresh_schema_future
849     end
850 
851     # Fresh refresh; prep this connection!
852 
853     # Since we're starting a new refresh, there can be no pending refresh request.
854     @pending_schema_refresh = false
855 
856     # Clear outstanding schema changes and timers.
857     @schema_changes = []
858     @io_reactor.cancel_timer(@schema_refresh_timer) if @schema_refresh_timer
859     @schema_refresh_timer = nil
860     @io_reactor.cancel_timer(@schema_refresh_window) if @schema_refresh_window
861     @schema_refresh_window = nil
862 
863     # Start refreshing..
864     @refresh_schema_future = refresh_maybe_retry(:schema)
865     @refresh_schema_future.on_complete do
866       pending = false
867       synchronize do
868         # We're done refreshing. If we have a pending refresh, launch it now.
869         @refresh_schema_future = nil
870         pending = @pending_schema_refresh
871         @pending_schema_refresh = false
872         unless pending
873           # Restore timers if there are pending schema changes.
874           handle_schema_change(nil)
875         end
876       end
877 
878       refresh_schema_async_wrapper if pending
879     end
880 
881     # Return the (now cached) future
882     @refresh_schema_future
883   end
884 end
refresh_table_async(keyspace_name, table_name) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
289 def refresh_table_async(keyspace_name, table_name)
290   connection = @connection
291 
292   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
293 
294   @logger.info("Refreshing table \"#{keyspace_name}.#{table_name}\"")
295 
296   @schema_fetcher.fetch_table(connection, keyspace_name, table_name).map do |table|
297     if table
298       @schema.replace_table(table)
299     else
300       @schema.delete_table(keyspace_name, table_name)
301     end
302 
303     @logger.info("Completed refreshing table \"#{keyspace_name}.#{table_name}\"")
304   end
305 end
refresh_type_async(keyspace_name, type_name) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
325 def refresh_type_async(keyspace_name, type_name)
326   connection = @connection
327 
328   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
329 
330   @logger.info("Refreshing user-defined type \"#{keyspace_name}.#{type_name}\"")
331 
332   @schema_fetcher.fetch_type(connection, keyspace_name, type_name).map do |type|
333     if type
334       @schema.replace_type(type)
335     else
336       @schema.delete_type(keyspace_name, type_name)
337     end
338 
339     @logger.info("Completed refreshing user-defined type \"#{keyspace_name}.#{type_name}\"")
340   end
341 end
register_async() click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
196 def register_async
197   connection = @connection
198 
199   return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil?
200 
201   request = Protocol::RegisterRequest.new(
202     Protocol::TopologyChangeEventResponse::TYPE,
203     Protocol::StatusChangeEventResponse::TYPE
204   )
205 
206   request.events << Protocol::SchemaChangeEventResponse::TYPE if @connection_options.synchronize_schema?
207 
208   f = connection.send_request(request)
209   f = f.map do |r|
210     case r
211     when Protocol::ReadyResponse
212       nil
213     when Protocol::ErrorResponse
214       raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0)
215     else
216       raise Errors::InternalError, "Unexpected response #{r.inspect}"
217     end
218   end
219   f.map do
220     connection.on_event do |event|
221       @logger.debug("Event received #{event}")
222 
223       if event.type == 'SCHEMA_CHANGE'
224         handle_schema_change(event)
225       else
226         case event.change
227         when 'UP'
228           address = @address_resolver.resolve(event.address)
229           if @registry.has_host?(address)
230             @registry.host_up(address)
231           else
232             refresh_host_async_maybe_retry(event.address)
233             refresh_schema_async_wrapper
234           end
235         when 'DOWN'
236           # RUBY-164: Don't mark host down if there are active connections. We have
237           # logic in connector.rb to call host_down when all connections to a node are lost,
238           # so that covers the requirement.
239         when 'NEW_NODE'
240           address = @address_resolver.resolve(event.address)
241 
242           unless @registry.has_host?(address)
243             refresh_host_async_maybe_retry(event.address)
244             refresh_schema_async_wrapper
245           end
246         when 'REMOVED_NODE'
247           @registry.host_lost(@address_resolver.resolve(event.address))
248           refresh_schema_async_wrapper
249         end
250       end
251     end
252 
253     self
254   end
255 end
send_select_request(connection, request) click to toggle source
    # File lib/cassandra/cluster/control_connection.rb
945 def send_select_request(connection, request)
946   backtrace = caller
947   connection.send_request(request).map do |r|
948     case r
949     when Protocol::RowsResultResponse
950       r.rows
951     when Protocol::ErrorResponse
952       e = r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0)
953       e.set_backtrace(backtrace)
954       raise e
955     else
956       raise Errors::InternalError, "Unexpected response #{r.inspect}", caller
957     end
958   end
959 end