class Delayed::Backend::Es::Job

Constants

DOCUMENT_TYPE
INDEX_NAME

Attributes

attempts[RW]
failed_at[RW]
handler[RW]
id[RW]
last_error[RW]
locked_at[RW]
locked_by[RW]
priority[RW]
queue[RW]
run_at[RW]
version[RW]

Public Class Methods

all() click to toggle source

CALLING 'ALL' IS NEVER A GOOD IDEA MEMORY LEAKS ALWAYS BEGIN LIKE THIS!!! stub to call 10 jobs.

# File lib/delayed/backend/es.rb, line 124
def self.all
  search_response = get_client.search :index =>INDEX_NAME, :type => DOCUMENT_TYPE, :body => {:size => 10, :query => {match_all: {}}}
  search_response["hits"]["hits"].map{|c|
      new(c["_source"].merge("id" => c["_id"]))
  }
end
clear_locks!(worker_name) click to toggle source

USES ES SCROLL API

# File lib/delayed/backend/es.rb, line 156
def self.clear_locks!(worker_name)
      scroll_id = nil
      execution_count = 0
      while true
              begin
                     response = nil
                                # Display the initial results
                                #puts "--- BATCH 0 -------------------------------------------------"
                                #puts r['hits']['hits'].map { |d| d['_source']['title'] }.inspect
                                if scroll_id.blank?
                                        response = get_client.search index: INDEX_NAME, scroll: '4m', body: {_source: false, query: {term: {locked_by: worker_name}}}
                                else
                                        response = get_client.scroll scroll_id: scroll_id, scroll: '4m'
                                end
                                
                                scroll_id = response['_scroll_id']

                                job_ids = response['hits']['hits'].map{|c| c['_id']}
                          
                                  break if job_ids.blank?

                                  bulk_array = []
                                  
                                  script = 
                                {
                                        :lang => "painless",
                                        :params => {
                                                
                                        },
                                        :source => '''
                                                ctx._source.locked_at = null;
                                                ctx._source.locked_by = null;
                                        '''
                                }

                                  job_ids.each do |jid|

                                          bulk_array << {
                                                  _update: {
                                                          _index: INDEX_NAME,
                                                          _type: DOCUMENT_TYPE,
                                                          _id: jid,
                                                          data: {
                                                                 script: script, 
                                                                 scripted_upsert: false,
                                                                 upsert: {}
                                                          }
                                                  }
                                          }

                                  end

                                  bulk_response = get_client.bulk body: bulk_array

                                  execution_count +=1    

                                  break if execution_count > 10
                   rescue => e
                           puts "error clearing locks--->"
                           puts e.to_s
                           break
                   end
      end
end
count() click to toggle source
# File lib/delayed/backend/es.rb, line 131
def self.count
  get_client.count index: INDEX_NAME
end
create(attrs = {}) click to toggle source
# File lib/delayed/backend/es.rb, line 139
def self.create(attrs = {})
  new(attrs).tap do |o|
    o.save
  end
end
create!(*args) click to toggle source
# File lib/delayed/backend/es.rb, line 145
def self.create!(*args)
  create(*args)
end
create_index() click to toggle source
# File lib/delayed/backend/es.rb, line 89
def self.create_index
        response = get_client.indices.create :index => INDEX_NAME, :body => {
                mappings: {DOCUMENT_TYPE => { :properties =>  mappings}}
        }
        puts "Created delayed job index with response."
        puts response.to_s
end
create_indexes() click to toggle source
# File lib/delayed/backend/es.rb, line 109
def self.create_indexes
        delete_index
        create_index
end
db_time_now() click to toggle source
# File lib/delayed/backend/es.rb, line 421
def self.db_time_now
  Time.current
end
delete_all() click to toggle source
# File lib/delayed/backend/es.rb, line 135
def self.delete_all
  create_indexes
end
delete_index() click to toggle source
# File lib/delayed/backend/es.rb, line 97
def self.delete_index
        if Elasticsearch::Persistence.client.indices.exists? index: INDEX_NAME
                
                puts "delayed job index exists."
                response = get_client.indices.delete :index => INDEX_NAME
                puts "delete response:"
                puts response.to_s
        else
                puts "delayed job index does not exist."
        end
end
find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) click to toggle source

Find a few candidate jobs to run (in case some immediately get locked by others).

# File lib/delayed/backend/es.rb, line 222
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) 
      #puts "max run time is:"
      #puts Worker.max_run_time
      right_now = Time.now
      #####################################################
                ##
                ##
                ## THE BASE QUERY
                ## translated into human terms
                ## any job where
                ## 1. run_at is less than the current time
                ## AND
                ## 2. locked_by : current_worker OR locked_At : nil OR locked_at < (time_now - max_run_time)
                ## AND
                ## 3. failed_at : nil
                ## AND
                ## OPTIONAL ->
                ## priority queries
                ## OPTIONAL ->
                ## queue name.
                ##
                ##
                #####################################################

                query = {
                        bool: {
                                must: [
                                        {
                                                range: {
                                                        run_at: {
                                                                lte: right_now.strftime("%Y-%m-%d %H:%M:%S")
                                                        }
                                                }
                                        },
                                        {
                                                bool: {
                                                        should: [
                                                                {
                                                                        term: {
                                                                                locked_by: Worker.name
                                                                        }
                                                                },
                                                                {
                                                                        bool: {
                                                                                must_not: [
                                                                                        {
                                                                                                exists: {
                                                                                                        field: "locked_at"
                                                                                                }
                                                                                        }
                                                                                ]
                                                                        }
                                                                },
                                                                {
                                                                        range: {
                                                                                locked_at: {
                                                                                        lte: (right_now - max_run_time).strftime("%Y-%m-%d %H:%M:%S")
                                                                                }
                                                                        }
                                                                }
                                                        ]
                                                }
                                        }
                                ],
                                must_not: [
                                        {
                                                exists: {
                                                        field: "failed_at"
                                                }
                                        }
                                ]
                        }
                }

                ################################################
                ##
                ##
                ## ADD PRIORITY CLAUSES
                ##
                ##
                ################################################
                if Worker.min_priority
                        query[:bool][:must] << {
                                range: {
                                        priority: {
                                                gte: Worker.min_priority.to_i
                                        }
                                }
                        }
                end

                if Worker.max_priority
                        query[:bool][:must] << {
                                range: {
                                        priority: {
                                                lte: Worker.max_priority.to_i
                                        }
                                }
                        }
                end


                ##############################################
                ##
                ##
                ## QUEUES IF SPECIFIED.
                ##
                ##
                ##############################################
                if Worker.queues.any?
                        query[:bool][:must] << {
                                terms: {
                                        queue: Worker.queues
                                }
                        }
                end


                #############################################
                ##
                ##
                ## SORT
                ##
                ##
                ############################################
                sort = [
                        {"locked_by" => "desc"},
                        {"priority" => "asc"},
                        {"run_at" => "asc"}
                ]

                ##puts "find available jobs query is:"
                ##puts JSON.pretty_generate(query)

                search_response = get_client.search :index => INDEX_NAME, :type => DOCUMENT_TYPE,
                        :body => {
                                version: true,
                                size: limit,
                                sort: sort,
                                query: query
                        }
                

                puts "search_response is"
                puts search_response["hits"]["hits"]
                ## it would return the first hit.
                search_response["hits"]["hits"].map{|c|
                        k = new(c["_source"])
                        k.id = c["_id"]
                        k.version = c["_version"]
                        k
                }

end
get_client() click to toggle source
# File lib/delayed/backend/es.rb, line 37
def self.get_client
      if Elasticsearch::Persistence.client
              puts "got persistence client, using it."
              puts "its settings are/"
              puts Elasticsearch::Persistence.client
              Elasticsearch::Persistence.client
      else
              puts "----- returning the default client --------- "
              client ||= Elasticsearch::Client.new
              client
      end
end
mappings() click to toggle source
# File lib/delayed/backend/es.rb, line 50
def self.mappings
                {
                        payload_object: {
                                type: 'object'
                        },
                        locked_at: {
                                type: 'date',
                                format: 'yyyy-MM-dd HH:mm:ss'
                        },
                        failed_at: {
                                type: 'date',
                                format: 'yyyy-MM-dd HH:mm:ss'
                        },
                        priority: {
                                type: 'integer'
                        },
                        attempts: {
                                type: 'integer'
                        },
                        queue: {
                                type: 'keyword'
                        },
                        handler: {
                                type: 'text',
                                index: false
                        },
                        locked_by: {
                                type: 'keyword'
                        },
                        last_error: {
                                type: 'keyword'
                        },
                        run_at: {
                                type: 'date',
                                format: 'yyyy-MM-dd HH:mm:ss'
                        }
                }
        end
new(hash = {}) click to toggle source
# File lib/delayed/backend/es.rb, line 114
def initialize(hash = {})
  self.attempts = 0
  self.priority = 0
  self.id = SecureRandom.hex(5)
  hash.each { |k, v| send(:"#{k}=", v) }
end

Public Instance Methods

destroy() click to toggle source
# File lib/delayed/backend/es.rb, line 430
def destroy
  # gotta do this.
  #puts "Calling destroy"
  self.class.get_client.delete :index => INDEX_NAME, :type => DOCUMENT_TYPE, :id => self.id.to_s
end
json_representation() click to toggle source
# File lib/delayed/backend/es.rb, line 436
def json_representation
      if self.respond_to? "as_json"
              as_json.except("payload_object").except(:payload_object)
      else
              puts "payload object is ----------->"
              puts self.payload_object
              attributes = {}
              self.class.mappings.keys.each do |attr|
                      if attr.to_s == "payload_object"
                              ## this object has to be serialized.
                              ##
                      else
                              attributes[attr] = self.send(attr)
                      end
              end
              JSON.generate(attributes)
      end
end
lock_exclusively!(_max_run_time, worker) click to toggle source

Lock this job for this worker. Returns true if we have the lock, false otherwise.

# File lib/delayed/backend/es.rb, line 379
def lock_exclusively!(_max_run_time, worker)
  #puts "called lock exclusively ------------------------>"
  
  script = 
                {
                        :lang => "painless",
                        :params => {
                                :locked_at => self.class.db_time_now.strftime("%Y-%m-%d %H:%M:%S"),
                                :locked_by => worker,
                                :version => self.version
                        },
                        :source => '''
                                if(ctx._version == params.version){
                                        ctx._source.locked_at = params.locked_at;
                                        ctx._source.locked_by = params.locked_by;
                                }
                                else{
                                        ctx.op = "none";
                                }
                        '''
                }

                puts "Script is"
                puts JSON.pretty_generate(script)


                #begin
                response = self.class.get_client.update(index: INDEX_NAME, type: DOCUMENT_TYPE, id: self.id.to_s, body: {
                        :script => script,
                        :scripted_upsert => false,
                        :upsert => {}     
                })

                ## if this returns no-op chec,
                puts "lock response:"
                puts response.to_s
                

                return response["result"] == "updated"
  
end
reload() click to toggle source
# File lib/delayed/backend/es.rb, line 472
def reload
  #puts "called reload job---------------->"
  object = self.class.get_client.get :id => self.id, :index => INDEX_NAME, :type => DOCUMENT_TYPE
  k = self.class.new(object["_source"])
  k.id = object["_id"]
  k
end
save() click to toggle source
# File lib/delayed/backend/es.rb, line 455
def save
  #puts "Came to save --------------->"
  self.run_at ||= Time.current.strftime("%Y-%m-%d %H:%M:%S")
  ## so here you do the actual saving.
  #Elasticsearch::Client.gateway.
  #puts "object as json -------------->"
  #puts json_representation
  save_response = self.class.get_client.index :index => INDEX_NAME, :type => DOCUMENT_TYPE, :body => json_representation, :id => self.id.to_s
  #puts "save response is: #{save_response}"
  self.class.all << self unless self.class.all.include?(self)
  true
end
save!() click to toggle source
# File lib/delayed/backend/es.rb, line 468
def save!
  save
end
update_attributes(attrs = {}) click to toggle source
# File lib/delayed/backend/es.rb, line 425
def update_attributes(attrs = {})
  attrs.each { |k, v| send(:"#{k}=", v) }
  save
end