6 #include "NodeManagerZyre.hh" 7 #include "PublisherZmq.hh" 25 SPD_TRACE(
"### Destroy NodeZyre [{}] ###",
mpNodeInfo->name());
31 zsock_destroy(&pSocket);
46 SPD_TRACE(
"Salsa::NodeZyre::init()<-");
62 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"CONSUMER")
65 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"FEEDER") {
68 char * pPubUrl = getenv(
"SALSA_PUB_URL");
75 char * pTimeout = getenv(
"SALSA_FINISHED_JOB_TIMEOUT");
79 char * pCheckTimeout = getenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT");
86 f->nodeInfo()->set_hostname(
hostname());
90 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"WORKER") {
93 w->nodeInfo()->set_hostname(
hostname());
102 SPD_TRACE(
"Salsa::NodeZyre::init()->");
112 SPD_TRACE(
"Salsa::NodeZyre::exec()<-");
114 void * pPointer =
nullptr;
115 std::shared_ptr<SocketZyre> pSocket =
nullptr;
116 zsock_t * pZmqSock =
nullptr;
118 int64_t last_time = zclock_time();
121 SPD_TRACE(
"Actor::wait()");
128 SPD_TRACE(
"Signal from pipe={}", static_cast<void *>(
mpPipe));
134 SPD_TRACE(
"Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
135 static_cast<void *>(pZmqSocket));
136 if (pZmqSocket == pPointer) {
137 pZmqSock = pZmqSocket;
143 SPD_TRACE(
"HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
144 zmsg_t * pMsg = zmsg_recv(pZmqSock);
152 SPD_TRACE(
"Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
153 static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
154 if (pNet && pNet->socket() && pPointer == pNet->socket()) {
165 SPD_ERROR(
"Socket comming from unknown network : socket={}", pPointer);
171 Message * pMsg = pSocket->pull();
173 SPD_ERROR(
"Message from socket={} is null", pPointer);
178 SPD_TRACE(
"Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
179 static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), type);
181 bool doPublish =
true;
182 std::vector<std::string> values;
184 if (type == Message::ENTER) {
185 const char * pHeader =
186 zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(),
"X-SALSA-NODE-TYPE");
188 if (pHeader) snt = pHeader;
189 SPD_TRACE(
"[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid(), snt);
193 else if (type == Message::EXIT) {
194 SPD_TRACE(
"[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
198 else if (type == Message::EVASIVE) {
199 SPD_TRACE(
"[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
202 else if (type == Message::WHISPER) {
203 SPD_TRACE(
"[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
208 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
212 cur_time = zclock_time();
214 SPD_TRACE(
"Poller expired. Doing finished job cleaning ...");
216 last_time = zclock_time();
226 SPD_TRACE(
"Salsa::NodeZyre::exec()->");
236 SPD_TRACE(
"Salsa::NodeZyre::finish()<-");
238 SPD_TRACE(
"Salsa::NodeZyre::finish()->");
250 auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
252 pNode->parent(shared_from_this());
283 zframe_t * pID = zmsg_pop(pMsg);
284 char * pCmd = zmsg_popstr(pMsg);
285 if (!strcmp(pCmd,
"TASK")) {
288 if (getenv(
"SALSA_LOG_DIR")) logdir = getenv(
"SALSA_LOG_DIR");
291 char * pPayload_str = zmsg_popstr(pMsg);
292 TaskInfo * pTaskInfo =
nullptr;
293 while (pPayload_str) {
294 std::string payload = pPayload_str;
297 pTaskInfo =
new TaskInfo();
300 if (!pTaskInfo->ParseFromString(payload)) {
301 SPD_ERROR(
"Message does not contain ProtoBuf message!");
305 SPD_DEBUG(
"TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
306 if (task_count == 0) {
307 if (!logdir.empty()) {
308 logdir = fmt::format(
"{}/{}", logdir, pTaskInfo->jobid());
309 if (!zsys_file_exists(logdir.data())) zsys_dir_create(logdir.data());
312 if (pTaskInfo->logtargets_size() > 0 && pTaskInfo->logtargets()[0] ==
"default") {
314 pTaskInfo->add_logtargets(
315 fmt::format(
"file://{}/ndm-{:010d}.log", logdir.data(), pTaskInfo->taskid()));
319 pPayload_str = zmsg_popstr(pMsg);
343 zmsg_t * pMsgOut = zmsg_new();
344 zmsg_add(pMsgOut, pID);
345 zmsg_addstr(pMsgOut,
"");
346 zmsg_addstr(pMsgOut,
"TASK_ADDED");
347 zmsg_addstr(pMsgOut, fmt::format(
"{}", task_count).data());
348 zmsg_addstr(pMsgOut, fmt::format(
"{}", logdir).data());
349 zmsg_send(&pMsgOut, pSocket);
350 zmsg_destroy(&pMsgOut);
369 else if (!strcmp(pCmd,
"AUTH")) {
371 SPD_DEBUG(
"Checking AUTH ...");
372 zmsg_t * pMsgOut = zmsg_new();
373 zmsg_add(pMsgOut, pID);
374 zmsg_addstr(pMsgOut,
"");
375 zmsg_addstr(pMsgOut,
"AUTH");
376 zmsg_addstr(pMsgOut,
"OK");
379 rdr = zyre_uuid(
sockets()[0]->zyre());
380 zmsg_addstr(pMsgOut, rdr.data());
382 fmt::format(
"v{}.{}.{}-{}", SALSA_VERSION_MAJOR(SALSA_VERSION), SALSA_VERSION_MINOR(SALSA_VERSION),
383 SALSA_VERSION_PATCH(SALSA_VERSION), SALSA_VERSION_RELEASE)
386 zmsg_send(&pMsgOut, pSocket);
387 SPD_DEBUG(
"Sent AUTH OK {} ...", static_cast<void *>(pSocket));
391 else if (!strcmp(pCmd,
"JOB_DEL_ID")) {
392 char * pJobUUID_str = zmsg_popstr(pMsg);
393 std::string jobUUID = pJobUUID_str;
397 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
400 zmsg_t * pMsgOut = zmsg_new();
401 zmsg_add(pMsgOut, pID);
402 zmsg_addstr(pMsgOut,
"");
403 zmsg_addstr(pMsgOut, pCmd);
404 zmsg_addstr(pMsgOut,
"OK");
405 zmsg_send(&pMsgOut, pSocket);
406 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
408 else if (!strcmp(pCmd,
"JOB_DEL_FINISHED")) {
410 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
413 zmsg_t * pMsgOut = zmsg_new();
414 zmsg_add(pMsgOut, pID);
415 zmsg_addstr(pMsgOut,
"");
416 zmsg_addstr(pMsgOut, pCmd);
417 zmsg_addstr(pMsgOut,
"OK");
418 zmsg_send(&pMsgOut, pSocket);
419 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
421 else if (!strcmp(pCmd,
"JOB_DEL_ALL")) {
423 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
426 zmsg_t * pMsgOut = zmsg_new();
427 zmsg_add(pMsgOut, pID);
428 zmsg_addstr(pMsgOut,
"");
429 zmsg_addstr(pMsgOut, pCmd);
430 zmsg_addstr(pMsgOut,
"OK");
431 zmsg_send(&pMsgOut, pSocket);
432 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
434 else if (!strcmp(pCmd,
"WORKER_COUNT")) {
436 zmsg_t * pMsgOut = zmsg_new();
437 zmsg_add(pMsgOut, pID);
438 zmsg_addstr(pMsgOut,
"");
439 zmsg_addstr(pMsgOut, pCmd);
441 zmsg_send(&pMsgOut, pSocket);
442 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
445 zmsg_t * pMsgOut = zmsg_new();
446 zmsg_add(pMsgOut, pID);
447 zmsg_addstr(pMsgOut,
"");
448 zmsg_addstr(pMsgOut, pCmd);
449 zmsg_addstr(pMsgOut,
"FAILED");
450 zmsg_send(&pMsgOut, pSocket);
451 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
PollerZmq * mpPoller
Internal poller.
virtual int init()
First function.
virtual ~NodeZyre()
Destruct Zyre node.
virtual int init()
First function.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
int submitterSocketIndex() const
Returns submitter socket index.
virtual int exec()
Main function.
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
std::shared_ptr< Worker > worker(std::string uuid) const
virtual int finish()
Last function.
bool mTerminated
Flag if actor should be terminated.
virtual void terminateJob(std::string uuid)
int mJobCheckTimeout
Job check timeout.
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
std::string mJobInfoGroupName
JobInfo Group name.
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
ZeroMQ implementation of salsa actor class
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
virtual void publish(std::string id, bool force=false) const
int32_t nSlots(double mult=1.0) const
uint64_t finishedJobTimeout() const
Returns finished job timeout.
void addSocket(std::shared_ptr< SocketZyre > socket)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
zsock_t * mpPipe
Zmq pipe socket.
void print(std::string opt="") const
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
NodeInfo * mpNodeInfo
Node Info.
virtual void add(SocketZyre *pSocket)
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
std::string hostname() const
Returns node hostname.
std::string mSubmitClientUrl
Submit url for client.
virtual bool handleTaskPool(void *pPool)
virtual EEventType event() const =0
Returns node event type.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
std::vector< std::shared_ptr< SocketZyre > > sockets() const
virtual void publisher(Publisher *p)
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
virtual bool terminateFinishedJobs()
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
std::shared_ptr< Feeder > feeder(std::string uuid) const
void * submitterSocketID() const
Returns submitter socket identity.
EEventType
Node event type.
virtual void terminateAllJobs(bool finishedonly=false)
NodeManagerZyre * mpNodeManager
Job manager.