28 zsock_signal(pPipe, 0);
32 SPD_TRACE(
"SalsaActorFn::init() <-");
34 if ((ret = pActor->
init())) {
35 SPD_ERROR(
"init() failed! [{}]", ret);
38 SPD_TRACE(
"SalsaActorFn::init()->");
41 SPD_TRACE(
"SalsaActorFn::exec() <-");
42 if ((ret = pActor->
exec())) {
43 SPD_ERROR(
"exec() failed! [{}]", ret);
46 SPD_TRACE(
"SalsaActorFn::exec() ->");
49 SPD_TRACE(
"SalsaActorFn::finish() <-");
50 if ((ret = pActor->
finish())) {
51 SPD_ERROR(
"finish() failed! [{}]", ret);
54 SPD_TRACE(
"SalsaActorFn::finish() ->");
61 zsock_signal(pPipe, 0);
67 zmsg_t * pReceived = zmsg_recv(pPipe);
69 SPD_WARN(
"PA: pReceived == <nullptr> (Exec interrupted)");
74 zframe_t * pFrame = zmsg_first(pReceived);
77 if (zframe_streq(pFrame,
"$TERM")) {
78 SPD_TRACE(
"PA: Terminate received");
79 zmsg_destroy(&pReceived);
84 char * pCommand = zframe_strdup(pFrame);
85 SPD_TRACE(
"PA: got Command [{}]", pCommand);
86 pFrame = zmsg_next(pReceived);
89 char * pUID = zframe_strdup(pFrame);
90 SPD_TRACE(
"PA: got UID [{}]", pUID);
91 pFrame = zmsg_next(pReceived);
94 char * pGID = zframe_strdup(pFrame);
95 SPD_TRACE(
"PA: got GID [{}]", pGID);
96 pFrame = zmsg_next(pReceived);
99 char * pWorker = zframe_strdup(pFrame);
100 SPD_TRACE(
"PA: got Woker [{}]", pWorker);
101 pFrame = zmsg_next(pReceived);
104 char * pUpsream = zframe_strdup(pFrame);
105 SPD_TRACE(
"PA: got Upstream [{}]", pUpsream);
106 pFrame = zmsg_next(pReceived);
109 char * pClient = zframe_strdup(pFrame);
110 SPD_TRACE(
"PA: got Client [{}]", pClient);
111 pFrame = zmsg_next(pReceived);
113 std::string pMessage_str;
114 std::string pLoop_str;
119 char * pLoop = zframe_strdup(pFrame);
122 SPD_TRACE(
"PA: got str logs [{}]", pLoop);
124 std::string pMessage_str;
125 if (pLoop_str ==
"logs") {
126 while ((pFrame = zmsg_next(pReceived)) !=
nullptr) {
127 char * pMessage = zframe_strdup(pFrame);
128 pMessage_str = pMessage;
129 if (pMessage_str ==
"envs") {
133 SPD_TRACE(
"PA: Adding log target [{}]", pMessage);
135 zstr_free(&pMessage);
143 std::vector<std::string> envs;
144 if (pLoop_str ==
"envs") {
145 while ((pFrame = zmsg_next(pReceived)) !=
nullptr) {
146 char * pMessage = zframe_strdup(pFrame);
147 SPD_TRACE(
"PA: Adding env [{}]", pMessage);
148 envs.push_back(pMessage);
149 zstr_free(&pMessage);
152 char * envp[envs.size()];
154 for (
auto s : envs) {
155 char * cstr =
new char[s.length() + 1];
156 strcpy(cstr, s.c_str());
165 SPD_TRACE(
"PA: Destroying message [{}]", static_cast<void *>(pReceived));
166 zmsg_destroy(&pReceived);
170 SPD_TRACE(
"PA: Creating logger");
172 log.
spd()->info(
"Running [{}]", pCommand);
174 SPD_TRACE(
"PA: Waiting for pipes...");
176 if (pipe2(pipefd, O_NONBLOCK)) {
177 SPD_ERROR(
"FAILED to receive pipes!");
179 SPD_TRACE(
"PA: Got pipes [{}, {}]", pipefd[0], pipefd[1]);
188 SPD_TRACE(
"PA Child: uid [{}]->[{}] guid [{}]->[{}]", getuid(), pUID, getgid(), pGID);
189 if (setgid(atoi(pGID)) == -1) {
190 SPD_ERROR(
"Problem setting GUI to process !!! ");
193 if (setuid(atoi(pUID)) == -1) {
194 SPD_ERROR(
"Problem setting UID to process !!! ");
198 SPD_TRACE(
"PA Child: uid [{}] guid [{}]", getuid(), getgid());
201 SPD_TRACE(
"PA Child: Running command [{}]", pCommand);
203 unsigned int iCount = 0;
204 char ** ppCommand =
nullptr;
205 char * tmp = std::strtok(pCommand,
" ");
207 SPD_TRACE(
"PA Child: Tokenizing");
210 ppCommand =
static_cast<char **
>(realloc(ppCommand, (iCount + 2) *
sizeof(
char **)));
211 ppCommand[iCount++] = strdup(tmp);
212 tmp = std::strtok(
nullptr,
" ");
214 ppCommand[iCount] =
nullptr;
219 SPD_TRACE(
"PA Child: Configuring pipes");
224 dup2(pipefd[1], STDOUT_FILENO);
225 dup2(pipefd[1], STDERR_FILENO);
228 if (execvpe(ppCommand[0], ppCommand, envp) == -1) {
238 SPD_TRACE(
"PA Parent: Sending PID [{}] to parent", pid);
240 zmsg_t * pTx = zmsg_new();
241 zmsg_addstr(pTx,
"$PID");
242 zmsg_addstrf(pTx,
"%d", pid);
243 zmsg_addstr(pTx, pUpsream);
244 zmsg_addstr(pTx, pClient);
245 zsock_send(pPipe,
"m", pTx);
255 SPD_TRACE(
"PA Parent: Running command...");
258 waitpid(pid, &stat, WUNTRACED);
259 if (WIFEXITED(stat) || WIFSIGNALED(stat)) {
260 zstr_sendf(pWatcherActor,
"$EXIT");
265 zactor_destroy(&pWatcherActor);
268 int rc = WEXITSTATUS(stat);
270 if (stat == 9) rc = 137;
272 SPD_TRACE(
"PA Parent: Exit [{}] rc [{}]", stat, rc);
275 zmsg_t * pTx = zmsg_new();
276 zmsg_addstr(pTx,
"$EXIT");
277 zmsg_addstrf(pTx,
"%d", rc);
278 zmsg_addstr(pTx, pWorker);
279 zmsg_addstr(pTx, pUpsream);
280 zmsg_addstr(pTx, pClient);
281 zsock_send(pPipe,
"m", pTx);
284 log.
spd()->info(
"Process exited with status [{}]", stat);
288 SPD_ERROR(
"PA Parent: fork() failure!");
290 zmsg_t * pTx = zmsg_new();
291 zmsg_addstr(pTx,
"$FORKFAIL");
292 zsock_send(pPipe,
"m", pTx);
307 SPD_TRACE(
"PA: Terminating persistent actor");
313 zsock_signal(pPipe, 0);
315 Log & commandLogger = *(
static_cast<Log *
>(pLogger));
322 int fd = commandLogger.
fd();
323 const int LIMIT = PIPE_BUF;
324 char buffer[LIMIT + 1];
325 std::memset(buffer, 0, LIMIT + 1);
327 zpoller_t * pPoller = zpoller_new(
nullptr);
328 zpoller_add(pPoller, pPipe);
329 zpoller_add(pPoller, &fd);
333 void * pRecvSock = zpoller_wait(pPoller, -1);
334 if (pRecvSock == pPipe) {
335 char * pMsg = zstr_recv(pPipe);
336 std::string recvMsg = pMsg;
338 if (recvMsg ==
"$EXIT") {
342 else if (pRecvSock == &fd) {
343 ssize_t readRet = read(fd, buffer, LIMIT);
345 if (buffer[0] !=
'\0') {
346 commandLogger.
write(buffer);
347 memset(buffer, 0,
sizeof(buffer));
353 zpoller_remove(pPoller, pPipe);
354 zpoller_remove(pPoller, &fd);
355 zpoller_destroy(&pPoller);
365 SPD_TRACE(
"ActorZmq::pipe()<-");
366 mpPipe =
static_cast<zsock_t *
>(pPipe);
375 SPD_TRACE(
"ActorZmq::pipe()->");
384 SPD_TRACE(
"ActorZmq::init()<-");
389 SPD_TRACE(
"ActorZmq::init()->");
399 SPD_TRACE(
"ActorZmq::exec()<-");
406 SPD_WARN(
"ActorZmq::exec() : Other socket from ActorZmq class");
411 SPD_TRACE(
"ActorZmq::exec()->");
421 SPD_TRACE(
"ActorZmq::finish()<-");
422 SPD_TRACE(
"ActorZmq::finish()->");
432 SPD_ERROR(
"Poller is nullptr!");
437 SPD_TRACE(
"ActorZmq::exec(): pEvent [{}] mpPipe [{}]", static_cast<void *>(pEvent), static_cast<void *>(
mpPipe));
440 zmsg_t * pMsg = zmsg_recv(
mpPipe);
445 char * pCommand = zmsg_popstr(pMsg);
447 if (streq(pCommand,
"$TERM")) {
448 SPD_TRACE(
"ActorZmq::exec(): received $TERM");
452 SPD_ERROR(
"ActorZmq::exec(): invalid message to actor msg: [{}]", pCommand);
455 zstr_free(&pCommand);
459 SPD_TRACE(
"ActorZmq::exec(): Poller expired timeout [{}]...",
mTimeout);
462 SPD_TRACE(
"ActorZmq::exec(): Poller terminated ...");
PollerZmq * mpPoller
Internal poller.
std::shared_ptr< spdlog::logger > spd()
Get SPDLOG logger handle.
virtual int init()
First function.
virtual void pipe(void *pipe)
Setter for pipe.
static void signalHandler(int signalNumber)
Setter salsa interruption.
virtual int finish()
Last function.
void fd(int newFD)
Set FD of pipe to watch.
bool mTerminated
Flag if actor should be terminated.
static void actorProcwaitSupport_(zsock_t *pipe, void *argv)
Support actor method (used for PID waiting)
int write(char const *)
Write to logger.
ZeroMQ implementation of salsa actor class.
zsock_t * mpPipe
Zmq pipe socket.
PollerZmq * pollerZmq() const
zpoller_t * poller() const
virtual void add(SocketZyre *pSocket)
static void SalsaActorFn(zsock_t *pPipe, void *pArgv)
zpoller_t * poller() const
Returns Poller.
virtual int exec()
Main function.
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
int create()
Create SPDLOG loger.
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
int mTimeout
Poller timeout.
bool terminated() const
Flag if actor should be terminated.
virtual void * wait(int timeout=-1)
Waiting for socket.
int add(std::string)
Add output sink (file, console, zmq) for SPDLOG.