salsa  0.7.1
NodeManager.cc
1 #include <json/json.h>
2 #include "NodeManager.hh"
3 namespace Salsa {
5  : Object()
6  , mFinishedJobTimeout(24 * 3600)
7 {
11  srand(static_cast<uint32_t>(time(nullptr))); // seed with time since epoch
12 }
13 
15 {
19 
20  for (auto job : mJobs) {
21  if (mpTaskPool) {
22  mpTaskPool->terminateJob(job.second);
23  }
24  delete job.second;
25  }
26  // terminateJobAll();
27  mJobs.clear();
28  delete mpTaskPool;
29  delete mpPublisher;
30 }
31 
32 void NodeManager::print(std::string /*opt*/) const
33 {
37 
38  SPD_TRACE("mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ", mFeeders.size(), mConsumers.size(),
39  mWorkers.size(), mJobs.size());
40 
41  if (mJobs.size() > 0) {
42  SPD_DEBUG("= JOBS =======================");
43  for (auto j : mJobs) {
44  j.second->print();
45  }
46  SPD_DEBUG("==============================");
47  }
48  else {
49  SPD_DEBUG("= NO JOBS ====================");
50  }
51 
52  if (mpTaskPool) {
53  mpTaskPool->print();
54  }
55 }
56 
57 void NodeManager::addConsumer(std::string uuid, std::shared_ptr<Socket> pSocket)
58 {
62 
63  mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket, this));
64 }
65 
66 void NodeManager::addFeeder(std::string uuid, std::shared_ptr<Socket> pSocket)
67 {
71 
72  mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket, this));
73 }
74 
75 void NodeManager::addWorker(std::string uuid, std::shared_ptr<Socket> pSocket)
76 {
80 
81  mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket, this));
82 }
83 
84 Socket * NodeManager::onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out)
85 {
89 
90  SPD_TRACE("NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]", self, msg->uuid(), fromType,
91  static_cast<void *>(msg));
92 
93  // TODO Implement map<std::string /* self */, Node::ENodeType?>
94  auto pFeeder = feeder(self);
95  if (pFeeder) {
96  SPD_DEBUG("::onEnter FEEDER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
97  SPD_INFO("FEEDER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
98  if (fromType == "CONSUMER") {
99  pFeeder->addClient(msg->uuid(), fromType);
100  pFeeder->onEnter(msg, out, fromType);
101  }
102  else if (fromType == "WORKER") {
103  pFeeder->addClient(msg->uuid(), fromType);
104  pFeeder->onEnter(msg, out, fromType);
105  }
106  else if (fromType == "DISCOVERY") {
107  // We fully ignoring it
108  SPD_DEBUG("DISCOVERY is here");
109  }
110  else {
111  pFeeder->addOther(msg->uuid(), fromType);
112  }
113 
114  return pFeeder->pipe().get();
115  }
116 
117  auto pConsumer = consumer(self);
118  if (pConsumer) {
119  SPD_DEBUG("::onEnter CONSUMER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
120  SPD_INFO("CONSUMER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
121  if (fromType == "FEEDER") {
122  pConsumer->addClient(msg->uuid(), fromType);
123  pConsumer->onEnter(msg, out, fromType);
124  }
125  else {
126  pConsumer->addOther(msg->uuid(), fromType);
127  }
128 
129  return pConsumer->pipe().get();
130  }
131 
132  auto pWorker = worker(self);
133  if (pWorker) {
134  SPD_DEBUG("::onEnter WORKER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
135  SPD_INFO("WORKER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
136  if (fromType == "FEEDER") {
137  pWorker->addClient(msg->uuid(), fromType);
138  pWorker->onEnter(msg, out, fromType);
139  }
140  else {
141  pWorker->addOther(msg->uuid(), fromType);
142  }
143 
144  return pWorker->pipe().get();
145  }
146 
147  return nullptr;
148 }
149 Socket * NodeManager::onExit(std::string self, Message * msg, std::vector<std::string> & out)
150 {
154 
155  SPD_TRACE("NodeManager::onExit self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
156 
157  auto pWorker = worker(self);
158  if (pWorker) {
159  SPD_DEBUG("::onExit WORKER [{}] client on network [{}] has left", self, msg->uuid());
160  SPD_INFO("WORKER [{}] => [{}]", self, msg->uuid());
161  pWorker->onExit(msg, out);
162  pWorker->removeClient(msg->uuid());
163  return pWorker->pipe().get();
164  }
165 
166  auto pFeeder = feeder(self);
167  if (pFeeder) {
168  SPD_DEBUG("::onExit FEEDER [{}] client on network [{}] has left", self, msg->uuid());
169  SPD_INFO("FEEDER [{}] => [{}]", self, msg->uuid());
170  pFeeder->onExit(msg, out);
171  pFeeder->removeClient(msg->uuid());
172  return pFeeder->pipe().get();
173  }
174 
175  auto pConsumer = consumer(self);
176  if (pConsumer) {
177  SPD_DEBUG("::onExit CONSUMER [{}] client on network [{}] has left", self, msg->uuid());
178  SPD_INFO("CONSUMER [{}] => [{}]", self, msg->uuid());
179  pConsumer->onExit(msg, out);
180  pConsumer->removeClient(msg->uuid());
181  return pConsumer->pipe().get();
182  }
183 
184  return nullptr;
185 }
186 
187 Socket * NodeManager::onWhisper(std::string self, Message * msg, std::vector<std::string> & out)
188 {
192 
193  SPD_TRACE("NodeManager::onWhisper self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
194 
195  auto pFeeder = feeder(self);
196  if (pFeeder) {
197  SPD_TRACE("::onWhisper() FEEDER [{}] from [{}] has msg", self, msg->uuid());
198  pFeeder->onWhisper(msg, out);
199  return pFeeder->pipe().get();
200  }
201 
202  auto pConsumer = consumer(self);
203  if (pConsumer) {
204  SPD_TRACE("::onWhisper() CONSUMER [{}] from [{}] has msg", self, msg->uuid());
205  pConsumer->onWhisper(msg, out);
206  return pConsumer->pipe().get();
207  }
208 
209  auto pWorker = worker(self);
210  if (pWorker) {
211  SPD_TRACE("::onWhisper() WORKER [{}] from [{}] has msg", self, msg->uuid());
212  pWorker->onWhisper(msg, out);
213  return pWorker->pipe().get();
214  }
215 
216  return nullptr;
217 }
218 
219 bool NodeManager::sendWhisper(Socket * /*s*/, std::string /*to*/, std::vector<std::string> & /*v*/)
220 {
224 
225  return true;
226 }
227 
228 void NodeManager::addTask(TaskInfo * pTaskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType type)
229 {
233 
234  Salsa::Job * pJob = nullptr;
235  auto search = mJobs.find(pTaskInfo->jobid());
236  if (search != mJobs.end()) {
237  // if pJob was found
238  pJob = search->second;
239  }
240  else {
241  // if pJob was not found
242  pJob = new Salsa::Job(pTaskInfo->jobid());
243  pJob->consumer(cuuid);
244  pJob->feeder(fuuid);
245  mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
246  mActiveJobs.push_back(pTaskInfo->jobid());
247  // TODO : now we need to tell all feeders that theat they should subscribe to workers
248  SPD_TRACE("Looping feeders");
249  for (auto feeder : mFeeders) {
251  SPD_TRACE("Subscribe to feeder [{}]", feeder.first);
252  feeder.second->subscribe(pTaskInfo->jobid());
253  }
254  }
255 
256  SPD_TRACE("::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
257  pJob->addTask(pTaskInfo->taskid(), pTaskInfo, type);
258 }
259 
261 {
265  TaskInfo * pTaskInfo = nullptr;
266 
267  SPD_TRACE("mActiveJobs.size() [{}]", mActiveJobs.size());
268  while (mActiveJobs.size() > 0 && pTaskInfo == nullptr) {
269  size_t index = static_cast<size_t>(rand()) % mActiveJobs.size();
270  std::string jobstr = mActiveJobs[index];
271  auto iJob = mJobs.find(jobstr);
272  if (iJob != mJobs.end()) {
273  pTaskInfo = iJob->second->nextTask();
274  if (pTaskInfo) {
275  SPD_TRACE("getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
276  return pTaskInfo;
277  }
278  }
279 
280  // removing jobstring from mActiveJobs
281  mActiveJobs.erase(std::remove(begin(mActiveJobs), end(mActiveJobs), jobstr), end(mActiveJobs));
282  }
283 
284  SPD_TRACE("::getNextTask No pTaskInfo found");
285  return nullptr;
286 }
287 
288 void NodeManager::resultTask(TaskInfo * pTask)
289 {
293 
294  Job * pJob = job(pTask->jobid());
295  if (pJob == nullptr) {
296  delete pTask;
297  return;
298  }
299 
300  SPD_TRACE("TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
301  // search->second->moveTask(pTask->taskid(), pTask, Salsa::Job::running, Salsa::Job::done);
302 
303  // If job has no consumer we end (assuming that it is SUBMITTER)
304  if (pJob->consumer().empty()) {
305  // TODO : Fix done and failed
306  auto sourceQueue = (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
307  if (pTask->returncode() == 0) {
308  pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
309  }
310  else {
311  pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
312  }
313 
314  if (pJob->isFinished()) {
316  mFinishedJobs.push_back(pJob->uuid());
317  }
318 
319  resultTaskToExternal(pJob, pTask);
320 
321  print();
322  // TODO we need to think what to do with TaskInfo object in highest level
323  delete pTask;
324  return;
325  }
326 
327  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
328  SPD_WARN("Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
329  pJob->removeTask(pTask->taskid(), Salsa::Job::assigned);
330  }
331  else {
332  SPD_WARN("Task [{}] duplicate found in [running] queue!", pTask->taskid());
333  pJob->removeTask(pTask->taskid(), Salsa::Job::running);
334  }
335 
336  std::shared_ptr<Consumer> pConsumer = consumer(pJob->consumer());
337  std::vector<std::string> out = {"TASK_RESULT"};
338  // out.push_back("TASK_RESULT");
339  std::string payload;
340  pTask->SerializeToString(&payload);
341  out.push_back(payload);
342  uint32_t slots = nSlots();
343 
344  // delete pTask;
345 
346  // TODO only for testing, REMOVE IT later
347  // - Well, what about #ifdef DEBUG ?
348  if (getenv("SALSA_FAKE")) slots *= 10;
349 
350  if (pJob->size(Job::pending) < slots) {
351  if (pJob->haveMoreTasks()) {
352  SPD_TRACE("We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->haveMoreTasks());
353  out.push_back("&");
354  out.push_back("SENDTASKS");
355  out.push_back(fmt::format("{}", slots));
356  }
357  }
358 
359  sendWhisper(pConsumer->pipe().get(), pJob->feeder(), out);
360 }
361 
362 void NodeManager::terminateJob(std::string uuid)
363 {
367 
368  SPD_TRACE("Terminating job from client [{}]", uuid);
369 
370  auto iJob = mJobs.find(uuid);
371  if (iJob != mJobs.end()) {
372  if (mpTaskPool) {
373  mpTaskPool->terminateJob(iJob->second);
374  }
375 
376  for (auto f : mFeeders) {
377  f.second->terminateJob(uuid);
378  }
379 
380  mFinishedJobs.erase(std::remove(begin(mFinishedJobs), end(mFinishedJobs), uuid), end(mFinishedJobs));
381 
382  SPD_TRACE("Removing job [{}]", uuid);
383  delete iJob->second;
384  iJob->second = nullptr;
385  mJobs.erase(iJob);
386  }
387  SPD_TRACE("NodeManager::terminateJob print()");
388  print();
389 }
390 
392 {
396 
397  if (mFinishedJobs.size() == 0) return false;
398 
399  SPD_DEBUG("Checking finished jobs [{}] to be removed ...", mFinishedJobs.size());
400 
401  std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
402  uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
403  std::vector<std::string> cleanUUID;
404  for (auto js : mFinishedJobs) {
405  auto j = job(js);
406  if (j == nullptr) continue;
407  if (curTimeEpoch - j->timeFinished() > mFinishedJobTimeout) {
408  SPD_DEBUG("Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
410  cleanUUID.push_back(js);
411  }
412  }
413  if (!cleanUUID.size()) return false;
414 
415  for (auto u : cleanUUID) {
416  terminateJob(u);
417  }
418 
419  return true;
420 }
421 
422 void NodeManager::terminateAllJobs(bool finishedonly)
423 {
427  if (mJobs.size() == 0) return;
428 
429  std::vector<std::string> cleanUUID;
430  if (finishedonly)
431  for (auto job : mFinishedJobs) cleanUUID.push_back(job);
432 
433  else {
434  for (auto job : mJobs) cleanUUID.push_back(job.first);
435  }
436 
437  for (auto u : cleanUUID) {
438  SPD_DEBUG("Terminating [{}]", u);
439  terminateJob(u);
440  }
441 }
442 
443 std::shared_ptr<Feeder> NodeManager::feeder(std::string uuid) const
444 {
449  auto search = mFeeders.find(uuid);
450  if (search != mFeeders.end()) {
451  return search->second;
452  }
453  else {
454  return nullptr;
455  }
456 }
457 std::shared_ptr<Consumer> NodeManager::consumer(std::string uuid) const
458 {
463  auto search = mConsumers.find(uuid);
464  if (search != mConsumers.end()) {
465  return search->second;
466  }
467  return nullptr;
468 }
469 std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
470 {
475  auto search = mWorkers.find(uuid);
476  if (search != mWorkers.end()) {
477  return search->second;
478  }
479  return nullptr;
480 }
481 
482 Job * NodeManager::job(std::string uuid)
483 {
488  auto search = mJobs.find(uuid);
489  if (search != mJobs.end()) {
490  return search->second;
491  }
492  return nullptr;
493 }
494 
496 {
500  if (mpTaskPool == nullptr) {
501  mpTaskPool = new TaskPool(this);
502  }
503 }
504 
505 bool NodeManager::handleTaskPool(void * /*p*/)
506 {
510  return false;
511 }
512 
514 {
518  return mpTaskPool;
519 }
520 
522 {
526  return mActiveJobs.size() > 0;
527 }
528 
529 void NodeManager::jobs(std::string client_uuid, std::vector<std::string> & jobs) const
530 {
534 
535  for (auto search : mJobs) {
536  if (search.second != nullptr && search.second->feeder() == client_uuid) {
537  jobs.push_back(search.first);
538  }
539  }
540 }
541 int32_t NodeManager::nSlots(double mult) const
542 {
546 
547  int32_t num = 0;
548  for (auto feeder : mFeeders) {
549  num += feeder.second->nodeInfo()->slots();
550  }
551  return num * mult;
552 }
553 
554 void NodeManager::noMoreTasks(std::string /*jobUUID*/)
555 {
559 
560  (void)(0);
561  // auto search = mJobs.find(jobUUID);
562  // if (search != mJobs.end()) {
563  // return search->second->haveMoreTasks();
564  //}
565 }
566 
568 {
572 
573  bool rc = false;
574  for (auto job : mJobs) {
575  if (job.second->Job::size(Job::pending)) {
576  // job.second->haveMoreTasks(true);
577  bool isActiveJob = false;
578  for (auto pActiveJobUUID : mActiveJobs) {
579  if (pActiveJobUUID == job.first) {
580  isActiveJob = true;
581  break;
582  }
583  }
584 
585  if (!isActiveJob) {
586  mActiveJobs.push_back(job.first);
587  }
588 
589  rc = true;
590  }
591  }
592  return rc;
593 }
594 
595 bool NodeManager::haveMoreTasks(std::string jobUUID)
596 {
600  auto found = mJobs.find(jobUUID);
601  if (found != mJobs.end()) {
602  return found->second->haveMoreTasks();
603  }
604  else {
605  return false;
606  }
607 }
608 
610 {
614  mpPublisher = pPublisher;
615 }
616 
618 {
622  return mpPublisher;
623 }
624 
625 bool NodeManager::publish(std::string id, bool force) const
626 {
630  if (!mpPublisher) return false;
631 
632  bool changed = false;
633  bool jobJustFinished = false;
634  Json::Value json;
635  Json::Value & json_jobs = json["jobs"];
636  json_jobs = Json::arrayValue;
637  json["version"] =
638  fmt::format("v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
639  salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK);
640  // json_node = Json::arrayValue;
641  if (mJobs.size() > 0) {
642  for (auto job : mJobs) {
643  if (job.second->changed()) changed = true;
644  if (job.second->isJustFinished()) jobJustFinished = true;
645  }
646 
647  SPD_DEBUG("force=[{}] changed=[{}] jobJustFinished=[{}]", force, changed, jobJustFinished);
648 
649  if (!force && !changed && !jobJustFinished) return false;
650 
651  for (auto job : mJobs) {
652  job.second->json(json_jobs);
653  }
654  }
655 
656  // Fill node info
657  // mClusterAlias = "mycluster";
658  std::string name = fmt::format("{}:{}", id, mClusterAlias);
659  auto f = feeder(id);
660  if (f) {
661  json["node"] = f->jsonValueNodeInfo();
662  if (!f->nodeInfo()->name().empty()) name = f->nodeInfo()->name();
663  }
664 
665  Json::StreamWriterBuilder wBuilder;
666  wBuilder["indentation"] = "";
667  std::string data = Json::writeString(wBuilder, json);
668 
669  SPD_DEBUG("Publish sub [salsa:{}] id [{}] data [{}] ", name, name, data);
670  mpPublisher->publish(id, name, data, jobJustFinished);
671 
672  for (auto job : mJobs) {
673  job.second->changed(false);
674  }
675 
676  return true;
677 }
678 
679 } // namespace Salsa
Job class.
Definition: Job.hh:16
void consumer(std::string uuid)
Definition: Job.cc:238
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:272
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:32
void json(Json::Value &json)
Definition: Job.cc:173
bool changed() const
Returns if job info was changed.
Definition: Job.hh:71
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:56
size_t size(EQueueType t=all) const
Definition: Job.cc:207
void feeder(std::string uuid)
Definition: Job.cc:255
bool isJustFinished()
Returns if job was just finished.
Definition: Job.cc:296
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:96
EQueueType
Queue types.
Definition: Job.hh:19
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:280
std::string uuid() const
returns UUID
Definition: Job.hh:28
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:145
Base Message class.
Definition: Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Definition: NodeManager.hh:66
Publisher * mpPublisher
Publisher.
Definition: NodeManager.hh:100
virtual bool publish(std::string id, bool force=false) const
Definition: NodeManager.cc:625
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:391
std::vector< std::string > mActiveJobs
List of active jobs.
Definition: NodeManager.hh:93
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:443
std::string mClusterAlias
Cluster alias.
Definition: NodeManager.hh:91
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:219
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:187
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:422
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:469
Job * job(std::string uuid)
Definition: NodeManager.cc:482
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:66
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
Definition: NodeManager.hh:97
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:149
virtual bool haveMoreTasks()
Definition: NodeManager.cc:567
TaskPool * taskPool()
Get NM's task pool.
Definition: NodeManager.cc:513
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:362
virtual bool handleTaskPool(void *p)
Definition: NodeManager.cc:505
virtual Publisher * publisher() const
Definition: NodeManager.cc:617
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
Definition: NodeManager.hh:95
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:228
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:457
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
Definition: NodeManager.hh:96
virtual void resultTask(TaskInfo *task)
Definition: NodeManager.cc:288
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
Definition: NodeManager.cc:529
std::map< std::string, Job * > mJobs
List of jobs.
Definition: NodeManager.hh:92
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:75
bool hasJobs() const
Definition: NodeManager.cc:521
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
Definition: NodeManager.hh:98
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:84
virtual void addTaskSlot()
Definition: NodeManager.cc:495
std::vector< std::string > mFinishedJobs
List of finished jobs.
Definition: NodeManager.hh:94
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:57
virtual ~NodeManager()
Definition: NodeManager.cc:14
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:541
virtual void noMoreTasks(std::string jobUUID)
Definition: NodeManager.cc:554
TaskInfo * getNextTask()
Definition: NodeManager.cc:260
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:99
void print(std::string opt="") const
Definition: NodeManager.cc:32
Base Salsa Object class.
Definition: Object.hh:15
Base Publisher class.
Definition: Publisher.hh:14
virtual void publish(std::string id, std::string name, std::string data, bool force=true)=0
Publish TODO publish what?
Base Socket class.
Definition: Socket.hh:15
Base salsa TaskPool class.
Definition: TaskPool.hh:18
void print(bool verbose=false) const
Definition: TaskPool.cc:128
bool terminateJob(Job *pJob)
Definition: TaskPool.cc:99