class RocketJob::Sliced::Slices
Attributes
all[R]
collection_name[RW]
slice_class[RW]
slice_size[RW]
Public Class Methods
new(collection_name:, slice_class: Sliced::Slice, slice_size: 100)
click to toggle source
Parameters
name: [String] Name of the collection to create slice_size: [Integer] Number of records to store in each slice Default: 100 slice_class: [class] Slice class to use to hold records. Default: RocketJob::Sliced::Slice
# File lib/rocket_job/sliced/slices.rb, line 20 def initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) @slice_class = slice_class @slice_size = slice_size @collection_name = collection_name # Using `Sliced::Slice` avoids having to add `_type` as an index when all slices are the same type anyway. @all = Sliced::Slice.with_collection(collection_name) end
Public Instance Methods
append(slice, input_slice)
click to toggle source
Append to an existing slice if already present
# File lib/rocket_job/sliced/slices.rb, line 99 def append(slice, input_slice) existing_slice = all.where(id: input_slice.id).first return insert(slice, input_slice) unless existing_slice extra_records = slice.is_a?(Slice) ? slice.records : slice existing_slice.records = existing_slice.records + extra_records existing_slice.save! existing_slice end
completed()
click to toggle source
Forwardable generates invalid warnings on these methods.
# File lib/rocket_job/sliced/slices.rb, line 131 def completed all.completed end
create(params = {})
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 33 def create(params = {}) slice = new(params) slice.save slice end
create!(params = {})
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 39 def create!(params = {}) slice = new(params) slice.save! slice end
create_indexes()
click to toggle source
Index for find_and_modify only if it is not already present
# File lib/rocket_job/sliced/slices.rb, line 112 def create_indexes missing = begin all.collection.indexes.none? { |i| i["name"] == "state_1__id_1" } rescue Mongo::Error::OperationFailure true end all.collection.indexes.create_one({state: 1, _id: 1}, unique: true) if missing end
drop()
click to toggle source
Drop this collection when it is no longer needed
# File lib/rocket_job/sliced/slices.rb, line 126 def drop all.collection.drop end
each(&block)
click to toggle source
Returns output slices in the order of their id which is usually the order in which they were written.
# File lib/rocket_job/sliced/slices.rb, line 47 def each(&block) all.sort(id: 1).each(&block) end
failed()
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 135 def failed all.failed end
first()
click to toggle source
Mongoid
does not apply ordering, add sort rubocop:disable Style/RedundantSort
# File lib/rocket_job/sliced/slices.rb, line 149 def first all.sort("_id" => 1).first end
group_exceptions()
click to toggle source
Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.
Each struct consists of:
class_name: [String] Exception class name. count: [Integer] Number of exceptions with this class. messages: [Array<String>] Unique list of error messages.
# File lib/rocket_job/sliced/slices.rb, line 171 def group_exceptions result_struct = Struct.new(:class_name, :count, :messages) result = all.collection.aggregate( [ { "$match" => {state: "failed"} }, { "$group" => { _id: {error_class: "$exception.class_name"}, messages: {"$addToSet" => "$exception.message"}, count: {"$sum" => 1} } } ] ) result.collect do |errors| result_struct.new(errors["_id"]["error_class"], errors["count"], errors["messages"]) end end
insert(slice, input_slice = nil)
click to toggle source
Insert a new slice into the collection
Returns [Integer] the number of records uploaded
Parameters
slice [RocketJob::Sliced::Slice | Array] The slice to write to the slices collection If slice is an Array, it will be converted to a Slice before inserting into the slices collection input_slice [RocketJob::Sliced::Slice] The input slice to which this slice corresponds The id of the input slice is copied across If the insert results in a duplicate record it is ignored, to support restarting of jobs that failed in the middle of processing. A warning is logged that the slice has already been processed.
Note:
`slice_size` is not enforced. However many records are present in the slice will be written as a single slice to the slices collection
# File lib/rocket_job/sliced/slices.rb, line 73 def insert(slice, input_slice = nil) slice = new(records: slice) unless slice.is_a?(Slice) # Retain input_slice id in the new output slice if input_slice slice.id = input_slice.id slice.first_record_number = input_slice.first_record_number end begin slice.save! rescue Mongo::Error::OperationFailure => e # Ignore duplicates since it means the job was restarted raise(e) unless e.message.include?("E11000") logger.warn "Skipped already processed slice# #{slice.id}" end slice end
Also aliased as: <<
insert_many(slices)
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 93 def insert_many(slices) documents = slices.collect(&:as_document) all.collection.insert_many(documents) end
last()
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 153 def last all.sort("_id" => -1).first end
new(params = {})
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 29 def new(params = {}) slice_class.new(params.merge(collection_name: collection_name)) end
queued()
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 139 def queued all.queued end
running()
click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 143 def running all.running end