salsa  0.7.1
Feeder.cc
1 #include "Feeder.hh"
2 #include "Job.hh"
3 #include "NodeManager.hh"
4 
5 using namespace fmt::literals;
6 
7 namespace Salsa {
8 Feeder::Feeder(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * pNM) : Distributor(uuid, pipe, pNM)
9 {
13  mpNodeInfo->set_uuid(mUUID);
14 }
15 
17 {
21 }
22 
23 void Feeder::onEnter(Message * /*pInMsg*/, std::vector<std::string> & out, std::string type)
24 {
28 
29  if (type == "WORKER" && mpNodeManager->hasJobs()) {
30  out.push_back("SUB");
31  }
32 }
33 
34 void Feeder::onExit(Message * pInMsg, std::vector<std::string> & /*out*/)
35 {
39 
40  SPD_TRACE("::onExit inMSG [{}]", pInMsg->uuid());
41 
42  uint32_t slots = 0;
43  uint32_t iPos = 0;
44  bool found = false;
45  for (auto & node : mpNodeInfo->hosts()) {
46  if (pInMsg->uuid() == node.uuid()) {
47  found = true;
48  }
49  else {
50  slots += node.slots();
51  if (!found) {
52  iPos++;
53  }
54  }
55  }
56 
57  // TODO: Continue if worker (Better check is needed)
58  if (found) { // if node is found in local reg
59  mpNodeInfo->mutable_hosts()->DeleteSubrange(iPos, 1);
60  mpNodeInfo->set_slots(slots);
61 
62  for (auto pTask : mWorkerTasks[pInMsg->uuid()]) {
63  SPD_WARN("WORKER [{}] exit. Moving task [{}:{}] to Job::pending", pInMsg->uuid(), pTask->jobid(),
64  pTask->taskid());
65  Job * pJob = mpNodeManager->job(pTask->jobid());
66  if (pJob) {
67  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::running)) {
68  pJob->moveTask(pTask->taskid(), Salsa::Job::running, Salsa::Job::pending);
69  }
70  else if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
71  pJob->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::pending);
72  }
73 
74  if (!getenv("SALSA_FAST")) {
75  if (!pJob->consumer().empty()) {
76  std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(pJob->consumer());
77  std::vector<std::string> outData;
78  outData.push_back("JOBRESUBMITED");
79  std::string payload;
80  pTask->SerializeToString(&payload);
81  outData.push_back(payload);
82  mpNodeManager->sendWhisper(pConsumer->pipe().get(), pJob->feeder(), outData);
83  }
84  }
85  }
86  }
87 
88  mWorkerTasks[pInMsg->uuid()].clear();
89  mWorkerTasks.erase(pInMsg->uuid());
90  mClients.erase(pInMsg->uuid());
91 
93  subscribe(pInMsg->uuid());
94  }
97  SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
98  }
99 }
100 void Feeder::onWhisper(Message * pInMsg, std::vector<std::string> & out)
101 {
105 
106  std::vector<std::string> inContent = pInMsg->content();
107 
108  SPD_TRACE("::onWhisper inMSG [{}]", inContent[0]);
109  if (inContent[0] == "FREESLOT") { // from consumer
110  // Signal arrives from consumer. Indicates that consumer has free exec slot.
111  SPD_TRACE("Searching for task in one of jobs");
112  TaskInfo * pTask = mpNodeManager->getNextTask();
113  // If there's task available, send it out
114  if (pTask == nullptr) {
115  SPD_TRACE("Sending back NOMORETASKS");
116  out.push_back("NOMORETASKS");
117  out.push_back(inContent[1]);
118  }
119  else {
120  out.push_back("TASK");
121  std::string payload;
122  pTask->SerializeToString(&payload);
123  out.push_back(payload);
124  out.push_back(inContent[1]);
125  mWorkerTasks[pInMsg->uuid()].push_back(pTask);
126  SPD_TRACE("mWorkerTasks[{}] vector size [{}]", pInMsg->uuid(), mWorkerTasks[pInMsg->uuid()].size());
127  }
128  }
129  else if (inContent[0] == "TASK_IS_RUNNING") { // from consumer
130  SPD_TRACE("TASK_IS_RUNNING");
131 
132  std::string payload = inContent[1];
133  TaskInfo * pTask = new TaskInfo();
134  {
135  if (!pTask->ParseFromString(payload)) {
136  SPD_ERROR("Message does not contain ProtoBuf message!");
137  for (auto s : inContent) {
138  SPD_ERROR("::onWhisper inMSG [{}]", s);
139  }
140  return;
141  }
142  }
143 
144  Job * job = mpNodeManager->job(pTask->jobid());
145  if (job) {
146  if (job->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
147  job->moveTask(pTask->taskid(), Salsa::Job::assigned, Salsa::Job::running);
148  }
149 
150  if (!job->consumer().empty()) {
151  std::shared_ptr<Consumer> pConsumer = mpNodeManager->consumer(job->consumer());
152  std::vector<std::string> output;
153  output.push_back(inContent[0]);
154  output.push_back(inContent[1]);
155  mpNodeManager->sendWhisper(pConsumer->pipe().get(), job->feeder(), output);
156  }
157  else {
158  delete pTask;
159  }
160  }
161  mpNodeManager->print();
162  }
163  else if (inContent[0] == "TASK_RESULT") { // from consumer
164  std::string payload = inContent[1];
165  TaskInfo * pTask = new TaskInfo();
166  {
167  if (!pTask->ParseFromString(payload)) {
168  SPD_ERROR("Message does not contain ProtoBuf message!");
169  for (auto line : inContent) {
170  SPD_ERROR("::onWhisper inMSG [{}]", line);
171  }
172  return;
173  }
174  }
175 
176  removeWorkerTask(pTask, pInMsg->uuid());
177 
178  // task is deleted in next line
179  mpNodeManager->resultTask(pTask);
180  // TODO possible memleak? @mvala
181  }
182  else if (inContent[0] == "NODEINFO") { // from consumer
183  std::string payload = inContent[1];
184  Salsa::NodeInfo * pNI = mpNodeInfo->add_hosts();
185  if (!pNI->ParseFromString(payload)) {
186  SPD_ERROR("[NodeInfo] Message does not contain ProtoBuf message!");
187  }
188  uint32_t slots = 0;
189 
190  for (auto nodeInfo : mpNodeInfo->hosts()) {
191  slots += nodeInfo.slots();
192  }
193  mpNodeInfo->set_slots(slots);
195 
196  SPD_INFO("Workers [{}] slots [{}]", mpNodeInfo->hosts_size(), mpNodeInfo->slots());
197  }
198 }
199 
200 void Feeder::subscribe(std::string uuid)
201 {
205 
206  SPD_INFO("Client [{}] started", uuid);
207  SPD_TRACE("Feeders -> [{}]", mClients.size());
208  for (auto client : mClients) {
209  std::vector<std::string> out;
210  out.push_back("SUB");
211  mpNodeManager->sendWhisper(pipe().get(), client.first, out);
212  }
213 }
214 
215 void Feeder::removeWorkerTask(TaskInfo * pTaskInfo)
216 {
220 
221  for (auto wkTask : mWorkerTasks) {
222  removeWorkerTask(pTaskInfo, wkTask.first);
223  }
224 }
225 
226 void Feeder::removeWorkerTask(TaskInfo * pTaskInfo, std::string uuid)
227 {
231  SPD_TRACE("mWorkerTasks[{}].size() [{}]", uuid, mWorkerTasks[uuid].size());
232  if (mWorkerTasks[uuid].size() == 0) return;
233 
234  int iPos = 0;
235  for (auto pTask : mWorkerTasks[uuid]) {
236  if (pTask->taskid() == pTaskInfo->taskid()) {
237  mWorkerTasks[uuid].erase(mWorkerTasks[uuid].begin() + iPos);
238  return;
239  }
240  iPos++;
241  }
242 }
243 
244 void Feeder::terminateJob(std::string uuid)
245 {
249 
250  auto job = mpNodeManager->job(uuid);
251 
252  std::vector<Salsa::TaskInfo *> tasks;
253  job->tasks(tasks, Job::pending, false);
254  job->tasks(tasks, Job::assigned, false);
255  job->tasks(tasks, Job::running, false);
256  for (auto pTask : tasks) {
257  SPD_TRACE("removeWorkerTask [{}]", pTask->taskid());
258  removeWorkerTask(pTask);
259  }
260 
261  for (auto client : mClients) {
262  std::vector<std::string> out;
263  out.push_back("TERMINATEJOB");
264  out.push_back(uuid);
265  mpNodeManager->sendWhisper(pipe().get(), client.first, out);
266  }
267  SPD_INFO("JOB [{}] has finished", uuid);
268 }
269 
270 } // namespace Salsa
Base Distributor class.
Definition: Distributor.hh:19
std::map< std::string, std::string > mClients
List of clients.
Definition: Distributor.hh:56
NodeInfo * nodeInfo() const
Definition: Distributor.cc:92
virtual void upadateJsonValueNodeInfo()
Definition: Distributor.cc:100
std::string mUUID
Self UUID.
Definition: Distributor.hh:54
NodeManager * mpNodeManager
Node Manager.
Definition: Distributor.hh:58
std::shared_ptr< Socket > pipe() const
TODO Returns distributor's pipe?
Definition: Distributor.cc:77
NodeInfo * mpNodeInfo
Node Info.
Definition: Distributor.hh:59
std::string uuid() const
Returns distributor's UUID.
Definition: Distributor.cc:84
virtual ~Feeder()
Definition: Feeder.cc:16
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
Definition: Feeder.cc:23
void terminateJob(std::string uuid)
Definition: Feeder.cc:244
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
Definition: Feeder.cc:34
void removeWorkerTask(TaskInfo *pTI)
Definition: Feeder.cc:215
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
Definition: Feeder.cc:100
std::map< std::string, std::vector< TaskInfo * > > mWorkerTasks
Worker tasks.
Definition: Feeder.hh:33
void subscribe(std::string uuid)
Definition: Feeder.cc:200
Job class.
Definition: Job.hh:16
void consumer(std::string uuid)
Definition: Job.cc:238
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:130
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:56
void feeder(std::string uuid)
Definition: Job.cc:255
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:145
Base Message class.
Definition: Message.hh:15
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
virtual std::string uuid() const =0
Returns node uuid.
NodeManager class.
Definition: NodeManager.hh:22
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:219
Job * job(std::string uuid)
Definition: NodeManager.cc:482
virtual bool haveMoreTasks()
Definition: NodeManager.cc:567
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:457
virtual void resultTask(TaskInfo *task)
Definition: NodeManager.cc:288
bool hasJobs() const
Definition: NodeManager.cc:521
TaskInfo * getNextTask()
Definition: NodeManager.cc:260
void print(std::string opt="") const
Definition: NodeManager.cc:32