salsa  0.4.0
 All Classes Functions Variables Typedefs Enumerations Pages
NodeZyre.cc
1 #include <algorithm>
2 #include <cstdlib>
3 #include <vector>
4 
5 #include "Job.hh"
6 #include "NodeManagerZyre.hh"
7 #include "PublisherZmq.hh"
8 #include "NodeZyre.hh"
9 using namespace fmt::literals;
10 
11 namespace Salsa {
12 NodeZyre::NodeZyre(std::string name) : Node(name), ActorZmq()
13 {
17 }
18 
20 {
24 
25  SPD_TRACE("### Destroy NodeZyre [{}] ###", mpNodeInfo->name());
26 
27  // clearing socket
28  mSockets.clear();
29 
30  for (auto pSocket : mZmqSockets) {
31  zsock_destroy(&pSocket);
32  }
33 
34  if (mpNodeManager) {
35  delete mpNodeManager;
36  mpNodeManager = nullptr;
37  }
38 }
39 
41 {
45 
46  SPD_TRACE("Salsa::NodeZyre::init()<-");
47 
50 
51  if (!mpPoller) {
52  return 1;
53  }
54 
55  if (mpNodeManager == nullptr) {
56  mpNodeManager = new NodeManagerZyre(this);
57  }
58 
59  for (auto socket : mSockets) {
60  mpPoller->add(socket.get());
61 
62  if (socket->header("X-SALSA-NODE-TYPE") == "CONSUMER")
63  mpNodeManager->addConsumer(zyre_uuid(socket->zyre()), socket);
64 
65  if (socket->header("X-SALSA-NODE-TYPE") == "FEEDER") {
66  mpNodeManager->addFeeder(zyre_uuid(socket->zyre()), socket);
67  if (!mpNodeManager->publisher()) {
68  char * pPubUrl = getenv("SALSA_PUB_URL");
69  if (pPubUrl) {
70  mJobInfoBrokerUrl = pPubUrl;
71  }
72  SPD_INFO("JobInfo broker url [{}]", mJobInfoBrokerUrl);
74  }
75  char * pTimeout = getenv("SALSA_FINISHED_JOB_TIMEOUT");
76  if (pTimeout) {
77  mpNodeManager->finishedJobTimeout(atol(pTimeout));
78  }
79  char * pCheckTimeout = getenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT");
80  if (pCheckTimeout) {
81  mJobCheckTimeout = atoi(pCheckTimeout);
82  }
83 
84  auto f = mpNodeManager->feeder(zyre_uuid(socket->zyre()));
85  f->nodeInfo()->set_name(mJobInfoGroupName);
86  f->nodeInfo()->set_hostname(hostname());
87  f->nodeInfo()->set_submiturl(mSubmitClientUrl);
88  }
89 
90  if (socket->header("X-SALSA-NODE-TYPE") == "WORKER") {
91  mpNodeManager->addWorker(zyre_uuid(socket->zyre()), socket);
92  auto w = mpNodeManager->worker(zyre_uuid(socket->zyre()));
93  w->nodeInfo()->set_hostname(hostname());
94  }
95  }
96 
97  for (auto socket : mZmqSockets) {
98  mpPoller->add(socket);
99  }
100 
101  // mpNodeManager->print();
102  SPD_TRACE("Salsa::NodeZyre::init()->");
103  return 0;
104 }
105 
107 {
111 
112  SPD_TRACE("Salsa::NodeZyre::exec()<-");
113 
114  void * pPointer = nullptr;
115  std::shared_ptr<SocketZyre> pSocket = nullptr;
116  zsock_t * pZmqSock = nullptr;
117  int64_t cur_time;
118  int64_t last_time = zclock_time();
119 
120  while (!mTerminated && !Salsa::Actor::interrupted()) {
121  SPD_TRACE("Actor::wait()");
122  pPointer = ActorZmq::wait();
123  if (!pPointer) {
124  break;
125  }
126 
127  if (pPointer == mpPipe) {
128  SPD_TRACE("Signal from pipe={}", static_cast<void *>(mpPipe));
129  // We are not reacting to pipe events for now
130  continue;
131  }
132  else {
133  for (auto pZmqSocket : mZmqSockets) {
134  SPD_TRACE("Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
135  static_cast<void *>(pZmqSocket));
136  if (pZmqSocket == pPointer) {
137  pZmqSock = pZmqSocket;
138  break;
139  }
140  }
141 
142  if (pZmqSock) {
143  SPD_TRACE("HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
144  zmsg_t * pMsg = zmsg_recv(pZmqSock);
145  handleExternalZmq(pMsg, pZmqSock);
146  zmsg_destroy(&pMsg);
147  pZmqSock = nullptr;
148  continue;
149  }
150 
151  for (auto pNet : mSockets) {
152  SPD_TRACE("Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
153  static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
154  if (pNet && pNet->socket() && pPointer == pNet->socket()) {
155  pSocket = pNet;
156  break;
157  }
158  }
159 
160  if (!pSocket) {
161  if (mpNodeManager->handleTaskPool(pPointer)) continue;
162  }
163 
164  if (!pSocket) {
165  SPD_ERROR("Socket comming from unknown network : socket={}", pPointer);
166  continue;
167  }
168 
169  // ==================================================
170 
171  Message * pMsg = pSocket->pull();
172  if (!pMsg) {
173  SPD_ERROR("Message from socket={} is null", pPointer);
174  continue;
175  }
176 
177  Message::EEventType type = pMsg->event();
178  SPD_TRACE("Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
179  static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), type);
180 
181  bool doPublish = true;
182  std::vector<std::string> values;
183 
184  if (type == Message::ENTER) {
185  const char * pHeader =
186  zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(), "X-SALSA-NODE-TYPE");
187  std::string snt;
188  if (pHeader) snt = pHeader;
189  SPD_TRACE("[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid(), snt);
190  mpNodeManager->onEnter(zyre_uuid(pSocket->zyre()), snt, pMsg, values);
191  // doPublish = false;
192  }
193  else if (type == Message::EXIT) {
194  SPD_TRACE("[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
195  mpNodeManager->onExit(zyre_uuid(pSocket->zyre()), pMsg, values);
196  // doPublish = false;
197  }
198  else if (type == Message::EVASIVE) {
199  SPD_TRACE("[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
200  doPublish = false;
201  }
202  else if (type == Message::WHISPER) {
203  SPD_TRACE("[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
204  mpNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), pMsg, values);
205  }
206 
207  if (doPublish) {
208  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
209  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
210  }
211 
212  cur_time = zclock_time();
213  if ((cur_time - last_time) > mJobCheckTimeout) {
214  SPD_TRACE("Poller expired. Doing finished job cleaning ...");
215  if (mpNodeManager->terminateFinishedJobs()) mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
216  last_time = zclock_time();
217  }
218 
219  delete pMsg;
220  pSocket = nullptr;
221  pZmqSock = nullptr;
222  }
223 
224  } // END WHILE not terminated
225 
226  SPD_TRACE("Salsa::NodeZyre::exec()->");
227  return 0;
228 }
229 
231 {
235 
236  SPD_TRACE("Salsa::NodeZyre::finish()<-");
237 
238  SPD_TRACE("Salsa::NodeZyre::finish()->");
239 
240  return 0;
241 }
242 
243 void NodeZyre::addSocket(std::shared_ptr<SocketZyre> socket)
244 {
248 
249  if (socket) {
250  auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
251 
252  pNode->parent(shared_from_this());
253  Node::add(pNode);
254  mSockets.push_back(socket);
255  }
256 }
257 
258 std::vector<std::shared_ptr<SocketZyre>> NodeZyre::sockets() const
259 {
263  return mSockets;
264 }
265 
266 void NodeZyre::addSocket(zsock_t * socket)
267 {
271  if (socket) {
272  mZmqSockets.push_back(socket);
273  }
274 }
275 
276 void NodeZyre::handleExternalZmq(zmsg_t * pMsg, zsock_t * pSocket)
277 {
282 
283  zframe_t * pID = zmsg_pop(pMsg);
284  char * pCmd = zmsg_popstr(pMsg);
285  if (!strcmp(pCmd, "TASK")) {
286 
287  std::string logdir;
288  if (getenv("SALSA_LOG_DIR")) logdir = getenv("SALSA_LOG_DIR");
289 
290  int task_count = 0;
291  char * pPayload_str = zmsg_popstr(pMsg);
292  TaskInfo * pTaskInfo = nullptr;
293  while (pPayload_str) {
294  std::string payload = pPayload_str;
295  free(pPayload_str);
296 
297  pTaskInfo = new TaskInfo();
298  {
299 
300  if (!pTaskInfo->ParseFromString(payload)) {
301  SPD_ERROR("Message does not contain ProtoBuf message!");
302  return;
303  }
304  }
305  SPD_DEBUG("TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
306  if (task_count == 0) {
307  if (!logdir.empty()) {
308  logdir = fmt::format("{}/{}", logdir, pTaskInfo->jobid());
309  if (!zsys_file_exists(logdir.data())) zsys_dir_create(logdir.data());
310  }
311  }
312  if (pTaskInfo->logtargets_size() > 0 && pTaskInfo->logtargets()[0] == "default") {
313  if (!logdir.empty())
314  pTaskInfo->add_logtargets(
315  fmt::format("file://{}/ndm-{:010d}.log", logdir.data(), pTaskInfo->taskid()));
316  }
317  mpNodeManager->addTask(pTaskInfo, "", "", Salsa::Job::pending);
318  task_count++;
319  pPayload_str = zmsg_popstr(pMsg);
320  }
321 
322  Job * job = mpNodeManager->job(pTaskInfo->jobid());
323  if (job) {
324  if (job->submitterSocketID() == nullptr) job->submitterSocketID(zframe_dup(pID));
326  if (mZmqSockets.size() == 1) {
327  job->submitterSocketIndex(0);
328  }
329  else {
330  int i = 0;
331  for (auto s : mZmqSockets) {
332  if (s == pSocket) {
333  job->submitterSocketIndex(i);
334  break;
335  }
336  i++;
337  }
338  }
339 
340  // SPD_INFO("{} {}", job->submitterSocketIndex(), job->submitterSocketID());
341  }
342 
343  zmsg_t * pMsgOut = zmsg_new();
344  zmsg_add(pMsgOut, pID);
345  zmsg_addstr(pMsgOut, "");
346  zmsg_addstr(pMsgOut, "TASK_ADDED");
347  zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
348  zmsg_addstr(pMsgOut, fmt::format("{}", logdir).data());
349  zmsg_send(&pMsgOut, pSocket);
350  zmsg_destroy(&pMsgOut);
351 
352  // zmsg_t * pMsgOut = zmsg_new();
353  // zframe_destroy(&pID);
354  // pID = static_cast<zframe_t *>(job->submitterSocketID());
355  // zmsg_add(pMsgOut, pID);
356  // zmsg_addstr(pMsgOut, "");
357  // zmsg_addstr(pMsgOut, "TASK_ADDED");
358  // zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
359  // // zmsg_send(&pMsgOut, pSocket);
360  // zmsg_send(&pMsgOut, mZmqSockets[job->submitterSocketIndex()]);
361  // zmsg_destroy(&pMsgOut);
362 
363  pID = nullptr;
364 
365  mpNodeManager->print();
366 
367  // delete pTaskInfo;
368  }
369  else if (!strcmp(pCmd, "AUTH")) {
370 
371  SPD_DEBUG("Checking AUTH ...");
372  zmsg_t * pMsgOut = zmsg_new();
373  zmsg_add(pMsgOut, pID);
374  zmsg_addstr(pMsgOut, "");
375  zmsg_addstr(pMsgOut, "AUTH");
376  zmsg_addstr(pMsgOut, "OK");
378  std::string rdr;
379  rdr = zyre_uuid(sockets()[0]->zyre());
380  zmsg_addstr(pMsgOut, rdr.data());
381  zmsg_addstr(pMsgOut,
382  fmt::format("v{}.{}.{}-{}", SALSA_VERSION_MAJOR(SALSA_VERSION), SALSA_VERSION_MINOR(SALSA_VERSION),
383  SALSA_VERSION_PATCH(SALSA_VERSION), SALSA_VERSION_RELEASE)
384  .data());
385  zmsg_addstr(pMsgOut, mJobInfoClientUrl.data());
386  zmsg_send(&pMsgOut, pSocket);
387  SPD_DEBUG("Sent AUTH OK {} ...", static_cast<void *>(pSocket));
388  pID = nullptr;
389  }
390 
391  else if (!strcmp(pCmd, "JOB_DEL_ID")) {
392  char * pJobUUID_str = zmsg_popstr(pMsg);
393  std::string jobUUID = pJobUUID_str;
394  free(pJobUUID_str);
395 
396  mpNodeManager->terminateJob(jobUUID);
397  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
398  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
399 
400  zmsg_t * pMsgOut = zmsg_new();
401  zmsg_add(pMsgOut, pID);
402  zmsg_addstr(pMsgOut, "");
403  zmsg_addstr(pMsgOut, pCmd);
404  zmsg_addstr(pMsgOut, "OK");
405  zmsg_send(&pMsgOut, pSocket);
406  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
407  }
408  else if (!strcmp(pCmd, "JOB_DEL_FINISHED")) {
410  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
411  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
412 
413  zmsg_t * pMsgOut = zmsg_new();
414  zmsg_add(pMsgOut, pID);
415  zmsg_addstr(pMsgOut, "");
416  zmsg_addstr(pMsgOut, pCmd);
417  zmsg_addstr(pMsgOut, "OK");
418  zmsg_send(&pMsgOut, pSocket);
419  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
420  }
421  else if (!strcmp(pCmd, "JOB_DEL_ALL")) {
423  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
424  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
425 
426  zmsg_t * pMsgOut = zmsg_new();
427  zmsg_add(pMsgOut, pID);
428  zmsg_addstr(pMsgOut, "");
429  zmsg_addstr(pMsgOut, pCmd);
430  zmsg_addstr(pMsgOut, "OK");
431  zmsg_send(&pMsgOut, pSocket);
432  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
433  }
434  else if (!strcmp(pCmd, "WORKER_COUNT")) {
435 
436  zmsg_t * pMsgOut = zmsg_new();
437  zmsg_add(pMsgOut, pID);
438  zmsg_addstr(pMsgOut, "");
439  zmsg_addstr(pMsgOut, pCmd);
440  zmsg_addstr(pMsgOut, std::to_string(mpNodeManager->nSlots()).data());
441  zmsg_send(&pMsgOut, pSocket);
442  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
443  }
444  else {
445  zmsg_t * pMsgOut = zmsg_new();
446  zmsg_add(pMsgOut, pID);
447  zmsg_addstr(pMsgOut, "");
448  zmsg_addstr(pMsgOut, pCmd);
449  zmsg_addstr(pMsgOut, "FAILED");
450  zmsg_send(&pMsgOut, pSocket);
451  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
452  }
453  free(pCmd);
454  // if (pID) zframe_destroy(&pID);
455 }
456 
457 } // namespace Salsa
PollerZmq * mpPoller
Internal poller.
Definition: ActorZmq.hh:49
virtual int init()
First function.
Definition: ActorZmq.cc:378
virtual ~NodeZyre()
Destruct Zyre node.
Definition: NodeZyre.cc:19
virtual int init()
First function.
Definition: NodeZyre.cc:40
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:467
std::string hostname() const
Returns node hostname.
Definition: Node.hh:37
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
Base Message class.
Definition: Message.hh:15
Base PublisherZmq class.
Definition: PublisherZmq.hh:17
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:441
std::vector< std::shared_ptr< SocketZyre > > sockets() const
Definition: NodeZyre.cc:258
virtual int exec()
Main function.
Definition: NodeZyre.cc:106
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
Definition: NodeZyre.hh:63
virtual int finish()
Last function.
Definition: NodeZyre.cc:230
bool mTerminated
Flag if actor should be terminated.
Definition: ActorZmq.hh:50
Base Node class.
Definition: Node.hh:22
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:360
int mJobCheckTimeout
Job check timeout.
Definition: NodeZyre.hh:65
int submitterSocketIndex() const
Returns submitter socket index.
Definition: Job.hh:73
Job class.
Definition: Job.hh:16
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
Definition: NodeZyre.hh:59
virtual void * wait()
Definition: ActorZmq.cc:426
uint64_t finishedJobTimeout() const
Returns finished job timeout.
Definition: NodeManager.hh:55
std::string mJobInfoGroupName
JobInfo Group name.
Definition: NodeZyre.hh:61
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
ZeroMQ implementation of salsa actor class.
Definition: ActorZmq.hh:19
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
Definition: Node.hh:53
void addSocket(std::shared_ptr< SocketZyre > socket)
Definition: NodeZyre.cc:243
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:64
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
Definition: NodeManager.cc:480
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
Definition: NodeZyre.cc:276
zsock_t * mpPipe
Zmq pipe socket.
Definition: ActorZmq.hh:48
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
Definition: NodeZyre.hh:62
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
NodeInfo * mpNodeInfo
Node Info.
Definition: Node.hh:72
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
Definition: NodeZyre.hh:58
std::string mSubmitClientUrl
Submit url for client.
Definition: NodeZyre.hh:64
virtual bool handleTaskPool(void *pPool)
virtual EEventType event() const =0
Returns node event type.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:55
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:539
virtual void publisher(Publisher *p)
Definition: NodeManager.cc:607
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:73
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:389
void print(std::string opt="") const
Definition: NodeManager.cc:30
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition: Actor.hh:35
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:226
virtual void publish(std::string id, bool force=false) const
Definition: NodeManager.cc:623
NodeManagerZyre class.
EEventType
Node event type.
Definition: Message.hh:18
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:420
void * submitterSocketID() const
Returns submitter socket identity.
Definition: Job.hh:78
NodeManagerZyre * mpNodeManager
Job manager.
Definition: NodeZyre.hh:60