salsa  0.4.15
TaskExecutorForkZmq.cc
1 #include "TaskExecutorForkZmq.hh"
2 #include "TaskState.hh"
3 namespace Salsa {
4 TaskExecutorForkZmq::TaskExecutorForkZmq(zactor_t * actor) : TaskExecutor(), mpZActor(actor)
5 {
9 }
11 {
15  if (mpZActor) {
16  zactor_destroy(&mpZActor);
17  }
18 }
19 bool TaskExecutorForkZmq::run(std::string worker, std::string upstream)
20 {
24 
25  if (mpTaskState == nullptr) return false;
26 
27  if (pipe() == nullptr) return false;
28 
29  zmsg_t * pOutMsg = zmsg_new();
30  zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->data().c_str());
31  zmsg_addstrf(pOutMsg, "%d", mpTaskState->task()->clientid());
32  zmsg_addstrf(pOutMsg, "%d", mpTaskState->task()->groupid());
33  zmsg_addstr(pOutMsg, worker.c_str());
34  zmsg_addstr(pOutMsg, upstream.c_str());
35  zmsg_addstr(pOutMsg, mpTaskState->task()->jobid().c_str());
36  for (int iPos = 0; iPos < mpTaskState->task()->logtargets_size(); iPos++) {
37  if (iPos == 0) zmsg_addstrf(pOutMsg, "%s", "logs");
38  zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->logtargets(iPos).c_str());
39  }
40  for (int iPos = 0; iPos < mpTaskState->task()->envs_size(); iPos++) {
41  if (iPos == 0) zmsg_addstrf(pOutMsg, "%s", "envs");
42  zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->envs(iPos).c_str());
43  }
44 
45  zsock_send(pipe(), "m", pOutMsg);
46  zmsg_destroy(&pOutMsg);
47 
48  return true;
49 }
51 {
55  return mpZActor;
56 }
57 
58 bool TaskExecutorForkZmq::handlePipe(std::vector<std::string> & extra)
59 {
63 
64  zmsg_t * pMessage = zmsg_recv(pipe());
65  if (zframe_streq(zmsg_first(pMessage), "$PID")) {
66  char * pPidStr = zframe_strdup(zmsg_next(pMessage));
67 
68  uint32_t pid = static_cast<uint32_t>(strtoul(pPidStr, nullptr, 0));
69  mpTaskState->pid(pid);
70  // mpTaskState->state(TaskState::EState::running);
71  std::string payload;
72  mpTaskState->task()->SerializeToString(&payload);
73  SPD_DEBUG("JOB [{}:{}] PID [{}] started", mpTaskState->task()->jobid(), mpTaskState->task()->taskid(), pPidStr);
74 
75  free(pPidStr);
76  }
77  else if (zframe_streq(zmsg_first(pMessage), "$EXIT")) {
78  char * pExitStatusStr = zframe_strdup(zmsg_next(pMessage));
79  uint32_t exitStatus = static_cast<uint32_t>(strtoul(pExitStatusStr, nullptr, 0));
80  free(pExitStatusStr);
81  mpTaskState->task()->set_returncode(exitStatus);
82 
83  SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]", mpTaskState->task()->jobid(),
84  mpTaskState->task()->taskid(), mpTaskState->pid(), mpTaskState->task()->returncode(),
85  mpTaskState->state() == TaskState::killed);
86 
87  // mpTaskState->state(TaskState::EState::idle);
88  // mpTaskState->pid(0);
89 
90  // std::string payload;
91  // mpTaskState->task()->SerializeToString(&payload);
92  char * pWkUUID = zframe_strdup(zmsg_next(pMessage));
93  extra.push_back(pWkUUID);
94  char * pUpstream = zframe_strdup(zmsg_next(pMessage));
95  extra.push_back(pUpstream);
96 
97  free(pWkUUID);
98  free(pUpstream);
99  }
100  zmsg_destroy(&pMessage);
101 
102  return true;
103 }
104 } // namespace Salsa
virtual bool handlePipe(std::vector< std::string > &extra)
Handle pipe.
virtual void * pipe() const
virtual bool run(std::string worker, std::string upstream)
Run task.
TaskExecutorForkZmq(zactor_t *pActor=nullptr)
zactor_t * mpZActor
ZMQ Actor pointer.
Base TaskExecutor class.
Definition: TaskExecutor.hh:14
TaskState * mpTaskState
Task state.
Definition: TaskExecutor.hh:31
void state(EState s)
Definition: TaskState.cc:36
TaskInfo * task() const
Definition: TaskState.cc:66
void pid(uint32_t pid)
Definition: TaskState.cc:51