class Cadence::Client::ThriftClient

Constants

DEFAULT_OPTIONS
WORKFLOW_ID_REUSE_POLICY

Attributes

identity[R]
mutex[R]
options[R]
url[R]

Public Class Methods

new(host, port, identity, options = {}) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 20
def initialize(host, port, identity, options = {})
  @url = "http://#{host}:#{port}"
  @identity = identity
  @options = DEFAULT_OPTIONS.merge(options)
  @mutex = Mutex.new
end

Public Instance Methods

count_workflow_executions() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 299
def count_workflow_executions
  raise NotImplementedError
end
deprecate_domain(name:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 58
def deprecate_domain(name:)
  request = CadenceThrift::DeprecateDomainRequest.new(name: name)
  send_request('DeprecateDomain', request)
end
describe_domain(name:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 38
def describe_domain(name:)
  request = CadenceThrift::DescribeDomainRequest.new(name: name)
  send_request('DescribeDomain', request)
end
describe_task_list(domain:, task_list:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 330
def describe_task_list(domain:, task_list:)
  request = CadenceThrift::DescribeTaskListRequest.new(
    domain: domain,
    taskList: CadenceThrift::TaskList.new(
      name: task_list
    ),
    taskListType: CadenceThrift::TaskListType::Decision,
    includeTaskListStatus: true
  )
  send_request('DescribeTaskList', request)
end
describe_workflow_execution(domain:, workflow_id:, run_id:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 319
def describe_workflow_execution(domain:, workflow_id:, run_id:)
  request = CadenceThrift::DescribeWorkflowExecutionRequest.new(
    domain: domain,
    execution: CadenceThrift::WorkflowExecution.new(
      workflowId: workflow_id,
      runId: run_id
    )
  )
  send_request('DescribeWorkflowExecution', request)
end
get_search_attributes() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 303
def get_search_attributes
  raise NotImplementedError
end
get_workflow_execution_history(domain:, workflow_id:, run_id:, next_page_token: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 105
def get_workflow_execution_history(domain:, workflow_id:, run_id:, next_page_token: nil)
  request = CadenceThrift::GetWorkflowExecutionHistoryRequest.new(
    domain: domain,
    execution: CadenceThrift::WorkflowExecution.new(
      workflowId: workflow_id,
      runId: run_id
    ),
    nextPageToken: next_page_token
  )

  send_request('GetWorkflowExecutionHistory', request)
end
list_archived_workflow_executions() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 291
def list_archived_workflow_executions
  raise NotImplementedError
end
list_closed_workflow_executions() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 283
def list_closed_workflow_executions
  raise NotImplementedError
end
list_domains(page_size:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 43
def list_domains(page_size:)
  request = CadenceThrift::ListDomainsRequest.new(pageSize: page_size)
  send_request('ListDomains', request)
end
list_open_workflow_executions() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 279
def list_open_workflow_executions
  raise NotImplementedError
end
list_workflow_executions() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 287
def list_workflow_executions
  raise NotImplementedError
end
poll_for_activity_task(domain:, task_list:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 148
def poll_for_activity_task(domain:, task_list:)
  request = CadenceThrift::PollForActivityTaskRequest.new(
    identity: identity,
    domain: domain,
    taskList: CadenceThrift::TaskList.new(
      name: task_list
    )
  )
  send_request('PollForActivityTask', request)
end
poll_for_decision_task(domain:, task_list:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 118
def poll_for_decision_task(domain:, task_list:)
  request = CadenceThrift::PollForDecisionTaskRequest.new(
    identity: identity,
    domain: domain,
    taskList: CadenceThrift::TaskList.new(
      name: task_list
    )
  )
  send_request('PollForDecisionTask', request)
end
query_workflow() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 315
def query_workflow
  raise NotImplementedError
end
record_activity_task_heartbeat(task_token:, details: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 159
def record_activity_task_heartbeat(task_token:, details: nil)
  request = CadenceThrift::RecordActivityTaskHeartbeatRequest.new(
    taskToken: task_token,
    details: JSON.serialize(details),
    identity: identity
  )
  send_request('RecordActivityTaskHeartbeat', request)
end
record_activity_task_heartbeat_by_id() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 168
def record_activity_task_heartbeat_by_id
  raise NotImplementedError
end
register_domain(name:, description: nil, global: false, metrics: false, retention_period: 10) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 27
def register_domain(name:, description: nil, global: false, metrics: false, retention_period: 10)
  request = CadenceThrift::RegisterDomainRequest.new(
    name: name,
    description: description,
    emitMetric: metrics,
    isGlobalDomain: global,
    workflowExecutionRetentionPeriodInDays: retention_period
  )
  send_request('RegisterDomain', request)
end
request_cancel_workflow_execution() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 229
def request_cancel_workflow_execution
  raise NotImplementedError
end
reset_sticky_task_list() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 311
def reset_sticky_task_list
  raise NotImplementedError
end
reset_workflow_execution(domain:, workflow_id:, run_id:, reason:, decision_task_event_id:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 251
def reset_workflow_execution(domain:, workflow_id:, run_id:, reason:, decision_task_event_id:)
  request = CadenceThrift::ResetWorkflowExecutionRequest.new(
    domain: domain,
    workflowExecution: CadenceThrift::WorkflowExecution.new(
      workflowId: workflow_id,
      runId: run_id
    ),
    reason: reason,
    decisionFinishEventId: decision_task_event_id,
    requestId: SecureRandom.uuid
  )
  send_request('ResetWorkflowExecution', request)
end
respond_activity_task_canceled(task_token:, details: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 216
def respond_activity_task_canceled(task_token:, details: nil)
  request = CadenceThrift::RespondActivityTaskCanceledRequest.new(
    taskToken: task_token,
    details: JSON.serialize(details),
    identity: identity
  )
  send_request('RespondActivityTaskCanceled', request)
end
respond_activity_task_canceled_by_id() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 225
def respond_activity_task_canceled_by_id
  raise NotImplementedError
end
respond_activity_task_completed(task_token:, result:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 172
def respond_activity_task_completed(task_token:, result:)
  request = CadenceThrift::RespondActivityTaskCompletedRequest.new(
    identity: identity,
    taskToken: task_token,
    result: JSON.serialize(result)
  )
  send_request('RespondActivityTaskCompleted', request)
end
respond_activity_task_completed_by_id(domain:, activity_id:, workflow_id:, run_id:, result:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 181
def respond_activity_task_completed_by_id(domain:, activity_id:, workflow_id:, run_id:, result:)
  request = CadenceThrift::RespondActivityTaskCompletedByIDRequest.new(
    identity: identity,
    domain: domain,
    workflowID: workflow_id,
    runID: run_id,
    activityID: activity_id,
    result: JSON.serialize(result)
  )
  send_request('RespondActivityTaskCompletedByID', request)
end
respond_activity_task_failed(task_token:, reason:, details: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 193
def respond_activity_task_failed(task_token:, reason:, details: nil)
  request = CadenceThrift::RespondActivityTaskFailedRequest.new(
    identity: identity,
    taskToken: task_token,
    reason: reason,
    details: JSON.serialize(details)
  )
  send_request('RespondActivityTaskFailed', request)
end
respond_activity_task_failed_by_id(domain:, activity_id:, workflow_id:, run_id:, reason:, details: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 203
def respond_activity_task_failed_by_id(domain:, activity_id:, workflow_id:, run_id:, reason:, details: nil)
  request = CadenceThrift::RespondActivityTaskFailedByIDRequest.new(
    identity: identity,
    domain: domain,
    workflowID: workflow_id,
    runID: run_id,
    activityID: activity_id,
    reason: reason,
    details: JSON.serialize(details)
  )
  send_request('RespondActivityTaskFailedByID', request)
end
respond_decision_task_completed(task_token:, decisions:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 129
def respond_decision_task_completed(task_token:, decisions:)
  request = CadenceThrift::RespondDecisionTaskCompletedRequest.new(
    identity: identity,
    taskToken: task_token,
    decisions: Array(decisions)
  )
  send_request('RespondDecisionTaskCompleted', request)
end
respond_decision_task_failed(task_token:, cause:, details: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 138
def respond_decision_task_failed(task_token:, cause:, details: nil)
  request = CadenceThrift::RespondDecisionTaskFailedRequest.new(
    identity: identity,
    taskToken: task_token,
    cause: cause,
    details: JSON.serialize(details)
  )
  send_request('RespondDecisionTaskFailed', request)
end
respond_query_task_completed() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 307
def respond_query_task_completed
  raise NotImplementedError
end
scan_workflow_executions() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 295
def scan_workflow_executions
  raise NotImplementedError
end
signal_with_start_workflow_execution() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 247
def signal_with_start_workflow_execution
  raise NotImplementedError
end
signal_workflow_execution(domain:, workflow_id:, run_id:, signal:, input: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 233
def signal_workflow_execution(domain:, workflow_id:, run_id:, signal:, input: nil)
  request = CadenceThrift::SignalWorkflowExecutionRequest.new(
    domain: domain,
    workflowExecution: CadenceThrift::WorkflowExecution.new(
      workflowId: workflow_id,
      runId: run_id
    ),
    signalName: signal,
    input: JSON.serialize(input),
    identity: identity
  )
  send_request('SignalWorkflowExecution', request)
end
start_workflow_execution( domain:, workflow_id:, workflow_name:, task_list:, input: nil, execution_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil ) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 63
def start_workflow_execution(
  domain:,
  workflow_id:,
  workflow_name:,
  task_list:,
  input: nil,
  execution_timeout:,
  task_timeout:,
  workflow_id_reuse_policy: nil,
  headers: nil,
  cron_schedule: nil
)
  request = CadenceThrift::StartWorkflowExecutionRequest.new(
    identity: identity,
    domain: domain,
    workflowType: CadenceThrift::WorkflowType.new(
      name: workflow_name
    ),
    workflowId: workflow_id,
    taskList: CadenceThrift::TaskList.new(
      name: task_list
    ),
    input: JSON.serialize(input),
    executionStartToCloseTimeoutSeconds: execution_timeout,
    taskStartToCloseTimeoutSeconds: task_timeout,
    requestId: SecureRandom.uuid,
    header: CadenceThrift::Header.new(
      fields: headers
    ),
    cronSchedule: cron_schedule
  )

  if workflow_id_reuse_policy
    policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy]
    raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy

    request.workflowIdReusePolicy = policy
  end

  send_request('StartWorkflowExecution', request)
end
terminate_workflow_execution(domain:, workflow_id:, run_id:, reason:, details: nil) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 265
def terminate_workflow_execution(domain:, workflow_id:, run_id:, reason:, details: nil)
  request = CadenceThrift::TerminateWorkflowExecutionRequest.new(
    domain: domain,
    workflowExecution: CadenceThrift::WorkflowExecution.new(
      workflowId: workflow_id,
      runId: run_id
    ),
    reason: reason,
    details: JSON.serialize(details),
    identity: identity
  )
  send_request('TerminateWorkflowExecution', request)
end
update_domain(name:, description:) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 48
def update_domain(name:, description:)
  request = CadenceThrift::UpdateDomainRequest.new(
    name: name,
    updateInfo: CadenceThrift::UpdateDomainRequest.new(
      description: description
    )
  )
  send_request('UpdateDomain', request)
end

Private Instance Methods

connection() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 357
def connection
  @connection ||= begin
    protocol = Thrift::BinaryProtocol.new(transport)
    CadenceThrift::WorkflowService::Client.new(protocol)
  end
end
send_request(name, request) click to toggle source
# File lib/cadence/client/thrift_client.rb, line 364
def send_request(name, request)
  start_time = Time.now

  # synchronize these calls because transport headers are mutated
  result = mutex.synchronize do
    transport.add_headers 'Rpc-Procedure' => "WorkflowService::#{name}"
    connection.public_send(name, request)
  end

  time_diff_ms = ((Time.now - start_time) * 1000).round
  Cadence.metrics.timing('request.latency', time_diff_ms, request_name: name)

  result
end
transport() click to toggle source
# File lib/cadence/client/thrift_client.rb, line 346
def transport
  @transport ||= Thrift::HTTPClientTransport.new(url).tap do |http|
    http.add_headers(
      'Rpc-Caller' => 'ruby-client',
      'Rpc-Encoding' => 'thrift',
      'Rpc-Service' => 'cadence-proxy',
      'Context-TTL-MS' => (options[:polling_ttl] * 1_000).to_s
    )
  end
end