27 if (
pipe() ==
nullptr)
return false;
29 zmsg_t * pOutMsg = zmsg_new();
30 zmsg_addstrf(pOutMsg,
"%s",
mpTaskState->task()->data().c_str());
31 zmsg_addstrf(pOutMsg,
"%d",
mpTaskState->task()->clientid());
32 zmsg_addstrf(pOutMsg,
"%d",
mpTaskState->task()->groupid());
33 zmsg_addstr(pOutMsg, worker.c_str());
34 zmsg_addstr(pOutMsg, upstream.c_str());
35 zmsg_addstr(pOutMsg,
mpTaskState->task()->jobid().c_str());
36 for (
int iPos = 0; iPos <
mpTaskState->task()->logtargets_size(); iPos++) {
37 if (iPos == 0) zmsg_addstrf(pOutMsg,
"%s",
"logs");
38 zmsg_addstrf(pOutMsg,
"%s",
mpTaskState->task()->logtargets(iPos).c_str());
40 for (
int iPos = 0; iPos <
mpTaskState->task()->envs_size(); iPos++) {
41 if (iPos == 0) zmsg_addstrf(pOutMsg,
"%s",
"envs");
42 zmsg_addstrf(pOutMsg,
"%s",
mpTaskState->task()->envs(iPos).c_str());
45 zsock_send(
pipe(),
"m", pOutMsg);
46 zmsg_destroy(&pOutMsg);
64 zmsg_t * pMessage = zmsg_recv(
pipe());
65 if (zframe_streq(zmsg_first(pMessage),
"$PID")) {
66 char * pPidStr = zframe_strdup(zmsg_next(pMessage));
68 uint32_t pid =
static_cast<uint32_t
>(strtoul(pPidStr,
nullptr, 0));
73 SPD_DEBUG(
"JOB [{}:{}] PID [{}] started",
mpTaskState->task()->jobid(),
mpTaskState->task()->taskid(), pPidStr);
77 else if (zframe_streq(zmsg_first(pMessage),
"$EXIT")) {
78 char * pExitStatusStr = zframe_strdup(zmsg_next(pMessage));
79 uint32_t exitStatus =
static_cast<uint32_t
>(strtoul(pExitStatusStr,
nullptr, 0));
83 SPD_DEBUG(
"JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]",
mpTaskState->task()->jobid(),
92 char * pWkUUID = zframe_strdup(zmsg_next(pMessage));
93 extra.push_back(pWkUUID);
94 char * pUpstream = zframe_strdup(zmsg_next(pMessage));
95 extra.push_back(pUpstream);
100 zmsg_destroy(&pMessage);