3 #include "NodeManager.hh" 40 SPD_TRACE(
"::onExit inMSG [{}]", pInMsg->
uuid());
46 if (pInMsg->
uuid() == node.uuid()) {
50 slots += node.slots();
59 mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
63 SPD_WARN(
"WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->
uuid(), pTask->jobid(),
67 if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68 pJob->
moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
70 else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71 pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
74 if (!getenv(
"SALSA_FAST")) {
75 if (!pJob->consumer().empty()) {
77 std::vector<std::string> outData;
78 outData.push_back(
"JOBRESUBMITED");
80 pTask->SerializeToString(&payload);
81 outData.push_back(payload);
106 std::vector<std::string> inContent = pInMsg->
content();
108 SPD_TRACE(
"::onWhisper inMSG [{}]", inContent[0]);
109 if (inContent[0] ==
"FREESLOT") {
111 SPD_TRACE(
"Searching for task in one of jobs");
114 if (pTask ==
nullptr) {
115 SPD_TRACE(
"Sending back NOMORETASKS");
116 out.push_back(
"NOMORETASKS");
117 out.push_back(inContent[1]);
120 out.push_back(
"TASK");
122 pTask->SerializeToString(&payload);
123 out.push_back(payload);
124 out.push_back(inContent[1]);
126 SPD_TRACE(
"mWorkerTasks[{}] vector size [{}]", pInMsg->
uuid(),
mWorkerTasks[pInMsg->
uuid()].size());
129 else if (inContent[0] ==
"TASK_IS_RUNNING") {
130 SPD_TRACE(
"TASK_IS_RUNNING");
132 std::string payload = inContent[1];
133 TaskInfo * pTask =
new TaskInfo();
135 if (!pTask->ParseFromString(payload)) {
136 SPD_ERROR(
"Message does not contain ProtoBuf message!");
137 for (
auto s : inContent) {
138 SPD_ERROR(
"::onWhisper inMSG [{}]", s);
146 if (job->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
147 job->
moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
152 std::vector<std::string> output;
153 output.push_back(inContent[0]);
154 output.push_back(inContent[1]);
163 else if (inContent[0] ==
"TASK_RESULT") {
164 std::string payload = inContent[1];
165 TaskInfo * pTask =
new TaskInfo();
167 if (!pTask->ParseFromString(payload)) {
168 SPD_ERROR(
"Message does not contain ProtoBuf message!");
169 for (
auto line : inContent) {
170 SPD_ERROR(
"::onWhisper inMSG [{}]", line);
182 else if (inContent[0] ==
"NODEINFO") {
183 std::string payload = inContent[1];
184 Salsa::NodeInfo * pNI =
mpNodeInfo->add_hosts();
185 if (!pNI->ParseFromString(payload)) {
186 SPD_ERROR(
"[NodeInfo] Message does not contain ProtoBuf message!");
206 SPD_INFO(
"Client [{}] started",
uuid);
207 SPD_TRACE(
"Feeders -> [{}]",
mClients.size());
209 std::vector<std::string> out;
210 out.push_back(
"SUB");
236 if (pTask->taskid() == pTaskInfo->taskid()) {
252 std::vector<Salsa::TaskInfo *> tasks;
253 job->
tasks(tasks, Job::pending,
false);
254 job->tasks(tasks, Job::assigned,
false);
255 job->tasks(tasks, Job::running,
false);
256 for (
auto pTask : tasks) {
257 SPD_TRACE(
"removeWorkerTask [{}]", pTask->taskid());
262 std::vector<std::string> out;
263 out.push_back(
"TERMINATEJOB");
267 SPD_INFO(
"JOB [{}] has finished",
uuid);
std::string mUUID
Self UUID.
NodeInfo * nodeInfo() const
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
void consumer(std::string uuid)
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
void terminateJob(std::string uuid)
virtual void upadateJsonValueNodeInfo()
NodeManager * mpNodeManager
Node Manager.
virtual bool haveMoreTasks()
void tasks(std::vector< TaskInfo *> &v, EQueueType type, bool clear=true)
virtual void resultTask(TaskInfo *task)
void feeder(std::string uuid)
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
void subscribe(std::string uuid)
void print(std::string opt="") const
std::map< std::string, std::string > mClients
List of clients.
std::shared_ptr< Socket > pipe() const
TODO Returns distributor's pipe?
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
NodeInfo * mpNodeInfo
Node Info.
std::string uuid() const
Returns distributor's UUID.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
void removeWorkerTask(TaskInfo *pTI)
std::shared_ptr< Consumer > consumer(std::string uuid) const