6 #include "NodeManagerZyre.hh"
7 #include "PublisherZmq.hh"
9 using namespace fmt::literals;
12 NodeZyre::NodeZyre(std::string name)
27 SPD_TRACE(
"### Destroy NodeZyre [{}] ###",
mpNodeInfo->name());
33 zsock_destroy(&pSocket);
48 SPD_TRACE(
"Salsa::NodeZyre::init()<-");
62 char * pPubUrl = getenv(
"SALSA_PUB_URL");
70 char * pTimeout = getenv(
"SALSA_FINISHED_JOB_TIMEOUT");
74 char * pCheckTimeout = getenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT");
83 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"CONSUMER")
86 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"FEEDER") {
91 f->nodeInfo()->set_hostname(
hostname());
95 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"WORKER") {
98 w->nodeInfo()->set_hostname(
hostname());
107 SPD_TRACE(
"Salsa::NodeZyre::init()->");
117 SPD_TRACE(
"Salsa::NodeZyre::exec()<-");
119 void * pPointer =
nullptr;
120 std::shared_ptr<SocketZyre> pSocket =
nullptr;
121 zsock_t * pZmqSock =
nullptr;
123 int64_t last_time = zclock_time();
126 SPD_TRACE(
"Actor::wait()");
133 SPD_TRACE(
"Signal from pipe={}",
static_cast<void *
>(
mpPipe));
139 SPD_TRACE(
"Searching ZMQ inSocket=[{}] zmqSocket[{}]",
static_cast<void *
>(pPointer),
140 static_cast<void *
>(pZmqSocket));
141 if (pZmqSocket == pPointer) {
142 pZmqSock = pZmqSocket;
148 SPD_TRACE(
"HANDLING zmq socket [{}]",
static_cast<void *
>(pZmqSock));
149 zmsg_t * pMsg = zmsg_recv(pZmqSock);
157 SPD_TRACE(
"Searching ZYRE socket={} in net={} socket={}",
static_cast<void *
>(pPointer),
158 static_cast<void *
>(pNet.get()),
static_cast<void *
>(pNet->socket()));
159 if (pNet && pNet->socket() && pPointer == pNet->socket()) {
170 SPD_ERROR(
"Socket comming from unknown network : socket={}", pPointer);
176 Message * pMsg = pSocket->pull();
178 SPD_ERROR(
"Message from socket={} is null", pPointer);
183 SPD_TRACE(
"Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
184 static_cast<void *
>(pSocket.get()),
static_cast<void *
>(pMsg),
static_cast<int>(
type));
186 bool doPublish =
true;
187 bool forcePublish =
false;
188 bool cleanedJobs =
false;
189 std::vector<std::string> values;
191 if (
type == Message::ENTER) {
192 const char * pHeader =
193 zyre_event_header(
static_cast<MessageZyre *
>(pMsg)->zyreEvent(),
"X-SALSA-NODE-TYPE");
195 if (pHeader) snt = pHeader;
197 SPD_DEBUG(
"[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid(), snt);
201 else if (
type == Message::EXIT) {
202 SPD_TRACE(
"[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
206 else if (
type == Message::EVASIVE) {
207 SPD_TRACE(
"[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
210 else if (
type == Message::WHISPER) {
211 SPD_TRACE(
"[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
216 cur_time = zclock_time();
221 last_time = zclock_time();
225 SPD_TRACE(
"Trying to publishing from [{}] [force={}] [cleanedJobs={}] ...",
226 zyre_uuid(
sockets()[0]->zyre()), forcePublish, cleanedJobs);
228 if (wasPublished) last_time = zclock_time();
238 SPD_TRACE(
"Salsa::NodeZyre::exec()->");
248 SPD_TRACE(
"Salsa::NodeZyre::finish()<-");
250 SPD_TRACE(
"Salsa::NodeZyre::finish()->");
262 auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
264 pNode->parent(shared_from_this());
295 zframe_t * pID = zmsg_pop(pMsg);
296 char * pCmd = zmsg_popstr(pMsg);
297 if (!strcmp(pCmd,
"TASK")) {
300 if (getenv(
"SALSA_LOG_DIR")) logdir = getenv(
"SALSA_LOG_DIR");
303 char * pPayload_str = zmsg_popstr(pMsg);
304 TaskInfo * pTaskInfo =
nullptr;
305 while (pPayload_str) {
306 std::string payload = pPayload_str;
309 pTaskInfo =
new TaskInfo();
312 if (!pTaskInfo->ParseFromString(payload)) {
313 SPD_ERROR(
"Message does not contain ProtoBuf message!");
317 SPD_DEBUG(
"TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
318 if (task_count == 0) {
319 if (!logdir.empty()) {
320 logdir = fmt::format(
"{}/{}", logdir, pTaskInfo->jobid());
321 if (!zsys_file_exists(logdir.data())) zsys_dir_create(logdir.data());
324 if (pTaskInfo->logtargets_size() > 0 && pTaskInfo->logtargets()[0] ==
"default") {
326 pTaskInfo->add_logtargets(
327 fmt::format(
"file://{}/ndm-{:010d}.log", logdir.data(), pTaskInfo->taskid()));
331 pPayload_str = zmsg_popstr(pMsg);
355 zmsg_t * pMsgOut = zmsg_new();
356 zmsg_add(pMsgOut, pID);
357 zmsg_addstr(pMsgOut,
"");
358 zmsg_addstr(pMsgOut,
"TASK_ADDED");
359 zmsg_addstr(pMsgOut, fmt::format(
"{}", task_count).data());
360 zmsg_addstr(pMsgOut, fmt::format(
"{}", logdir).data());
361 zmsg_send(&pMsgOut, pSocket);
362 zmsg_destroy(&pMsgOut);
381 else if (!strcmp(pCmd,
"AUTH")) {
383 SPD_DEBUG(
"Checking AUTH ...");
384 zmsg_t * pMsgOut = zmsg_new();
385 zmsg_add(pMsgOut, pID);
386 zmsg_addstr(pMsgOut,
"");
387 zmsg_addstr(pMsgOut,
"AUTH");
388 zmsg_addstr(pMsgOut,
"OK");
391 rdr = zyre_uuid(
sockets()[0]->zyre());
392 zmsg_addstr(pMsgOut, rdr.data());
394 fmt::format(
"v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
395 salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK)
398 zmsg_send(&pMsgOut, pSocket);
399 SPD_DEBUG(
"Sent AUTH OK {} ...",
static_cast<void *
>(pSocket));
403 else if (!strcmp(pCmd,
"JOB_DEL_ID")) {
404 char * pJobUUID_str = zmsg_popstr(pMsg);
405 std::string jobUUID = pJobUUID_str;
409 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
412 zmsg_t * pMsgOut = zmsg_new();
413 zmsg_add(pMsgOut, pID);
414 zmsg_addstr(pMsgOut,
"");
415 zmsg_addstr(pMsgOut, pCmd);
416 zmsg_addstr(pMsgOut,
"OK");
417 zmsg_send(&pMsgOut, pSocket);
418 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
420 else if (!strcmp(pCmd,
"JOB_DEL_FINISHED")) {
422 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
425 zmsg_t * pMsgOut = zmsg_new();
426 zmsg_add(pMsgOut, pID);
427 zmsg_addstr(pMsgOut,
"");
428 zmsg_addstr(pMsgOut, pCmd);
429 zmsg_addstr(pMsgOut,
"OK");
430 zmsg_send(&pMsgOut, pSocket);
431 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
433 else if (!strcmp(pCmd,
"JOB_DEL_ALL")) {
435 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
438 zmsg_t * pMsgOut = zmsg_new();
439 zmsg_add(pMsgOut, pID);
440 zmsg_addstr(pMsgOut,
"");
441 zmsg_addstr(pMsgOut, pCmd);
442 zmsg_addstr(pMsgOut,
"OK");
443 zmsg_send(&pMsgOut, pSocket);
444 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
446 else if (!strcmp(pCmd,
"WORKER_COUNT")) {
448 zmsg_t * pMsgOut = zmsg_new();
449 zmsg_add(pMsgOut, pID);
450 zmsg_addstr(pMsgOut,
"");
451 zmsg_addstr(pMsgOut, pCmd);
453 zmsg_send(&pMsgOut, pSocket);
454 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
457 zmsg_t * pMsgOut = zmsg_new();
458 zmsg_add(pMsgOut, pID);
459 zmsg_addstr(pMsgOut,
"");
460 zmsg_addstr(pMsgOut, pCmd);
461 zmsg_addstr(pMsgOut,
"FAILED");
462 zmsg_send(&pMsgOut, pSocket);
463 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd,
static_cast<void *
>(pSocket));
ZeroMQ implementation of salsa actor class.
virtual int init()
First function.
bool mTerminated
Flag if actor should be terminated.
PollerZmq * mpPoller
Internal poller.
zsock_t * mpPipe
Zmq pipe socket.
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
void * submitterSocketID() const
Returns submitter socket identity.
int submitterSocketIndex() const
Returns submitter socket index.
Salsa zyre message class.
virtual std::string uuid() const =0
Returns node uuid.
virtual EEventType event() const =0
Returns node event type.
EEventType
Node event type.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
virtual bool handleTaskPool(void *pPool)
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual bool publish(std::string id, bool force=false) const
virtual bool terminateFinishedJobs()
std::shared_ptr< Feeder > feeder(std::string uuid) const
virtual void terminateAllJobs(bool finishedonly=false)
std::shared_ptr< Worker > worker(std::string uuid) const
virtual void publisher(Publisher *p)
Job * job(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual void terminateJob(std::string uuid)
void clusterAlias(std::string n)
Sets Cluster alias.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
int32_t nSlots(double mult=1.0) const
uint64_t finishedJobTimeout() const
Returns finished job timeout.
void print(std::string opt="") const
std::string type()
Returns type of current node.
std::string mJobInfoGroupName
JobInfo Group name.
int mJobCheckTimeout
Job check timeout.
void addSocket(std::shared_ptr< SocketZyre > socket)
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
std::vector< std::shared_ptr< SocketZyre > > sockets() const
virtual int init()
First function.
virtual int exec()
Main function.
std::string mType
Current node type.
std::string mSubmitClientUrl
Submit url for client.
virtual int finish()
Last function.
NodeManagerZyre * mpNodeManager
Job manager.
std::string mClusterAlias
Cluster alias.
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
virtual ~NodeZyre()
Destruct Zyre node.
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
std::string hostname() const
Returns node hostname.
NodeInfo * mpNodeInfo
Node Info.
virtual void add(SocketZyre *pSocket)