2#include "NodeManager.hh"
6 , mFinishedJobTimeout(24 * 3600)
11 srand(
static_cast<uint32_t
>(time(
nullptr)));
38 SPD_TRACE(
"mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ",
mFeeders.size(),
mConsumers.size(),
41 if (
mJobs.size() > 0) {
42 SPD_DEBUG(
"= JOBS =======================");
43 for (
auto j :
mJobs) {
46 SPD_DEBUG(
"==============================");
49 SPD_DEBUG(
"= NO JOBS ====================");
63 mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket,
this));
72 mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket,
this));
81 mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket,
this));
90 SPD_TRACE(
"NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]", self, msg->
uuid(), fromType,
91 static_cast<void *
>(msg));
94 auto pFeeder =
feeder(self);
96 SPD_DEBUG(
"::onEnter FEEDER [{}] has client on network [{}] type [{}]", self, msg->
uuid(), fromType);
97 SPD_INFO(
"FEEDER [{}] <= [{}] [{}]", self, msg->
uuid(), fromType);
98 if (fromType ==
"CONSUMER") {
99 pFeeder->addClient(msg->
uuid(), fromType);
100 pFeeder->onEnter(msg, out, fromType);
102 else if (fromType ==
"WORKER") {
103 pFeeder->addClient(msg->
uuid(), fromType);
104 pFeeder->onEnter(msg, out, fromType);
106 else if (fromType ==
"DISCOVERY") {
108 SPD_DEBUG(
"DISCOVERY is here");
111 pFeeder->addOther(msg->
uuid(), fromType);
114 return pFeeder->pipe().get();
119 SPD_DEBUG(
"::onEnter CONSUMER [{}] has client on network [{}] type [{}]", self, msg->
uuid(), fromType);
120 SPD_INFO(
"CONSUMER [{}] <= [{}] [{}]", self, msg->
uuid(), fromType);
121 if (fromType ==
"FEEDER") {
122 pConsumer->addClient(msg->
uuid(), fromType);
123 pConsumer->onEnter(msg, out, fromType);
126 pConsumer->addOther(msg->
uuid(), fromType);
129 return pConsumer->pipe().get();
132 auto pWorker =
worker(self);
134 SPD_DEBUG(
"::onEnter WORKER [{}] has client on network [{}] type [{}]", self, msg->
uuid(), fromType);
135 SPD_INFO(
"WORKER [{}] <= [{}] [{}]", self, msg->
uuid(), fromType);
136 if (fromType ==
"FEEDER") {
137 pWorker->addClient(msg->
uuid(), fromType);
138 pWorker->onEnter(msg, out, fromType);
141 pWorker->addOther(msg->
uuid(), fromType);
144 return pWorker->pipe().get();
155 SPD_TRACE(
"NodeManager::onExit self [{}] from [{}] msg [{}]", self, msg->
uuid(),
static_cast<void *
>(msg));
157 auto pWorker =
worker(self);
159 SPD_DEBUG(
"::onExit WORKER [{}] client on network [{}] has left", self, msg->
uuid());
160 SPD_INFO(
"WORKER [{}] => [{}]", self, msg->
uuid());
161 pWorker->onExit(msg, out);
162 pWorker->removeClient(msg->
uuid());
163 return pWorker->pipe().get();
166 auto pFeeder =
feeder(self);
168 SPD_DEBUG(
"::onExit FEEDER [{}] client on network [{}] has left", self, msg->
uuid());
169 SPD_INFO(
"FEEDER [{}] => [{}]", self, msg->
uuid());
170 pFeeder->onExit(msg, out);
171 pFeeder->removeClient(msg->
uuid());
172 return pFeeder->pipe().get();
177 SPD_DEBUG(
"::onExit CONSUMER [{}] client on network [{}] has left", self, msg->
uuid());
178 SPD_INFO(
"CONSUMER [{}] => [{}]", self, msg->
uuid());
179 pConsumer->onExit(msg, out);
180 pConsumer->removeClient(msg->
uuid());
181 return pConsumer->pipe().get();
193 SPD_TRACE(
"NodeManager::onWhisper self [{}] from [{}] msg [{}]", self, msg->
uuid(),
static_cast<void *
>(msg));
195 auto pFeeder =
feeder(self);
197 SPD_TRACE(
"::onWhisper() FEEDER [{}] from [{}] has msg", self, msg->
uuid());
198 pFeeder->onWhisper(msg, out);
199 return pFeeder->pipe().get();
204 SPD_TRACE(
"::onWhisper() CONSUMER [{}] from [{}] has msg", self, msg->
uuid());
205 pConsumer->onWhisper(msg, out);
206 return pConsumer->pipe().get();
209 auto pWorker =
worker(self);
211 SPD_TRACE(
"::onWhisper() WORKER [{}] from [{}] has msg", self, msg->
uuid());
212 pWorker->onWhisper(msg, out);
213 return pWorker->pipe().get();
235 auto search =
mJobs.find(pTaskInfo->jobid());
236 if (search !=
mJobs.end()) {
238 pJob = search->second;
245 mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
248 SPD_TRACE(
"Looping feeders");
251 SPD_TRACE(
"Subscribe to feeder [{}]",
feeder.first);
252 feeder.second->subscribe(pTaskInfo->jobid());
256 SPD_TRACE(
"::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
257 pJob->
addTask(pTaskInfo->taskid(), pTaskInfo, type);
265 TaskInfo * pTaskInfo =
nullptr;
267 SPD_TRACE(
"mActiveJobs.size() [{}]",
mActiveJobs.size());
268 while (
mActiveJobs.size() > 0 && pTaskInfo ==
nullptr) {
269 size_t index =
static_cast<size_t>(rand()) %
mActiveJobs.size();
271 auto iJob =
mJobs.find(jobstr);
272 if (iJob !=
mJobs.end()) {
273 pTaskInfo = iJob->second->nextTask();
275 SPD_TRACE(
"getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
284 SPD_TRACE(
"::getNextTask No pTaskInfo found");
294 Job * pJob =
job(pTask->jobid());
295 if (pJob ==
nullptr) {
300 SPD_TRACE(
"TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
306 auto sourceQueue = (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
307 if (pTask->returncode() == 0) {
308 pJob->
moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
311 pJob->
moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
327 if (pJob->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
328 SPD_WARN(
"Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
329 pJob->
removeTask(pTask->taskid(), Salsa::Job::assigned);
332 SPD_WARN(
"Task [{}] duplicate found in [running] queue!", pTask->taskid());
333 pJob->
removeTask(pTask->taskid(), Salsa::Job::running);
337 std::vector<std::string> out = {
"TASK_RESULT"};
340 pTask->SerializeToString(&payload);
341 out.push_back(payload);
342 uint32_t slots =
nSlots();
348 if (getenv(
"SALSA_FAKE")) slots *= 10;
350 if (pJob->
size(Job::pending) < slots) {
352 SPD_TRACE(
"We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->
haveMoreTasks());
354 out.push_back(
"SENDTASKS");
355 out.push_back(fmt::format(
"{}", slots));
368 SPD_TRACE(
"Terminating job from client [{}]", uuid);
370 auto iJob =
mJobs.find(uuid);
371 if (iJob !=
mJobs.end()) {
377 f.second->terminateJob(uuid);
382 SPD_TRACE(
"Removing job [{}]", uuid);
384 iJob->second =
nullptr;
387 SPD_TRACE(
"NodeManager::terminateJob print()");
399 SPD_DEBUG(
"Checking finished jobs [{}] to be removed ...",
mFinishedJobs.size());
401 std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
402 uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
403 std::vector<std::string> cleanUUID;
406 if (j ==
nullptr)
continue;
408 SPD_DEBUG(
"Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
410 cleanUUID.push_back(js);
413 if (!cleanUUID.size())
return false;
415 for (
auto u : cleanUUID) {
427 if (
mJobs.size() == 0)
return;
429 std::vector<std::string> cleanUUID;
434 for (
auto job :
mJobs) cleanUUID.push_back(
job.first);
437 for (
auto u : cleanUUID) {
438 SPD_DEBUG(
"Terminating [{}]", u);
451 return search->second;
465 return search->second;
477 return search->second;
488 auto search =
mJobs.find(uuid);
489 if (search !=
mJobs.end()) {
490 return search->second;
535 for (
auto search :
mJobs) {
536 if (search.second !=
nullptr && search.second->feeder() == client_uuid) {
537 jobs.push_back(search.first);
549 num +=
feeder.second->nodeInfo()->slots();
575 if (
job.second->Job::size(Job::pending)) {
577 bool isActiveJob =
false;
579 if (pActiveJobUUID ==
job.first) {
601 if (found !=
mJobs.end()) {
602 return found->second->haveMoreTasks();
632 bool changed =
false;
633 bool jobJustFinished =
false;
635 Json::Value & json_jobs = json[
"jobs"];
636 json_jobs = Json::arrayValue;
638 fmt::format(
"v{}.{}.{}-{}", salsa_VERSION_MAJOR(salsa_VERSION), salsa_VERSION_MINOR(salsa_VERSION),
639 salsa_VERSION_PATCH(salsa_VERSION), salsa_VERSION_TWEAK);
641 if (
mJobs.size() > 0) {
647 SPD_DEBUG(
"force=[{}] changed=[{}] jobJustFinished=[{}]", force, changed, jobJustFinished);
649 if (!force && !changed && !jobJustFinished)
return false;
661 json[
"node"] = f->jsonValueNodeInfo();
662 if (!f->nodeInfo()->name().empty()) name = f->nodeInfo()->name();
665 Json::StreamWriterBuilder wBuilder;
666 wBuilder[
"indentation"] =
"";
667 std::string data = Json::writeString(wBuilder, json);
669 SPD_DEBUG(
"Publish sub [salsa:{}] id [{}] data [{}] ", name, name, data);
void consumer(std::string uuid)
bool haveMoreTasks() const
Task statuses.
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
void json(Json::Value &json)
bool changed() const
Returns if job info was changed.
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
size_t size(EQueueType t=all) const
void feeder(std::string uuid)
bool isJustFinished()
Returns if job was just finished.
bool removeTask(uint32_t id, EQueueType from)
bool isFinished()
Returns if jobs is finished.
std::string uuid() const
returns UUID
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
virtual std::string uuid() const =0
Returns node uuid.
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Publisher * mpPublisher
Publisher.
virtual bool publish(std::string id, bool force=false) const
virtual bool terminateFinishedJobs()
std::vector< std::string > mActiveJobs
List of active jobs.
std::shared_ptr< Feeder > feeder(std::string uuid) const
std::string mClusterAlias
Cluster alias.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
virtual void terminateAllJobs(bool finishedonly=false)
std::shared_ptr< Worker > worker(std::string uuid) const
Job * job(std::string uuid)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual bool haveMoreTasks()
TaskPool * taskPool()
Get NM's task pool.
virtual void terminateJob(std::string uuid)
virtual bool handleTaskPool(void *p)
virtual Publisher * publisher() const
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
std::shared_ptr< Consumer > consumer(std::string uuid) const
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
virtual void resultTask(TaskInfo *task)
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
std::map< std::string, Job * > mJobs
List of jobs.
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
virtual void addTaskSlot()
std::vector< std::string > mFinishedJobs
List of finished jobs.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
int32_t nSlots(double mult=1.0) const
virtual void noMoreTasks(std::string jobUUID)
TaskPool * mpTaskPool
Task pool.
void print(std::string opt="") const
virtual void publish(std::string id, std::string name, std::string data, bool force=true)=0
Publish TODO publish what?
Base salsa TaskPool class.
TaskState * find(void *p) const
void print(bool verbose=false) const
bool terminateJob(Job *pJob)