salsa  0.7.1
NodeZyre.cc
1 #include <algorithm>
2 #include <cstdlib>
3 #include <vector>
4 
5 #include "Job.hh"
6 #include "NodeManagerZyre.hh"
7 #include "PublisherZmq.hh"
8 #include "NodeZyre.hh"
9 using namespace fmt::literals;
10 
11 namespace Salsa {
12 NodeZyre::NodeZyre(std::string name)
13  : Node(name)
14  , ActorZmq()
15 {
19 }
20 
22 {
26 
27  SPD_TRACE("### Destroy NodeZyre [{}] ###", mpNodeInfo->name());
28 
29  // clearing socket
30  mSockets.clear();
31 
32  for (auto pSocket : mZmqSockets) {
33  zsock_destroy(&pSocket);
34  }
35 
36  if (mpNodeManager) {
37  delete mpNodeManager;
38  mpNodeManager = nullptr;
39  }
40 }
41 
43 {
47 
48  SPD_TRACE("Salsa::NodeZyre::init()<-");
49 
52 
53  if (!mpPoller) {
54  return 1;
55  }
56 
57  if (mpNodeManager == nullptr) {
58  mpNodeManager = new NodeManagerZyre(this);
60 
61  if (!mType.compare("FEEDER") && !mpNodeManager->publisher()) {
62  char * pPubUrl = getenv("SALSA_PUB_URL");
63  if (pPubUrl) {
64  mJobInfoBrokerUrl = pPubUrl;
65  }
66  // SPD_INFO("JobInfo broker url [{}]", mJobInfoBrokerUrl);
68  SPD_INFO("[{}] will publish to url=[{}]", mType, mJobInfoBrokerUrl);
69  }
70  char * pTimeout = getenv("SALSA_FINISHED_JOB_TIMEOUT");
71  if (pTimeout) {
72  mpNodeManager->finishedJobTimeout(atol(pTimeout));
73  }
74  char * pCheckTimeout = getenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT");
75  if (pCheckTimeout) {
76  mJobCheckTimeout = atoi(pCheckTimeout);
77  }
78  }
79 
80  for (auto socket : mSockets) {
81  mpPoller->add(socket.get());
82 
83  if (socket->header("X-SALSA-NODE-TYPE") == "CONSUMER")
84  mpNodeManager->addConsumer(zyre_uuid(socket->zyre()), socket);
85 
86  if (socket->header("X-SALSA-NODE-TYPE") == "FEEDER") {
87  mpNodeManager->addFeeder(zyre_uuid(socket->zyre()), socket);
88 
89  auto f = mpNodeManager->feeder(zyre_uuid(socket->zyre()));
90  f->nodeInfo()->set_name(mJobInfoGroupName);
91  f->nodeInfo()->set_hostname(hostname());
92  f->nodeInfo()->set_submiturl(mSubmitClientUrl);
93  }
94 
95  if (socket->header("X-SALSA-NODE-TYPE") == "WORKER") {
96  mpNodeManager->addWorker(zyre_uuid(socket->zyre()), socket);
97  auto w = mpNodeManager->worker(zyre_uuid(socket->zyre()));
98  w->nodeInfo()->set_hostname(hostname());
99  }
100  }
101 
102  for (auto socket : mZmqSockets) {
103  mpPoller->add(socket);
104  }
105 
106  // mpNodeManager->print();
107  SPD_TRACE("Salsa::NodeZyre::init()->");
108  return 0;
109 }
110 
112 {
116 
117  SPD_TRACE("Salsa::NodeZyre::exec()<-");
118 
119  void * pPointer = nullptr;
120  std::shared_ptr<SocketZyre> pSocket = nullptr;
121  zsock_t * pZmqSock = nullptr;
122  int64_t cur_time;
123  int64_t last_time = zclock_time();
124 
125  while (!mTerminated && !Salsa::Actor::interrupted()) {
126  SPD_TRACE("Actor::wait()");
127  pPointer = ActorZmq::wait();
128  if (!pPointer) {
129  break;
130  }
131 
132  if (pPointer == mpPipe) {
133  SPD_TRACE("Signal from pipe={}", static_cast<void *>(mpPipe));
134  // We are not reacting to pipe events for now
135  continue;
136  }
137  else {
138  for (auto pZmqSocket : mZmqSockets) {
139  SPD_TRACE("Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
140  static_cast<void *>(pZmqSocket));
141  if (pZmqSocket == pPointer) {
142  pZmqSock = pZmqSocket;
143  break;
144  }
145  }
146 
147  if (pZmqSock) {
148  SPD_TRACE("HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
149  zmsg_t * pMsg = zmsg_recv(pZmqSock);
150  handleExternalZmq(pMsg, pZmqSock);
151  zmsg_destroy(&pMsg);
152  pZmqSock = nullptr;
153  continue;
154  }
155 
156  for (auto pNet : mSockets) {
157  SPD_TRACE("Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
158  static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
159  if (pNet && pNet->socket() && pPointer == pNet->socket()) {
160  pSocket = pNet;
161  break;
162  }
163  }
164 
165  if (!pSocket) {
166  if (mpNodeManager->handleTaskPool(pPointer)) continue;
167  }
168 
169  if (!pSocket) {
170  SPD_ERROR("Socket comming from unknown network : socket={}", pPointer);
171  continue;
172  }
173 
174  // ==================================================
175 
176  Message * pMsg = pSocket->pull();
177  if (!pMsg) {
178  SPD_ERROR("Message from socket={} is null", pPointer);
179  continue;
180  }
181 
182  Message::EEventType type = pMsg->event();
183  SPD_TRACE("Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
184  static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), static_cast<int>(type));
185 
186  bool doPublish = true;
187  bool forcePublish = false;
188  bool cleanedJobs = false;
189  std::vector<std::string> values;
190 
191  if (type == Message::ENTER) {
192  const char * pHeader =
193  zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(), "X-SALSA-NODE-TYPE");
194  std::string snt;
195  if (pHeader) snt = pHeader;
196 
197  SPD_DEBUG("[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid(), snt);
198  mpNodeManager->onEnter(zyre_uuid(pSocket->zyre()), snt, pMsg, values);
199  // doPublish = false;
200  }
201  else if (type == Message::EXIT) {
202  SPD_TRACE("[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
203  mpNodeManager->onExit(zyre_uuid(pSocket->zyre()), pMsg, values);
204  // doPublish = false;
205  }
206  else if (type == Message::EVASIVE) {
207  SPD_TRACE("[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
208  doPublish = false;
209  }
210  else if (type == Message::WHISPER) {
211  SPD_TRACE("[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
212  mpNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), pMsg, values);
213  // forcePublish = true;
214  }
215 
216  cur_time = zclock_time();
217  if ((cur_time - last_time) > mJobCheckTimeout) {
218  forcePublish = true;
219  doPublish = true;
220  cleanedJobs = mpNodeManager->terminateFinishedJobs();
221  last_time = zclock_time();
222  }
223 
224  if (doPublish) {
225  SPD_TRACE("Trying to publishing from [{}] [force={}] [cleanedJobs={}] ...",
226  zyre_uuid(sockets()[0]->zyre()), forcePublish, cleanedJobs);
227  bool wasPublished = mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), forcePublish);
228  if (wasPublished) last_time = zclock_time();
229  }
230 
231  delete pMsg;
232  pSocket = nullptr;
233  pZmqSock = nullptr;
234  }
235 
236  } // END WHILE not terminated
237 
238  SPD_TRACE("Salsa::NodeZyre::exec()->");
239  return 0;
240 }
241 
243 {
247 
248  SPD_TRACE("Salsa::NodeZyre::finish()<-");
249 
250  SPD_TRACE("Salsa::NodeZyre::finish()->");
251 
252  return 0;
253 }
254 
255 void NodeZyre::addSocket(std::shared_ptr<SocketZyre> socket)
256 {
260 
261  if (socket) {
262  auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
263 
264  pNode->parent(shared_from_this());
265  Node::add(pNode);
266  mSockets.push_back(socket);
267  }
268 }
269 
270 std::vector<std::shared_ptr<SocketZyre>> NodeZyre::sockets() const
271 {
275  return mSockets;
276 }
277 
278 void NodeZyre::addSocket(zsock_t * socket)
279 {
283  if (socket) {
284  mZmqSockets.push_back(socket);
285  }
286 }
287 
288 void NodeZyre::handleExternalZmq(zmsg_t * pMsg, zsock_t * pSocket)
289 {
294 
295  zframe_t * pID = zmsg_pop(pMsg);
296  char * pCmd = zmsg_popstr(pMsg);
297  if (!strcmp(pCmd, "TASK")) {
298 
299  std::string logdir;
300  if (getenv("SALSA_LOG_DIR")) logdir = getenv("SALSA_LOG_DIR");
301 
302  int task_count = 0;
303  char * pPayload_str = zmsg_popstr(pMsg);
304  TaskInfo * pTaskInfo = nullptr;
305  while (pPayload_str) {
306  std::string payload = pPayload_str;
307  free(pPayload_str);
308 
309  pTaskInfo = new TaskInfo();
310  {
311 
312  if (!pTaskInfo->ParseFromString(payload)) {
313  SPD_ERROR("Message does not contain ProtoBuf message!");
314  return;
315  }
316  }
317  SPD_DEBUG("TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
318  if (task_count == 0) {
319  if (!logdir.empty()) {
320  logdir = fmt::format("{}/{}", logdir, pTaskInfo->jobid());
321  if (!zsys_file_exists(logdir.data())) zsys_dir_create(logdir.data());
322  }
323  }
324  if (pTaskInfo->logtargets_size() > 0 && pTaskInfo->logtargets()[0] == "default") {
325  if (!logdir.empty())
326  pTaskInfo->add_logtargets(
327  fmt::format("file://{}/ndm-{:010d}.log", logdir.data(), pTaskInfo->taskid()));
328  }
329  mpNodeManager->addTask(pTaskInfo, "", "", Salsa::Job::pending);
330  task_count++;
331  pPayload_str = zmsg_popstr(pMsg);
332  }
333 
334  Job * job = mpNodeManager->job(pTaskInfo->jobid());
335  if (job) {
336  if (job->submitterSocketID() == nullptr) job->submitterSocketID(zframe_dup(pID));
338  if (mZmqSockets.size() == 1) {
339  job->submitterSocketIndex(0);
340  }
341  else {
342  int i = 0;
343  for (auto s : mZmqSockets) {
344  if (s == pSocket) {
345  job->submitterSocketIndex(i);
346  break;
347  }
348  i++;
349  }
350  }
351 
352  // SPD_INFO("{} {}", job->submitterSocketIndex(), job->submitterSocketID());
353  }
354 
355  zmsg_t * pMsgOut = zmsg_new();
356  zmsg_add(pMsgOut, pID);
357  zmsg_addstr(pMsgOut, "");
358  zmsg_addstr(pMsgOut, "TASK_ADDED");
359  zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
360  zmsg_addstr(pMsgOut, fmt::format("{}", logdir).data());
361  zmsg_send(&pMsgOut, pSocket);
362  zmsg_destroy(&pMsgOut);
363 
364  // zmsg_t * pMsgOut = zmsg_new();
365  // zframe_destroy(&pID);
366  // pID = static_cast<zframe_t *>(job->submitterSocketID());
367  // zmsg_add(pMsgOut, pID);
368  // zmsg_addstr(pMsgOut, "");
369  // zmsg_addstr(pMsgOut, "TASK_ADDED");
370  // zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
371  // // zmsg_send(&pMsgOut, pSocket);
372  // zmsg_send(&pMsgOut, mZmqSockets[job->submitterSocketIndex()]);
373  // zmsg_destroy(&pMsgOut);
374 
375  pID = nullptr;
376 
377  mpNodeManager->print();
378 
379  // delete pTaskInfo;
380  }
381  else if (!strcmp(pCmd, "AUTH")) {
382 
383  SPD_DEBUG("Checking AUTH ...");
384  zmsg_t * pMsgOut = zmsg_new();
385  zmsg_add(pMsgOut, pID);
386  zmsg_addstr(pMsgOut, "");
387  zmsg_addstr(pMsgOut, "AUTH");
388  zmsg_addstr(pMsgOut, "OK");
390  std::string rdr;
391  rdr = zyre_uuid(sockets()[0]->zyre());
392  zmsg_addstr(pMsgOut, rdr.data());
393  zmsg_addstr(pMsgOut,
394  fmt::format("v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
395  salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK)
396  .data());
397  zmsg_addstr(pMsgOut, mJobInfoClientUrl.data());
398  zmsg_send(&pMsgOut, pSocket);
399  SPD_DEBUG("Sent AUTH OK {} ...", static_cast<void *>(pSocket));
400  pID = nullptr;
401  }
402 
403  else if (!strcmp(pCmd, "JOB_DEL_ID")) {
404  char * pJobUUID_str = zmsg_popstr(pMsg);
405  std::string jobUUID = pJobUUID_str;
406  free(pJobUUID_str);
407 
408  mpNodeManager->terminateJob(jobUUID);
409  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
410  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
411 
412  zmsg_t * pMsgOut = zmsg_new();
413  zmsg_add(pMsgOut, pID);
414  zmsg_addstr(pMsgOut, "");
415  zmsg_addstr(pMsgOut, pCmd);
416  zmsg_addstr(pMsgOut, "OK");
417  zmsg_send(&pMsgOut, pSocket);
418  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
419  }
420  else if (!strcmp(pCmd, "JOB_DEL_FINISHED")) {
422  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
423  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
424 
425  zmsg_t * pMsgOut = zmsg_new();
426  zmsg_add(pMsgOut, pID);
427  zmsg_addstr(pMsgOut, "");
428  zmsg_addstr(pMsgOut, pCmd);
429  zmsg_addstr(pMsgOut, "OK");
430  zmsg_send(&pMsgOut, pSocket);
431  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
432  }
433  else if (!strcmp(pCmd, "JOB_DEL_ALL")) {
435  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
436  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
437 
438  zmsg_t * pMsgOut = zmsg_new();
439  zmsg_add(pMsgOut, pID);
440  zmsg_addstr(pMsgOut, "");
441  zmsg_addstr(pMsgOut, pCmd);
442  zmsg_addstr(pMsgOut, "OK");
443  zmsg_send(&pMsgOut, pSocket);
444  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
445  }
446  else if (!strcmp(pCmd, "WORKER_COUNT")) {
447 
448  zmsg_t * pMsgOut = zmsg_new();
449  zmsg_add(pMsgOut, pID);
450  zmsg_addstr(pMsgOut, "");
451  zmsg_addstr(pMsgOut, pCmd);
452  zmsg_addstr(pMsgOut, std::to_string(mpNodeManager->nSlots()).data());
453  zmsg_send(&pMsgOut, pSocket);
454  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
455  }
456  else {
457  zmsg_t * pMsgOut = zmsg_new();
458  zmsg_add(pMsgOut, pID);
459  zmsg_addstr(pMsgOut, "");
460  zmsg_addstr(pMsgOut, pCmd);
461  zmsg_addstr(pMsgOut, "FAILED");
462  zmsg_send(&pMsgOut, pSocket);
463  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
464  }
465  free(pCmd);
466  // if (pID) zframe_destroy(&pID);
467 }
468 
469 } // namespace Salsa
ZeroMQ implementation of salsa actor class.
Definition: ActorZmq.hh:19
virtual int init()
First function.
Definition: ActorZmq.cc:378
virtual void * wait()
Definition: ActorZmq.cc:426
bool mTerminated
Flag if actor should be terminated.
Definition: ActorZmq.hh:50
PollerZmq * mpPoller
Internal poller.
Definition: ActorZmq.hh:49
zsock_t * mpPipe
Zmq pipe socket.
Definition: ActorZmq.hh:48
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition: Actor.hh:35
Job class.
Definition: Job.hh:16
void * submitterSocketID() const
Returns submitter socket identity.
Definition: Job.hh:81
int submitterSocketIndex() const
Returns submitter socket index.
Definition: Job.hh:76
Salsa zyre message class.
Definition: MessageZyre.hh:16
Base Message class.
Definition: Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual EEventType event() const =0
Returns node event type.
EEventType
Node event type.
Definition: Message.hh:18
NodeManagerZyre class.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
virtual bool handleTaskPool(void *pPool)
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual bool publish(std::string id, bool force=false) const
Definition: NodeManager.cc:625
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:391
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:443
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:422
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:469
virtual void publisher(Publisher *p)
Definition: NodeManager.cc:609
Job * job(std::string uuid)
Definition: NodeManager.cc:482
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:66
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:362
void clusterAlias(std::string n)
Sets Cluster alias.
Definition: NodeManager.hh:86
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:228
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:75
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:57
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:541
uint64_t finishedJobTimeout() const
Returns finished job timeout.
Definition: NodeManager.hh:55
void print(std::string opt="") const
Definition: NodeManager.cc:32
std::string type()
Returns type of current node.
Definition: NodeZyre.hh:43
std::string mJobInfoGroupName
JobInfo Group name.
Definition: NodeZyre.hh:71
int mJobCheckTimeout
Job check timeout.
Definition: NodeZyre.hh:75
void addSocket(std::shared_ptr< SocketZyre > socket)
Definition: NodeZyre.cc:255
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
Definition: NodeZyre.hh:73
std::vector< std::shared_ptr< SocketZyre > > sockets() const
Definition: NodeZyre.cc:270
virtual int init()
First function.
Definition: NodeZyre.cc:42
virtual int exec()
Main function.
Definition: NodeZyre.cc:111
std::string mType
Current node type.
Definition: NodeZyre.hh:69
std::string mSubmitClientUrl
Submit url for client.
Definition: NodeZyre.hh:74
virtual int finish()
Last function.
Definition: NodeZyre.cc:242
NodeManagerZyre * mpNodeManager
Job manager.
Definition: NodeZyre.hh:68
std::string mClusterAlias
Cluster alias.
Definition: NodeZyre.hh:70
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
Definition: NodeZyre.hh:66
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
Definition: NodeZyre.cc:288
virtual ~NodeZyre()
Destruct Zyre node.
Definition: NodeZyre.cc:21
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
Definition: NodeZyre.hh:72
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
Definition: NodeZyre.hh:67
Base Node class.
Definition: Node.hh:23
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
Definition: Node.hh:53
std::string hostname() const
Returns node hostname.
Definition: Node.hh:37
NodeInfo * mpNodeInfo
Node Info.
Definition: Node.hh:72
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
Base PublisherZmq class.
Definition: PublisherZmq.hh:17