class RocketJob::Sliced::Slice
A slice is an Array of Records, along with meta-data that is used or set during processing of the individual records
Note: Do not create instances of this model directly, go via Slice#new
so that the correct collection name is used.
Example:
slice = RocketJob::Sliced::Slice.new slice << 'first' slice << 'second' second = slice.at(1) # The [] operator is for retrieving attributes: slice['state']
Public Class Methods
Returns whether this is a specialized binary slice for creating binary data from each slice that is downloaded without conversion into output files.
# File lib/rocket_job/sliced/slice.rb, line 99 def self.binary_format end
For binary formats only, format the supplied records into the binary format for this slice
# File lib/rocket_job/sliced/slice.rb, line 103 def self.to_binary(_records) raise NotImplementedError end
Public Instance Methods
Returns [Hash] the slice as a Hash for storage purposes Compresses / Encrypts the slice according to the job setting
# File lib/rocket_job/sliced/slice.rb, line 146 def as_attributes attrs = super attrs["records"] = serialize_records if @records attrs end
Returns [Integer] the record number of the record currently being processed relative to the entire file.
# File lib/rocket_job/sliced/slice.rb, line 124 def current_record_number first_record_number + (processing_record_number || 1) - 1 end
Fail this slice if an exception occurs during processing.
# File lib/rocket_job/sliced/slice.rb, line 157 def fail_on_exception!(re_raise_exceptions = false, &block) SemanticLogger.named_tagged(slice: id.to_s, &block) rescue Exception => e SemanticLogger.named_tagged(slice: id.to_s) do if failed? || !may_fail? exception = JobException.from_exception(e) exception.worker_name = worker_name save! unless new_record? || destroyed? elsif new_record? || destroyed? fail(e) else fail!(e) end raise e if re_raise_exceptions end end
Returns the failed record. Returns [nil] if there is no failed record
# File lib/rocket_job/sliced/slice.rb, line 140 def failed_record at(processing_record_number - 1) if exception && processing_record_number end
# File lib/rocket_job/sliced/slice.rb, line 152 def inspect "#{super[0...-1]}, records: #{@records.inspect}, collection_name: #{collection_name.inspect}>" end
`records` array has special handling so that it can be modified in place instead of having to replace the entire array every time. For example, when appending lines with `<<`.
# File lib/rocket_job/sliced/slice.rb, line 109 def records @records ||= [] end
Replace the records within this slice
# File lib/rocket_job/sliced/slice.rb, line 114 def records=(records) raise(ArgumentError, "Cannot assign type: #{records.class.name} to records") unless records.is_a?(Array) @records = records end
Before Fail save the exception to this slice.
# File lib/rocket_job/sliced/slice.rb, line 129 def set_exception(exc = nil) if exc self.exception = JobException.from_exception(exc) exception.worker_name = worker_name end self.failure_count = failure_count.to_i + 1 self.worker_name = nil end
Private Instance Methods
Always add records to any updates.
# File lib/rocket_job/sliced/slice.rb, line 177 def atomic_updates(*args) r = super(*args) (r["$set"] ||= {})["records"] = serialize_records if @records r end
# File lib/rocket_job/sliced/slice.rb, line 183 def parse_records @records = attributes.delete("records") end
# File lib/rocket_job/sliced/slice.rb, line 187 def serialize_records records.mongoize end
Before Start
# File lib/rocket_job/sliced/slice.rb, line 192 def set_started_at self.started_at = Time.now end