salsa  0.4.0
 All Classes Functions Variables Typedefs Enumerations Pages
Worker.cc
1 #include "Worker.hh"
2 #include "NodeInfo.pb.h"
3 #include "NodeManager.hh"
4 #include "TaskInfo.pb.h"
5 namespace Salsa {
6 Worker::Worker(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm) : Distributor(uuid, pipe, nm)
7 {
11 
12  std::string const numCoresEnvName = "SALSA_WORKERS_COUNT";
13 
14  mNumCores = std::thread::hardware_concurrency();
15  if (std::getenv(numCoresEnvName.c_str())) {
16  try {
17  std::string numCoresStr = std::getenv(numCoresEnvName.c_str());
18  if (!numCoresStr.empty()) {
19  SPD_INFO("Using SALSA_WORKERS_COUNT env to set number of cores [{}] ...", numCoresStr);
20  std::size_t found = numCoresStr.find('m');
21  int tmpCores = 0;
22 
23  if (found != std::string::npos) {
24  numCoresStr.erase(numCoresStr.begin() + found);
25  tmpCores = std::stoi(numCoresStr) / 1000;
26  }
27  else {
28  tmpCores = std::stoi(numCoresStr);
29  }
30 
31  if (tmpCores > 0) {
32  mNumCores = static_cast<unsigned int>(tmpCores);
33  }
34  else {
35  SPD_ERROR("Provided env variable [{}] is zero/negative! Value: [{}]!!!", numCoresEnvName, tmpCores);
36  throw std::exception();
37  }
38  }
39  }
40  catch (std::exception &) // std::stoi
41  {
42  // Perhaps it would be better to (re-)throw exception?
43  SPD_ERROR("An exception occured while trying to parse env var [{}]", numCoresEnvName);
44  throw;
45  }
46  }
47 
48  SPD_INFO("WORKER [{}] has {} cores", mUUID, mNumCores);
49 
50  for (uint32_t iSlot = 0; iSlot < mNumCores; iSlot++) {
51  SPD_TRACE("Worker [{}] slot [{}]", mUUID, iSlot);
53  }
54  mpNodeInfo->set_uuid(mUUID);
55  mpNodeInfo->set_slots(mNumCores);
56 }
57 
59 {
63 }
64 
65 uint32_t Worker::numCores() const
66 {
70  return mNumCores;
71 }
72 
73 // void Worker::print() const
74 // {
75 // ///
76 // /// Prints Worker information
77 // ///
78 // }
79 
80 void Worker::onEnter(Message * /*inMsg*/, std::vector<std::string> & out, std::string type)
81 {
85 
86  if (type == "FEEDER") {
87  out.push_back("NODEINFO");
88  std::string payload;
89  mpNodeInfo->SerializeToString(&payload);
90  out.push_back(payload);
91  }
92 }
93 void Worker::onExit(Message * /*inMsg*/, std::vector<std::string> & /*out*/)
94 {
98 }
99 
100 void Worker::onWhisper(Message * inMsg, std::vector<std::string> & out)
101 {
105 
106  std::vector<std::string> inContent = inMsg->content();
107 
108  if (inContent[0] == "SUB") {
110  if (ts && ts->id() > 0) {
111  SPD_TRACE("AFTER SUB reserving task [{}]", ts->id());
112  out.push_back("FREESLOT");
113  out.push_back(fmt::format("{}", ts->id()));
114  ts->state(TaskState::assigned);
115  }
117  }
118  else if (inContent[0] == "TASK") {
119  std::string payload = inContent[1];
120  uint32_t id = static_cast<uint32_t>(strtoul(inContent[2].c_str(), nullptr, 0));
121  TaskState * ts = mpNodeManager->taskPool()->findById(id);
122  SPD_TRACE("Searching in task pool for id [{}] and found state [{}] ", id, ts->state());
123 
124  TaskInfo * task = ts->task();
125  if (!task) {
126  task = new TaskInfo();
127  ts->task(task);
128  }
129 
130  if (!task->ParseFromString(payload)) {
131  SPD_ERROR("Message does not contain ProtoBuf message!");
132  for (auto s : inContent) {
133  SPD_ERROR("::onWhisper inMSG [{}]", s);
134  }
135  return;
136  }
137  SPD_TRACE("[{}] TASK from [{}] JOB [{}:{}] started", mUUID, inMsg->uuid(), task->jobid(), task->taskid());
138 
139  // TODO : check if it is assigned (it should be already by reservation)
140  if (ts->state() != TaskState::assigned)
141  SPD_ERROR("Task [{}:{}] is not assigned and it should be. Problem with reservation !!!!", task->jobid(),
142  task->taskid());
143 
144  mpNodeManager->addTask(task, mUUID, inMsg->uuid(), Salsa::Job::running);
145  mpNodeManager->runTask(ts, mUUID, inMsg->uuid());
146 
147  if (!getenv("SALSA_FAST")) {
148  out.push_back("TASK_IS_RUNNING");
149  out.push_back(payload);
150  }
151 
153  if (ts && ts->id() > 0) {
154  SPD_TRACE("AFTER TASK reserving task [{}]", ts->id());
155  // TODO if free core available then do
156  out.push_back("&");
157  out.push_back("FREESLOT");
158  out.push_back(fmt::format("{}", ts->id()));
159  ts->state(TaskState::assigned);
160  }
161  }
162  else if (inContent[0] == "NOMORETASKS") {
163  // Unsubscribe worker from feeder if needed
164  SPD_TRACE("Releasing reservation [{}] because of no more jobs", inContent[1]);
165  uint32_t id = static_cast<uint32_t>(strtoul(inContent[1].c_str(), nullptr, 0));
166  mpNodeManager->taskPool()->changeState(id, TaskState::idle);
168  }
169  else if (inContent[0] == "TERMINATEJOB") {
170  // terminate job
171  mpNodeManager->terminateJob(inContent[1]);
172  SPD_INFO("WORKER [{}] has finished job [{}]", mUUID, inContent[1]);
173  }
174 }
175 
176 } // namespace Salsa
std::string mUUID
Self UUID.
Definition: Distributor.hh:54
Base Message class.
Definition: Message.hh:15
Base Distributor class.
Definition: Distributor.hh:19
Base salsa TaskState class.
Definition: TaskState.hh:16
uint32_t mNumCores
Number of cores.
Definition: Worker.hh:28
NodeManager * mpNodeManager
Node Manager.
Definition: Distributor.hh:58
Worker(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
Definition: Worker.cc:6
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:360
virtual void addTaskSlot()
Definition: NodeManager.cc:493
virtual ~Worker()
Definition: Worker.cc:58
NodeManager class.
Definition: NodeManager.hh:22
TaskState * findFreeTask() const
Definition: TaskPool.cc:61
void changeState(uint32_t id, TaskState::EState state)
Definition: TaskPool.cc:87
void state(EState s)
Definition: TaskState.cc:35
virtual std::string uuid() const =0
Returns node uuid.
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
Definition: Worker.cc:93
uint32_t numCores() const
Definition: Worker.cc:65
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
Definition: Worker.cc:80
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
Definition: Worker.cc:100
virtual void runTask(TaskState *ts, std::string wk, std::string upstream)=0
Run task interface.
TaskInfo * task() const
Definition: TaskState.cc:65
NodeInfo * mpNodeInfo
Node Info.
Definition: Distributor.hh:59
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
TaskPool * taskPool()
Get NM&#39;s task pool.
Definition: NodeManager.cc:511
void print(bool verbose=false) const
Definition: TaskPool.cc:128
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:226
void id(uint32_t id)
Definition: TaskState.cc:20
TaskState * findById(uint32_t id) const
Definition: TaskPool.cc:48