class DistributedResqueWorker::ResqueWorker
Constants
- CHUNK_SIZE
Public Class Methods
clean_up(queue_name, root)
click to toggle source
# File lib/distributed_resque_worker.rb, line 164 def clean_up(queue_name, root) FileUtils.remove_dir("#{root}/tmp/#{queue_name}") delete_queue(queue_name) Resque.logger.info('Cleanup Done!') end
delete_intermediate_s3_files(work_name, bucket)
click to toggle source
# File lib/distributed_resque_worker.rb, line 142 def delete_intermediate_s3_files(work_name, bucket) aws_bucket = AwsHelper.bucket(bucket) folder = "resque_worker/#{work_name}/" s3_object = aws_bucket.objects.with_prefix(folder) s3_file_names = s3_object.collect(&:key) s3_file_names.each do |item| AwsHelper.s3_delete(item, bucket) end end
delete_queue(queue_name)
click to toggle source
# File lib/distributed_resque_worker.rb, line 171 def delete_queue(queue_name) Resque.queues.each do |queue| Resque.remove_queue(queue) if queue == queue_name end end
download_intermediate_files(work_name, bucket, root)
click to toggle source
# File lib/distributed_resque_worker.rb, line 127 def download_intermediate_files(work_name, bucket, root) aws_bucket = AwsHelper.bucket(bucket) folder = "resque_worker/#{work_name}/" s3_object = aws_bucket.objects.with_prefix(folder) s3_file_names = s3_object.collect(&:key) s3_file_names.each do |filename| local_file_name = filename.split('/') next unless local_file_name[2] download_file_path = "#{root}/tmp/#{work_name}/#{local_file_name[2]}" Resque.logger.info("download_file_path #{download_file_path} ") AwsHelper.s3_download_file(filename, download_file_path, bucket) end end
enqueue_post_processor(args)
click to toggle source
# File lib/distributed_resque_worker.rb, line 76 def enqueue_post_processor(args) Resque.logger.info('start enqueue_post_processor') input = args.symbolize_keys! details = { type: 'post_processor', work_name: input[:work_name], bucket: input[:bucket], method: input[:method], root: input[:root], opts: input[:opts] } Resque.enqueue_to(input[:work_name], ResqueWorker, details) Resque.logger.info('finished enqueue_post_processor') end
merge_intermediate_files(work_name, final_file)
click to toggle source
# File lib/distributed_resque_worker.rb, line 70 def merge_intermediate_files(work_name, final_file) files = "tmp/#{work_name}/#{work_name}_*.csv" cmd = "awk '(NR == 1) || (FNR > 1)' #{files} > #{final_file}" system(`#{cmd}`) end
new(queue_name, bucket, root)
click to toggle source
# File lib/distributed_resque_worker.rb, line 25 def initialize(queue_name, bucket, root) @queue = "#{queue_name}_#{Time.now.to_i}_#{Random.rand(1000000)}".to_sym @bucket = bucket @root = root FileUtils.mkdir_p("#{root}/tmp/#{@queue}") end
perform(args)
click to toggle source
# File lib/distributed_resque_worker.rb, line 47 def self.perform(args) redis_key = resque_worker_redis_key(args['work_name']) no_jobs = resque_redis.redis.get(redis_key) Resque.logger.info("No of jobs remaining => #{no_jobs}") if args['type'] == 'processor' process_chunk(args) if resque_redis.redis.get(redis_key).to_i.zero? enqueue_post_processor(args) end elsif args['type'] == 'post_processor' post_processing(args) end end
post_processing(args)
click to toggle source
# File lib/distributed_resque_worker.rb, line 109 def post_processing(args) input = args.symbolize_keys! work_name = input[:work_name] root = input[:root] final_tmp_file = "#{root}/tmp/#{work_name}/#{work_name}_final.csv" Resque.logger.info("start post_processing #{input}") begin download_intermediate_files(work_name, input[:bucket], root) delete_intermediate_s3_files(work_name, input[:bucket]) merge_intermediate_files(work_name, final_tmp_file) upload_final_file_to_s3_and_send(input, final_tmp_file) rescue StandardError Resque.logger.error($ERROR_INFO) nil end Resque.logger.info('finished post_processing ') end
process_chunk(args)
click to toggle source
# File lib/distributed_resque_worker.rb, line 86 def process_chunk(args) input = args.symbolize_keys! method_chunk = "#{input[:method]}_chunk".to_sym worker_class = input[:work_name].split('_').first worker = worker_class.constantize path = "#{input[:work_name]}/#{input[:work_name]}_#{input[:index]}.csv" filename = "#{input[:root]}/tmp/#{path}" worker.send(method_chunk, input[:chunk], filename, input[:opts]) store_to_s3_delete_local_copy(path, filename, input[:bucket]) resque_redis.redis.decr(resque_worker_redis_key(input[:work_name])) end
resque_redis()
click to toggle source
# File lib/distributed_resque_worker.rb, line 62 def resque_redis Resque.redis end
resque_worker_redis_key(work_name)
click to toggle source
# File lib/distributed_resque_worker.rb, line 66 def resque_worker_redis_key(work_name) "DistributedResqueWorker:#{work_name}" end
store_to_s3_delete_local_copy(path, filename, bucket)
click to toggle source
# File lib/distributed_resque_worker.rb, line 98 def store_to_s3_delete_local_copy(path, filename, bucket) s3_name = "resque_worker/#{path}" begin AwsHelper.s3_store_file(s3_name, filename, bucket) File.delete(filename) rescue StandardError Resque.logger.error($ERROR_INFO) nil end end
upload_final_file_to_s3_and_send(input, final_tmp_file)
click to toggle source
# File lib/distributed_resque_worker.rb, line 152 def upload_final_file_to_s3_and_send(input, final_tmp_file) work_name = input[:work_name] s3_name = "resque_worker/#{work_name}/#{work_name}_final.csv" final_file_link = AwsHelper.s3_store_file(s3_name, final_tmp_file, input[:bucket]) method_post = "#{input[:method]}_post".to_sym worker_class = input[:work_name].split('_').first worker = worker_class.constantize worker.send(method_post, final_file_link, input[:opts]) clean_up(work_name, input[:root]) end
Public Instance Methods
chunk_work_and_enqueue(work_list, method, opts)
click to toggle source
# File lib/distributed_resque_worker.rb, line 32 def chunk_work_and_enqueue(work_list, method, opts) total_jobs = (work_list.length.to_f / CHUNK_SIZE).ceil total_jobs = 1 if total_jobs.zero? Resque.logger.info("total_jobs #{total_jobs}") ResqueWorker.resque_redis.redis.set( ResqueWorker.resque_worker_redis_key(@queue), total_jobs ) work_list.each_slice(CHUNK_SIZE).each_with_index do |chunk, index| details = { work_name: @queue, chunk: chunk, index: index, type: 'processor', root: @root.to_s, bucket: @bucket, method: method, opts: opts } Resque.enqueue_to(@queue, ResqueWorker, details) end end