3 #include <NodeManager.hh> 40 SPD_TRACE(
"::onExit inMSG [{}]", pInMsg->
uuid());
46 if (pInMsg->
uuid() == node.uuid()) {
50 slots += node.slots();
59 mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
63 SPD_WARN(
"WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->
uuid(), pTask->jobid(),
67 if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68 pJob->
moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
70 else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71 pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
74 if (!getenv(
"SALSA_FAST")) {
75 if (!pJob->consumer().empty()) {
77 std::vector<std::string> outData;
78 outData.push_back(
"JOBRESUBMITED");
80 pTask->SerializeToString(&payload);
81 outData.push_back(payload);
104 std::vector<std::string> inContent = pInMsg->
content();
106 SPD_TRACE(
"::onWhisper inMSG [{}]", inContent[0]);
107 if (inContent[0] ==
"FREESLOT") {
108 SPD_TRACE(
"Searching for task in one of jobs");
110 if (pTask ==
nullptr) {
111 SPD_TRACE(
"Sending back NOMORETASKS");
112 out.push_back(
"NOMORETASKS");
113 out.push_back(inContent[1]);
116 out.push_back(
"TASK");
118 pTask->SerializeToString(&payload);
119 out.push_back(payload);
120 out.push_back(inContent[1]);
122 SPD_TRACE(
"mWorkerTasks[{}] vector size [{}]", pInMsg->
uuid(),
mWorkerTasks[pInMsg->
uuid()].size());
125 else if (inContent[0] ==
"TASK_IS_RUNNING") {
126 SPD_TRACE(
"TASK_IS_RUNNING");
128 std::string payload = inContent[1];
129 TaskInfo * pTask =
new TaskInfo();
131 if (!pTask->ParseFromString(payload)) {
132 SPD_ERROR(
"Message does not contain ProtoBuf message!");
133 for (
auto s : inContent) {
134 SPD_ERROR(
"::onWhisper inMSG [{}]", s);
142 if (job->
isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
143 job->
moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
148 std::vector<std::string> output;
149 output.push_back(inContent[0]);
150 output.push_back(inContent[1]);
159 else if (inContent[0] ==
"TASK_RESULT") {
160 std::string payload = inContent[1];
161 TaskInfo * pTask =
new TaskInfo();
163 if (!pTask->ParseFromString(payload)) {
164 SPD_ERROR(
"Message does not contain ProtoBuf message!");
165 for (
auto s : inContent) {
166 SPD_ERROR(
"::onWhisper inMSG [{}]", s);
178 else if (inContent[0] ==
"NODEINFO") {
179 std::string payload = inContent[1];
180 Salsa::NodeInfo * pNI =
mpNodeInfo->add_hosts();
181 if (!pNI->ParseFromString(payload)) {
182 SPD_ERROR(
"[NodeInfo] Message does not contain ProtoBuf message!");
200 SPD_INFO(
"Client [{}] started",
uuid);
201 SPD_TRACE(
"Feeders -> [{}]",
mClients.size());
203 std::vector<std::string> out;
204 out.push_back(
"SUB");
230 if (pTask->taskid() == pTaskInfo->taskid()) {
246 std::vector<Salsa::TaskInfo *> tasks;
247 job->
tasks(tasks, Job::pending,
false);
248 job->tasks(tasks, Job::assigned,
false);
249 job->tasks(tasks, Job::running,
false);
250 for (
auto pTask : tasks) {
251 SPD_TRACE(
"removeWorkerTask [{}]", pTask->taskid());
256 std::vector<std::string> out;
257 out.push_back(
"TERMINATEJOB");
261 SPD_INFO(
"JOB [{}] has finished",
uuid);
std::string mUUID
Self UUID.
NodeInfo * nodeInfo() const
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
void consumer(std::string uuid)
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
bool isTaskInQueue(uint32_t id, QueueType type) const
Check task presence in certain queue.
void terminateJob(std::string uuid)
NodeManager * mpNodeManager
Node Manager.
void resultTask(TaskInfo *task)
void feeder(std::string uuid)
virtual std::string uuid() const =0
Returns node uuid.
void tasks(std::vector< TaskInfo *> &v, QueueType type, bool clear=true)
Job * job(std::string uuid)
void subscribe(std::string uuid)
void print(std::string opt="") const
std::map< std::string, std::string > mClients
List of clients.
std::shared_ptr< Socket > pipe() const
TODO Returns distributor's pipe?
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
NodeInfo * mpNodeInfo
Node Info.
std::string uuid() const
Returns distributor's UUID.
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
bool moveTask(uint32_t id, QueueType from, QueueType to)
void removeWorkerTask(TaskInfo *pTI)
std::shared_ptr< Consumer > consumer(std::string uuid) const