salsa  0.4.15
NodeManagerZyre.cc
1 #include "NodeManagerZyre.hh"
2 #include "MessageZyre.hh"
3 #include "PublisherZmq.hh"
4 #include "SocketZyre.hh"
5 #include "TaskExecutorFake.hh"
6 #include "TaskExecutorForkZmq.hh"
7 namespace Salsa {
9 {
13 }
15 {
19 }
20 
21 Socket * NodeManagerZyre::onEnter(std::string self, std::string fromType, Message * pMsg,
22  std::vector<std::string> & values)
23 {
27 
28  Socket * pSocket = NodeManager::onEnter(self, fromType, pMsg, values);
29 
30  if (pSocket) {
31  sendWhisper(pSocket, pMsg->uuid(), values);
32  }
33 
34  return pSocket;
35 }
36 
37 Socket * NodeManagerZyre::onExit(std::string self, Message * pMsg, std::vector<std::string> & values)
38 {
42 
43  Socket * pSocket = NodeManager::onExit(self, pMsg, values);
44 
45  if (pSocket) {
46  sendWhisper(pSocket, pMsg->uuid(), values);
47  }
48  return pSocket;
49 }
50 
51 Socket * NodeManagerZyre::onWhisper(std::string self, Message * pMsg, std::vector<std::string> & values)
52 {
56 
57  Socket * pSocket = NodeManager::onWhisper(self, pMsg, values);
58 
59  if (pSocket) {
60  sendWhisper(pSocket, pMsg->uuid(), values);
61  }
62  return pSocket;
63 }
64 
66 {
70 
71  if (mpTaskPool == nullptr) mpTaskPool = new TaskPool(this);
72 
73  if (getenv("SALSA_FAKE")) {
74  SPD_DEBUG("Fake jobs");
76  TaskState * pState = new TaskState(pExec);
77  pExec->taskState(pState);
78  mpTaskPool->add(pState->executor()->pipe(), pState);
79  }
80  else {
81  zactor_t * pActor = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
82  TaskExecutor * pExec = new TaskExecutorForkZmq(pActor);
83  TaskState * pState = new TaskState(pExec);
84  pExec->taskState(pState);
85  mpNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(pState->executor()->pipe()));
86  mpTaskPool->add(pState->executor()->pipe(), pState);
87  }
88 }
89 
90 void NodeManagerZyre::runTask(TaskState * pTaskState, std::string wk, std::string upstream)
91 {
95 
96  SPD_TRACE("Task [{}:{}] wk [{}] upstream [{}]", pTaskState->task()->jobid(), pTaskState->task()->taskid(), wk,
97  upstream);
98  pTaskState->executor()->run(wk, upstream);
99 }
100 
101 void NodeManagerZyre::resultTaskToExternal(Job * job, TaskInfo * task)
102 {
106  SPD_TRACE("Sending results to external taskid [{}:{}] rc[{}]", task->jobid(), task->taskid(), task->returncode());
107 
108  zmsg_t * msg_out = zmsg_new();
109  zmsg_add(msg_out, zframe_dup(static_cast<zframe_t *>(job->submitterSocketID())));
110  // zmsg_add(msg_out, static_cast<zframe_t *>(job->submitterSocketID()));
111  zmsg_addstr(msg_out, "");
112  zmsg_addstr(msg_out, "TASK_RESULT");
113  zmsg_addstr(msg_out, task->jobid().data());
114  zmsg_addstr(msg_out, fmt::format("{}", task->taskid()).data());
115  zmsg_addstr(msg_out, fmt::format("{}", task->returncode()).data());
116  zmsg_send(&msg_out, mpNodeZyre->socketExternal(job->submitterSocketIndex()));
117  zmsg_destroy(&msg_out);
118 }
119 
121 {
125 
126  if (mpTaskPool == nullptr) return false;
127 
128  return mpTaskPool->handlePipe(pPipe);
129 }
130 
131 bool NodeManagerZyre::sendWhisper(Socket * pSocket, std::string to, std::vector<std::string> & vec)
132 {
136 
137  if (vec.size()) {
138  SocketZyre * pSocketZyre = static_cast<SocketZyre *>(pSocket);
139  zmsg_t * pMsg = nullptr;
140 
141  if (vec[0] == "&") vec.erase(vec.begin());
142  for (auto str : vec) {
143  if (pMsg == nullptr) {
144  pMsg = zmsg_new();
145  }
146  if (str == "&") {
147  zyre_whisper(pSocketZyre->zyre(), to.c_str(), &pMsg);
148  zmsg_destroy(&pMsg);
149  pMsg = nullptr;
150  }
151  else {
152  zmsg_addstr(pMsg, str.c_str());
153  }
154  }
155  zyre_whisper(pSocketZyre->zyre(), to.c_str(), &pMsg);
156  zmsg_destroy(&pMsg);
157  return true;
158  }
159  return false;
160 }
161 
162 } // namespace Salsa
PollerZmq * pollerZmq() const
Definition: ActorZmq.cc:480
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
Definition: ActorZmq.cc:57
Job class.
Definition: Job.hh:16
void * submitterSocketID() const
Returns submitter socket identity.
Definition: Job.hh:78
int submitterSocketIndex() const
Returns submitter socket index.
Definition: Job.hh:73
Base Message class.
Definition: Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
NodeZyre * mpNodeZyre
Current zyre node.
virtual void runTask(TaskState *pTaskState, std::string wk, std::string upstream)
Run task interface.
virtual void addTaskSlot()
virtual bool handleTaskPool(void *pPool)
virtual void resultTaskToExternal(Job *job, TaskInfo *task)
Handle return of task and send it to external client.
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual bool sendWhisper(Socket *pSocket, std::string to, std::vector< std::string > &vect)
NodeManagerZyre(NodeZyre *pNodeZyre)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
NodeManager class.
Definition: NodeManager.hh:22
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:185
Job * job(std::string uuid)
Definition: NodeManager.cc:480
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:147
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:82
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:95
salsa node class
Definition: NodeZyre.hh:20
zsock_t * socketExternal(int i)
Rerturns external socket.
Definition: NodeZyre.hh:36
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
Salsa zyre socket class.
Definition: SocketZyre.hh:18
virtual zyre_t * zyre() const
Returns zyre pointer.
Definition: SocketZyre.hh:42
Base Socket class.
Definition: Socket.hh:15
TaskExecutorFake class.
TaskExecutorForkZmq class.
Base TaskExecutor class.
Definition: TaskExecutor.hh:14
virtual void * pipe() const
Definition: TaskExecutor.cc:15
virtual bool run(std::string, std::string)=0
Run task.
void taskState(TaskState *pTS)
Definition: TaskExecutor.cc:22
Base salsa TaskPool class.
Definition: TaskPool.hh:18
void add(void *p, TaskState *t)
Definition: TaskPool.cc:24
bool handlePipe(void *pPipe)
Definition: TaskPool.cc:142
Base salsa TaskState class.
Definition: TaskState.hh:16
TaskExecutor * executor()
Definition: TaskState.cc:83
TaskInfo * task() const
Definition: TaskState.cc:66