class WCC::Contentful::SyncEngine

The SyncEngine is used to keep the currently configured store up to date using the Sync API. It is available on the WCC::Contentful::Services instance, and the application is responsible to periodically call next in order to hit the sync API and update the store.

If you have mounted the WCC::Contentful::Engine, AND the configured store is one that can be synced (i.e. it responds to `:index`), then the WCC::Contentful::WebhookController will call next automatically anytime a webhook is received. Otherwise you should hook up to the Webhook events and call the sync engine via your initializer:

WCC::Contentful::Events.subscribe(proc do |event|
  WCC::Contentful::Services.instance.sync_engine.next(up_to: event.dig('sys', 'id'))
end, with: :call)

Attributes

client[R]
store[R]

Public Class Methods

new(state: nil, store: nil, client: nil, key: nil) click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 41
def initialize(state: nil, store: nil, client: nil, key: nil)
  @state_key = key || "sync:#{object_id}"
  @client = client || WCC::Contentful::Services.instance.client
  @mutex = Mutex.new

  if store
    unless %i[index index? find].all? { |m| store.respond_to?(m) }
      raise ArgumentError, ':store param must implement the Store interface'
    end

    @store = store
    @state = read_state if should_sync?
  end
  if state
    @state = token_wrapper_factory(state)
    raise ArgumentError, ':state param must be a String or Hash' unless @state.is_a? Hash
    unless @state.dig('sys', 'type') == 'token'
      raise ArgumentError, ':state param must be of sys.type = "token"'
    end
  end
  raise ArgumentError, 'either :state or :store must be provided' unless @state || @store
end

Public Instance Methods

emit_event(event) click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 102
def emit_event(event)
  type = event.dig('sys', 'type')
  raise ArgumentError, "Unknown event type #{event}" unless type.present?

  broadcast(type, event)
end
emit_sync_complete(events) click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 109
def emit_sync_complete(events)
  event = WCC::Contentful::Event::SyncComplete.new(events, source: self)
  broadcast('SyncComplete', event)
end
next(up_to_id: nil) { |event| ... } click to toggle source

Gets the next increment of data from the Sync API. If the configured store responds to `:index`, that will be called with each item in the Sync response to update the store. If a block is passed, that block will be evaluated with each item in the response. @param [String] up_to_id An ID to look for in the response. The method returns

true if the ID was found or no up_to_id was given, false if the ID did not come back.

@return [Array] A `[Boolean, Integer]` tuple where the first value is whether the ID was found,

and the second value is the number of items returned.
# File lib/wcc/contentful/sync_engine.rb, line 73
def next(up_to_id: nil)
  id_found = up_to_id.nil?
  all_events = []

  @mutex.synchronize do
    @state ||= read_state || token_wrapper_factory(nil)
    next_sync_token = @state['token']

    sync_resp = client.sync(sync_token: next_sync_token)
    sync_resp.items.each do |item|
      id = item.dig('sys', 'id')
      id_found ||= id == up_to_id

      store.index(item) if store&.index?
      event = WCC::Contentful::Event.from_raw(item, source: self)
      yield(event) if block_given?
      emit_event(event)
      all_events << event
    end

    @state = @state.merge('token' => sync_resp.next_sync_token)
    write_state
  end

  emit_sync_complete(all_events)

  [id_found, all_events.length]
end
should_sync?() click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 37
def should_sync?
  store&.index?
end
state() click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 30
def state
  (@state&.dup || token_wrapper_factory(nil)).freeze
end

Private Instance Methods

read_state() click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 116
def read_state
  return unless found = store&.find(@state_key)

  # backwards compat - migrate existing state
  token_wrapper_factory(found)
end
token_wrapper_factory(state) click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 127
def token_wrapper_factory(state)
  state = { 'token' => state } unless state.is_a? Hash

  state.merge!('sys' => { 'id' => @state_key, 'type' => 'token' }) unless state['sys']
  state
end
write_state() click to toggle source
# File lib/wcc/contentful/sync_engine.rb, line 123
def write_state
  store.index(@state) if store&.index?
end