1 #include "NodeManager.hh" 18 for (
auto j :
mJobs) {
36 SPD_DEBUG(
"mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ",
mFeeders.size(),
mConsumers.size(),
38 if (
mJobs.size() > 0) {
39 SPD_INFO(
"========== JOBS ===========");
40 for (
auto j :
mJobs) {
43 SPD_INFO(
"===========================");
46 SPD_INFO(
"========= NO JOBS =========");
58 mConsumers.insert(std::make_pair(uuid, std::make_shared<Consumer>(uuid, s,
this)));
65 mFeeders.insert(std::make_pair(uuid, std::make_shared<Feeder>(uuid, s,
this)));
73 mWorkers.insert(std::make_pair(uuid, std::make_shared<Worker>(uuid, s,
this)));
82 SPD_TRACE(
"NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]",
self, msg->
uuid(), fromType,
83 static_cast<void *
>(msg));
85 auto pFeeder =
feeder(
self);
87 SPD_DEBUG(
"::onEnter FEEDER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
88 SPD_INFO(
"FEEDER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
89 if (fromType ==
"CONSUMER") {
90 pFeeder->addClient(msg->
uuid(), fromType);
91 pFeeder->onEnter(msg, out, fromType);
93 else if (fromType ==
"WORKER") {
94 pFeeder->addClient(msg->
uuid(), fromType);
95 pFeeder->onEnter(msg, out, fromType);
97 else if (fromType ==
"DISCOVERY") {
99 SPD_DEBUG(
"DISCOVERY is here");
102 pFeeder->addOther(msg->
uuid(), fromType);
105 return pFeeder->pipe().get();
110 SPD_DEBUG(
"::onEnter CONSUMER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
111 SPD_INFO(
"CONSUMER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
112 if (fromType ==
"FEEDER") {
113 pConsumer->addClient(msg->
uuid(), fromType);
114 pConsumer->onEnter(msg, out, fromType);
117 pConsumer->addOther(msg->
uuid(), fromType);
120 return pConsumer->pipe().get();
123 auto pWorker =
worker(
self);
125 SPD_DEBUG(
"::onEnter WORKER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
126 SPD_INFO(
"WORKER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
127 if (fromType ==
"FEEDER") {
128 pWorker->addClient(msg->
uuid(), fromType);
129 pWorker->onEnter(msg, out, fromType);
132 pWorker->addOther(msg->
uuid(), fromType);
134 pWorker->pipe().get();
136 return pWorker->pipe().get();
147 SPD_TRACE(
"NodeManager::onExit self [{}] from [{}] msg [{}]",
self, msg->
uuid(),
static_cast<void *
>(msg));
149 auto pFeeder =
feeder(
self);
151 SPD_DEBUG(
"::onExit FEEDER [{}] client on network [{}] has left",
self, msg->
uuid());
152 SPD_INFO(
"FEEDER [{}] => [{}]",
self, msg->
uuid());
153 pFeeder->onExit(msg, out);
154 pFeeder->removeClient(msg->
uuid());
155 return pFeeder->pipe().get();
160 SPD_DEBUG(
"::onExit CONSUMER [{}] client on network [{}] has left",
self, msg->
uuid());
161 SPD_INFO(
"CONSUMER [{}] => [{}]",
self, msg->
uuid());
162 pConsumer->onExit(msg, out);
163 pConsumer->removeClient(msg->
uuid());
164 return pConsumer->pipe().get();
167 auto pWorker =
worker(
self);
169 SPD_DEBUG(
"::onExit WORKER [{}] client on network [{}] has left",
self, msg->
uuid());
170 SPD_INFO(
"WORKER [{}] => [{}]",
self, msg->
uuid());
171 pWorker->onExit(msg, out);
172 pWorker->removeClient(msg->
uuid());
173 return pWorker->pipe().get();
185 SPD_TRACE(
"NodeManager::onWhisper self [{}] from [{}] msg [{}]",
self, msg->
uuid(),
static_cast<void *
>(msg));
187 auto pFeeder =
feeder(
self);
189 SPD_TRACE(
"::onWhisper() FEEDER [{}] from [{}] has msg",
self, msg->
uuid());
190 pFeeder->onWhisper(msg, out);
191 return pFeeder->pipe().get();
196 SPD_TRACE(
"::onWhisper() CONSUMER [{}] from [{}] has msg",
self, msg->
uuid());
197 pConsumer->onWhisper(msg, out);
198 return pConsumer->pipe().get();
201 auto pWorker =
worker(
self);
203 SPD_TRACE(
"::onWhisper() WORKER [{}] from [{}] has msg",
self, msg->
uuid());
204 pWorker->onWhisper(msg, out);
205 return pWorker->pipe().get();
227 auto search =
mJobs.find(pTaskInfo->jobid());
228 if (search !=
mJobs.end()) {
230 pJob = search->second;
237 mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
240 SPD_TRACE(
"Looping feeders");
243 SPD_TRACE(
"Subscribe to feeder [{}]",
feeder.first);
244 feeder.second->subscribe(pTaskInfo->jobid());
248 SPD_TRACE(
"::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
249 pJob->
addTask(pTaskInfo->taskid(), pTaskInfo, type);
257 TaskInfo * pTaskInfo =
nullptr;
258 SPD_TRACE(
"mActiveJobs.size() [{}]",
mActiveJobs.size());
259 while (
mActiveJobs.size() > 0 && pTaskInfo ==
nullptr) {
262 auto search =
mJobs.find(jobstr);
263 if (search !=
mJobs.end()) {
264 pTaskInfo = search->second->nextJob();
266 SPD_TRACE(
"getNextTask FEEDER [{}] JOB [{}:{}]", search->first, pTaskInfo->jobid(),
267 pTaskInfo->taskid());
274 SPD_TRACE(
"::getNextTask No pTaskInfo found");
284 Job * pJob =
job(pTask->jobid());
285 if (pJob ==
nullptr) {
290 SPD_TRACE(
"TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
296 if (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned))
297 pJob->
moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::done);
299 pJob->
moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::done);
306 if (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned))
307 pJob->
removeTask(pTask->taskid(), Salsa::Job::assigned);
309 pJob->
removeTask(pTask->taskid(), Salsa::Job::running);
312 std::vector<std::string> out;
313 out.push_back(
"TASK_RESULT");
315 pTask->SerializeToString(&payload);
316 out.push_back(payload);
317 uint32_t slots =
nSlots();
323 if (getenv(
"SALSA_FAKE")) slots *= 10;
325 if (pJob->
size(Job::pending) < slots) {
327 SPD_TRACE(
"We are requesting new tasks [{}] haveMotreTasks [{}]", slots, pJob->
haveMoreTasks());
329 out.push_back(
"SENDTASKS");
330 out.push_back(fmt::format(
"{}", slots));
343 SPD_TRACE(
"Terminating job from client [{}]", uuid);
345 auto search =
mJobs.find(uuid);
346 if (search !=
mJobs.end()) {
352 f.second->terminateJob(uuid);
355 SPD_TRACE(
"Removing job [{}]", uuid);
356 delete search->second;
357 search->second =
nullptr;
360 SPD_TRACE(
"NodeManager::terminateJob print()");
369 if (
mJobs.size() == 0)
return;
371 std::vector<std::string> clean_uuids;
372 for (
auto j :
mJobs) clean_uuids.push_back(j.first);
374 for (
auto u : clean_uuids) {
375 SPD_DEBUG(
"Terminating [{}]", u);
389 return search->second;
401 return search->second;
413 return search->second;
424 auto search =
mJobs.find(uuid);
425 if (search !=
mJobs.end()) {
426 return search->second;
471 for (
auto search :
mJobs) {
472 if (search.second !=
nullptr && search.second->feeder() == client_uuid) {
473 jobs.push_back(search.first);
485 num +=
feeder.second->nodeInfo()->slots();
495 auto search =
mJobs.find(jobUUID);
496 if (search !=
mJobs.end()) {
497 return search->second->haveMoreTasks(
false);
509 if (
job.second->Job::size(Job::pending)) {
511 bool isActiveJob =
false;
513 if (pActiveJobUUID ==
job.first) {
534 auto found =
mJobs.find(jobUUID);
535 if (found !=
mJobs.end()) {
536 return found->second->haveMoreTasks();
564 if (
mJobs.size() == 0)
return;
566 std::string group =
"SalsaTasks";
569 job.second->
json(json[
"tasks"]);
572 Json::StreamWriterBuilder wBuilder;
573 wBuilder[
"indentation"] =
"";
574 std::string data = Json::writeString(wBuilder, json);
577 SPD_TRACE(
"Publish name [{}] id [{}] data [{}] ", group,
id, data);
std::vector< std::string > mActiveJobs
List of active jobs.
size_t size(QueueType t=all) const
void consumer(std::string uuid)
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
bool isTaskInQueue(uint32_t id, QueueType type) const
Check task presence in certain queue.
std::shared_ptr< Worker > worker(std::string uuid) const
bool terminateJob(Job *pJob)
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual void terminateJob(std::string uuid)
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
virtual void addTaskSlot()
void noMoreTasks(std::string jobUUID)
int32_t nSlots(double mult=1.0) const
void resultTask(TaskInfo *task)
void feeder(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
void print(std::string opt="") const
std::map< std::string, Job * > mJobs
List of jobs.
virtual Publisher * publisher() const
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
void print(bool verbose=false) const
virtual void publish(std::string id) const
bool haveMoreTasks() const
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::QueueType t=Salsa::Job::pending)
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
TaskPool * mpTaskPool
Task pool.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
void json(Json::Value &json)
Publisher * mpPublisher
Publisher.
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
virtual bool handleTaskPool(void *p)
TaskPool * taskPool()
Get NM's task pool.
bool removeTask(uint32_t id, QueueType from)
virtual void terminateJobAll()
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Base salsa TaskPool class
virtual void publish(std::string group, std::string id, std::string data)=0
Publish TODO publish what?
std::shared_ptr< Feeder > feeder(std::string uuid) const
bool moveTask(uint32_t id, QueueType from, QueueType to)
bool addTask(uint32_t id, TaskInfo *pJob, QueueType type)
std::shared_ptr< Consumer > consumer(std::string uuid) const