salsa 0.7.1
Loading...
Searching...
No Matches
Feeder.cc
1#include "Feeder.hh"
2#include "Job.hh"
3#include "NodeManager.hh"
4
5using namespace fmt::literals;
6
7namespace Salsa {
8Feeder::Feeder(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * pNM) : Distributor(uuid, pipe, pNM)
9{
13 mpNodeInfo->set_uuid(mUUID);
14}
15
22
23void Feeder::onEnter(Message * /*pInMsg*/, std::vector<std::string> & out, std::string type)
24{
28
29 if (type == "WORKER" && mpNodeManager->hasJobs()) {
30 out.push_back("SUB");
31 }
32}
33
34void Feeder::onExit(Message * pInMsg, std::vector<std::string> & /*out*/)
35{
39
40 SPD_TRACE("::onExit inMSG [{}]", pInMsg->uuid());
41
42 uint32_t slots = 0;
43 uint32_t iPos = 0;
44 bool found = false;
45 for (auto & node : mpNodeInfo->hosts()) {
46 if (pInMsg->uuid() == node.uuid()) {
47 found = true;
48 }
49 else {
50 slots += node.slots();
51 if (!found) {
52 iPos++;
53 }
54 }
55 }
56
57 // TODO: Continue if worker (Better check is needed)
58 if (found) { // if node is found in local reg
59 mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
60 mpNodeInfo->set_slots(slots);
61
62 for (auto pTask : mWorkerTasks[pInMsg->uuid()]) {
63 SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->uuid(), pTask->jobid(),
64 pTask->taskid());
65 Job * pJob = mpNodeManager->job(pTask->jobid());
66 if (pJob) {
67 if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68 pJob->moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
69 }
70 else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71 pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
72 }
73
74 if (!getenv("SALSA_FAST")) {
75 if (!pJob->consumer().empty()) {
76 std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(pJob->consumer());
77 std::vector<std::string> outData;
78 outData.push_back("JOBRESUBMITED");
79 std::string payload;
80 pTask->SerializeToString(&payload);
81 outData.push_back(payload);
82 mpNodeManager->sendWhisper(pConsumer->pipe().get(), pJob->feeder(), outData);
83 }
84 }
85 }
86 }
87
88 mWorkerTasks[pInMsg->uuid()].clear();
89 mWorkerTasks.erase(pInMsg->uuid());
90 mClients.erase(pInMsg->uuid());
91
93 subscribe(pInMsg->uuid());
94 }
97 SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
98 }
99}
100void Feeder::onWhisper(Message * pInMsg, std::vector<std::string> & out)
101{
105
106 std::vector<std::string> inContent = pInMsg->content();
107
108 SPD_TRACE("::onWhisper inMSG [{}]", inContent[0]);
109 if (inContent[0] == "FREESLOT") { // from consumer
110 // Signal arrives from consumer. Indicates that consumer has free exec slot.
111 SPD_TRACE("Searching for task in one of jobs");
112 TaskInfo * pTask = mpNodeManager->getNextTask();
113 // If there's task available, send it out
114 if (pTask == nullptr) {
115 SPD_TRACE("Sending back NOMORETASKS");
116 out.push_back("NOMORETASKS");
117 out.push_back(inContent[1]);
118 }
119 else {
120 out.push_back("TASK");
121 std::string payload;
122 pTask->SerializeToString(&payload);
123 out.push_back(payload);
124 out.push_back(inContent[1]);
125 mWorkerTasks[pInMsg->uuid()].push_back(pTask);
126 SPD_TRACE("mWorkerTasks[{}] vector size [{}]", pInMsg->uuid(), mWorkerTasks[pInMsg->uuid()].size());
127 }
128 }
129 else if (inContent[0] == "TASK_IS_RUNNING") { // from consumer
130 SPD_TRACE("TASK_IS_RUNNING");
131
132 std::string payload = inContent[1];
133 TaskInfo * pTask = new TaskInfo();
134 {
135 if (!pTask->ParseFromString(payload)) {
136 SPD_ERROR("Message does not contain ProtoBuf message!");
137 for (auto s : inContent) {
138 SPD_ERROR("::onWhisper inMSG [{}]", s);
139 }
140 return;
141 }
142 }
143
144 Job * job = mpNodeManager->job(pTask->jobid());
145 if (job) {
146 if (job->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
147 job->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
148 }
149
150 if (!job->consumer().empty()) {
151 std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(job->consumer());
152 std::vector<std::string> output;
153 output.push_back(inContent[0]);
154 output.push_back(inContent[1]);
155 mpNodeManager->sendWhisper(pConsumer->pipe().get(), job->feeder(), output);
156 }
157 else {
158 delete pTask;
159 }
160 }
162 }
163 else if (inContent[0] == "TASK_RESULT") { // from consumer
164 std::string payload = inContent[1];
165 TaskInfo * pTask = new TaskInfo();
166 {
167 if (!pTask->ParseFromString(payload)) {
168 SPD_ERROR("Message does not contain ProtoBuf message!");
169 for (auto line : inContent) {
170 SPD_ERROR("::onWhisper inMSG [{}]", line);
171 }
172 return;
173 }
174 }
175
176 removeWorkerTask(pTask, pInMsg->uuid());
177
178 // task is deleted in next line
180 // TODO possible memleak? @mvala
181 }
182 else if (inContent[0] == "NODEINFO") { // from consumer
183 std::string payload = inContent[1];
184 Salsa::NodeInfo * pNI = mpNodeInfo->add_hosts();
185 if (!pNI->ParseFromString(payload)) {
186 SPD_ERROR("[NodeInfo] Message does not contain ProtoBuf message!");
187 }
188 uint32_t slots = 0;
189
190 for (auto nodeInfo : mpNodeInfo->hosts()) {
191 slots += nodeInfo.slots();
192 }
193 mpNodeInfo->set_slots(slots);
195
196 SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
197 }
198}
199
200void Feeder::subscribe(std::string uuid)
201{
205
206 SPD_INFO("Client [{}] started", uuid);
207 SPD_TRACE("Feeders -> [{}]", mClients.size());
208 for (auto client : mClients) {
209 std::vector<std::string> out;
210 out.push_back("SUB");
211 mpNodeManager->sendWhisper(pipe().get(), client.first, out);
212 }
213}
214
215void Feeder::removeWorkerTask(TaskInfo * pTaskInfo)
216{
220
221 for (auto wkTask : mWorkerTasks) {
222 removeWorkerTask(pTaskInfo, wkTask.first);
223 }
224}
225
226void Feeder::removeWorkerTask(TaskInfo * pTaskInfo, std::string uuid)
227{
231 SPD_TRACE("mWorkerTasks[{}].size() [{}]", uuid, mWorkerTasks[uuid].size());
232 if (mWorkerTasks[uuid].size() == 0) return;
233
234 int iPos = 0;
235 for (auto pTask : mWorkerTasks[uuid]) {
236 if (pTask->taskid() == pTaskInfo->taskid()) {
237 mWorkerTasks[uuid].erase(mWorkerTasks[uuid].begin() + iPos);
238 return;
239 }
240 iPos++;
241 }
242}
243
244void Feeder::terminateJob(std::string uuid)
245{
249
250 auto job = mpNodeManager->job(uuid);
251
252 std::vector<Salsa::TaskInfo *> tasks;
253 job->tasks(tasks, Job::pending, false);
254 job->tasks(tasks, Job::assigned, false);
255 job->tasks(tasks, Job::running, false);
256 for (auto pTask : tasks) {
257 SPD_TRACE("removeWorkerTask [{}]", pTask->taskid());
258 removeWorkerTask(pTask);
259 }
260
261 for (auto client : mClients) {
262 std::vector<std::string> out;
263 out.push_back("TERMINATEJOB");
264 out.push_back(uuid);
265 mpNodeManager->sendWhisper(pipe().get(), client.first, out);
266 }
267 SPD_INFO("JOB [{}] has finished", uuid);
268}
269
270} // namespace Salsa
Base Distributor class.
std::map< std::string, std::string > mClients
List of clients.
NodeInfo * nodeInfo() const
virtual void upadateJsonValueNodeInfo()
std::string mUUID
Self UUID.
NodeManager * mpNodeManager
Node Manager.
std::shared_ptr< Socket > pipe() const
TODO Returns distributor's pipe?
NodeInfo * mpNodeInfo
Node Info.
std::string uuid() const
Returns distributor's UUID.
virtual ~Feeder()
Definition Feeder.cc:16
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
Definition Feeder.cc:23
void terminateJob(std::string uuid)
Definition Feeder.cc:244
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
Definition Feeder.cc:34
void removeWorkerTask(TaskInfo *pTI)
Definition Feeder.cc:215
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
Definition Feeder.cc:100
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
Definition Feeder.hh:33
void subscribe(std::string uuid)
Definition Feeder.cc:200
Feeder(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
Definition Feeder.cc:8
Job class.
Definition Job.hh:16
void consumer(std::string uuid)
Definition Job.cc:238
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition Job.cc:130
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition Job.cc:56
void feeder(std::string uuid)
Definition Job.cc:255
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition Job.cc:145
Base Message class.
Definition Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
NodeManager class.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Job * job(std::string uuid)
virtual bool haveMoreTasks()
std::shared_ptr< Consumer > consumer(std::string uuid) const
virtual void resultTask(TaskInfo *task)
bool hasJobs() const
TaskInfo * getNextTask()
void print(std::string opt="") const