class Delayed::Backend::Es::Job
Constants
- DOCUMENT_TYPE
- INDEX_NAME
Attributes
attempts[RW]
failed_at[RW]
handler[RW]
id[RW]
last_error[RW]
locked_at[RW]
locked_by[RW]
priority[RW]
queue[RW]
run_at[RW]
version[RW]
Public Class Methods
all()
click to toggle source
CALLING 'ALL' IS NEVER A GOOD IDEA MEMORY LEAKS ALWAYS BEGIN LIKE THIS!!! stub to call 10 jobs.
# File lib/delayed/backend/es.rb, line 124 def self.all search_response = get_client.search :index =>INDEX_NAME, :type => DOCUMENT_TYPE, :body => {:size => 10, :query => {match_all: {}}} search_response["hits"]["hits"].map{|c| new(c["_source"].merge("id" => c["_id"])) } end
clear_locks!(worker_name)
click to toggle source
USES ES SCROLL API
# File lib/delayed/backend/es.rb, line 156 def self.clear_locks!(worker_name) scroll_id = nil execution_count = 0 while true begin response = nil # Display the initial results #puts "--- BATCH 0 -------------------------------------------------" #puts r['hits']['hits'].map { |d| d['_source']['title'] }.inspect if scroll_id.blank? response = get_client.search index: INDEX_NAME, scroll: '4m', body: {_source: false, query: {term: {locked_by: worker_name}}} else response = get_client.scroll scroll_id: scroll_id, scroll: '4m' end scroll_id = response['_scroll_id'] job_ids = response['hits']['hits'].map{|c| c['_id']} break if job_ids.blank? bulk_array = [] script = { :lang => "painless", :params => { }, :source => ''' ctx._source.locked_at = null; ctx._source.locked_by = null; ''' } job_ids.each do |jid| bulk_array << { _update: { _index: INDEX_NAME, _type: DOCUMENT_TYPE, _id: jid, data: { script: script, scripted_upsert: false, upsert: {} } } } end bulk_response = get_client.bulk body: bulk_array execution_count +=1 break if execution_count > 10 rescue => e puts "error clearing locks--->" puts e.to_s break end end end
count()
click to toggle source
# File lib/delayed/backend/es.rb, line 131 def self.count get_client.count index: INDEX_NAME end
create(attrs = {})
click to toggle source
# File lib/delayed/backend/es.rb, line 139 def self.create(attrs = {}) new(attrs).tap do |o| o.save end end
create!(*args)
click to toggle source
# File lib/delayed/backend/es.rb, line 145 def self.create!(*args) create(*args) end
create_index()
click to toggle source
# File lib/delayed/backend/es.rb, line 89 def self.create_index response = get_client.indices.create :index => INDEX_NAME, :body => { mappings: {DOCUMENT_TYPE => { :properties => mappings}} } puts "Created delayed job index with response." puts response.to_s end
create_indexes()
click to toggle source
# File lib/delayed/backend/es.rb, line 109 def self.create_indexes delete_index create_index end
db_time_now()
click to toggle source
# File lib/delayed/backend/es.rb, line 421 def self.db_time_now Time.current end
delete_all()
click to toggle source
# File lib/delayed/backend/es.rb, line 135 def self.delete_all create_indexes end
delete_index()
click to toggle source
# File lib/delayed/backend/es.rb, line 97 def self.delete_index if Elasticsearch::Persistence.client.indices.exists? index: INDEX_NAME puts "delayed job index exists." response = get_client.indices.delete :index => INDEX_NAME puts "delete response:" puts response.to_s else puts "delayed job index does not exist." end end
find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
click to toggle source
Find a few candidate jobs to run (in case some immediately get locked by others).
# File lib/delayed/backend/es.rb, line 222 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) #puts "max run time is:" #puts Worker.max_run_time right_now = Time.now ##################################################### ## ## ## THE BASE QUERY ## translated into human terms ## any job where ## 1. run_at is less than the current time ## AND ## 2. locked_by : current_worker OR locked_At : nil OR locked_at < (time_now - max_run_time) ## AND ## 3. failed_at : nil ## AND ## OPTIONAL -> ## priority queries ## OPTIONAL -> ## queue name. ## ## ##################################################### query = { bool: { must: [ { range: { run_at: { lte: right_now.strftime("%Y-%m-%d %H:%M:%S") } } }, { bool: { should: [ { term: { locked_by: Worker.name } }, { bool: { must_not: [ { exists: { field: "locked_at" } } ] } }, { range: { locked_at: { lte: (right_now - max_run_time).strftime("%Y-%m-%d %H:%M:%S") } } } ] } } ], must_not: [ { exists: { field: "failed_at" } } ] } } ################################################ ## ## ## ADD PRIORITY CLAUSES ## ## ################################################ if Worker.min_priority query[:bool][:must] << { range: { priority: { gte: Worker.min_priority.to_i } } } end if Worker.max_priority query[:bool][:must] << { range: { priority: { lte: Worker.max_priority.to_i } } } end ############################################## ## ## ## QUEUES IF SPECIFIED. ## ## ############################################## if Worker.queues.any? query[:bool][:must] << { terms: { queue: Worker.queues } } end ############################################# ## ## ## SORT ## ## ############################################ sort = [ {"locked_by" => "desc"}, {"priority" => "asc"}, {"run_at" => "asc"} ] ##puts "find available jobs query is:" ##puts JSON.pretty_generate(query) search_response = get_client.search :index => INDEX_NAME, :type => DOCUMENT_TYPE, :body => { version: true, size: limit, sort: sort, query: query } puts "search_response is" puts search_response["hits"]["hits"] ## it would return the first hit. search_response["hits"]["hits"].map{|c| k = new(c["_source"]) k.id = c["_id"] k.version = c["_version"] k } end
get_client()
click to toggle source
# File lib/delayed/backend/es.rb, line 37 def self.get_client if Elasticsearch::Persistence.client puts "got persistence client, using it." puts "its settings are/" puts Elasticsearch::Persistence.client Elasticsearch::Persistence.client else puts "----- returning the default client --------- " client ||= Elasticsearch::Client.new client end end
mappings()
click to toggle source
# File lib/delayed/backend/es.rb, line 50 def self.mappings { payload_object: { type: 'object' }, locked_at: { type: 'date', format: 'yyyy-MM-dd HH:mm:ss' }, failed_at: { type: 'date', format: 'yyyy-MM-dd HH:mm:ss' }, priority: { type: 'integer' }, attempts: { type: 'integer' }, queue: { type: 'keyword' }, handler: { type: 'text', index: false }, locked_by: { type: 'keyword' }, last_error: { type: 'keyword' }, run_at: { type: 'date', format: 'yyyy-MM-dd HH:mm:ss' } } end
new(hash = {})
click to toggle source
# File lib/delayed/backend/es.rb, line 114 def initialize(hash = {}) self.attempts = 0 self.priority = 0 self.id = SecureRandom.hex(5) hash.each { |k, v| send(:"#{k}=", v) } end
Public Instance Methods
destroy()
click to toggle source
# File lib/delayed/backend/es.rb, line 430 def destroy # gotta do this. #puts "Calling destroy" self.class.get_client.delete :index => INDEX_NAME, :type => DOCUMENT_TYPE, :id => self.id.to_s end
json_representation()
click to toggle source
# File lib/delayed/backend/es.rb, line 436 def json_representation if self.respond_to? "as_json" as_json.except("payload_object").except(:payload_object) else puts "payload object is ----------->" puts self.payload_object attributes = {} self.class.mappings.keys.each do |attr| if attr.to_s == "payload_object" ## this object has to be serialized. ## else attributes[attr] = self.send(attr) end end JSON.generate(attributes) end end
lock_exclusively!(_max_run_time, worker)
click to toggle source
Lock this job for this worker. Returns true if we have the lock, false otherwise.
# File lib/delayed/backend/es.rb, line 379 def lock_exclusively!(_max_run_time, worker) #puts "called lock exclusively ------------------------>" script = { :lang => "painless", :params => { :locked_at => self.class.db_time_now.strftime("%Y-%m-%d %H:%M:%S"), :locked_by => worker, :version => self.version }, :source => ''' if(ctx._version == params.version){ ctx._source.locked_at = params.locked_at; ctx._source.locked_by = params.locked_by; } else{ ctx.op = "none"; } ''' } puts "Script is" puts JSON.pretty_generate(script) #begin response = self.class.get_client.update(index: INDEX_NAME, type: DOCUMENT_TYPE, id: self.id.to_s, body: { :script => script, :scripted_upsert => false, :upsert => {} }) ## if this returns no-op chec, puts "lock response:" puts response.to_s return response["result"] == "updated" end
reload()
click to toggle source
# File lib/delayed/backend/es.rb, line 472 def reload #puts "called reload job---------------->" object = self.class.get_client.get :id => self.id, :index => INDEX_NAME, :type => DOCUMENT_TYPE k = self.class.new(object["_source"]) k.id = object["_id"] k end
save()
click to toggle source
# File lib/delayed/backend/es.rb, line 455 def save #puts "Came to save --------------->" self.run_at ||= Time.current.strftime("%Y-%m-%d %H:%M:%S") ## so here you do the actual saving. #Elasticsearch::Client.gateway. #puts "object as json -------------->" #puts json_representation save_response = self.class.get_client.index :index => INDEX_NAME, :type => DOCUMENT_TYPE, :body => json_representation, :id => self.id.to_s #puts "save response is: #{save_response}" self.class.all << self unless self.class.all.include?(self) true end
save!()
click to toggle source
# File lib/delayed/backend/es.rb, line 468 def save! save end
update_attributes(attrs = {})
click to toggle source
# File lib/delayed/backend/es.rb, line 425 def update_attributes(attrs = {}) attrs.each { |k, v| send(:"#{k}=", v) } save end