salsa 0.7.1
Loading...
Searching...
No Matches
NodeZyre.cc
1#include <algorithm>
2#include <cstdlib>
3#include <vector>
4
5#include "Job.hh"
6#include "NodeManagerZyre.hh"
7#include "PublisherZmq.hh"
8#include "NodeZyre.hh"
9using namespace fmt::literals;
10
11namespace Salsa {
12NodeZyre::NodeZyre(std::string name)
13 : Node(name)
14 , ActorZmq()
15{
19}
20
22{
26
27 SPD_TRACE("### Destroy NodeZyre [{}] ###", mpNodeInfo->name());
28
29 // clearing socket
30 mSockets.clear();
31
32 for (auto pSocket : mZmqSockets) {
33 zsock_destroy(&pSocket);
34 }
35
36 if (mpNodeManager) {
37 delete mpNodeManager;
38 mpNodeManager = nullptr;
39 }
40}
41
43{
47
48 SPD_TRACE("Salsa::NodeZyre::init()<-");
49
52
53 if (!mpPoller) {
54 return 1;
55 }
56
57 if (mpNodeManager == nullptr) {
60
61 if (!mType.compare("FEEDER") && !mpNodeManager->publisher()) {
62 char * pPubUrl = getenv("SALSA_PUB_URL");
63 if (pPubUrl) {
64 mJobInfoBrokerUrl = pPubUrl;
65 }
66 // SPD_INFO("JobInfo broker url [{}]", mJobInfoBrokerUrl);
68 SPD_INFO("[{}] will publish to url=[{}]", mType, mJobInfoBrokerUrl);
69 }
70 char * pTimeout = getenv("SALSA_FINISHED_JOB_TIMEOUT");
71 if (pTimeout) {
72 mpNodeManager->finishedJobTimeout(atol(pTimeout));
73 }
74 char * pCheckTimeout = getenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT");
75 if (pCheckTimeout) {
76 mJobCheckTimeout = atoi(pCheckTimeout);
77 }
78 }
79
80 for (auto socket : mSockets) {
81 mpPoller->add(socket.get());
82
83 if (socket->header("X-SALSA-NODE-TYPE") == "CONSUMER")
84 mpNodeManager->addConsumer(zyre_uuid(socket->zyre()), socket);
85
86 if (socket->header("X-SALSA-NODE-TYPE") == "FEEDER") {
87 mpNodeManager->addFeeder(zyre_uuid(socket->zyre()), socket);
88
89 auto f = mpNodeManager->feeder(zyre_uuid(socket->zyre()));
90 f->nodeInfo()->set_name(mJobInfoGroupName);
91 f->nodeInfo()->set_hostname(hostname());
92 f->nodeInfo()->set_submiturl(mSubmitClientUrl);
93 }
94
95 if (socket->header("X-SALSA-NODE-TYPE") == "WORKER") {
96 mpNodeManager->addWorker(zyre_uuid(socket->zyre()), socket);
97 auto w = mpNodeManager->worker(zyre_uuid(socket->zyre()));
98 w->nodeInfo()->set_hostname(hostname());
99 }
100 }
101
102 for (auto socket : mZmqSockets) {
103 mpPoller->add(socket);
104 }
105
106 // mpNodeManager->print();
107 SPD_TRACE("Salsa::NodeZyre::init()->");
108 return 0;
109}
110
112{
116
117 SPD_TRACE("Salsa::NodeZyre::exec()<-");
118
119 void * pPointer = nullptr;
120 std::shared_ptr<SocketZyre> pSocket = nullptr;
121 zsock_t * pZmqSock = nullptr;
122 int64_t cur_time;
123 int64_t last_time = zclock_time();
124
126 SPD_TRACE("Actor::wait()");
127 pPointer = ActorZmq::wait();
128 if (!pPointer) {
129 break;
130 }
131
132 if (pPointer == mpPipe) {
133 SPD_TRACE("Signal from pipe={}", static_cast<void *>(mpPipe));
134 // We are not reacting to pipe events for now
135 continue;
136 }
137 else {
138 for (auto pZmqSocket : mZmqSockets) {
139 SPD_TRACE("Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
140 static_cast<void *>(pZmqSocket));
141 if (pZmqSocket == pPointer) {
142 pZmqSock = pZmqSocket;
143 break;
144 }
145 }
146
147 if (pZmqSock) {
148 SPD_TRACE("HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
149 zmsg_t * pMsg = zmsg_recv(pZmqSock);
150 handleExternalZmq(pMsg, pZmqSock);
151 zmsg_destroy(&pMsg);
152 pZmqSock = nullptr;
153 continue;
154 }
155
156 for (auto pNet : mSockets) {
157 SPD_TRACE("Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
158 static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
159 if (pNet && pNet->socket() && pPointer == pNet->socket()) {
160 pSocket = pNet;
161 break;
162 }
163 }
164
165 if (!pSocket) {
166 if (mpNodeManager->handleTaskPool(pPointer)) continue;
167 }
168
169 if (!pSocket) {
170 SPD_ERROR("Socket comming from unknown network : socket={}", pPointer);
171 continue;
172 }
173
174 // ==================================================
175
176 Message * pMsg = pSocket->pull();
177 if (!pMsg) {
178 SPD_ERROR("Message from socket={} is null", pPointer);
179 continue;
180 }
181
183 SPD_TRACE("Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
184 static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), static_cast<int>(type));
185
186 bool doPublish = true;
187 bool forcePublish = false;
188 bool cleanedJobs = false;
189 std::vector<std::string> values;
190
191 if (type == Message::ENTER) {
192 const char * pHeader =
193 zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(), "X-SALSA-NODE-TYPE");
194 std::string snt;
195 if (pHeader) snt = pHeader;
196
197 SPD_DEBUG("[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid(), snt);
198 mpNodeManager->onEnter(zyre_uuid(pSocket->zyre()), snt, pMsg, values);
199 // doPublish = false;
200 }
201 else if (type == Message::EXIT) {
202 SPD_TRACE("[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
203 mpNodeManager->onExit(zyre_uuid(pSocket->zyre()), pMsg, values);
204 // doPublish = false;
205 }
206 else if (type == Message::EVASIVE) {
207 SPD_TRACE("[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
208 doPublish = false;
209 }
210 else if (type == Message::WHISPER) {
211 SPD_TRACE("[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
212 mpNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), pMsg, values);
213 // forcePublish = true;
214 }
215
216 cur_time = zclock_time();
217 if ((cur_time - last_time) > mJobCheckTimeout) {
218 forcePublish = true;
219 doPublish = true;
220 cleanedJobs = mpNodeManager->terminateFinishedJobs();
221 last_time = zclock_time();
222 }
223
224 if (doPublish) {
225 SPD_TRACE("Trying to publishing from [{}] [force={}] [cleanedJobs={}] ...",
226 zyre_uuid(sockets()[0]->zyre()), forcePublish, cleanedJobs);
227 bool wasPublished = mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), forcePublish);
228 if (wasPublished) last_time = zclock_time();
229 }
230
231 delete pMsg;
232 pSocket = nullptr;
233 pZmqSock = nullptr;
234 }
235
236 } // END WHILE not terminated
237
238 SPD_TRACE("Salsa::NodeZyre::exec()->");
239 return 0;
240}
241
243{
247
248 SPD_TRACE("Salsa::NodeZyre::finish()<-");
249
250 SPD_TRACE("Salsa::NodeZyre::finish()->");
251
252 return 0;
253}
254
255void NodeZyre::addSocket(std::shared_ptr<SocketZyre> socket)
256{
260
261 if (socket) {
262 auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
263
264 pNode->parent(shared_from_this());
265 Node::add(pNode);
266 mSockets.push_back(socket);
267 }
268}
269
270std::vector<std::shared_ptr<SocketZyre>> NodeZyre::sockets() const
271{
275 return mSockets;
276}
277
278void NodeZyre::addSocket(zsock_t * socket)
279{
283 if (socket) {
284 mZmqSockets.push_back(socket);
285 }
286}
287
288void NodeZyre::handleExternalZmq(zmsg_t * pMsg, zsock_t * pSocket)
289{
294
295 zframe_t * pID = zmsg_pop(pMsg);
296 char * pCmd = zmsg_popstr(pMsg);
297 if (!strcmp(pCmd, "TASK")) {
298
299 std::string logdir;
300 if (getenv("SALSA_LOG_DIR")) logdir = getenv("SALSA_LOG_DIR");
301
302 int task_count = 0;
303 char * pPayload_str = zmsg_popstr(pMsg);
304 TaskInfo * pTaskInfo = nullptr;
305 while (pPayload_str) {
306 std::string payload = pPayload_str;
307 free(pPayload_str);
308
309 pTaskInfo = new TaskInfo();
310 {
311
312 if (!pTaskInfo->ParseFromString(payload)) {
313 SPD_ERROR("Message does not contain ProtoBuf message!");
314 return;
315 }
316 }
317 SPD_DEBUG("TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
318 if (task_count == 0) {
319 if (!logdir.empty()) {
320 logdir = fmt::format("{}/{}", logdir, pTaskInfo->jobid());
321 if (!zsys_file_exists(logdir.data())) zsys_dir_create(logdir.data());
322 }
323 }
324 if (pTaskInfo->logtargets_size() > 0 && pTaskInfo->logtargets()[0] == "default") {
325 if (!logdir.empty())
326 pTaskInfo->add_logtargets(
327 fmt::format("file://{}/ndm-{:010d}.log", logdir.data(), pTaskInfo->taskid()));
328 }
329 mpNodeManager->addTask(pTaskInfo, "", "", Salsa::Job::pending);
330 task_count++;
331 pPayload_str = zmsg_popstr(pMsg);
332 }
333
334 Job * job = mpNodeManager->job(pTaskInfo->jobid());
335 if (job) {
336 if (job->submitterSocketID() == nullptr) job->submitterSocketID(zframe_dup(pID));
338 if (mZmqSockets.size() == 1) {
339 job->submitterSocketIndex(0);
340 }
341 else {
342 int i = 0;
343 for (auto s : mZmqSockets) {
344 if (s == pSocket) {
345 job->submitterSocketIndex(i);
346 break;
347 }
348 i++;
349 }
350 }
351
352 // SPD_INFO("{} {}", job->submitterSocketIndex(), job->submitterSocketID());
353 }
354
355 zmsg_t * pMsgOut = zmsg_new();
356 zmsg_add(pMsgOut, pID);
357 zmsg_addstr(pMsgOut, "");
358 zmsg_addstr(pMsgOut, "TASK_ADDED");
359 zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
360 zmsg_addstr(pMsgOut, fmt::format("{}", logdir).data());
361 zmsg_send(&pMsgOut, pSocket);
362 zmsg_destroy(&pMsgOut);
363
364 // zmsg_t * pMsgOut = zmsg_new();
365 // zframe_destroy(&pID);
366 // pID = static_cast<zframe_t *>(job->submitterSocketID());
367 // zmsg_add(pMsgOut, pID);
368 // zmsg_addstr(pMsgOut, "");
369 // zmsg_addstr(pMsgOut, "TASK_ADDED");
370 // zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
371 // // zmsg_send(&pMsgOut, pSocket);
372 // zmsg_send(&pMsgOut, mZmqSockets[job->submitterSocketIndex()]);
373 // zmsg_destroy(&pMsgOut);
374
375 pID = nullptr;
376
378
379 // delete pTaskInfo;
380 }
381 else if (!strcmp(pCmd, "AUTH")) {
382
383 SPD_DEBUG("Checking AUTH ...");
384 zmsg_t * pMsgOut = zmsg_new();
385 zmsg_add(pMsgOut, pID);
386 zmsg_addstr(pMsgOut, "");
387 zmsg_addstr(pMsgOut, "AUTH");
388 zmsg_addstr(pMsgOut, "OK");
390 std::string rdr;
391 rdr = zyre_uuid(sockets()[0]->zyre());
392 zmsg_addstr(pMsgOut, rdr.data());
393 zmsg_addstr(pMsgOut,
394 fmt::format("v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
395 salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK)
396 .data());
397 zmsg_addstr(pMsgOut, mJobInfoClientUrl.data());
398 zmsg_send(&pMsgOut, pSocket);
399 SPD_DEBUG("Sent AUTH OK {} ...", static_cast<void *>(pSocket));
400 pID = nullptr;
401 }
402
403 else if (!strcmp(pCmd, "JOB_DEL_ID")) {
404 char * pJobUUID_str = zmsg_popstr(pMsg);
405 std::string jobUUID = pJobUUID_str;
406 free(pJobUUID_str);
407
408 mpNodeManager->terminateJob(jobUUID);
409 SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
410 mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
411
412 zmsg_t * pMsgOut = zmsg_new();
413 zmsg_add(pMsgOut, pID);
414 zmsg_addstr(pMsgOut, "");
415 zmsg_addstr(pMsgOut, pCmd);
416 zmsg_addstr(pMsgOut, "OK");
417 zmsg_send(&pMsgOut, pSocket);
418 SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
419 }
420 else if (!strcmp(pCmd, "JOB_DEL_FINISHED")) {
422 SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
423 mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
424
425 zmsg_t * pMsgOut = zmsg_new();
426 zmsg_add(pMsgOut, pID);
427 zmsg_addstr(pMsgOut, "");
428 zmsg_addstr(pMsgOut, pCmd);
429 zmsg_addstr(pMsgOut, "OK");
430 zmsg_send(&pMsgOut, pSocket);
431 SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
432 }
433 else if (!strcmp(pCmd, "JOB_DEL_ALL")) {
435 SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
436 mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()), true);
437
438 zmsg_t * pMsgOut = zmsg_new();
439 zmsg_add(pMsgOut, pID);
440 zmsg_addstr(pMsgOut, "");
441 zmsg_addstr(pMsgOut, pCmd);
442 zmsg_addstr(pMsgOut, "OK");
443 zmsg_send(&pMsgOut, pSocket);
444 SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
445 }
446 else if (!strcmp(pCmd, "WORKER_COUNT")) {
447
448 zmsg_t * pMsgOut = zmsg_new();
449 zmsg_add(pMsgOut, pID);
450 zmsg_addstr(pMsgOut, "");
451 zmsg_addstr(pMsgOut, pCmd);
452 zmsg_addstr(pMsgOut, std::to_string(mpNodeManager->nSlots()).data());
453 zmsg_send(&pMsgOut, pSocket);
454 SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
455 }
456 else {
457 zmsg_t * pMsgOut = zmsg_new();
458 zmsg_add(pMsgOut, pID);
459 zmsg_addstr(pMsgOut, "");
460 zmsg_addstr(pMsgOut, pCmd);
461 zmsg_addstr(pMsgOut, "FAILED");
462 zmsg_send(&pMsgOut, pSocket);
463 SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
464 }
465 free(pCmd);
466 // if (pID) zframe_destroy(&pID);
467}
468
469} // namespace Salsa
ZeroMQ implementation of salsa actor class.
Definition ActorZmq.hh:19
virtual int init()
First function.
Definition ActorZmq.cc:378
virtual void * wait()
Definition ActorZmq.cc:426
bool mTerminated
Flag if actor should be terminated.
Definition ActorZmq.hh:50
PollerZmq * mpPoller
Internal poller.
Definition ActorZmq.hh:49
zsock_t * mpPipe
Zmq pipe socket.
Definition ActorZmq.hh:48
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition Actor.hh:35
Job class.
Definition Job.hh:16
void * submitterSocketID() const
Returns submitter socket identity.
Definition Job.hh:81
int submitterSocketIndex() const
Returns submitter socket index.
Definition Job.hh:76
Salsa zyre message class.
Base Message class.
Definition Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual EEventType event() const =0
Returns node event type.
EEventType
Node event type.
Definition Message.hh:18
NodeManagerZyre class.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
virtual bool handleTaskPool(void *pPool)
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual bool publish(std::string id, bool force=false) const
virtual bool terminateFinishedJobs()
std::shared_ptr< Feeder > feeder(std::string uuid) const
virtual void terminateAllJobs(bool finishedonly=false)
std::shared_ptr< Worker > worker(std::string uuid) const
virtual void publisher(Publisher *p)
Job * job(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual void terminateJob(std::string uuid)
void clusterAlias(std::string n)
Sets Cluster alias.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
int32_t nSlots(double mult=1.0) const
uint64_t finishedJobTimeout() const
Returns finished job timeout.
void print(std::string opt="") const
std::string type()
Returns type of current node.
Definition NodeZyre.hh:43
std::string mJobInfoGroupName
JobInfo Group name.
Definition NodeZyre.hh:71
int mJobCheckTimeout
Job check timeout.
Definition NodeZyre.hh:75
void addSocket(std::shared_ptr< SocketZyre > socket)
Definition NodeZyre.cc:255
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
Definition NodeZyre.hh:73
std::vector< std::shared_ptr< SocketZyre > > sockets() const
Definition NodeZyre.cc:270
virtual int init()
First function.
Definition NodeZyre.cc:42
virtual int exec()
Main function.
Definition NodeZyre.cc:111
std::string mType
Current node type.
Definition NodeZyre.hh:69
std::string mSubmitClientUrl
Submit url for client.
Definition NodeZyre.hh:74
NodeZyre(std::string name="")
Construct Zyre node with provided name (and packetizer)
Definition NodeZyre.cc:12
virtual int finish()
Last function.
Definition NodeZyre.cc:242
NodeManagerZyre * mpNodeManager
Job manager.
Definition NodeZyre.hh:68
std::string mClusterAlias
Cluster alias.
Definition NodeZyre.hh:70
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
Definition NodeZyre.hh:66
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
Definition NodeZyre.cc:288
virtual ~NodeZyre()
Destruct Zyre node.
Definition NodeZyre.cc:21
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
Definition NodeZyre.hh:72
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
Definition NodeZyre.hh:67
Base Node class.
Definition Node.hh:23
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
Definition Node.hh:53
std::string hostname() const
Returns node hostname.
Definition Node.hh:37
NodeInfo * mpNodeInfo
Node Info.
Definition Node.hh:72
virtual void add(SocketZyre *pSocket)
Definition PollerZmq.cc:45
Base PublisherZmq class.