salsa 0.7.1
Loading...
Searching...
No Matches
NodeManagerZyre.cc
1#include "NodeManagerZyre.hh"
2#include "MessageZyre.hh"
3#include "PublisherZmq.hh"
4#include "SocketZyre.hh"
5#include "TaskExecutorFake.hh"
6#include "TaskExecutorForkZmq.hh"
7namespace Salsa {
20
21Socket * NodeManagerZyre::onEnter(std::string self, std::string fromType, Message * pMsg,
22 std::vector<std::string> & values)
23{
27
28 Socket * pSocket = NodeManager::onEnter(self, fromType, pMsg, values);
29
30 if (pSocket) {
31 sendWhisper(pSocket, pMsg->uuid(), values);
32 }
33
34 return pSocket;
35}
36
37Socket * NodeManagerZyre::onExit(std::string self, Message * pMsg, std::vector<std::string> & values)
38{
42
43 Socket * pSocket = NodeManager::onExit(self, pMsg, values);
44
45 if (pSocket) {
46 sendWhisper(pSocket, pMsg->uuid(), values);
47 }
48 return pSocket;
49}
50
51Socket * NodeManagerZyre::onWhisper(std::string self, Message * pMsg, std::vector<std::string> & values)
52{
56
57 Socket * pSocket = NodeManager::onWhisper(self, pMsg, values);
58
59 if (pSocket) {
60 sendWhisper(pSocket, pMsg->uuid(), values);
61 }
62 return pSocket;
63}
64
66{
70
71 if (mpTaskPool == nullptr) mpTaskPool = new TaskPool(this);
72
73 if (getenv("SALSA_FAKE")) {
74 SPD_DEBUG("Fake jobs");
76 TaskState * pState = new TaskState(pExec);
77 pExec->taskState(pState);
78 mpTaskPool->add(pState->executor()->pipe(), pState);
79 }
80 else {
81 zactor_t * pActor = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
82 TaskExecutor * pExec = new TaskExecutorForkZmq(pActor);
83 TaskState * pState = new TaskState(pExec);
84 pExec->taskState(pState);
85 mpNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(pState->executor()->pipe()));
86 mpTaskPool->add(pState->executor()->pipe(), pState);
87 }
88}
89
90void NodeManagerZyre::runTask(TaskState * pTaskState, std::string wk, std::string upstream)
91{
95
96 SPD_TRACE("Task [{}:{}] wk [{}] upstream [{}]", pTaskState->task()->jobid(), pTaskState->task()->taskid(), wk,
97 upstream);
98 pTaskState->executor()->run(wk, upstream);
99}
100
102{
106 SPD_TRACE("Sending results to external taskid [{}:{}] rc[{}]", task->jobid(), task->taskid(), task->returncode());
107
108 zmsg_t * msg_out = zmsg_new();
109 zmsg_add(msg_out, zframe_dup(static_cast<zframe_t *>(job->submitterSocketID())));
110 // zmsg_add(msg_out, static_cast<zframe_t *>(job->submitterSocketID()));
111 zmsg_addstr(msg_out, "");
112 zmsg_addstr(msg_out, "TASK_RESULT");
113 zmsg_addstr(msg_out, task->jobid().data());
114 zmsg_addstr(msg_out, fmt::format("{}", task->taskid()).data());
115 zmsg_addstr(msg_out, fmt::format("{}", task->returncode()).data());
116 zmsg_send(&msg_out, mpNodeZyre->socketExternal(job->submitterSocketIndex()));
117 zmsg_destroy(&msg_out);
118}
119
121{
125
126 if (mpTaskPool == nullptr) return false;
127
128 return mpTaskPool->handlePipe(pPipe);
129}
130
131bool NodeManagerZyre::sendWhisper(Socket * pSocket, std::string to, std::vector<std::string> & vec)
132{
136
137 if (vec.size()) {
138 SocketZyre * pSocketZyre = static_cast<SocketZyre *>(pSocket);
139 zmsg_t * pMsg = nullptr;
140
141 if (vec[0] == "&") vec.erase(vec.begin());
142 for (auto str : vec) {
143 if (pMsg == nullptr) {
144 pMsg = zmsg_new();
145 }
146 if (str == "&") {
147 zyre_whisper(pSocketZyre->zyre(), to.c_str(), &pMsg);
148 zmsg_destroy(&pMsg);
149 pMsg = nullptr;
150 }
151 else {
152 zmsg_addstr(pMsg, str.c_str());
153 }
154 }
155 zyre_whisper(pSocketZyre->zyre(), to.c_str(), &pMsg);
156 zmsg_destroy(&pMsg);
157 return true;
158 }
159 return false;
160}
161
162} // namespace Salsa
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
Definition ActorZmq.cc:57
Job class.
Definition Job.hh:16
Base Message class.
Definition Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
NodeZyre * mpNodeZyre
Current zyre node.
virtual void runTask(TaskState *pTaskState, std::string wk, std::string upstream)
Run task interface.
virtual bool handleTaskPool(void *pPool)
virtual void resultTaskToExternal(Job *job, TaskInfo *task)
Handle return of task and send it to external client.
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual bool sendWhisper(Socket *pSocket, std::string to, std::vector< std::string > &vect)
NodeManagerZyre(NodeZyre *pNodeZyre)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Job * job(std::string uuid)
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
TaskPool * mpTaskPool
Task pool.
salsa node class
Definition NodeZyre.hh:20
Salsa zyre socket class.
Definition SocketZyre.hh:18
virtual zyre_t * zyre() const
Returns zyre pointer.
Definition SocketZyre.hh:42
Base Socket class.
Definition Socket.hh:15
TaskExecutorFake class.
TaskExecutorForkZmq class.
Base TaskExecutor class.
virtual void * pipe() const
virtual bool run(std::string, std::string)=0
Run task.
void taskState(TaskState *pTS)
Base salsa TaskPool class.
Definition TaskPool.hh:18
Base salsa TaskState class.
Definition TaskState.hh:16
TaskExecutor * executor()
Definition TaskState.cc:83
TaskInfo * task() const
Definition TaskState.cc:66