2 #include "NodeInfo.pb.h" 3 #include "NodeManager.hh" 4 #include "TaskInfo.pb.h" 12 std::string
const numCoresEnvName =
"SALSA_WORKERS_COUNT";
14 mNumCores = std::thread::hardware_concurrency();
15 if (std::getenv(numCoresEnvName.c_str())) {
17 std::string numCoresStr = std::getenv(numCoresEnvName.c_str());
18 if (!numCoresStr.empty()) {
19 SPD_INFO(
"Using SALSA_WORKERS_COUNT env to set number of cores [{}] ...", numCoresStr);
20 std::size_t found = numCoresStr.find(
'm');
23 if (found != std::string::npos) {
24 numCoresStr.erase(numCoresStr.begin() + found);
25 tmpCores = std::stoi(numCoresStr) / 1000;
28 tmpCores = std::stoi(numCoresStr);
32 mNumCores =
static_cast<unsigned int>(tmpCores);
35 SPD_ERROR(
"Provided env variable [{}] is zero/negative! Value: [{}]!!!", numCoresEnvName, tmpCores);
36 throw std::exception();
40 catch (std::exception &)
43 SPD_ERROR(
"An exception occured while trying to parse env var [{}]", numCoresEnvName);
50 for (uint32_t iSlot = 0; iSlot <
mNumCores; iSlot++) {
51 SPD_TRACE(
"Worker [{}] slot [{}]",
mUUID, iSlot);
86 if (type ==
"FEEDER") {
87 out.push_back(
"NODEINFO");
90 out.push_back(payload);
106 std::vector<std::string> inContent = inMsg->
content();
108 if (inContent[0] ==
"SUB") {
110 if (ts && ts->
id() > 0) {
111 SPD_TRACE(
"AFTER SUB reserving task [{}]", ts->
id());
112 out.push_back(
"FREESLOT");
113 out.push_back(fmt::format(
"{}", ts->
id()));
114 ts->
state(TaskState::assigned);
118 else if (inContent[0] ==
"TASK") {
119 std::string payload = inContent[1];
120 uint32_t
id =
static_cast<uint32_t
>(strtoul(inContent[2].c_str(),
nullptr, 0));
122 SPD_TRACE(
"Searching in task pool for id [{}] and found state [{}] ",
id, ts->
state());
124 TaskInfo * task = ts->
task();
126 task =
new TaskInfo();
130 if (!task->ParseFromString(payload)) {
131 SPD_ERROR(
"Message does not contain ProtoBuf message!");
132 for (
auto s : inContent) {
133 SPD_ERROR(
"::onWhisper inMSG [{}]", s);
137 SPD_TRACE(
"[{}] TASK from [{}] JOB [{}:{}] started",
mUUID, inMsg->
uuid(), task->jobid(), task->taskid());
140 if (ts->
state() != TaskState::assigned)
141 SPD_ERROR(
"Task [{}:{}] is not assigned and it should be. Problem with reservation !!!!", task->jobid(),
147 if (!getenv(
"SALSA_FAST")) {
148 out.push_back(
"TASK_IS_RUNNING");
149 out.push_back(payload);
153 if (ts && ts->
id() > 0) {
154 SPD_TRACE(
"AFTER TASK reserving task [{}]", ts->
id());
157 out.push_back(
"FREESLOT");
158 out.push_back(fmt::format(
"{}", ts->
id()));
159 ts->
state(TaskState::assigned);
162 else if (inContent[0] ==
"NOMORETASKS") {
164 SPD_TRACE(
"Releasing reservation [{}] because of no more jobs", inContent[1]);
165 uint32_t
id =
static_cast<uint32_t
>(strtoul(inContent[1].c_str(),
nullptr, 0));
169 else if (inContent[0] ==
"TERMINATEJOB") {
172 SPD_INFO(
"WORKER [{}] has finished job [{}]",
mUUID, inContent[1]);
std::string mUUID
Self UUID.
Base salsa TaskState class
uint32_t mNumCores
Number of cores.
NodeManager * mpNodeManager
Node Manager.
Worker(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
virtual void terminateJob(std::string uuid)
virtual void addTaskSlot()
TaskState * findFreeTask() const
void changeState(uint32_t id, TaskState::EState state)
virtual std::string uuid() const =0
Returns node uuid.
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Three horsemen of apocalypse.
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
virtual void runTask(TaskState *ts, std::string wk, std::string upstream)=0
Run task interface.
NodeInfo * mpNodeInfo
Node Info.
void print(bool verbose=false) const
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
TaskPool * taskPool()
Get NM's task pool.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
uint32_t numCores() const
TaskState * findById(uint32_t id) const