class BatchKit::Database

Implements functionality for persisting details of jobs run in a relational database, via the Sequel database library.

Public Class Methods

new(options = {}) click to toggle source

Instantiate a database back-end for persisting job and task runs.

@param options [Hash] An options hash, passed on to the

{BatchKit::Database::Schema#initialize Schema} instance.
# File lib/batch-kit/database.rb, line 16
def initialize(options = {})
    @options = options
    @schema = Schema.new(options)
end

Public Instance Methods

connect(*args) click to toggle source

Connect to a back-end database for persistence.

@param args [Array<String>] Connection details to be passed to

the {BatchKit::Database::Schema#connect} method.
# File lib/batch-kit/database.rb, line 32
def connect(*args)
    @schema.connect(*args)

    # We can only include the models once we have connected
    require_relative 'database/models'

    # Check if the database schema is up-to-date
    MD5.check_schema(@schema)

    # Perform housekeeping tasks
    perform_housekeeping
end
log() click to toggle source

Log database messages under the batch.database namespace.

# File lib/batch-kit/database.rb, line 23
def log
    @log ||= BatchKit::LogManager.logger('batch.database')
end
perform_housekeeping() click to toggle source

Purges detail records that are older than the retention threshhold.

# File lib/batch-kit/database.rb, line 47
def perform_housekeeping
    # Only do housekeeping once per day
    return if JobRun.where{job_start_time > Date.today}.count > 0

    log.info "Performing batch database housekeeping"

    # Abort jobs in Executing state that have not logged for 6+ hours
    @schema.connection.transaction do
        cutoff = Time.now - 6 * 60 * 60
        exec_jobs = JobRun.where(job_status: 'EXECUTING').map(:job_run)
        curr_jobs = JobRunLog.select_group(:job_run).
            where(job_run: exec_jobs).having{max(log_time) > cutoff}.map(:job_run)
        abort_jobs = JobRun.where(job_run: exec_jobs - curr_jobs).all
        if abort_jobs.count > 0
            log.detail "Cleaning up #{abort_jobs.count} zombie jobs"
            abort_tasks = TaskRun.where(job_run: abort_jobs.map(&:id), task_status: 'EXECUTING')
            abort_tasks.each(&:timeout)
            abort_jobs.each(&:timeout)
        end
    end

    # Purge locks that expired 6+ hours ago
    @schema.connection.transaction do
        purge_date = Time.now - 6 * 60 * 60
        Lock.where{lock_expires_at < purge_date}.delete
    end

    # Purge log records for old job runs
    @schema.connection.transaction do
        purge_date = Date.today - @options.fetch(:log_retention_days, 60)
        purge_job_runs = JobRun.where(job_purged_flag: false).
            where{job_start_time < purge_date}.map(:job_run)
        if purge_job_runs.count > 0
            log.detail "Purging log records for #{purge_job_runs.count} job runs"
            purge_job_runs.each_slice(1000).each do |purge_ids|
                JobRunLog.where(job_run: purge_ids).delete
                JobRun.where(job_run: purge_ids).update(job_purged_flag: true)
            end
        end
    end

    # Purge old task and job runs
    @schema.connection.transaction do
        purge_date = Date.today - @options.fetch(:job_run_retention_days, 365)
        purge_job_runs = JobRun.where{job_start_time < purge_date}.map(:job_run)
        if purge_job_runs.count > 0
            log.detail "Purging job and task run records for #{purge_job_runs.count} job runs"
            purge_job_runs.each_slice(1000).each do |purge_ids|
                JobRunArg.where(job_run: purge_ids).delete
                TaskRun.where(job_run: purge_ids).delete
                JobRun.where(job_run: purge_ids).delete
            end
        end
    end

    # Purge old request runs
    @schema.connection.transaction do
        purge_date = Date.today - @options.fetch(:request_retention_days, 90)
        purge_requests = Request.where{request_launched_at < purge_date}.map(:request_id)
        if purge_requests.count > 0
            log.detail "Purging request records for #{purge_requests.count} requests"
            purge_requests.each_slice(1000).each do |purge_ids|
                Request.where(request_id: purge_ids).delete
                Requestor.where(request_id: purge_ids).delete
            end
        end
    end

    # Purge jobs with no runs
    @schema.connection.transaction do
        purge_jobs = Job.left_join(:batch_job_run, :job_id => :job_id).
            where(Sequel.qualify(:batch_job_run, :job_id) => nil).
            select(Sequel.qualify(:batch_job, :job_id)).map(:job_id)
        if purge_jobs.count > 0
            log.detail "Purging #{purge_jobs.count} old jobs"
            purge_jobs.each_slice(1000).each do |purge_ids|
                JobRunFailure.where(job_id: purge_ids).delete
                Task.where(job_id: purge_ids).delete
                Job.where(job_id: purge_ids).delete
            end
        end
    end
end