salsa 0.7.1
Loading...
Searching...
No Matches
ActorZmq.cc
1#include "ActorZmq.hh"
2
3namespace Salsa {
5{
9
10 mpPoller = new PollerZmq();
11}
13{
17
18 // Why no smart pointer? Because this one is fully managed within current class.
19 delete mpPoller;
20}
21
22void ActorZmq::SalsaActorFn(zsock_t * pPipe, void * pArg)
23{
27
28 zsock_signal(pPipe, 0);
29 ActorZmq * pActor = static_cast<Salsa::ActorZmq *>(pArg);
30 pActor->pipe(pPipe);
31
32 SPD_TRACE("SalsaActorFn::init() <-");
33 int ret = 0;
34 if ((ret = pActor->init())) {
35 SPD_ERROR("init() failed! [{}]", ret);
36 return;
37 }
38 SPD_TRACE("SalsaActorFn::init()->");
39
40 if (!Salsa::Actor::interrupted() && !pActor->terminated()) {
41 SPD_TRACE("SalsaActorFn::exec() <-");
42 if ((ret = pActor->exec())) {
43 SPD_ERROR("exec() failed! [{}]", ret);
44 return;
45 }
46 SPD_TRACE("SalsaActorFn::exec() ->");
47 }
48
49 SPD_TRACE("SalsaActorFn::finish() <-");
50 if ((ret = pActor->finish())) {
51 SPD_ERROR("finish() failed! [{}]", ret);
52 return;
53 }
54 SPD_TRACE("SalsaActorFn::finish() ->");
55}
56
57void ActorZmq::SalsaActorForkFn(zsock_t * pPipe, void *)
58{
59 // _/(;_;)\_
60 // AT+OK
61 zsock_signal(pPipe, 0);
62
63 pid_t pid = 0;
64
65 // PA will be running indefinitely, until interrupted
66 while (true) {
67 zmsg_t * pReceived = zmsg_recv(pPipe);
68 if (!pReceived) {
69 SPD_WARN("PA: pReceived == <nullptr> (Exec interrupted)");
70 break;
71 }
72
73 // read first frame
74 zframe_t * pFrame = zmsg_first(pReceived);
75
76 // Terminate on signal
77 if (zframe_streq(pFrame, "$TERM")) {
78 SPD_TRACE("PA: Terminate received");
79 zmsg_destroy(&pReceived);
80 break; // TERMINATE persistent actor
81 }
82 else {
83 // GET command from message
84 char * pCommand = zframe_strdup(pFrame);
85 SPD_TRACE("PA: got Command [{}]", pCommand);
86 pFrame = zmsg_next(pReceived);
87
88 // GET UID from message
89 char * pUID = zframe_strdup(pFrame);
90 SPD_TRACE("PA: got UID [{}]", pUID);
91 pFrame = zmsg_next(pReceived);
92
93 // GET GID from message
94 char * pGID = zframe_strdup(pFrame);
95 SPD_TRACE("PA: got GID [{}]", pGID);
96 pFrame = zmsg_next(pReceived);
97
98 // GET Upsream uuid from message
99 char * pWorker = zframe_strdup(pFrame);
100 SPD_TRACE("PA: got Woker [{}]", pWorker);
101 pFrame = zmsg_next(pReceived);
102
103 // GET Upsream uuid from message
104 char * pUpsream = zframe_strdup(pFrame);
105 SPD_TRACE("PA: got Upstream [{}]", pUpsream);
106 pFrame = zmsg_next(pReceived);
107
108 // GET Client uuid from message
109 char * pClient = zframe_strdup(pFrame);
110 SPD_TRACE("PA: got Client [{}]", pClient);
111 pFrame = zmsg_next(pReceived);
112
113 std::string pMessage_str;
114 std::string pLoop_str;
115 // GET targets from message
116 Salsa::Log log;
117 if (pFrame) {
118
119 char * pLoop = zframe_strdup(pFrame);
120 pLoop_str = pLoop;
121 zstr_free(&pLoop);
122 SPD_TRACE("PA: got str logs [{}]", pLoop);
123
124 std::string pMessage_str;
125 if (pLoop_str == "logs") {
126 while ((pFrame = zmsg_next(pReceived)) != nullptr) {
127 char * pMessage = zframe_strdup(pFrame);
128 pMessage_str = pMessage;
129 if (pMessage_str == "envs") {
130 pLoop_str = "envs";
131 break;
132 }
133 SPD_TRACE("PA: Adding log target [{}]", pMessage);
134 log.add(pMessage);
135 zstr_free(&pMessage);
136 }
138 // if (log.empty())
139 // log.add("");
140 }
141 }
142
143 std::vector<std::string> envs;
144 if (pLoop_str == "envs") {
145 while ((pFrame = zmsg_next(pReceived)) != nullptr) {
146 char * pMessage = zframe_strdup(pFrame);
147 SPD_TRACE("PA: Adding env [{}]", pMessage);
148 envs.push_back(pMessage);
149 zstr_free(&pMessage);
150 }
151 }
152 char * envp[envs.size()];
153 int i = 0;
154 for (auto s : envs) {
155 char * cstr = new char[s.length() + 1];
156 strcpy(cstr, s.c_str());
157 // do stuff
158 envp[i] = cstr;
159 // delete[] cstr;
160 i++;
161 }
162 envp[i] = NULL;
163
164 // Destroy message and go on with your life
165 SPD_TRACE("PA: Destroying message [{}]", static_cast<void *>(pReceived));
166 zmsg_destroy(&pReceived);
167
168 // Initialization DONE ---------------------------------------------
169
170 SPD_TRACE("PA: Creating logger");
171 log.create();
172 log.spd()->info("Running [{}]", pCommand);
173
174 SPD_TRACE("PA: Waiting for pipes...");
175 int pipefd[2];
176 if (pipe2(pipefd, O_NONBLOCK)) {
177 SPD_ERROR("FAILED to receive pipes!"); // TODO Inform manager about pipe failure ?
178 }
179 SPD_TRACE("PA: Got pipes [{}, {}]", pipefd[0], pipefd[1]);
180
181 // = = = = = = = = = = FORK = = = = = = = = = =
182 pid = fork();
183 if (pid == 0) {
184
185 // TODO this needs to be improved if we cannot set UID
186 // TODO Check if uid is equal to process uid (in non-root case)
187 if (getuid() == 0) {
188 SPD_TRACE("PA Child: uid [{}]->[{}] guid [{}]->[{}]", getuid(), pUID, getgid(), pGID);
189 if (setgid(atoi(pGID)) == -1) {
190 SPD_ERROR("Problem setting GUI to process !!! ");
191 return;
192 }
193 if (setuid(atoi(pUID)) == -1) {
194 SPD_ERROR("Problem setting UID to process !!! ");
195 return;
196 }
197
198 SPD_TRACE("PA Child: uid [{}] guid [{}]", getuid(), getgid());
199 }
200
201 SPD_TRACE("PA Child: Running command [{}]", pCommand);
202 // FORK Child handler
203 unsigned int iCount = 0;
204 char ** ppCommand = nullptr;
205 char * tmp = std::strtok(pCommand, " ");
206
207 SPD_TRACE("PA Child: Tokenizing");
208 do {
209 // iCount + 2 because 1) iCounter an iterator and 2) we need nullptr at the end
210 ppCommand = static_cast<char **>(realloc(ppCommand, (iCount + 2) * sizeof(char **)));
211 ppCommand[iCount++] = strdup(tmp);
212 tmp = std::strtok(nullptr, " ");
213 } while (tmp);
214 ppCommand[iCount] = nullptr;
215
216 // SPD_TRACE("PA Child: Sleeping for 100ms");
217 // std::this_thread::sleep_for(std::chrono::milliseconds(100));
218
219 SPD_TRACE("PA Child: Configuring pipes");
220
221 // After these lines you'll be unable to log anything to console, so don't even
222 // try. It's literally a waste of time.
223 close(pipefd[0]);
224 dup2(pipefd[1], STDOUT_FILENO);
225 dup2(pipefd[1], STDERR_FILENO);
226 close(pipefd[1]);
227
228 if (execvpe(ppCommand[0], ppCommand, envp) == -1) {
229 // int const err = errno;
230 // SPD_ERROR("PA failed to execute command! Error: [{}]", strerror(err));
231 // *facepalm*
232 exit(127);
233 }
234 }
235 else if (pid > 0) {
236 // FORK Parent handler
237 // Send PID to parent
238 SPD_TRACE("PA Parent: Sending PID [{}] to parent", pid);
239 {
240 zmsg_t * pTx = zmsg_new();
241 zmsg_addstr(pTx, "$PID");
242 zmsg_addstrf(pTx, "%d", pid);
243 zmsg_addstr(pTx, pUpsream);
244 zmsg_addstr(pTx, pClient);
245 zsock_send(pPipe, "m", pTx);
246 zmsg_destroy(&pTx);
247 }
248
249 int stat = -1;
250 close(pipefd[1]);
251
252 log.fd(pipefd[0]);
253 zactor_t * pWatcherActor = zactor_new(actorProcwaitSupport_, &log);
254
255 SPD_TRACE("PA Parent: Running command...");
256 // Read from pipe until child dies
257 while (true) {
258 waitpid(pid, &stat, WUNTRACED);
259 if (WIFEXITED(stat) || WIFSIGNALED(stat)) {
260 zstr_sendf(pWatcherActor, "$EXIT");
261 break;
262 }
263 }
264
265 zactor_destroy(&pWatcherActor);
266
267 close(pipefd[0]);
268 int rc = WEXITSTATUS(stat);
269 // In case of kill -9 : returning 137
270 if (stat == 9) rc = 137;
271
272 SPD_TRACE("PA Parent: Exit [{}] rc [{}]", stat, rc);
273 {
274
275 zmsg_t * pTx = zmsg_new();
276 zmsg_addstr(pTx, "$EXIT");
277 zmsg_addstrf(pTx, "%d", rc);
278 zmsg_addstr(pTx, pWorker);
279 zmsg_addstr(pTx, pUpsream);
280 zmsg_addstr(pTx, pClient);
281 zsock_send(pPipe, "m", pTx);
282 zmsg_destroy(&pTx);
283 }
284 log.spd()->info("Process exited with status [{}]", stat);
285 }
286 else {
287 // FORK Failed handler
288 SPD_ERROR("PA Parent: fork() failure!");
289 {
290 zmsg_t * pTx = zmsg_new();
291 zmsg_addstr(pTx, "$FORKFAIL");
292 zsock_send(pPipe, "m", pTx);
293 zmsg_destroy(&pTx);
294 }
295 } // END FORK handling
296
297 free(pCommand);
298 free(pUID);
299 free(pGID);
300 free(pWorker);
301 free(pUpsream);
302 free(pClient);
303
304 } // END execute command
305 } // END while (true)
306
307 SPD_TRACE("PA: Terminating persistent actor");
308 return;
309}
310
311void ActorZmq::actorProcwaitSupport_(zsock_t * pPipe, void * pLogger)
312{
313 zsock_signal(pPipe, 0);
314
315 Log & commandLogger = *(static_cast<Log *>(pLogger));
316 // 3) 2)1)
317 // Since this is kind of hard to read:
318 // 1) Cast pLogger to Log *
319 // 2) Get its value
320 // 3) Set reference to it
321
322 int fd = commandLogger.fd();
323 const int LIMIT = PIPE_BUF;
324 char buffer[LIMIT + 1];
325 std::memset(buffer, 0, LIMIT + 1);
326
327 zpoller_t * pPoller = zpoller_new(nullptr);
328 zpoller_add(pPoller, pPipe);
329 zpoller_add(pPoller, &fd);
330
331 while (true) {
332 // Possible death race condition... I'm looking at you Valgrind
333 void * pRecvSock = zpoller_wait(pPoller, -1);
334 if (pRecvSock == pPipe) {
335 char * pMsg = zstr_recv(pPipe);
336 std::string recvMsg = pMsg;
337 free(pMsg);
338 if (recvMsg == "$EXIT") {
339 break;
340 }
341 }
342 else if (pRecvSock == &fd) {
343 ssize_t readRet = read(fd, buffer, LIMIT);
344 if (readRet > 0) {
345 if (buffer[0] != '\0') {
346 commandLogger.write(buffer);
347 memset(buffer, 0, sizeof(buffer));
348 }
349 }
350 }
351 }
352
353 zpoller_remove(pPoller, pPipe);
354 zpoller_remove(pPoller, &fd);
355 zpoller_destroy(&pPoller);
356 return;
357}
358
359void ActorZmq::pipe(void * pPipe)
360{
364
365 SPD_TRACE("ActorZmq::pipe()<-");
366 mpPipe = static_cast<zsock_t *>(pPipe);
367
368 if (!mpPoller) {
369 mpPoller = new PollerZmq();
370 }
371
372 if (mpPipe) {
374 }
375 SPD_TRACE("ActorZmq::pipe()->");
376}
377
379{
383
384 SPD_TRACE("ActorZmq::init()<-");
385 // Setting up signal handler
386 std::signal(SIGINT, Salsa::Actor::signalHandler);
387 std::signal(SIGTERM, Salsa::Actor::signalHandler);
388
389 SPD_TRACE("ActorZmq::init()->");
390 return 0;
391}
392
394{
398
399 SPD_TRACE("ActorZmq::exec()<-");
400
401 void * pEvent;
403 pEvent = wait();
404 if (pEvent) {
405 // handle other socket
406 SPD_WARN("ActorZmq::exec() : Other socket from ActorZmq class");
407 }
408 }
409
410 SPD_TRACE("ActorZmq::exec() : Salsa::interrupted() [{}]", Salsa::Actor::interrupted());
411 SPD_TRACE("ActorZmq::exec()->");
412 return 0;
413}
414
416{
420
421 SPD_TRACE("ActorZmq::finish()<-");
422 SPD_TRACE("ActorZmq::finish()->");
423 return 0;
424}
425
427{
431 if (!mpPoller) {
432 SPD_ERROR("Poller is nullptr!");
433 return nullptr;
434 }
435
436 void * pEvent = mpPoller->wait(mTimeout);
437 SPD_TRACE("ActorZmq::exec(): pEvent [{}] mpPipe [{}]", static_cast<void *>(pEvent), static_cast<void *>(mpPipe));
438
439 if (mpPipe && pEvent == mpPipe) {
440 zmsg_t * pMsg = zmsg_recv(mpPipe);
441 if (!pMsg) {
442 return nullptr;
443 }
444
445 char * pCommand = zmsg_popstr(pMsg);
446 zmsg_destroy(&pMsg);
447 if (streq(pCommand, "$TERM")) {
448 SPD_TRACE("ActorZmq::exec(): received $TERM");
449 mTerminated = true;
450 }
451 else {
452 SPD_ERROR("ActorZmq::exec(): invalid message to actor msg: [{}]", pCommand);
453 assert(false); // We should __not__ use assert here, because it's only used to debug...
454 }
455 zstr_free(&pCommand);
456 }
457 else {
458 if (zpoller_expired(mpPoller->poller())) {
459 SPD_TRACE("ActorZmq::exec(): Poller expired timeout [{}]...", mTimeout);
460 }
461 else if (zpoller_terminated(mpPoller->poller())) {
462 SPD_TRACE("ActorZmq::exec(): Poller terminated ...");
463 mTerminated = true;
464 }
465 else {
466 return pEvent;
467 }
468 }
469
470 return pEvent;
471}
472
473zpoller_t * ActorZmq::poller() const
474{
478 return mpPoller->poller();
479}
481{
485 return mpPoller;
486}
487} // namespace Salsa
ZeroMQ implementation of salsa actor class.
Definition ActorZmq.hh:19
virtual int init()
First function.
Definition ActorZmq.cc:378
virtual void * wait()
Definition ActorZmq.cc:426
int mTimeout
Poller timeout.
Definition ActorZmq.hh:51
virtual void pipe(void *pipe)
Setter for pipe.
Definition ActorZmq.cc:359
static void actorProcwaitSupport_(zsock_t *pipe, void *argv)
Support actor method (used for PID waiting)
Definition ActorZmq.cc:311
PollerZmq * pollerZmq() const
Definition ActorZmq.cc:480
bool mTerminated
Flag if actor should be terminated.
Definition ActorZmq.hh:50
bool terminated() const
Flag if actor should be terminated.
Definition ActorZmq.hh:40
zpoller_t * poller() const
Definition ActorZmq.cc:473
static void SalsaActorFn(zsock_t *pPipe, void *pArgv)
Definition ActorZmq.cc:22
PollerZmq * mpPoller
Internal poller.
Definition ActorZmq.hh:49
zsock_t * mpPipe
Zmq pipe socket.
Definition ActorZmq.hh:48
virtual int exec()
Main function.
Definition ActorZmq.cc:393
virtual int finish()
Last function.
Definition ActorZmq.cc:415
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
Definition ActorZmq.cc:57
virtual ~ActorZmq()
Definition ActorZmq.cc:12
Base salsa actor class.
Definition Actor.hh:17
static void signalHandler(int signalNumber)
Setter salsa interruption.
Definition Actor.cc:19
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition Actor.hh:35
Definition Log.hh:19
std::shared_ptr< spdlog::logger > spd()
Get SPDLOG logger handle.
Definition Log.hh:39
int write(char const *)
Write to logger.
Definition Log.cc:64
int add(std::string)
Add output sink (file, console, zmq) for SPDLOG.
Definition Log.cc:12
void fd(int newFD)
Set FD of pipe to watch.
Definition Log.hh:44
int create()
Create SPDLOG loger.
Definition Log.cc:45
salsa node class
Definition PollerZmq.hh:16
virtual void add(SocketZyre *pSocket)
Definition PollerZmq.cc:45
zpoller_t * poller() const
Returns Poller.
Definition PollerZmq.hh:27
virtual void * wait(int timeout=-1)
Waiting for socket.
Definition PollerZmq.cc:56