class Cadence::Client::ThriftClient
Constants
- DEFAULT_OPTIONS
- WORKFLOW_ID_REUSE_POLICY
Attributes
identity[R]
mutex[R]
options[R]
url[R]
Public Class Methods
new(host, port, identity, options = {})
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 20 def initialize(host, port, identity, options = {}) @url = "http://#{host}:#{port}" @identity = identity @options = DEFAULT_OPTIONS.merge(options) @mutex = Mutex.new end
Public Instance Methods
count_workflow_executions()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 299 def count_workflow_executions raise NotImplementedError end
deprecate_domain(name:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 58 def deprecate_domain(name:) request = CadenceThrift::DeprecateDomainRequest.new(name: name) send_request('DeprecateDomain', request) end
describe_domain(name:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 38 def describe_domain(name:) request = CadenceThrift::DescribeDomainRequest.new(name: name) send_request('DescribeDomain', request) end
describe_task_list(domain:, task_list:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 330 def describe_task_list(domain:, task_list:) request = CadenceThrift::DescribeTaskListRequest.new( domain: domain, taskList: CadenceThrift::TaskList.new( name: task_list ), taskListType: CadenceThrift::TaskListType::Decision, includeTaskListStatus: true ) send_request('DescribeTaskList', request) end
describe_workflow_execution(domain:, workflow_id:, run_id:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 319 def describe_workflow_execution(domain:, workflow_id:, run_id:) request = CadenceThrift::DescribeWorkflowExecutionRequest.new( domain: domain, execution: CadenceThrift::WorkflowExecution.new( workflowId: workflow_id, runId: run_id ) ) send_request('DescribeWorkflowExecution', request) end
get_search_attributes()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 303 def get_search_attributes raise NotImplementedError end
get_workflow_execution_history(domain:, workflow_id:, run_id:, next_page_token: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 105 def get_workflow_execution_history(domain:, workflow_id:, run_id:, next_page_token: nil) request = CadenceThrift::GetWorkflowExecutionHistoryRequest.new( domain: domain, execution: CadenceThrift::WorkflowExecution.new( workflowId: workflow_id, runId: run_id ), nextPageToken: next_page_token ) send_request('GetWorkflowExecutionHistory', request) end
list_archived_workflow_executions()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 291 def list_archived_workflow_executions raise NotImplementedError end
list_closed_workflow_executions()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 283 def list_closed_workflow_executions raise NotImplementedError end
list_domains(page_size:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 43 def list_domains(page_size:) request = CadenceThrift::ListDomainsRequest.new(pageSize: page_size) send_request('ListDomains', request) end
list_open_workflow_executions()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 279 def list_open_workflow_executions raise NotImplementedError end
list_workflow_executions()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 287 def list_workflow_executions raise NotImplementedError end
poll_for_activity_task(domain:, task_list:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 148 def poll_for_activity_task(domain:, task_list:) request = CadenceThrift::PollForActivityTaskRequest.new( identity: identity, domain: domain, taskList: CadenceThrift::TaskList.new( name: task_list ) ) send_request('PollForActivityTask', request) end
poll_for_decision_task(domain:, task_list:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 118 def poll_for_decision_task(domain:, task_list:) request = CadenceThrift::PollForDecisionTaskRequest.new( identity: identity, domain: domain, taskList: CadenceThrift::TaskList.new( name: task_list ) ) send_request('PollForDecisionTask', request) end
query_workflow()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 315 def query_workflow raise NotImplementedError end
record_activity_task_heartbeat(task_token:, details: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 159 def record_activity_task_heartbeat(task_token:, details: nil) request = CadenceThrift::RecordActivityTaskHeartbeatRequest.new( taskToken: task_token, details: JSON.serialize(details), identity: identity ) send_request('RecordActivityTaskHeartbeat', request) end
record_activity_task_heartbeat_by_id()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 168 def record_activity_task_heartbeat_by_id raise NotImplementedError end
register_domain(name:, description: nil, global: false, metrics: false, retention_period: 10)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 27 def register_domain(name:, description: nil, global: false, metrics: false, retention_period: 10) request = CadenceThrift::RegisterDomainRequest.new( name: name, description: description, emitMetric: metrics, isGlobalDomain: global, workflowExecutionRetentionPeriodInDays: retention_period ) send_request('RegisterDomain', request) end
request_cancel_workflow_execution()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 229 def request_cancel_workflow_execution raise NotImplementedError end
reset_sticky_task_list()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 311 def reset_sticky_task_list raise NotImplementedError end
reset_workflow_execution(domain:, workflow_id:, run_id:, reason:, decision_task_event_id:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 251 def reset_workflow_execution(domain:, workflow_id:, run_id:, reason:, decision_task_event_id:) request = CadenceThrift::ResetWorkflowExecutionRequest.new( domain: domain, workflowExecution: CadenceThrift::WorkflowExecution.new( workflowId: workflow_id, runId: run_id ), reason: reason, decisionFinishEventId: decision_task_event_id, requestId: SecureRandom.uuid ) send_request('ResetWorkflowExecution', request) end
respond_activity_task_canceled(task_token:, details: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 216 def respond_activity_task_canceled(task_token:, details: nil) request = CadenceThrift::RespondActivityTaskCanceledRequest.new( taskToken: task_token, details: JSON.serialize(details), identity: identity ) send_request('RespondActivityTaskCanceled', request) end
respond_activity_task_canceled_by_id()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 225 def respond_activity_task_canceled_by_id raise NotImplementedError end
respond_activity_task_completed(task_token:, result:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 172 def respond_activity_task_completed(task_token:, result:) request = CadenceThrift::RespondActivityTaskCompletedRequest.new( identity: identity, taskToken: task_token, result: JSON.serialize(result) ) send_request('RespondActivityTaskCompleted', request) end
respond_activity_task_completed_by_id(domain:, activity_id:, workflow_id:, run_id:, result:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 181 def respond_activity_task_completed_by_id(domain:, activity_id:, workflow_id:, run_id:, result:) request = CadenceThrift::RespondActivityTaskCompletedByIDRequest.new( identity: identity, domain: domain, workflowID: workflow_id, runID: run_id, activityID: activity_id, result: JSON.serialize(result) ) send_request('RespondActivityTaskCompletedByID', request) end
respond_activity_task_failed(task_token:, reason:, details: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 193 def respond_activity_task_failed(task_token:, reason:, details: nil) request = CadenceThrift::RespondActivityTaskFailedRequest.new( identity: identity, taskToken: task_token, reason: reason, details: JSON.serialize(details) ) send_request('RespondActivityTaskFailed', request) end
respond_activity_task_failed_by_id(domain:, activity_id:, workflow_id:, run_id:, reason:, details: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 203 def respond_activity_task_failed_by_id(domain:, activity_id:, workflow_id:, run_id:, reason:, details: nil) request = CadenceThrift::RespondActivityTaskFailedByIDRequest.new( identity: identity, domain: domain, workflowID: workflow_id, runID: run_id, activityID: activity_id, reason: reason, details: JSON.serialize(details) ) send_request('RespondActivityTaskFailedByID', request) end
respond_decision_task_completed(task_token:, decisions:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 129 def respond_decision_task_completed(task_token:, decisions:) request = CadenceThrift::RespondDecisionTaskCompletedRequest.new( identity: identity, taskToken: task_token, decisions: Array(decisions) ) send_request('RespondDecisionTaskCompleted', request) end
respond_decision_task_failed(task_token:, cause:, details: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 138 def respond_decision_task_failed(task_token:, cause:, details: nil) request = CadenceThrift::RespondDecisionTaskFailedRequest.new( identity: identity, taskToken: task_token, cause: cause, details: JSON.serialize(details) ) send_request('RespondDecisionTaskFailed', request) end
respond_query_task_completed()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 307 def respond_query_task_completed raise NotImplementedError end
scan_workflow_executions()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 295 def scan_workflow_executions raise NotImplementedError end
signal_with_start_workflow_execution()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 247 def signal_with_start_workflow_execution raise NotImplementedError end
signal_workflow_execution(domain:, workflow_id:, run_id:, signal:, input: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 233 def signal_workflow_execution(domain:, workflow_id:, run_id:, signal:, input: nil) request = CadenceThrift::SignalWorkflowExecutionRequest.new( domain: domain, workflowExecution: CadenceThrift::WorkflowExecution.new( workflowId: workflow_id, runId: run_id ), signalName: signal, input: JSON.serialize(input), identity: identity ) send_request('SignalWorkflowExecution', request) end
start_workflow_execution( domain:, workflow_id:, workflow_name:, task_list:, input: nil, execution_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil )
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 63 def start_workflow_execution( domain:, workflow_id:, workflow_name:, task_list:, input: nil, execution_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil ) request = CadenceThrift::StartWorkflowExecutionRequest.new( identity: identity, domain: domain, workflowType: CadenceThrift::WorkflowType.new( name: workflow_name ), workflowId: workflow_id, taskList: CadenceThrift::TaskList.new( name: task_list ), input: JSON.serialize(input), executionStartToCloseTimeoutSeconds: execution_timeout, taskStartToCloseTimeoutSeconds: task_timeout, requestId: SecureRandom.uuid, header: CadenceThrift::Header.new( fields: headers ), cronSchedule: cron_schedule ) if workflow_id_reuse_policy policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy] raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy request.workflowIdReusePolicy = policy end send_request('StartWorkflowExecution', request) end
terminate_workflow_execution(domain:, workflow_id:, run_id:, reason:, details: nil)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 265 def terminate_workflow_execution(domain:, workflow_id:, run_id:, reason:, details: nil) request = CadenceThrift::TerminateWorkflowExecutionRequest.new( domain: domain, workflowExecution: CadenceThrift::WorkflowExecution.new( workflowId: workflow_id, runId: run_id ), reason: reason, details: JSON.serialize(details), identity: identity ) send_request('TerminateWorkflowExecution', request) end
update_domain(name:, description:)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 48 def update_domain(name:, description:) request = CadenceThrift::UpdateDomainRequest.new( name: name, updateInfo: CadenceThrift::UpdateDomainRequest.new( description: description ) ) send_request('UpdateDomain', request) end
Private Instance Methods
connection()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 357 def connection @connection ||= begin protocol = Thrift::BinaryProtocol.new(transport) CadenceThrift::WorkflowService::Client.new(protocol) end end
send_request(name, request)
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 364 def send_request(name, request) start_time = Time.now # synchronize these calls because transport headers are mutated result = mutex.synchronize do transport.add_headers 'Rpc-Procedure' => "WorkflowService::#{name}" connection.public_send(name, request) end time_diff_ms = ((Time.now - start_time) * 1000).round Cadence.metrics.timing('request.latency', time_diff_ms, request_name: name) result end
transport()
click to toggle source
# File lib/cadence/client/thrift_client.rb, line 346 def transport @transport ||= Thrift::HTTPClientTransport.new(url).tap do |http| http.add_headers( 'Rpc-Caller' => 'ruby-client', 'Rpc-Encoding' => 'thrift', 'Rpc-Service' => 'cadence-proxy', 'Context-TTL-MS' => (options[:polling_ttl] * 1_000).to_s ) end end