salsa  0.4.15
TaskPool.cc
1 #include "TaskPool.hh"
2 #include "Job.hh"
3 #include "NodeManager.hh"
4 #include "TaskExecutor.hh"
5 namespace Salsa {
6 
7 TaskPool::TaskPool(NodeManager * pNodeManager) : Object(), mpNodeManager(pNodeManager)
8 {
12 }
14 {
18  for (auto task : mTasks) {
19  delete task.second;
20  }
21  mTasks.clear();
22 }
23 
24 void TaskPool::add(void * pPointer, TaskState * pTaskState)
25 {
29  if (pPointer == nullptr) return;
30 
31  mTasks.insert(std::make_pair(pPointer, pTaskState));
32  pTaskState->id(mTasks.size());
33 }
34 
35 TaskState * TaskPool::find(void * pPointer) const
36 {
41  auto found = mTasks.find(pPointer);
42  if (found != mTasks.end()) {
43  return found->second;
44  }
45  return nullptr;
46 }
47 
48 TaskState * TaskPool::findById(uint32_t id) const
49 {
53 
54  for (auto task : mTasks) {
55  if (task.second->id() == id) return task.second;
56  }
57 
58  return nullptr;
59 }
60 
62 {
66 
67  for (auto task : mTasks) {
68  if (task.second->state() == TaskState::idle) return task.second;
69  }
70 
71  return nullptr;
72 }
73 
75 {
79  uint32_t nFreeSlots = 0;
80  for (auto task : mTasks) {
81  if (task.second->state() == TaskState::idle) nFreeSlots++;
82  }
83 
84  return nFreeSlots;
85 }
86 
87 void TaskPool::changeState(uint32_t id, TaskState::EState state)
88 {
92  if (id == 0) return;
93 
94  for (auto task : mTasks) {
95  if (task.second->id() == id) task.second->state(state);
96  }
97 }
98 
100 {
104 
105  if (mTasks.size() == 0) return true;
106 
107  pJob->print();
108  std::vector<TaskInfo *> runningTasks;
109  pJob->tasks(runningTasks, Job::EQueueType::running);
110 
111  for (auto runningTask : runningTasks) {
112  for (auto task : mTasks) {
113  if (task.second->state() == TaskState::killed) {
114  continue;
115  }
116  else if (task.second->task() == runningTask) {
117  task.second->killTask();
118  break;
119  }
120  }
121  }
122  // job->tasks(v, Job::EQueueType::pending);
123  // job->tasks(v, Job::EQueueType::assigned);
124 
125  return false;
126 }
127 
128 void TaskPool::print(bool verbose) const
129 {
133  uint32_t stat[TaskState::all] = {};
134 
135  for (auto task : mTasks) {
136  stat[task.second->state()]++;
137  task.second->print(verbose);
138  }
139  SPD_DEBUG("TaskPool I[{}] A[{}] R[{}]", stat[TaskState::idle], stat[TaskState::assigned], stat[TaskState::running]);
140 }
141 
142 bool TaskPool::handlePipe(void * pPipe)
143 {
147 
148  TaskState * pTaskState = find(pPipe);
149  if (pTaskState == nullptr) {
150  SPD_ERROR("pTaskState by actor [{}] is null!!!", static_cast<void *>(pPipe));
151  return false;
152  }
153  if (pTaskState->executor() == nullptr) {
154  SPD_ERROR("pTaskState->executor() by actor [{}] is null!!!", static_cast<void *>(pPipe));
155  return false;
156  }
157  if (pTaskState->executor()->pipe() == nullptr) {
158  SPD_ERROR("pTaskState->executor()->pipe() by actor [{}] is null!!!", static_cast<void *>(pPipe));
159  return false;
160  }
161 
162  std::vector<std::string> extra;
163  pTaskState->executor()->handlePipe(extra);
164  TaskState::EState state = pTaskState->state();
165  if (state == TaskState::assigned) {
166  pTaskState->state(TaskState::running);
167  // handle mJobs
168  // Nothing for now i think
169  }
170  else if (state == TaskState::running || state == TaskState::killed) {
171  pTaskState->state(TaskState::idle);
172  pTaskState->pid(0);
173 
174  std::string wkUUID = extra[0];
175  std::string upstream = extra[1];
176  // handle mJobs
177 
178  Job * pJob = mpNodeManager->job(pTaskState->task()->jobid());
179  if (pJob != nullptr) {
180  SPD_TRACE("TASK ENDED JOB [{}:{}]", pTaskState->task()->jobid(), pTaskState->task()->taskid());
181  pJob->removeTask(pTaskState->task()->taskid(), Salsa::Job::running);
182  }
183  std::vector<std::string> out;
184  out.push_back("TASK_RESULT");
185  std::string payload;
186  pTaskState->task()->SerializeToString(&payload);
187  out.push_back(payload);
188  pTaskState = findFreeTask();
189  if (pTaskState && pTaskState->id() > 0) {
190  SPD_TRACE("AFTER TASK_RESULT sending reserving task [{}]", pTaskState->id());
191  out.push_back("&");
192  out.push_back("FREESLOT");
193  out.push_back(fmt::format("{}", pTaskState->id()));
194  pTaskState->state(TaskState::assigned);
195  }
196 
197  SPD_TRACE("Searching to worker [{}]", wkUUID);
198  std::shared_ptr<Salsa::Worker> pWorker = mpNodeManager->worker(wkUUID);
199  if (pWorker) {
200  SPD_TRACE("Sending via pWorker [{}] to feeder [{}]", wkUUID, upstream);
201  mpNodeManager->sendWhisper(pWorker->pipe().get(), upstream, out);
202  }
203  }
204  print();
205 
206  return true;
207 }
208 
209 } // namespace Salsa
Job class.
Definition: Job.hh:16
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:121
void print() const
Definition: Job.cc:153
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:89
NodeManager class.
Definition: NodeManager.hh:22
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:217
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:467
Job * job(std::string uuid)
Definition: NodeManager.cc:480
Base Salsa Object class.
Definition: Object.hh:15
virtual void * pipe() const
Definition: TaskExecutor.cc:15
virtual bool handlePipe(std::vector< std::string > &)=0
Handle pipe.
TaskState * find(void *p) const
Definition: TaskPool.cc:35
TaskPool(NodeManager *pNM)
Definition: TaskPool.cc:7
void print(bool verbose=false) const
Definition: TaskPool.cc:128
NodeManager * mpNodeManager
Node manager.
Definition: TaskPool.hh:38
void changeState(uint32_t id, TaskState::EState state)
Definition: TaskPool.cc:87
uint32_t nSlotFree()
Definition: TaskPool.cc:74
void add(void *p, TaskState *t)
Definition: TaskPool.cc:24
TaskState * findFreeTask() const
Definition: TaskPool.cc:61
std::map< void *, TaskState * > mTasks
List of task slots.
Definition: TaskPool.hh:37
bool handlePipe(void *pPipe)
Definition: TaskPool.cc:142
bool terminateJob(Job *pJob)
Definition: TaskPool.cc:99
virtual ~TaskPool()
Definition: TaskPool.cc:13
TaskState * findById(uint32_t id) const
Definition: TaskPool.cc:48
Base salsa TaskState class.
Definition: TaskState.hh:16
TaskExecutor * executor()
Definition: TaskState.cc:83
void state(EState s)
Definition: TaskState.cc:36
TaskInfo * task() const
Definition: TaskState.cc:66
void id(uint32_t id)
Definition: TaskState.cc:21
void pid(uint32_t pid)
Definition: TaskState.cc:51
EState
Status of task.
Definition: TaskState.hh:19