class Tengine::Core::Event

Attributes

kernel[RW]

Public Class Methods

find_or_create_then_update_with_block(condition, retry_max = 60, wtimeout = 10240) { |the_event| ... } click to toggle source

@yield [event] Yields the (possibly new) event. @yieldparam [Tengine::Core::Event] event The event in question. @yieldreturn [Boolean] Return false, and it will just break the execution. Otherwise, it tries to update the event. @param [Hash] condition Criteria to find a document. @param [Numeric] retry_max (60) Maximum number of retry attempts to save the event. @param [Numeric] wtimeout (10240) Write timeout, ignored for earlier mongodb. @return [Tengine::Core::Event] The event in question if update succeeded, false if retry_max reached, or nil if the block exited with false. @raise [Mongo::OperationFailure] Any exceptions that happened inside will be propagated outside.

# File lib/tengine/core/event.rb, line 75
  def self.find_or_create_then_update_with_block condition, retry_max = 60, wtimeout = 10240
    # * とある条件を満たすイベントがあれば、それを上書きしたい。
    # * なければ、新規作成したい。
    # * でもアトミックにやりたい。
    # * ないとおもって新規作成しようとしたら裏でイベントが生えていたら、上書きモードでやり直したい。
    # * あるとおもって上書きしようとしたら裏でイベントが消えていたら、新規作成モードでやり直したい。
    # * という要求をできるだけ高速に処理したい。

    the_event = nil
    retries = -1
    results = nil

    safemode = Tengine::Core::SafeUpdatable.safemode(collection, wtimeout)

    while true do
      return false if retries >= retry_max # retryしすぎ

      retries += 1
      # あればとってくる
      if the_event and not the_event.new_record?
        the_event.reload
      else
        the_event = where(condition).first || new(condition)
      end

      return nil if not yield(the_event) # ユザーによる意図的な中断

      hash = the_event.as_document.dup # <- dup ?
      hash.delete "_id"
      hash['lock_version'] = the_event.lock_version + 1
      hash['created_at'] ||= Time.at(Time.now.to_i)
      hash['updated_at'] = Time.at(Time.now.to_i)

      results = nil
      begin
        # Can't, no results returned...
        # results = with(safe: safemode).where(
        #   key: the_event.key, lock_version: the_event.lock_version
        # ).update(
        #   "$set" => hash,
        #   flags: [ :upsert ]
        # )
        mongo_session.with(safe: safemode) do |ss|
          col = ss[collection.name]
          results = ss.context.update(
            col.database.name,
            col.name,
            { key: the_event.key, lock_version: the_event.lock_version },
            { "$set" => hash },
            flags: [ :upsert ]
          )
        end
      rescue Moped::Errors::OperationFailure => e
        # upsert = trueだがindexのunique制約があるので重複したkeyは
        # 作成不可、lock_versionの更新失敗はこちらに来る。これは意
        # 図した動作なのでraiseしない。
        Tengine.logger.debug "retrying due to mongodb error #{e}"
        # lock_versionが存在しない可能性(そのような古いDBを引きずっている等)
        mongo_session.with(safe: safemode) do |ss|
          col = ss[collection.name]
          results = ss.context.update(
            col.database.name,
            col.name,
            { "$query" => { key: the_event.key, lock_version: { "$exists" => false} } },
            { "$set" => { lock_version: -(2**63) } },
          )
        end
        # again
      else
        if results["error"]
          raise Moped::Errors::OperationFailure, results["error"]
        elsif results["upserted"]
          # *hack* _idを消してupsertしたので、このとき_idは新しくなっている
          the_event._id = results["upserted"]
          the_event.reload
          return the_event
        else
          the_event.reload
          return the_event
        end
      end
    end
rescue Exception
p $!
  end

Public Instance Methods

to_hash() click to toggle source
# File lib/tengine/core/event.rb, line 58
def to_hash
  ret = attributes.dup # <- dup ?
  ret.delete "_id"
  ret
end