salsa  0.3.0
NodeManager.cc
1 #include "NodeManager.hh"
2 
3 #include <json/json.h>
4 namespace Salsa {
6 {
10  srand(time(nullptr)); // seed with time since epoch
11 }
13 {
17 
18  for (auto j : mJobs) {
19  if (mpTaskPool) {
20  mpTaskPool->terminateJob(j.second);
21  }
22  delete j.second;
23  }
24  // terminateJobAll();
25  mJobs.clear();
26  delete mpTaskPool;
27  delete mpPublisher;
28 }
29 
30 void NodeManager::print(std::string /*opt*/) const
31 {
35 
36  SPD_DEBUG("mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ", mFeeders.size(), mConsumers.size(),
37  mWorkers.size(), mJobs.size());
38  if (mJobs.size() > 0) {
39  SPD_INFO("========== JOBS ===========");
40  for (auto j : mJobs) {
41  j.second->print();
42  }
43  SPD_INFO("===========================");
44  }
45  else {
46  SPD_INFO("========= NO JOBS =========");
47  }
48  if (mpTaskPool) {
49  mpTaskPool->print();
50  }
51 }
52 
53 void NodeManager::addConsumer(std::string uuid, std::shared_ptr<Socket> s)
54 {
58  mConsumers.insert(std::make_pair(uuid, std::make_shared<Consumer>(uuid, s, this)));
59 }
60 void NodeManager::addFeeder(std::string uuid, std::shared_ptr<Socket> s)
61 {
65  mFeeders.insert(std::make_pair(uuid, std::make_shared<Feeder>(uuid, s, this)));
66 }
67 void NodeManager::addWorker(std::string uuid, std::shared_ptr<Socket> s)
68 {
72 
73  mWorkers.insert(std::make_pair(uuid, std::make_shared<Worker>(uuid, s, this)));
74 }
75 
76 Socket * NodeManager::onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out)
77 {
81 
82  SPD_TRACE("NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]", self, msg->uuid(), fromType,
83  static_cast<void *>(msg));
84 
85  auto pFeeder = feeder(self);
86  if (pFeeder) {
87  SPD_DEBUG("::onEnter FEEDER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
88  SPD_INFO("FEEDER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
89  if (fromType == "CONSUMER") {
90  pFeeder->addClient(msg->uuid(), fromType);
91  pFeeder->onEnter(msg, out, fromType);
92  }
93  else if (fromType == "WORKER") {
94  pFeeder->addClient(msg->uuid(), fromType);
95  pFeeder->onEnter(msg, out, fromType);
96  }
97  else if (fromType == "DISCOVERY") {
98  // We fully ignoring it
99  SPD_DEBUG("DISCOVERY is here");
100  }
101  else {
102  pFeeder->addOther(msg->uuid(), fromType);
103  }
104 
105  return pFeeder->pipe().get();
106  }
107 
108  auto pConsumer = consumer(self);
109  if (pConsumer) {
110  SPD_DEBUG("::onEnter CONSUMER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
111  SPD_INFO("CONSUMER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
112  if (fromType == "FEEDER") {
113  pConsumer->addClient(msg->uuid(), fromType);
114  pConsumer->onEnter(msg, out, fromType);
115  }
116  else {
117  pConsumer->addOther(msg->uuid(), fromType);
118  }
119 
120  return pConsumer->pipe().get();
121  }
122 
123  auto pWorker = worker(self);
124  if (pWorker) {
125  SPD_DEBUG("::onEnter WORKER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
126  SPD_INFO("WORKER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
127  if (fromType == "FEEDER") {
128  pWorker->addClient(msg->uuid(), fromType);
129  pWorker->onEnter(msg, out, fromType);
130  }
131  else {
132  pWorker->addOther(msg->uuid(), fromType);
133  }
134  pWorker->pipe().get();
135 
136  return pWorker->pipe().get();
137  }
138 
139  return nullptr;
140 }
141 Socket * NodeManager::onExit(std::string self, Message * msg, std::vector<std::string> & out)
142 {
146 
147  SPD_TRACE("NodeManager::onExit self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
148 
149  auto pFeeder = feeder(self);
150  if (pFeeder) {
151  SPD_DEBUG("::onExit FEEDER [{}] client on network [{}] has left", self, msg->uuid());
152  SPD_INFO("FEEDER [{}] => [{}]", self, msg->uuid());
153  pFeeder->onExit(msg, out);
154  pFeeder->removeClient(msg->uuid());
155  return pFeeder->pipe().get();
156  }
157 
158  auto pConsumer = consumer(self);
159  if (pConsumer) {
160  SPD_DEBUG("::onExit CONSUMER [{}] client on network [{}] has left", self, msg->uuid());
161  SPD_INFO("CONSUMER [{}] => [{}]", self, msg->uuid());
162  pConsumer->onExit(msg, out);
163  pConsumer->removeClient(msg->uuid());
164  return pConsumer->pipe().get();
165  }
166 
167  auto pWorker = worker(self);
168  if (pWorker) {
169  SPD_DEBUG("::onExit WORKER [{}] client on network [{}] has left", self, msg->uuid());
170  SPD_INFO("WORKER [{}] => [{}]", self, msg->uuid());
171  pWorker->onExit(msg, out);
172  pWorker->removeClient(msg->uuid());
173  return pWorker->pipe().get();
174  }
175 
176  return nullptr;
177 }
178 
179 Socket * NodeManager::onWhisper(std::string self, Message * msg, std::vector<std::string> & out)
180 {
184 
185  SPD_TRACE("NodeManager::onWhisper self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
186 
187  auto pFeeder = feeder(self);
188  if (pFeeder) {
189  SPD_TRACE("::onWhisper() FEEDER [{}] from [{}] has msg", self, msg->uuid());
190  pFeeder->onWhisper(msg, out);
191  return pFeeder->pipe().get();
192  }
193 
194  auto pConsumer = consumer(self);
195  if (pConsumer) {
196  SPD_TRACE("::onWhisper() CONSUMER [{}] from [{}] has msg", self, msg->uuid());
197  pConsumer->onWhisper(msg, out);
198  return pConsumer->pipe().get();
199  }
200 
201  auto pWorker = worker(self);
202  if (pWorker) {
203  SPD_TRACE("::onWhisper() WORKER [{}] from [{}] has msg", self, msg->uuid());
204  pWorker->onWhisper(msg, out);
205  return pWorker->pipe().get();
206  }
207 
208  return nullptr;
209 }
210 
211 bool NodeManager::sendWhisper(Socket * /*s*/, std::string /*to*/, std::vector<std::string> & /*v*/)
212 {
216 
217  return true;
218 }
219 
220 void NodeManager::addTask(TaskInfo * pTaskInfo, std::string cuuid, std::string fuuid, Salsa::Job::QueueType type)
221 {
225 
226  Salsa::Job * pJob = nullptr;
227  auto search = mJobs.find(pTaskInfo->jobid());
228  if (search != mJobs.end()) {
229  // if pJob was found
230  pJob = search->second;
231  }
232  else {
233  // if pJob was not found
234  pJob = new Salsa::Job(pTaskInfo->jobid());
235  pJob->consumer(cuuid);
236  pJob->feeder(fuuid);
237  mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
238  mActiveJobs.push_back(pTaskInfo->jobid());
239  // TODO : now we need to tell all feeders that theat they should subscribe to workers
240  SPD_TRACE("Looping feeders");
241  for (auto feeder : mFeeders) {
243  SPD_TRACE("Subscribe to feeder [{}]", feeder.first);
244  feeder.second->subscribe(pTaskInfo->jobid());
245  }
246  }
247 
248  SPD_TRACE("::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
249  pJob->addTask(pTaskInfo->taskid(), pTaskInfo, type);
250 }
251 
253 {
257  TaskInfo * pTaskInfo = nullptr;
258  SPD_TRACE("mActiveJobs.size() [{}]", mActiveJobs.size());
259  while (mActiveJobs.size() > 0 && pTaskInfo == nullptr) {
260  size_t index = rand() % mActiveJobs.size();
261  std::string jobstr = mActiveJobs[index];
262  auto search = mJobs.find(jobstr);
263  if (search != mJobs.end()) {
264  pTaskInfo = search->second->nextJob();
265  if (pTaskInfo) {
266  SPD_TRACE("getNextTask FEEDER [{}] JOB [{}:{}]", search->first, pTaskInfo->jobid(),
267  pTaskInfo->taskid());
268  return pTaskInfo;
269  }
270  }
271  // removing jobstring from mActiveJobs
272  mActiveJobs.erase(std::remove(begin(mActiveJobs), end(mActiveJobs), jobstr), end(mActiveJobs));
273  }
274  SPD_TRACE("::getNextTask No pTaskInfo found");
275  return nullptr;
276 }
277 
278 void NodeManager::resultTask(TaskInfo * pTask)
279 {
283 
284  Job * pJob = job(pTask->jobid());
285  if (pJob == nullptr) {
286  delete pTask;
287  return;
288  }
289 
290  SPD_TRACE("TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
291  // search->second->moveTask(pTask->taskid(), pTask, Salsa::Job::running, Salsa::Job::done);
292 
293  // If job has no consumer we end (assuming that it is SUBMITTER)
294  if (pJob->consumer().empty()) {
295  // TODO : Fix done and failed
296  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned))
297  pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::done);
298  else
299  pJob->moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::done);
300  print();
301  // TODO we need to think what to do with TaskInfo object in highest level
302  delete pTask;
303  return;
304  }
305 
306  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned))
307  pJob->removeTask(pTask->taskid(), Salsa::Job::assigned);
308  else
309  pJob->removeTask(pTask->taskid(), Salsa::Job::running);
310 
311  std::shared_ptr<Consumer> pConsumer = consumer(pJob->consumer());
312  std::vector<std::string> out;
313  out.push_back("TASK_RESULT");
314  std::string payload;
315  pTask->SerializeToString(&payload);
316  out.push_back(payload);
317  uint32_t slots = nSlots();
318 
319  // delete pTask;
320 
321  // TODO only for testing, REMOVE IT later
322  // - Well, what about #ifdef DEBUG ?
323  if (getenv("SALSA_FAKE")) slots *= 10;
324 
325  if (pJob->size(Job::pending) < slots) {
326  if (pJob->haveMoreTasks()) {
327  SPD_TRACE("We are requesting new tasks [{}] haveMotreTasks [{}]", slots, pJob->haveMoreTasks());
328  out.push_back("&");
329  out.push_back("SENDTASKS");
330  out.push_back(fmt::format("{}", slots));
331  }
332  }
333 
334  sendWhisper(pConsumer->pipe().get(), pJob->feeder(), out);
335 }
336 
337 void NodeManager::terminateJob(std::string uuid)
338 {
342 
343  SPD_TRACE("Terminating job from client [{}]", uuid);
344 
345  auto search = mJobs.find(uuid);
346  if (search != mJobs.end()) {
347  if (mpTaskPool) {
348  mpTaskPool->terminateJob(search->second);
349  }
350 
351  for (auto f : mFeeders) {
352  f.second->terminateJob(uuid);
353  }
354 
355  SPD_TRACE("Removing job [{}]", uuid);
356  delete search->second;
357  search->second = nullptr;
358  mJobs.erase(search);
359  }
360  SPD_TRACE("NodeManager::terminateJob print()");
361  print();
362 }
363 
365 {
369  if (mJobs.size() == 0) return;
370 
371  std::vector<std::string> clean_uuids;
372  for (auto j : mJobs) clean_uuids.push_back(j.first);
373 
374  for (auto u : clean_uuids) {
375  SPD_DEBUG("Terminating [{}]", u);
376 
377  terminateJob(u);
378  }
379 }
380 
381 std::shared_ptr<Feeder> NodeManager::feeder(std::string uuid) const
382 {
387  auto search = mFeeders.find(uuid);
388  if (search != mFeeders.end()) {
389  return search->second;
390  }
391  return nullptr;
392 }
393 std::shared_ptr<Consumer> NodeManager::consumer(std::string uuid) const
394 {
399  auto search = mConsumers.find(uuid);
400  if (search != mConsumers.end()) {
401  return search->second;
402  }
403  return nullptr;
404 }
405 std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
406 {
411  auto search = mWorkers.find(uuid);
412  if (search != mWorkers.end()) {
413  return search->second;
414  }
415  return nullptr;
416 }
417 
418 Job * NodeManager::job(std::string uuid)
419 {
424  auto search = mJobs.find(uuid);
425  if (search != mJobs.end()) {
426  return search->second;
427  }
428  return nullptr;
429 }
430 
432 {
436  if (mpTaskPool == nullptr) {
437  mpTaskPool = new TaskPool(this);
438  }
439 }
440 
441 bool NodeManager::handleTaskPool(void * /*p*/)
442 {
446  return false;
447 }
448 
450 {
454  return mpTaskPool;
455 }
456 
458 {
462  return mActiveJobs.size() > 0;
463 }
464 
465 void NodeManager::jobs(std::string client_uuid, std::vector<std::string> & jobs) const
466 {
470 
471  for (auto search : mJobs) {
472  if (search.second != nullptr && search.second->feeder() == client_uuid) {
473  jobs.push_back(search.first);
474  }
475  }
476 }
477 int32_t NodeManager::nSlots(double mult) const
478 {
482 
483  int32_t num = 0;
484  for (auto feeder : mFeeders) {
485  num += feeder.second->nodeInfo()->slots();
486  }
487  return num * mult;
488 }
489 
490 void NodeManager::noMoreTasks(std::string jobUUID)
491 {
495  auto search = mJobs.find(jobUUID);
496  if (search != mJobs.end()) {
497  return search->second->haveMoreTasks(false);
498  }
499 }
500 
502 {
506 
507  bool rc = false;
508  for (auto job : mJobs) {
509  if (job.second->Job::size(Job::pending)) {
510  job.second->haveMoreTasks(true);
511  bool isActiveJob = false;
512  for (auto pActiveJobUUID : mActiveJobs) {
513  if (pActiveJobUUID == job.first) {
514  isActiveJob = true;
515  break;
516  }
517  }
518 
519  if (!isActiveJob) {
520  mActiveJobs.push_back(job.first);
521  }
522 
523  rc = true;
524  }
525  }
526  return rc;
527 }
528 
529 bool NodeManager::haveMoreTasks(std::string jobUUID)
530 {
534  auto found = mJobs.find(jobUUID);
535  if (found != mJobs.end()) {
536  return found->second->haveMoreTasks();
537  }
538  return false;
539 }
540 
542 {
546  mpPublisher = pPublisher;
547 }
548 
550 {
554  return mpPublisher;
555 }
556 
557 void NodeManager::publish(std::string id) const
558 {
562  if (!mpPublisher) return;
563 
564  if (mJobs.size() == 0) return;
565 
566  std::string group = "SalsaTasks";
567  Json::Value json;
568  for (auto job : mJobs) {
569  job.second->json(json["tasks"]);
570  }
571 
572  Json::StreamWriterBuilder wBuilder;
573  wBuilder["indentation"] = "";
574  std::string data = Json::writeString(wBuilder, json);
575 
576  print();
577  SPD_TRACE("Publish name [{}] id [{}] data [{}] ", group, id, data);
578  mpPublisher->publish(group, id, data);
579 }
580 
581 } // namespace Salsa
std::vector< std::string > mActiveJobs
List of active jobs.
Definition: NodeManager.hh:79
Base Message class
Definition: Message.hh:15
size_t size(QueueType t=all) const
Definition: Job.cc:160
void consumer(std::string uuid)
Definition: Job.cc:189
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
Definition: NodeManager.cc:465
bool isTaskInQueue(uint32_t id, QueueType type) const
Check task presence in certain queue.
Definition: Job.cc:119
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:405
bool terminateJob(Job *pJob)
Definition: TaskPool.cc:99
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:141
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:337
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
Definition: NodeManager.hh:81
virtual void addTaskSlot()
Definition: NodeManager.cc:431
void noMoreTasks(std::string jobUUID)
Definition: NodeManager.cc:490
Job class
Definition: Job.hh:16
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:477
void resultTask(TaskInfo *task)
Definition: NodeManager.cc:278
void feeder(std::string uuid)
Definition: Job.cc:205
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:60
virtual std::string uuid() const =0
Returns node uuid.
Base Publisher class
Definition: Publisher.hh:14
Job * job(std::string uuid)
Definition: NodeManager.cc:418
bool hasJobs() const
Definition: NodeManager.cc:457
void print(std::string opt="") const
Definition: NodeManager.cc:30
std::map< std::string, Job * > mJobs
List of jobs.
Definition: NodeManager.hh:78
virtual Publisher * publisher() const
Definition: NodeManager.cc:549
TaskInfo * getNextTask()
Definition: NodeManager.cc:252
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:179
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
Definition: NodeManager.hh:80
Definition: Actor.cc:2
virtual ~NodeManager()
Definition: NodeManager.cc:12
void print(bool verbose=false) const
Definition: TaskPool.cc:128
virtual void publish(std::string id) const
Definition: NodeManager.cc:557
QueueType
Queue types.
Definition: Job.hh:19
Base Socket class
Definition: Socket.hh:15
bool haveMoreTasks() const
Definition: Job.cc:221
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::QueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:220
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:53
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:83
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:211
void json(Json::Value &json)
Definition: Job.cc:144
Publisher * mpPublisher
Publisher.
Definition: NodeManager.hh:84
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:67
virtual bool handleTaskPool(void *p)
Definition: NodeManager.cc:441
TaskPool * taskPool()
Get NM&#39;s task pool.
Definition: NodeManager.cc:449
Base Salsa Object class
Definition: Object.hh:15
bool removeTask(uint32_t id, QueueType from)
Definition: Job.cc:76
virtual void terminateJobAll()
Definition: NodeManager.cc:364
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
Definition: NodeManager.hh:82
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:76
Base salsa TaskPool class
Definition: TaskPool.hh:18
virtual void publish(std::string group, std::string id, std::string data)=0
Publish TODO publish what?
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:381
bool moveTask(uint32_t id, QueueType from, QueueType to)
Definition: Job.cc:45
bool addTask(uint32_t id, TaskInfo *pJob, QueueType type)
Definition: Job.cc:25
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:393