2 #include "NodeManager.hh"
9 srand(
static_cast<uint32_t
>(time(
nullptr)));
36 SPD_TRACE(
"mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ",
mFeeders.size(),
mConsumers.size(),
39 if (
mJobs.size() > 0) {
40 SPD_DEBUG(
"= JOBS =======================");
41 for (
auto j :
mJobs) {
44 SPD_DEBUG(
"==============================");
47 SPD_DEBUG(
"= NO JOBS ====================");
61 mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket,
this));
70 mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket,
this));
79 mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket,
this));
88 SPD_TRACE(
"NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]",
self, msg->
uuid(), fromType,
89 static_cast<void *
>(msg));
92 auto pFeeder =
feeder(
self);
94 SPD_DEBUG(
"::onEnter FEEDER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
95 SPD_INFO(
"FEEDER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
96 if (fromType ==
"CONSUMER") {
97 pFeeder->addClient(msg->
uuid(), fromType);
98 pFeeder->onEnter(msg, out, fromType);
100 else if (fromType ==
"WORKER") {
101 pFeeder->addClient(msg->
uuid(), fromType);
102 pFeeder->onEnter(msg, out, fromType);
104 else if (fromType ==
"DISCOVERY") {
106 SPD_DEBUG(
"DISCOVERY is here");
109 pFeeder->addOther(msg->
uuid(), fromType);
112 return pFeeder->pipe().get();
117 SPD_DEBUG(
"::onEnter CONSUMER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
118 SPD_INFO(
"CONSUMER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
119 if (fromType ==
"FEEDER") {
120 pConsumer->addClient(msg->
uuid(), fromType);
121 pConsumer->onEnter(msg, out, fromType);
124 pConsumer->addOther(msg->
uuid(), fromType);
127 return pConsumer->pipe().get();
130 auto pWorker =
worker(
self);
132 SPD_DEBUG(
"::onEnter WORKER [{}] has client on network [{}] type [{}]",
self, msg->
uuid(), fromType);
133 SPD_INFO(
"WORKER [{}] <= [{}] [{}]",
self, msg->
uuid(), fromType);
134 if (fromType ==
"FEEDER") {
135 pWorker->addClient(msg->
uuid(), fromType);
136 pWorker->onEnter(msg, out, fromType);
139 pWorker->addOther(msg->
uuid(), fromType);
142 return pWorker->pipe().get();
153 SPD_TRACE(
"NodeManager::onExit self [{}] from [{}] msg [{}]",
self, msg->
uuid(),
static_cast<void *
>(msg));
155 auto pWorker =
worker(
self);
157 SPD_DEBUG(
"::onExit WORKER [{}] client on network [{}] has left",
self, msg->
uuid());
158 SPD_INFO(
"WORKER [{}] => [{}]",
self, msg->
uuid());
159 pWorker->onExit(msg, out);
160 pWorker->removeClient(msg->
uuid());
161 return pWorker->pipe().get();
164 auto pFeeder =
feeder(
self);
166 SPD_DEBUG(
"::onExit FEEDER [{}] client on network [{}] has left",
self, msg->
uuid());
167 SPD_INFO(
"FEEDER [{}] => [{}]",
self, msg->
uuid());
168 pFeeder->onExit(msg, out);
169 pFeeder->removeClient(msg->
uuid());
170 return pFeeder->pipe().get();
175 SPD_DEBUG(
"::onExit CONSUMER [{}] client on network [{}] has left",
self, msg->
uuid());
176 SPD_INFO(
"CONSUMER [{}] => [{}]",
self, msg->
uuid());
177 pConsumer->onExit(msg, out);
178 pConsumer->removeClient(msg->
uuid());
179 return pConsumer->pipe().get();
191 SPD_TRACE(
"NodeManager::onWhisper self [{}] from [{}] msg [{}]",
self, msg->
uuid(),
static_cast<void *
>(msg));
193 auto pFeeder =
feeder(
self);
195 SPD_TRACE(
"::onWhisper() FEEDER [{}] from [{}] has msg",
self, msg->
uuid());
196 pFeeder->onWhisper(msg, out);
197 return pFeeder->pipe().get();
202 SPD_TRACE(
"::onWhisper() CONSUMER [{}] from [{}] has msg",
self, msg->
uuid());
203 pConsumer->onWhisper(msg, out);
204 return pConsumer->pipe().get();
207 auto pWorker =
worker(
self);
209 SPD_TRACE(
"::onWhisper() WORKER [{}] from [{}] has msg",
self, msg->
uuid());
210 pWorker->onWhisper(msg, out);
211 return pWorker->pipe().get();
233 auto search =
mJobs.find(pTaskInfo->jobid());
234 if (search !=
mJobs.end()) {
236 pJob = search->second;
243 mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
246 SPD_TRACE(
"Looping feeders");
249 SPD_TRACE(
"Subscribe to feeder [{}]",
feeder.first);
250 feeder.second->subscribe(pTaskInfo->jobid());
254 SPD_TRACE(
"::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
255 pJob->
addTask(pTaskInfo->taskid(), pTaskInfo, type);
263 TaskInfo * pTaskInfo =
nullptr;
265 SPD_TRACE(
"mActiveJobs.size() [{}]",
mActiveJobs.size());
266 while (
mActiveJobs.size() > 0 && pTaskInfo ==
nullptr) {
267 size_t index =
static_cast<size_t>(rand()) %
mActiveJobs.size();
269 auto iJob =
mJobs.find(jobstr);
270 if (iJob !=
mJobs.end()) {
271 pTaskInfo = iJob->second->nextTask();
273 SPD_TRACE(
"getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
282 SPD_TRACE(
"::getNextTask No pTaskInfo found");
292 Job * pJob =
job(pTask->jobid());
293 if (pJob ==
nullptr) {
298 SPD_TRACE(
"TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
304 auto sourceQueue = (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
305 if (pTask->returncode() == 0) {
306 pJob->
moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
309 pJob->
moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
325 if (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
326 SPD_WARN(
"Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
327 pJob->
removeTask(pTask->taskid(), Salsa::Job::assigned);
330 SPD_WARN(
"Task [{}] duplicate found in [running] queue!", pTask->taskid());
331 pJob->
removeTask(pTask->taskid(), Salsa::Job::running);
335 std::vector<std::string> out = {
"TASK_RESULT"};
338 pTask->SerializeToString(&payload);
339 out.push_back(payload);
340 uint32_t slots =
nSlots();
346 if (getenv(
"SALSA_FAKE")) slots *= 10;
348 if (pJob->
size(Job::pending) < slots) {
350 SPD_TRACE(
"We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->
haveMoreTasks());
352 out.push_back(
"SENDTASKS");
353 out.push_back(fmt::format(
"{}", slots));
366 SPD_TRACE(
"Terminating job from client [{}]", uuid);
368 auto iJob =
mJobs.find(uuid);
369 if (iJob !=
mJobs.end()) {
375 f.second->terminateJob(uuid);
380 SPD_TRACE(
"Removing job [{}]", uuid);
382 iJob->second =
nullptr;
385 SPD_TRACE(
"NodeManager::terminateJob print()");
397 SPD_DEBUG(
"Checking finished jobs [{}] to be removed ...",
mFinishedJobs.size());
399 std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
400 uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
401 std::vector<std::string> cleanUUID;
404 if (j ==
nullptr)
continue;
406 SPD_DEBUG(
"Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
408 cleanUUID.push_back(js);
411 if (!cleanUUID.size())
return false;
413 for (
auto u : cleanUUID) {
425 if (
mJobs.size() == 0)
return;
427 std::vector<std::string> cleanUUID;
432 for (
auto job :
mJobs) cleanUUID.push_back(
job.first);
435 for (
auto u : cleanUUID) {
436 SPD_DEBUG(
"Terminating [{}]", u);
449 return search->second;
463 return search->second;
475 return search->second;
486 auto search =
mJobs.find(uuid);
487 if (search !=
mJobs.end()) {
488 return search->second;
533 for (
auto search :
mJobs) {
534 if (search.second !=
nullptr && search.second->feeder() == client_uuid) {
535 jobs.push_back(search.first);
547 num +=
feeder.second->nodeInfo()->slots();
573 if (
job.second->Job::size(Job::pending)) {
575 bool isActiveJob =
false;
577 if (pActiveJobUUID ==
job.first) {
598 auto found =
mJobs.find(jobUUID);
599 if (found !=
mJobs.end()) {
600 return found->second->haveMoreTasks();
630 bool changed =
false;
632 Json::Value & json_jobs = json[
"jobs"];
633 json_jobs = Json::arrayValue;
635 fmt::format(
"v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
636 salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK);
638 if (
mJobs.size() > 0) {
643 if (!force && !changed)
return;
651 std::string name = id;
654 json[
"node"] = f->jsonValueNodeInfo();
655 if (!f->nodeInfo()->name().empty()) name = f->nodeInfo()->name();
658 Json::StreamWriterBuilder wBuilder;
659 wBuilder[
"indentation"] =
"";
660 std::string data = Json::writeString(wBuilder, json);
663 SPD_TRACE(
"Publish sub [salsa:{}] id [{}] data [{}] ",
id, name, data);
void consumer(std::string uuid)
bool haveMoreTasks() const
Task statuses.
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
void json(Json::Value &json)
bool changed() const
Returns if job info was changed.
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
size_t size(EQueueType t=all) const
void feeder(std::string uuid)
bool removeTask(uint32_t id, EQueueType from)
bool isFinished()
Returns if jobs is finished.
std::string uuid() const
returns UUID
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
virtual std::string uuid() const =0
Returns node uuid.
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Publisher * mpPublisher
Publisher.
virtual bool terminateFinishedJobs()
std::vector< std::string > mActiveJobs
List of active jobs.
std::shared_ptr< Feeder > feeder(std::string uuid) const
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
virtual void terminateAllJobs(bool finishedonly=false)
virtual void publish(std::string id, bool force=false) const
std::shared_ptr< Worker > worker(std::string uuid) const
Job * job(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual bool haveMoreTasks()
TaskPool * taskPool()
Get NM's task pool.
virtual void terminateJob(std::string uuid)
virtual bool handleTaskPool(void *p)
virtual Publisher * publisher() const
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
std::shared_ptr< Consumer > consumer(std::string uuid) const
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
virtual void resultTask(TaskInfo *task)
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
std::map< std::string, Job * > mJobs
List of jobs.
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
virtual void addTaskSlot()
std::vector< std::string > mFinishedJobs
List of finished jobs.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
int32_t nSlots(double mult=1.0) const
virtual void noMoreTasks(std::string jobUUID)
TaskPool * mpTaskPool
Task pool.
void print(std::string opt="") const
virtual void publish(std::string id, std::string name, std::string data)=0
Publish TODO publish what?
Base salsa TaskPool class.
void print(bool verbose=false) const
bool terminateJob(Job *pJob)