class WCC::Contentful::SyncEngine
The SyncEngine
is used to keep the currently configured store up to date using the Sync API. It is available on the WCC::Contentful::Services
instance, and the application is responsible to periodically call next
in order to hit the sync API and update the store.
If you have mounted the WCC::Contentful::Engine
, AND the configured store is one that can be synced (i.e. it responds to `:index`), then the WCC::Contentful::WebhookController will call next
automatically anytime a webhook is received. Otherwise you should hook up to the Webhook events and call the sync engine via your initializer:
WCC::Contentful::Events.subscribe(proc do |event| WCC::Contentful::Services.instance.sync_engine.next(up_to: event.dig('sys', 'id')) end, with: :call)
Attributes
Public Class Methods
# File lib/wcc/contentful/sync_engine.rb, line 41 def initialize(state: nil, store: nil, client: nil, key: nil) @state_key = key || "sync:#{object_id}" @client = client || WCC::Contentful::Services.instance.client @mutex = Mutex.new if store unless %i[index index? find].all? { |m| store.respond_to?(m) } raise ArgumentError, ':store param must implement the Store interface' end @store = store @state = read_state if should_sync? end if state @state = token_wrapper_factory(state) raise ArgumentError, ':state param must be a String or Hash' unless @state.is_a? Hash unless @state.dig('sys', 'type') == 'token' raise ArgumentError, ':state param must be of sys.type = "token"' end end raise ArgumentError, 'either :state or :store must be provided' unless @state || @store end
Public Instance Methods
# File lib/wcc/contentful/sync_engine.rb, line 102 def emit_event(event) type = event.dig('sys', 'type') raise ArgumentError, "Unknown event type #{event}" unless type.present? broadcast(type, event) end
# File lib/wcc/contentful/sync_engine.rb, line 109 def emit_sync_complete(events) event = WCC::Contentful::Event::SyncComplete.new(events, source: self) broadcast('SyncComplete', event) end
Gets the next increment of data from the Sync API. If the configured store responds to `:index`, that will be called with each item in the Sync response to update the store. If a block is passed, that block will be evaluated with each item in the response. @param [String] up_to_id An ID to look for in the response. The method returns
true if the ID was found or no up_to_id was given, false if the ID did not come back.
@return [Array] A `[Boolean, Integer]` tuple where the first value is whether the ID was found,
and the second value is the number of items returned.
# File lib/wcc/contentful/sync_engine.rb, line 73 def next(up_to_id: nil) id_found = up_to_id.nil? all_events = [] @mutex.synchronize do @state ||= read_state || token_wrapper_factory(nil) next_sync_token = @state['token'] sync_resp = client.sync(sync_token: next_sync_token) sync_resp.items.each do |item| id = item.dig('sys', 'id') id_found ||= id == up_to_id store.index(item) if store&.index? event = WCC::Contentful::Event.from_raw(item, source: self) yield(event) if block_given? emit_event(event) all_events << event end @state = @state.merge('token' => sync_resp.next_sync_token) write_state end emit_sync_complete(all_events) [id_found, all_events.length] end
# File lib/wcc/contentful/sync_engine.rb, line 37 def should_sync? store&.index? end
# File lib/wcc/contentful/sync_engine.rb, line 30 def state (@state&.dup || token_wrapper_factory(nil)).freeze end
Private Instance Methods
# File lib/wcc/contentful/sync_engine.rb, line 116 def read_state return unless found = store&.find(@state_key) # backwards compat - migrate existing state token_wrapper_factory(found) end
# File lib/wcc/contentful/sync_engine.rb, line 127 def token_wrapper_factory(state) state = { 'token' => state } unless state.is_a? Hash state.merge!('sys' => { 'id' => @state_key, 'type' => 'token' }) unless state['sys'] state end
# File lib/wcc/contentful/sync_engine.rb, line 123 def write_state store.index(@state) if store&.index? end