48 SPD_TRACE(
"Salsa::NodeZyre::init()<-");
62 char * pPubUrl = getenv(
"SALSA_PUB_URL");
70 char * pTimeout = getenv(
"SALSA_FINISHED_JOB_TIMEOUT");
74 char * pCheckTimeout = getenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT");
83 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"CONSUMER")
84 mpNodeManager->addConsumer(zyre_uuid(socket->zyre()), socket);
86 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"FEEDER") {
91 f->nodeInfo()->set_hostname(
hostname());
95 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"WORKER") {
98 w->nodeInfo()->set_hostname(
hostname());
107 SPD_TRACE(
"Salsa::NodeZyre::init()->");
117 SPD_TRACE(
"Salsa::NodeZyre::exec()<-");
119 void * pPointer =
nullptr;
120 std::shared_ptr<SocketZyre> pSocket =
nullptr;
121 zsock_t * pZmqSock =
nullptr;
123 int64_t last_time = zclock_time();
126 SPD_TRACE(
"Actor::wait()");
133 SPD_TRACE(
"Signal from pipe={}",
static_cast<void *
>(
mpPipe));
139 SPD_TRACE(
"Searching ZMQ inSocket=[{}] zmqSocket[{}]",
static_cast<void *
>(pPointer),
140 static_cast<void *
>(pZmqSocket));
141 if (pZmqSocket == pPointer) {
142 pZmqSock = pZmqSocket;
148 SPD_TRACE(
"HANDLING zmq socket [{}]",
static_cast<void *
>(pZmqSock));
149 zmsg_t * pMsg = zmsg_recv(pZmqSock);
157 SPD_TRACE(
"Searching ZYRE socket={} in net={} socket={}",
static_cast<void *
>(pPointer),
158 static_cast<void *
>(pNet.get()),
static_cast<void *
>(pNet->socket()));
159 if (pNet && pNet->socket() && pPointer == pNet->socket()) {
170 SPD_ERROR(
"Socket comming from unknown network : socket={}", pPointer);
176 Message * pMsg = pSocket->pull();
178 SPD_ERROR(
"Message from socket={} is null", pPointer);
183 SPD_TRACE(
"Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
184 static_cast<void *
>(pSocket.get()),
static_cast<void *
>(pMsg),
static_cast<int>(
type));
186 bool doPublish =
true;
187 bool forcePublish =
false;
188 bool cleanedJobs =
false;
189 std::vector<std::string> values;
191 if (
type == Message::ENTER) {
192 const char * pHeader =
193 zyre_event_header(
static_cast<MessageZyre *
>(pMsg)->zyreEvent(),
"X-SALSA-NODE-TYPE");
195 if (pHeader) snt = pHeader;
197 SPD_DEBUG(
"[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid(), snt);
198 mpNodeManager->onEnter(zyre_uuid(pSocket->zyre()), snt, pMsg, values);
201 else if (
type == Message::EXIT) {
202 SPD_TRACE(
"[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
203 mpNodeManager->onExit(zyre_uuid(pSocket->zyre()), pMsg, values);
206 else if (
type == Message::EVASIVE) {
207 SPD_TRACE(
"[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
210 else if (
type == Message::WHISPER) {
211 SPD_TRACE(
"[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
212 mpNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), pMsg, values);
216 cur_time = zclock_time();
221 last_time = zclock_time();
225 SPD_TRACE(
"Trying to publishing from [{}] [force={}] [cleanedJobs={}] ...",
226 zyre_uuid(
sockets()[0]->zyre()), forcePublish, cleanedJobs);
228 if (wasPublished) last_time = zclock_time();
238 SPD_TRACE(
"Salsa::NodeZyre::exec()->");
295 zframe_t * pID = zmsg_pop(pMsg);
296 char * pCmd = zmsg_popstr(pMsg);
297 if (!strcmp(pCmd,
"TASK")) {
300 if (getenv(
"SALSA_LOG_DIR")) logdir = getenv(
"SALSA_LOG_DIR");
303 char * pPayload_str = zmsg_popstr(pMsg);
304 TaskInfo * pTaskInfo =
nullptr;
305 while (pPayload_str) {
306 std::string payload = pPayload_str;
309 pTaskInfo =
new TaskInfo();
312 if (!pTaskInfo->ParseFromString(payload)) {
313 SPD_ERROR(
"Message does not contain ProtoBuf message!");
317 SPD_DEBUG(
"TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
318 if (task_count == 0) {
319 if (!logdir.empty()) {
320 logdir = fmt::format(
"{}/{}", logdir, pTaskInfo->jobid());
321 if (!zsys_file_exists(logdir.data())) zsys_dir_create(logdir.data());
324 if (pTaskInfo->logtargets_size() > 0 && pTaskInfo->logtargets()[0] ==
"default") {
326 pTaskInfo->add_logtargets(
327 fmt::format(
"file://{}/ndm-{:010d}.log", logdir.data(), pTaskInfo->taskid()));
329 mpNodeManager->addTask(pTaskInfo,
"",
"", Salsa::Job::pending);
331 pPayload_str = zmsg_popstr(pMsg);
355 zmsg_t * pMsgOut = zmsg_new();
356 zmsg_add(pMsgOut, pID);
357 zmsg_addstr(pMsgOut,
"");
358 zmsg_addstr(pMsgOut,
"TASK_ADDED");
359 zmsg_addstr(pMsgOut, fmt::format(
"{}", task_count).data());
360 zmsg_addstr(pMsgOut, fmt::format(
"{}", logdir).data());
361 zmsg_send(&pMsgOut, pSocket);
362 zmsg_destroy(&pMsgOut);
381 else if (!strcmp(pCmd,
"AUTH")) {
383 SPD_DEBUG(
"Checking AUTH ...");
384 zmsg_t * pMsgOut = zmsg_new();
385 zmsg_add(pMsgOut, pID);
386 zmsg_addstr(pMsgOut,
"");
387 zmsg_addstr(pMsgOut,
"AUTH");
388 zmsg_addstr(pMsgOut,
"OK");
391 rdr = zyre_uuid(
sockets()[0]->zyre());
392 zmsg_addstr(pMsgOut, rdr.data());
394 fmt::format(
"v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
395 salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK)
398 zmsg_send(&pMsgOut, pSocket);
399 SPD_DEBUG(
"Sent AUTH OK {} ...",
static_cast<void *
>(pSocket));
403 else if (!strcmp(pCmd,
"JOB_DEL_ID")) {
404 char * pJobUUID_str = zmsg_popstr(pMsg);
405 std::string jobUUID = pJobUUID_str;
409 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
412 zmsg_t * pMsgOut = zmsg_new();
413 zmsg_add(pMsgOut, pID);
414 zmsg_addstr(pMsgOut,
"");
415 zmsg_addstr(pMsgOut, pCmd);
416 zmsg_addstr(pMsgOut,
"OK");
417 zmsg_send(&pMsgOut, pSocket);
418 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
420 else if (!strcmp(pCmd,
"JOB_DEL_FINISHED")) {
422 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
425 zmsg_t * pMsgOut = zmsg_new();
426 zmsg_add(pMsgOut, pID);
427 zmsg_addstr(pMsgOut,
"");
428 zmsg_addstr(pMsgOut, pCmd);
429 zmsg_addstr(pMsgOut,
"OK");
430 zmsg_send(&pMsgOut, pSocket);
431 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
433 else if (!strcmp(pCmd,
"JOB_DEL_ALL")) {
435 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
438 zmsg_t * pMsgOut = zmsg_new();
439 zmsg_add(pMsgOut, pID);
440 zmsg_addstr(pMsgOut,
"");
441 zmsg_addstr(pMsgOut, pCmd);
442 zmsg_addstr(pMsgOut,
"OK");
443 zmsg_send(&pMsgOut, pSocket);
444 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
446 else if (!strcmp(pCmd,
"WORKER_COUNT")) {
448 zmsg_t * pMsgOut = zmsg_new();
449 zmsg_add(pMsgOut, pID);
450 zmsg_addstr(pMsgOut,
"");
451 zmsg_addstr(pMsgOut, pCmd);
452 zmsg_addstr(pMsgOut, std::to_string(
mpNodeManager->nSlots()).data());
453 zmsg_send(&pMsgOut, pSocket);
454 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
457 zmsg_t * pMsgOut = zmsg_new();
458 zmsg_add(pMsgOut, pID);
459 zmsg_addstr(pMsgOut,
"");
460 zmsg_addstr(pMsgOut, pCmd);
461 zmsg_addstr(pMsgOut,
"FAILED");
462 zmsg_send(&pMsgOut, pSocket);
463 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));