salsa 0.7.1
Loading...
Searching...
No Matches
NodeManager.cc
1#include <json/json.h>
2#include "NodeManager.hh"
3namespace Salsa {
5 : Object()
6 , mFinishedJobTimeout(24 * 3600)
7{
11 srand(static_cast<uint32_t>(time(nullptr))); // seed with time since epoch
12}
13
15{
19
20 for (auto job : mJobs) {
21 if (mpTaskPool) {
22 mpTaskPool->terminateJob(job.second);
23 }
24 delete job.second;
25 }
26 // terminateJobAll();
27 mJobs.clear();
28 delete mpTaskPool;
29 delete mpPublisher;
30}
31
32void NodeManager::print(std::string /*opt*/) const
33{
37
38 SPD_TRACE("mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ", mFeeders.size(), mConsumers.size(),
39 mWorkers.size(), mJobs.size());
40
41 if (mJobs.size() > 0) {
42 SPD_DEBUG("= JOBS =======================");
43 for (auto j : mJobs) {
44 j.second->print();
45 }
46 SPD_DEBUG("==============================");
47 }
48 else {
49 SPD_DEBUG("= NO JOBS ====================");
50 }
51
52 if (mpTaskPool) {
53 mpTaskPool->print();
54 }
55}
56
57void NodeManager::addConsumer(std::string uuid, std::shared_ptr<Socket> pSocket)
58{
62
63 mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket, this));
64}
65
66void NodeManager::addFeeder(std::string uuid, std::shared_ptr<Socket> pSocket)
67{
71
72 mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket, this));
73}
74
75void NodeManager::addWorker(std::string uuid, std::shared_ptr<Socket> pSocket)
76{
80
81 mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket, this));
82}
83
84Socket * NodeManager::onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out)
85{
89
90 SPD_TRACE("NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]", self, msg->uuid(), fromType,
91 static_cast<void *>(msg));
92
93 // TODO Implement map<std::string /* self */, Node::ENodeType?>
94 auto pFeeder = feeder(self);
95 if (pFeeder) {
96 SPD_DEBUG("::onEnter FEEDER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
97 SPD_INFO("FEEDER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
98 if (fromType == "CONSUMER") {
99 pFeeder->addClient(msg->uuid(), fromType);
100 pFeeder->onEnter(msg, out, fromType);
101 }
102 else if (fromType == "WORKER") {
103 pFeeder->addClient(msg->uuid(), fromType);
104 pFeeder->onEnter(msg, out, fromType);
105 }
106 else if (fromType == "DISCOVERY") {
107 // We fully ignoring it
108 SPD_DEBUG("DISCOVERY is here");
109 }
110 else {
111 pFeeder->addOther(msg->uuid(), fromType);
112 }
113
114 return pFeeder->pipe().get();
115 }
116
117 auto pConsumer = consumer(self);
118 if (pConsumer) {
119 SPD_DEBUG("::onEnter CONSUMER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
120 SPD_INFO("CONSUMER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
121 if (fromType == "FEEDER") {
122 pConsumer->addClient(msg->uuid(), fromType);
123 pConsumer->onEnter(msg, out, fromType);
124 }
125 else {
126 pConsumer->addOther(msg->uuid(), fromType);
127 }
128
129 return pConsumer->pipe().get();
130 }
131
132 auto pWorker = worker(self);
133 if (pWorker) {
134 SPD_DEBUG("::onEnter WORKER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
135 SPD_INFO("WORKER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
136 if (fromType == "FEEDER") {
137 pWorker->addClient(msg->uuid(), fromType);
138 pWorker->onEnter(msg, out, fromType);
139 }
140 else {
141 pWorker->addOther(msg->uuid(), fromType);
142 }
143
144 return pWorker->pipe().get();
145 }
146
147 return nullptr;
148}
149Socket * NodeManager::onExit(std::string self, Message * msg, std::vector<std::string> & out)
150{
154
155 SPD_TRACE("NodeManager::onExit self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
156
157 auto pWorker = worker(self);
158 if (pWorker) {
159 SPD_DEBUG("::onExit WORKER [{}] client on network [{}] has left", self, msg->uuid());
160 SPD_INFO("WORKER [{}] => [{}]", self, msg->uuid());
161 pWorker->onExit(msg, out);
162 pWorker->removeClient(msg->uuid());
163 return pWorker->pipe().get();
164 }
165
166 auto pFeeder = feeder(self);
167 if (pFeeder) {
168 SPD_DEBUG("::onExit FEEDER [{}] client on network [{}] has left", self, msg->uuid());
169 SPD_INFO("FEEDER [{}] => [{}]", self, msg->uuid());
170 pFeeder->onExit(msg, out);
171 pFeeder->removeClient(msg->uuid());
172 return pFeeder->pipe().get();
173 }
174
175 auto pConsumer = consumer(self);
176 if (pConsumer) {
177 SPD_DEBUG("::onExit CONSUMER [{}] client on network [{}] has left", self, msg->uuid());
178 SPD_INFO("CONSUMER [{}] => [{}]", self, msg->uuid());
179 pConsumer->onExit(msg, out);
180 pConsumer->removeClient(msg->uuid());
181 return pConsumer->pipe().get();
182 }
183
184 return nullptr;
185}
186
187Socket * NodeManager::onWhisper(std::string self, Message * msg, std::vector<std::string> & out)
188{
192
193 SPD_TRACE("NodeManager::onWhisper self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
194
195 auto pFeeder = feeder(self);
196 if (pFeeder) {
197 SPD_TRACE("::onWhisper() FEEDER [{}] from [{}] has msg", self, msg->uuid());
198 pFeeder->onWhisper(msg, out);
199 return pFeeder->pipe().get();
200 }
201
202 auto pConsumer = consumer(self);
203 if (pConsumer) {
204 SPD_TRACE("::onWhisper() CONSUMER [{}] from [{}] has msg", self, msg->uuid());
205 pConsumer->onWhisper(msg, out);
206 return pConsumer->pipe().get();
207 }
208
209 auto pWorker = worker(self);
210 if (pWorker) {
211 SPD_TRACE("::onWhisper() WORKER [{}] from [{}] has msg", self, msg->uuid());
212 pWorker->onWhisper(msg, out);
213 return pWorker->pipe().get();
214 }
215
216 return nullptr;
217}
218
219bool NodeManager::sendWhisper(Socket * /*s*/, std::string /*to*/, std::vector<std::string> & /*v*/)
220{
224
225 return true;
226}
227
228void NodeManager::addTask(TaskInfo * pTaskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType type)
229{
233
234 Salsa::Job * pJob = nullptr;
235 auto search = mJobs.find(pTaskInfo->jobid());
236 if (search != mJobs.end()) {
237 // if pJob was found
238 pJob = search->second;
239 }
240 else {
241 // if pJob was not found
242 pJob = new Salsa::Job(pTaskInfo->jobid());
243 pJob->consumer(cuuid);
244 pJob->feeder(fuuid);
245 mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
246 mActiveJobs.push_back(pTaskInfo->jobid());
247 // TODO : now we need to tell all feeders that theat they should subscribe to workers
248 SPD_TRACE("Looping feeders");
249 for (auto feeder : mFeeders) {
251 SPD_TRACE("Subscribe to feeder [{}]", feeder.first);
252 feeder.second->subscribe(pTaskInfo->jobid());
253 }
254 }
255
256 SPD_TRACE("::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
257 pJob->addTask(pTaskInfo->taskid(), pTaskInfo, type);
258}
259
261{
265 TaskInfo * pTaskInfo = nullptr;
266
267 SPD_TRACE("mActiveJobs.size() [{}]", mActiveJobs.size());
268 while (mActiveJobs.size() > 0 && pTaskInfo == nullptr) {
269 size_t index = static_cast<size_t>(rand()) % mActiveJobs.size();
270 std::string jobstr = mActiveJobs[index];
271 auto iJob = mJobs.find(jobstr);
272 if (iJob != mJobs.end()) {
273 pTaskInfo = iJob->second->nextTask();
274 if (pTaskInfo) {
275 SPD_TRACE("getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
276 return pTaskInfo;
277 }
278 }
279
280 // removing jobstring from mActiveJobs
281 mActiveJobs.erase(std::remove(begin(mActiveJobs), end(mActiveJobs), jobstr), end(mActiveJobs));
282 }
283
284 SPD_TRACE("::getNextTask No pTaskInfo found");
285 return nullptr;
286}
287
288void NodeManager::resultTask(TaskInfo * pTask)
289{
293
294 Job * pJob = job(pTask->jobid());
295 if (pJob == nullptr) {
296 delete pTask;
297 return;
298 }
299
300 SPD_TRACE("TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
301 // search->second->moveTask(pTask->taskid(), pTask, Salsa::Job::running, Salsa::Job::done);
302
303 // If job has no consumer we end (assuming that it is SUBMITTER)
304 if (pJob->consumer().empty()) {
305 // TODO : Fix done and failed
306 auto sourceQueue = (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
307 if (pTask->returncode() == 0) {
308 pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
309 }
310 else {
311 pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
312 }
313
314 if (pJob->isFinished()) {
316 mFinishedJobs.push_back(pJob->uuid());
317 }
318
319 resultTaskToExternal(pJob, pTask);
320
321 print();
322 // TODO we need to think what to do with TaskInfo object in highest level
323 delete pTask;
324 return;
325 }
326
327 if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
328 SPD_WARN("Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
329 pJob->removeTask(pTask->taskid(), Salsa::Job::assigned);
330 }
331 else {
332 SPD_WARN("Task [{}] duplicate found in [running] queue!", pTask->taskid());
333 pJob->removeTask(pTask->taskid(), Salsa::Job::running);
334 }
335
336 std::shared_ptr<Consumer> pConsumer = consumer(pJob->consumer());
337 std::vector<std::string> out = {"TASK_RESULT"};
338 // out.push_back("TASK_RESULT");
339 std::string payload;
340 pTask->SerializeToString(&payload);
341 out.push_back(payload);
342 uint32_t slots = nSlots();
343
344 // delete pTask;
345
346 // TODO only for testing, REMOVE IT later
347 // - Well, what about #ifdef DEBUG ?
348 if (getenv("SALSA_FAKE")) slots *= 10;
349
350 if (pJob->size(Job::pending) < slots) {
351 if (pJob->haveMoreTasks()) {
352 SPD_TRACE("We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->haveMoreTasks());
353 out.push_back("&");
354 out.push_back("SENDTASKS");
355 out.push_back(fmt::format("{}", slots));
356 }
357 }
358
359 sendWhisper(pConsumer->pipe().get(), pJob->feeder(), out);
360}
361
362void NodeManager::terminateJob(std::string uuid)
363{
367
368 SPD_TRACE("Terminating job from client [{}]", uuid);
369
370 auto iJob = mJobs.find(uuid);
371 if (iJob != mJobs.end()) {
372 if (mpTaskPool) {
373 mpTaskPool->terminateJob(iJob->second);
374 }
375
376 for (auto f : mFeeders) {
377 f.second->terminateJob(uuid);
378 }
379
380 mFinishedJobs.erase(std::remove(begin(mFinishedJobs), end(mFinishedJobs), uuid), end(mFinishedJobs));
381
382 SPD_TRACE("Removing job [{}]", uuid);
383 delete iJob->second;
384 iJob->second = nullptr;
385 mJobs.erase(iJob);
386 }
387 SPD_TRACE("NodeManager::terminateJob print()");
388 print();
389}
390
392{
396
397 if (mFinishedJobs.size() == 0) return false;
398
399 SPD_DEBUG("Checking finished jobs [{}] to be removed ...", mFinishedJobs.size());
400
401 std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
402 uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
403 std::vector<std::string> cleanUUID;
404 for (auto js : mFinishedJobs) {
405 auto j = job(js);
406 if (j == nullptr) continue;
407 if (curTimeEpoch - j->timeFinished() > mFinishedJobTimeout) {
408 SPD_DEBUG("Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
410 cleanUUID.push_back(js);
411 }
412 }
413 if (!cleanUUID.size()) return false;
414
415 for (auto u : cleanUUID) {
416 terminateJob(u);
417 }
418
419 return true;
420}
421
422void NodeManager::terminateAllJobs(bool finishedonly)
423{
427 if (mJobs.size() == 0) return;
428
429 std::vector<std::string> cleanUUID;
430 if (finishedonly)
431 for (auto job : mFinishedJobs) cleanUUID.push_back(job);
432
433 else {
434 for (auto job : mJobs) cleanUUID.push_back(job.first);
435 }
436
437 for (auto u : cleanUUID) {
438 SPD_DEBUG("Terminating [{}]", u);
439 terminateJob(u);
440 }
441}
442
443std::shared_ptr<Feeder> NodeManager::feeder(std::string uuid) const
444{
449 auto search = mFeeders.find(uuid);
450 if (search != mFeeders.end()) {
451 return search->second;
452 }
453 else {
454 return nullptr;
455 }
456}
457std::shared_ptr<Consumer> NodeManager::consumer(std::string uuid) const
458{
463 auto search = mConsumers.find(uuid);
464 if (search != mConsumers.end()) {
465 return search->second;
466 }
467 return nullptr;
468}
469std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
470{
475 auto search = mWorkers.find(uuid);
476 if (search != mWorkers.end()) {
477 return search->second;
478 }
479 return nullptr;
480}
481
482Job * NodeManager::job(std::string uuid)
483{
488 auto search = mJobs.find(uuid);
489 if (search != mJobs.end()) {
490 return search->second;
491 }
492 return nullptr;
493}
494
496{
500 if (mpTaskPool == nullptr) {
501 mpTaskPool = new TaskPool(this);
502 }
503}
504
506{
510 return false;
511}
512
514{
518 return mpTaskPool;
519}
520
522{
526 return mActiveJobs.size() > 0;
527}
528
529void NodeManager::jobs(std::string client_uuid, std::vector<std::string> & jobs) const
530{
534
535 for (auto search : mJobs) {
536 if (search.second != nullptr && search.second->feeder() == client_uuid) {
537 jobs.push_back(search.first);
538 }
539 }
540}
541int32_t NodeManager::nSlots(double mult) const
542{
546
547 int32_t num = 0;
548 for (auto feeder : mFeeders) {
549 num += feeder.second->nodeInfo()->slots();
550 }
551 return num * mult;
552}
553
554void NodeManager::noMoreTasks(std::string /*jobUUID*/)
555{
559
560 (void)(0);
561 // auto search = mJobs.find(jobUUID);
562 // if (search != mJobs.end()) {
563 // return search->second->haveMoreTasks();
564 //}
565}
566
568{
572
573 bool rc = false;
574 for (auto job : mJobs) {
575 if (job.second->Job::size(Job::pending)) {
576 // job.second->haveMoreTasks(true);
577 bool isActiveJob = false;
578 for (auto pActiveJobUUID : mActiveJobs) {
579 if (pActiveJobUUID == job.first) {
580 isActiveJob = true;
581 break;
582 }
583 }
584
585 if (!isActiveJob) {
586 mActiveJobs.push_back(job.first);
587 }
588
589 rc = true;
590 }
591 }
592 return rc;
593}
594
595bool NodeManager::haveMoreTasks(std::string jobUUID)
596{
600 auto found = mJobs.find(jobUUID);
601 if (found != mJobs.end()) {
602 return found->second->haveMoreTasks();
603 }
604 else {
605 return false;
606 }
607}
608
610{
614 mpPublisher = pPublisher;
615}
616
618{
622 return mpPublisher;
623}
624
625bool NodeManager::publish(std::string id, bool force) const
626{
630 if (!mpPublisher) return false;
631
632 bool changed = false;
633 bool jobJustFinished = false;
634 Json::Value json;
635 Json::Value & json_jobs = json["jobs"];
636 json_jobs = Json::arrayValue;
637 json["version"] =
638 fmt::format("v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
639 salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK);
640 // json_node = Json::arrayValue;
641 if (mJobs.size() > 0) {
642 for (auto job : mJobs) {
643 if (job.second->changed()) changed = true;
644 if (job.second->isJustFinished()) jobJustFinished = true;
645 }
646
647 SPD_DEBUG("force=[{}] changed=[{}] jobJustFinished=[{}]", force, changed, jobJustFinished);
648
649 if (!force && !changed && !jobJustFinished) return false;
650
651 for (auto job : mJobs) {
652 job.second->json(json_jobs);
653 }
654 }
655
656 // Fill node info
657 // mClusterAlias = "mycluster";
658 std::string name = fmt::format("{}:{}", id, mClusterAlias);
659 auto f = feeder(id);
660 if (f) {
661 json["node"] = f->jsonValueNodeInfo();
662 if (!f->nodeInfo()->name().empty()) name = f->nodeInfo()->name();
663 }
664
665 Json::StreamWriterBuilder wBuilder;
666 wBuilder["indentation"] = "";
667 std::string data = Json::writeString(wBuilder, json);
668
669 SPD_DEBUG("Publish sub [salsa:{}] id [{}] data [{}] ", name, name, data);
670 mpPublisher->publish(id, name, data, jobJustFinished);
671
672 for (auto job : mJobs) {
673 job.second->changed(false);
674 }
675
676 return true;
677}
678
679} // namespace Salsa
Job class.
Definition Job.hh:16
void consumer(std::string uuid)
Definition Job.cc:238
bool haveMoreTasks() const
Task statuses.
Definition Job.cc:272
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition Job.cc:32
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition Job.cc:56
size_t size(EQueueType t=all) const
Definition Job.cc:207
void feeder(std::string uuid)
Definition Job.cc:255
bool removeTask(uint32_t id, EQueueType from)
Definition Job.cc:96
EQueueType
Queue types.
Definition Job.hh:19
bool isFinished()
Returns if jobs is finished.
Definition Job.cc:280
std::string uuid() const
returns UUID
Definition Job.hh:28
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition Job.cc:145
Base Message class.
Definition Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Publisher * mpPublisher
Publisher.
virtual bool publish(std::string id, bool force=false) const
virtual bool terminateFinishedJobs()
std::vector< std::string > mActiveJobs
List of active jobs.
std::shared_ptr< Feeder > feeder(std::string uuid) const
std::string mClusterAlias
Cluster alias.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
virtual void terminateAllJobs(bool finishedonly=false)
std::shared_ptr< Worker > worker(std::string uuid) const
Job * job(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual bool haveMoreTasks()
TaskPool * taskPool()
Get NM's task pool.
virtual void terminateJob(std::string uuid)
virtual bool handleTaskPool(void *p)
virtual Publisher * publisher() const
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
std::shared_ptr< Consumer > consumer(std::string uuid) const
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
virtual void resultTask(TaskInfo *task)
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
std::map< std::string, Job * > mJobs
List of jobs.
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
bool hasJobs() const
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
virtual void addTaskSlot()
std::vector< std::string > mFinishedJobs
List of finished jobs.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
virtual ~NodeManager()
int32_t nSlots(double mult=1.0) const
virtual void noMoreTasks(std::string jobUUID)
TaskInfo * getNextTask()
TaskPool * mpTaskPool
Task pool.
void print(std::string opt="") const
Base Publisher class.
Definition Publisher.hh:14
Base Socket class.
Definition Socket.hh:15
Base salsa TaskPool class.
Definition TaskPool.hh:18