class CassandraSchema::Migrator
Constants
- DEFAULT_OPTIONS
Attributes
connection[R]
current_version[R]
options[R]
Public Class Methods
new(connection:, migrations:, logger: Logger.new(STDOUT), options: {})
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 15 def initialize(connection:, migrations:, logger: Logger.new(STDOUT), options: {}) @connection = connection @logger = logger @migrations = migrations @options = DEFAULT_OPTIONS.merge(options) generate_migrator_schema! end
Public Instance Methods
migrate(target = nil)
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 24 def migrate(target = nil) lock_retry = @options.fetch(:lock_retry).dup begin raise if @options.fetch(:lock) && !lock_schema rescue if wait = lock_retry.shift @logger.info "Schema is locked; retrying in #{wait} seconds" sleep wait retry end @logger.info "Can't run migrations. Schema is locked." return end @current_version = get_current_version target ||= @migrations.keys.max || 0 @logger.info "Running migrations..." if target == current_version || @migrations.empty? @logger.info "Nothing to migrate." return end begin if target > current_version # excludes current version's up (current_version + 1).upto(target) do |next_version| migrate_to(next_version, :up) renew_lock if @options.fetch(:lock) end else # includes current version's :down # excludes target version's :down current_version.downto(target + 1) do |version| migrate_to(version, :down) renew_lock if @options.fetch(:lock) end end @logger.info "Current version: #{current_version}" @logger.info "Done!" rescue => ex @logger.error ex.message if ex.message && !ex.message.empty? @logger.info "Failed migrating all files. Current schema version: #{@current_version}" ensure unlock_schema if @options.fetch(:lock) end end
Private Instance Methods
delay(delay_time)
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 229 def delay(delay_time) sleep(delay_time) end
execute_command(command, options)
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 209 def execute_command(command, options) query_delay = @options.fetch(:query_delay) begin @connection.execute command, options # There is a Cassandra bug, where schema changes executed in quick succession # can result in internal corruption: # # https://stackoverflow.com/questions/29030661/creating-new-table-with-cqlsh-on-existing-keyspace-column-family-id-mismatch#answer # https://issues.apache.org/jira/browse/CASSANDRA-5025 delay(query_delay / 1000.0) if query_delay > 0 true rescue => ex @logger.error ex.message false end end
generate_migrator_schema!()
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 79 def generate_migrator_schema! @connection.execute( <<~CQL, CREATE TABLE IF NOT EXISTS schema_information ( name VARCHAR, value VARCHAR, PRIMARY KEY (name) ); CQL consistency: :quorum ) @connection.execute( <<~CQL, INSERT INTO schema_information(name, value) VALUES('version', '0') IF NOT EXISTS CQL consistency: :quorum ) end
get_current_version()
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 101 def get_current_version result = @connection.execute( <<~CQL, SELECT value FROM schema_information WHERE name = 'version' CQL consistency: :quorum ) unless result.rows.any? @logger.info "Can't load current schema version." fail end result.rows.first["value"].to_i end
lock_schema()
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 129 def lock_schema result = @connection.execute( <<~CQL, INSERT INTO schema_information(name, value) VALUES('lock', '1') IF NOT EXISTS USING TTL #{@options.fetch(:lock_timeout)} CQL consistency: :quorum ) result.rows.first.fetch("[applied]") end
migrate_to(target, direction)
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 164 def migrate_to(target, direction) new_version = direction == :up ? target : target - 1 @logger.info "Migrating to version #{new_version}" unless @migrations[target] @logger.info "Missing migration with version #{target}" fail end # Get commands list commands = @migrations.fetch(target).commands.fetch(direction) index = 0 commands.each do |command| unless execute_command(command, timeout: @options.fetch(:query_timeout)) message = "Failed migrating to version #{target}." if index > 0 message += " Recovering..." #recover recover_commands = @migrations .fetch(target) .commands .fetch(direction == :up ? :down : :up) .last(index) results = recover_commands.map { |cmd| execute_command(cmd, timeout: @options.fetch(:query_timeout)) } message += results.all? ? "Ok." : "Failed." end @logger.info message fail end index += 1 end update_version(new_version) end
renew_lock()
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 143 def renew_lock @connection.execute( <<~CQL, UPDATE schema_information USING TTL #{@options.fetch(:lock_timeout)} SET value = '1' WHERE name = 'lock' CQL consistency: :quorum ) end
unlock_schema()
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 155 def unlock_schema @connection.execute( <<~CQL, DELETE FROM schema_information WHERE name = 'lock' IF EXISTS; CQL consistency: :quorum ) end
update_version(target)
click to toggle source
# File lib/cassandra-schema/migrator.rb, line 117 def update_version(target) @connection.execute( <<~CQL, UPDATE schema_information SET value = ? WHERE name = 'version' CQL arguments: [target.to_s], consistency: :quorum ) @current_version = target end