salsa 0.7.1
Loading...
Searching...
No Matches
TaskExecutorForkZmq.cc
1#include "TaskExecutorForkZmq.hh"
2#include "TaskState.hh"
3namespace Salsa {
4TaskExecutorForkZmq::TaskExecutorForkZmq(zactor_t * actor) : TaskExecutor(), mpZActor(actor)
5{
9}
11{
15 if (mpZActor) {
16 zactor_destroy(&mpZActor);
17 }
18}
19bool TaskExecutorForkZmq::run(std::string worker, std::string upstream)
20{
24
25 if (mpTaskState == nullptr) return false;
26
27 if (pipe() == nullptr) return false;
28
29 zmsg_t * pOutMsg = zmsg_new();
30 zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->data().c_str());
31 zmsg_addstrf(pOutMsg, "%d", mpTaskState->task()->clientid());
32 zmsg_addstrf(pOutMsg, "%d", mpTaskState->task()->groupid());
33 zmsg_addstr(pOutMsg, worker.c_str());
34 zmsg_addstr(pOutMsg, upstream.c_str());
35 zmsg_addstr(pOutMsg, mpTaskState->task()->jobid().c_str());
36 for (int iPos = 0; iPos < mpTaskState->task()->logtargets_size(); iPos++) {
37 if (iPos == 0) zmsg_addstrf(pOutMsg, "%s", "logs");
38 zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->logtargets(iPos).c_str());
39 }
40 for (int iPos = 0; iPos < mpTaskState->task()->envs_size(); iPos++) {
41 if (iPos == 0) zmsg_addstrf(pOutMsg, "%s", "envs");
42 zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->envs(iPos).c_str());
43 }
44
45 zsock_send(pipe(), "m", pOutMsg);
46 zmsg_destroy(&pOutMsg);
47
48 return true;
49}
51{
55 return mpZActor;
56}
57
58bool TaskExecutorForkZmq::handlePipe(std::vector<std::string> & extra)
59{
63
64 zmsg_t * pMessage = zmsg_recv(pipe());
65 if (zframe_streq(zmsg_first(pMessage), "$PID")) {
66 char * pPidStr = zframe_strdup(zmsg_next(pMessage));
67
68 uint32_t pid = static_cast<uint32_t>(strtoul(pPidStr, nullptr, 0));
69 mpTaskState->pid(pid);
70 // mpTaskState->state(TaskState::EState::running);
71 std::string payload;
72 mpTaskState->task()->SerializeToString(&payload);
73 SPD_DEBUG("JOB [{}:{}] PID [{}] started", mpTaskState->task()->jobid(), mpTaskState->task()->taskid(), pPidStr);
74
75 free(pPidStr);
76 }
77 else if (zframe_streq(zmsg_first(pMessage), "$EXIT")) {
78 char * pExitStatusStr = zframe_strdup(zmsg_next(pMessage));
79 uint32_t exitStatus = static_cast<uint32_t>(strtoul(pExitStatusStr, nullptr, 0));
80 free(pExitStatusStr);
81 mpTaskState->task()->set_returncode(exitStatus);
82
83 SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]", mpTaskState->task()->jobid(),
84 mpTaskState->task()->taskid(), mpTaskState->pid(), mpTaskState->task()->returncode(),
85 mpTaskState->state() == TaskState::killed);
86
87 // mpTaskState->state(TaskState::EState::idle);
88 // mpTaskState->pid(0);
89
90 // std::string payload;
91 // mpTaskState->task()->SerializeToString(&payload);
92 char * pWkUUID = zframe_strdup(zmsg_next(pMessage));
93 extra.push_back(pWkUUID);
94 char * pUpstream = zframe_strdup(zmsg_next(pMessage));
95 extra.push_back(pUpstream);
96
97 free(pWkUUID);
98 free(pUpstream);
99 }
100 zmsg_destroy(&pMessage);
101
102 return true;
103}
104} // namespace Salsa
virtual bool handlePipe(std::vector< std::string > &extra)
Handle pipe.
virtual void * pipe() const
virtual bool run(std::string worker, std::string upstream)
Run task.
TaskExecutorForkZmq(zactor_t *pActor=nullptr)
zactor_t * mpZActor
ZMQ Actor pointer.
Base TaskExecutor class.
TaskState * mpTaskState
Task state.
void state(EState s)
Definition TaskState.cc:36
TaskInfo * task() const
Definition TaskState.cc:66
void pid(uint32_t pid)
Definition TaskState.cc:51