salsa 0.7.1
Loading...
Searching...
No Matches
Worker.cc
1#include "Worker.hh"
2#include "NodeInfo.pb.h"
3#include "NodeManager.hh"
4#include "TaskInfo.pb.h"
5namespace Salsa {
6Worker::Worker(std::string uuid, std::shared_ptr<Socket> pipe, NodeManager * nm) : Distributor(uuid, pipe, nm)
7{
11
12 std::string const numCoresEnvName = "SALSA_WORKERS_COUNT";
13
14 mNumCores = std::thread::hardware_concurrency();
15 if (std::getenv(numCoresEnvName.c_str())) {
16 try {
17 std::string numCoresStr = std::getenv(numCoresEnvName.c_str());
18 if (!numCoresStr.empty()) {
19 SPD_INFO("Using SALSA_WORKERS_COUNT env to set number of cores [{}] ...", numCoresStr);
20 std::size_t found = numCoresStr.find('m');
21 int tmpCores = 0;
22
23 if (found != std::string::npos) {
24 numCoresStr.erase(numCoresStr.begin() + found);
25 tmpCores = std::stoi(numCoresStr) / 1000;
26 }
27 else {
28 tmpCores = std::stoi(numCoresStr);
29 }
30
31 if (tmpCores > 0) {
32 mNumCores = static_cast<unsigned int>(tmpCores);
33 }
34 else {
35 SPD_ERROR("Provided env variable [{}] is zero/negative! Value: [{}]!!!", numCoresEnvName, tmpCores);
36 throw std::exception();
37 }
38 }
39 }
40 catch (std::exception &) // std::stoi
41 {
42 // Perhaps it would be better to (re-)throw exception?
43 SPD_ERROR("An exception occured while trying to parse env var [{}]", numCoresEnvName);
44 throw;
45 }
46 }
47
48 SPD_INFO("WORKER [{}] has {} cores", mUUID, mNumCores);
49
50 for (uint32_t iSlot = 0; iSlot < mNumCores; iSlot++) {
51 SPD_TRACE("Worker [{}] slot [{}]", mUUID, iSlot);
53 }
54 mpNodeInfo->set_uuid(mUUID);
55 mpNodeInfo->set_slots(mNumCores);
56}
57
64
65uint32_t Worker::numCores() const
66{
70 return mNumCores;
71}
72
73// void Worker::print() const
74// {
75// ///
76// /// Prints Worker information
77// ///
78// }
79
80void Worker::onEnter(Message * /*inMsg*/, std::vector<std::string> & out, std::string type)
81{
85
86 if (type == "FEEDER") {
87 out.push_back("NODEINFO");
88 std::string payload;
89 mpNodeInfo->SerializeToString(&payload);
90 out.push_back(payload);
91 }
92}
93void Worker::onExit(Message * /*inMsg*/, std::vector<std::string> & /*out*/)
94{
98}
99
100void Worker::onWhisper(Message * inMsg, std::vector<std::string> & out)
101{
105
106 std::vector<std::string> inContent = inMsg->content();
107
108 if (inContent[0] == "SUB") {
110 if (ts && ts->id() > 0) {
111 SPD_TRACE("AFTER SUB reserving task [{}]", ts->id());
112 out.push_back("FREESLOT");
113 out.push_back(fmt::format("{}", ts->id()));
114 ts->state(TaskState::assigned);
115 }
117 }
118 else if (inContent[0] == "TASK") {
119 std::string payload = inContent[1];
120 uint32_t id = static_cast<uint32_t>(strtoul(inContent[2].c_str(), nullptr, 0));
122 SPD_TRACE("Searching in task pool for id [{}] and found state [{}] ", id, static_cast<int>(ts->state()));
123
124 TaskInfo * task = ts->task();
125 if (!task) {
126 task = new TaskInfo();
127 ts->task(task);
128 }
129
130 if (!task->ParseFromString(payload)) {
131 SPD_ERROR("Message does not contain ProtoBuf message!");
132 for (auto s : inContent) {
133 SPD_ERROR("::onWhisper inMSG [{}]", s);
134 }
135 return;
136 }
137 SPD_TRACE("[{}] TASK from [{}] JOB [{}:{}] started", mUUID, inMsg->uuid(), task->jobid(), task->taskid());
138
139 // TODO : check if it is assigned (it should be already by reservation)
140 if (ts->state() != TaskState::assigned)
141 SPD_ERROR("Task [{}:{}] is not assigned and it should be. Problem with reservation !!!!", task->jobid(),
142 task->taskid());
143
144 mpNodeManager->addTask(task, mUUID, inMsg->uuid(), Salsa::Job::running);
145 mpNodeManager->runTask(ts, mUUID, inMsg->uuid());
146
147 if (!getenv("SALSA_FAST")) {
148 out.push_back("TASK_IS_RUNNING");
149 out.push_back(payload);
150 }
151
153 if (ts && ts->id() > 0) {
154 SPD_TRACE("AFTER TASK reserving task [{}]", ts->id());
155 // TODO if free core available then do
156 out.push_back("&");
157 out.push_back("FREESLOT");
158 out.push_back(fmt::format("{}", ts->id()));
159 ts->state(TaskState::assigned);
160 }
161 }
162 else if (inContent[0] == "NOMORETASKS") {
163 // Unsubscribe worker from feeder if needed
164 SPD_TRACE("Releasing reservation [{}] because of no more jobs", inContent[1]);
165 uint32_t id = static_cast<uint32_t>(strtoul(inContent[1].c_str(), nullptr, 0));
166 mpNodeManager->taskPool()->changeState(id, TaskState::idle);
168 }
169 else if (inContent[0] == "TERMINATEJOB") {
170 // terminate job
171 mpNodeManager->terminateJob(inContent[1]);
172 SPD_INFO("WORKER [{}] has finished job [{}]", mUUID, inContent[1]);
173 }
174}
175
176} // namespace Salsa
Base Distributor class.
std::string mUUID
Self UUID.
NodeManager * mpNodeManager
Node Manager.
NodeInfo * mpNodeInfo
Node Info.
Base Message class.
Definition Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
NodeManager class.
TaskPool * taskPool()
Get NM's task pool.
virtual void terminateJob(std::string uuid)
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
virtual void runTask(TaskState *ts, std::string wk, std::string upstream)=0
Run task interface.
virtual void addTaskSlot()
void print(bool verbose=false) const
Definition TaskPool.cc:128
void changeState(uint32_t id, TaskState::EState state)
Definition TaskPool.cc:87
TaskState * findFreeTask() const
Definition TaskPool.cc:61
TaskState * findById(uint32_t id) const
Definition TaskPool.cc:48
Base salsa TaskState class.
Definition TaskState.hh:16
void state(EState s)
Definition TaskState.cc:36
TaskInfo * task() const
Definition TaskState.cc:66
void id(uint32_t id)
Definition TaskState.cc:21
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
Definition Worker.cc:100
virtual ~Worker()
Definition Worker.cc:58
uint32_t mNumCores
Number of cores.
Definition Worker.hh:28
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
Definition Worker.cc:93
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
Definition Worker.cc:80
Worker(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
Definition Worker.cc:6
uint32_t numCores() const
Definition Worker.cc:65