12 std::string
const numCoresEnvName =
"SALSA_WORKERS_COUNT";
14 mNumCores = std::thread::hardware_concurrency();
15 if (std::getenv(numCoresEnvName.c_str())) {
17 std::string numCoresStr = std::getenv(numCoresEnvName.c_str());
18 if (!numCoresStr.empty()) {
19 SPD_INFO(
"Using SALSA_WORKERS_COUNT env to set number of cores [{}] ...", numCoresStr);
20 std::size_t found = numCoresStr.find(
'm');
23 if (found != std::string::npos) {
24 numCoresStr.erase(numCoresStr.begin() + found);
25 tmpCores = std::stoi(numCoresStr) / 1000;
28 tmpCores = std::stoi(numCoresStr);
32 mNumCores =
static_cast<unsigned int>(tmpCores);
35 SPD_ERROR(
"Provided env variable [{}] is zero/negative! Value: [{}]!!!", numCoresEnvName, tmpCores);
36 throw std::exception();
40 catch (std::exception &)
43 SPD_ERROR(
"An exception occured while trying to parse env var [{}]", numCoresEnvName);
50 for (uint32_t iSlot = 0; iSlot <
mNumCores; iSlot++) {
51 SPD_TRACE(
"Worker [{}] slot [{}]",
mUUID, iSlot);
106 std::vector<std::string> inContent = inMsg->
content();
108 if (inContent[0] ==
"SUB") {
110 if (ts && ts->
id() > 0) {
111 SPD_TRACE(
"AFTER SUB reserving task [{}]", ts->
id());
112 out.push_back(
"FREESLOT");
113 out.push_back(fmt::format(
"{}", ts->
id()));
114 ts->
state(TaskState::assigned);
118 else if (inContent[0] ==
"TASK") {
119 std::string payload = inContent[1];
120 uint32_t
id =
static_cast<uint32_t
>(strtoul(inContent[2].c_str(),
nullptr, 0));
122 SPD_TRACE(
"Searching in task pool for id [{}] and found state [{}] ",
id,
static_cast<int>(ts->
state()));
124 TaskInfo * task = ts->
task();
126 task =
new TaskInfo();
130 if (!task->ParseFromString(payload)) {
131 SPD_ERROR(
"Message does not contain ProtoBuf message!");
132 for (
auto s : inContent) {
133 SPD_ERROR(
"::onWhisper inMSG [{}]", s);
137 SPD_TRACE(
"[{}] TASK from [{}] JOB [{}:{}] started",
mUUID, inMsg->
uuid(), task->jobid(), task->taskid());
140 if (ts->
state() != TaskState::assigned)
141 SPD_ERROR(
"Task [{}:{}] is not assigned and it should be. Problem with reservation !!!!", task->jobid(),
147 if (!getenv(
"SALSA_FAST")) {
148 out.push_back(
"TASK_IS_RUNNING");
149 out.push_back(payload);
153 if (ts && ts->
id() > 0) {
154 SPD_TRACE(
"AFTER TASK reserving task [{}]", ts->
id());
157 out.push_back(
"FREESLOT");
158 out.push_back(fmt::format(
"{}", ts->
id()));
159 ts->
state(TaskState::assigned);
162 else if (inContent[0] ==
"NOMORETASKS") {
164 SPD_TRACE(
"Releasing reservation [{}] because of no more jobs", inContent[1]);
165 uint32_t
id =
static_cast<uint32_t
>(strtoul(inContent[1].c_str(),
nullptr, 0));
169 else if (inContent[0] ==
"TERMINATEJOB") {
172 SPD_INFO(
"WORKER [{}] has finished job [{}]",
mUUID, inContent[1]);