class WorkflowRb::WorkflowHost

Public Class Methods

new() click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 12
def initialize
  @persistence = MemoryPersistenceProvider.new
  @queue_provider = SingleNodeQueueProvider.new
  @lock_provider = SingleNodeLockProvider.new
  @registry = WorkflowRegistry.new
  @is_shutdown = true;
  @logger = Logger.new(STDOUT)
  @logger.level = Logger::WARN
  @thread_count = Etc.nprocessors
  @threads = []
  @poll_interval = 5
  @poll_tick = 0
end

Public Instance Methods

publish_event(event_name, event_key, event_data) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 104
def publish_event(event_name, event_key, event_data)
  if @is_shutdown
    raise Exception 'Host is not running'
  end
  @logger.info("Publishing event #{event_name} #{event_key}")
  subs = @persistence.get_subscriptions(event_name, event_key)
  subs.each do |sub|
    pub = EventPublication.new
    pub.id = SecureRandom.uuid
    pub.event_data = event_data
    pub.event_key = event_key
    pub.event_name = event_name
    pub.step_id = sub.step_id
    pub.workflow_id = sub.workflow_id
    @queue_provider.queue_for_publish(pub)
    @persistence.terminate_subscription(sub.id)
  end
end
register_workflow(workflow_class) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 34
def register_workflow(workflow_class)
  builder = WorkflowRb::WorkflowBuilder.new
  workflow_obj = workflow_class.new
  workflow_obj.build(builder)
  definition = builder.build(workflow_class::ID, workflow_class::VERSION, workflow_class::DATA_CLASS)
  @registry.register_workflow(definition)
end
resume_workflow(id) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 144
def resume_workflow(id)
  if @lock_provider.acquire_lock(id)
    begin
      workflow = @persistence.get_workflow_instance(id)
      if workflow.status == WorkflowStatus::SUSPENDED
        workflow.status = WorkflowStatus::RUNNABLE
        @persistence.persist_workflow(workflow)
        return true
      else
        return false
      end
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    ensure
      @lock_provider.release_lock(id)
    end
  else
    false
  end
end
start() click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 71
def start
  if (@is_shutdown)
    @is_shutdown = false;
    @logger.info('Starting worker thread pool')

    @thread_count.times do
      @threads << Thread.new { run_workflows }
    end

    @threads << Thread.new { run_publications }
    @threads << Thread.new { house_keeping }

  end
end
start_workflow(definition_id, version, data = nil) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 42
def start_workflow(definition_id, version, data = nil)
  wf_def = @registry.get_definition(definition_id, version)

  wf = WorkflowInstance.new
  wf.definition_id = definition_id
  wf.version = version
  wf.next_execution = Time.new
  wf.create_time = Time.new
  wf.status = WorkflowStatus::RUNNABLE

  if data
    wf.data = data
  else
    if wf_def.data_class
      wf.data = wf_def.data_class.new
    end
  end

  ep = ExecutionPointer.new
  ep.active = true
  ep.step_id = wf_def.initial_step
  ep.concurrent_fork = 1
  wf.execution_pointers << ep

  id = @persistence.create_new_workflow(wf)
  @queue_provider.queue_for_processing(id)
  id
end
stop() click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 86
def stop
  @is_shutdown = true;
  @logger.info('Stopping worker thread pool')
  @threads.each do |thread|
    thread.join(10)
  end
end
subscribe_event(workflow_id, step_id, event_name, event_key) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 94
def subscribe_event(workflow_id, step_id, event_name, event_key)
  @logger.info("Subscribing to event #{event_name} #{event_key} for workflow #{workflow_id} step #{step_id}")
  sub = EventSubscription.new
  sub.workflow_id = workflow_id
  sub.step_id = step_id
  sub.event_name = event_name
  sub.event_key = event_key
  @persistence.create_subscription(sub)
end
suspend_workflow(id) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 123
def suspend_workflow(id)
  if @lock_provider.acquire_lock(id)
    begin
      workflow = @persistence.get_workflow_instance(id)
      if workflow.status == WorkflowStatus::RUNNABLE
        workflow.status = WorkflowStatus::SUSPENDED
        @persistence.persist_workflow(workflow)
        return true
      else
        return false
      end
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    ensure
      @lock_provider.release_lock(id)
    end
  else
    false
  end
end
terminate_workflow(id) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 165
def terminate_workflow(id)
  if @lock_provider.acquire_lock(id)
    begin
      workflow = @persistence.get_workflow_instance(id)
      workflow.status = WorkflowStatus::TERMINATED
      @persistence.persist_workflow(workflow)
      return true
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    ensure
      @lock_provider.release_lock(id)
    end
  else
    false
  end
end
use_logger(logger) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 26
def use_logger(logger)
  @logger = logger
end
use_persistence(persistence) click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 30
def use_persistence(persistence)
  @persistence = persistence
end

Private Instance Methods

house_keeping() click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 257
def house_keeping
  while (!@is_shutdown)
    begin
      if (@poll_tick >= @poll_interval)
        @poll_tick = 0
        @logger.debug('Polling for runnable instances')
        @persistence.get_runnable_instances.each do |item|
          @queue_provider.queue_for_processing(item)
        end
      end
      @poll_tick += 1
      sleep(1)
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    end
  end
end
run_publications() click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 216
def run_publications
  while not @is_shutdown
    begin
      pub = @queue_provider.dequeue_for_publish
      if (pub)
        if @lock_provider.acquire_lock(pub.workflow_id)
          begin
            workflow = @persistence.get_workflow_instance(pub.workflow_id)
            pointers = workflow.execution_pointers.select { |ep| ep.event_name == pub.event_name and ep.event_key == pub.event_key and not ep.event_published}
            pointers.each do |pointer|
              pointer.event_data = pub.event_data
              pointer.event_published = true
              pointer.active = true
            end
            workflow.next_execution = Time.new
            @persistence.persist_workflow(workflow)
          rescue Exception => e
            @logger.error(e)
            @persistence.create_unpublished_event(pub)
          ensure
            @lock_provider.release_lock(pub.workflow_id)
            @queue_provider.queue_for_processing(pub.workflow_id)
          end

          if workflow.next_execution
            if (workflow.status == WorkflowStatus::RUNNABLE) and (workflow.next_execution <= Time.new)
              @queue_provider.queue_for_processing(pub.workflow_id)
            end
          end
        else
          @logger.info("Workflow #{workflow_id} is locked")
        end
      else
        sleep(0.5) #no work
      end
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    end
  end
end
run_workflows() click to toggle source
# File lib/workflow_rb/services/workflow_host.rb, line 184
def run_workflows
  executor = WorkflowExecutor.new(@registry, @persistence, self, @logger)
  while not @is_shutdown
    begin
      workflow_id = @queue_provider.dequeue_for_processing
      if (workflow_id)

        if @lock_provider.acquire_lock(workflow_id)
          begin
          workflow = @persistence.get_workflow_instance(workflow_id)
          executor.execute(workflow)
          ensure
            @lock_provider.release_lock(workflow_id)
          end

          if workflow.next_execution
            if (workflow.status == WorkflowStatus::RUNNABLE) and (workflow.next_execution <= Time.new)
              @queue_provider.queue_for_processing(workflow_id)
            end
          end
        else
          @logger.info("Workflow #{workflow_id} is locked")
        end
      else
        sleep(0.2) #no work
      end
    rescue Exception => e
      @logger.error("#{e.message} #{e.backtrace}")
    end
  end
end