12 #include "NDimensionalIpcRunner.h"
13 #include "NGnThreadData.h"
14 #include "NThreadData.h"
19 std::string SerializeTaskIds(
const std::vector<std::string> & taskIds)
21 std::ostringstream oss;
22 for (
size_t i = 0; i < taskIds.size(); ++i) {
23 if (i != 0) oss <<
',';
29 bool ParseTaskBatchPayload(
const std::string & payload, std::vector<std::pair<std::string, std::vector<int>>> & tasks)
32 std::stringstream batchStream(payload);
33 std::string taskToken;
34 while (std::getline(batchStream, taskToken,
';')) {
35 if (taskToken.empty())
continue;
36 size_t sep = taskToken.find(
':');
37 if (sep == std::string::npos || sep == 0 || sep + 1 >= taskToken.size()) {
40 const std::string taskId = taskToken.substr(0, sep);
41 const std::string coords = taskToken.substr(sep + 1);
42 std::vector<int> parsedCoords;
43 std::stringstream coordStream(coords);
44 std::string coordToken;
45 while (std::getline(coordStream, coordToken,
',')) {
46 if (coordToken.empty())
continue;
47 parsedCoords.push_back(std::stoi(coordToken));
49 tasks.emplace_back(taskId, std::move(parsedCoords));
51 return !tasks.empty();
55 bool NDimensionalIpcRunner::SendFrames(
void * socket,
const std::vector<std::string> & frames)
57 for (
size_t i = 0; i < frames.size(); ++i) {
58 int flags = (i + 1 < frames.size()) ? ZMQ_SNDMORE : 0;
59 if (zmq_send(socket, frames[i].data(), frames[i].size(), flags) < 0) {
66 bool NDimensionalIpcRunner::ReceiveFrames(
void * socket, std::vector<std::string> & outFrames)
72 int rc = zmq_msg_recv(&msg, socket, 0);
77 outFrames.emplace_back(
static_cast<const char *
>(zmq_msg_data(&msg)),
static_cast<size_t>(rc));
78 int more = zmq_msg_more(&msg);
85 std::string NDimensionalIpcRunner::BuildWorkerIdentity(
size_t workerIndex)
87 std::ostringstream oss;
88 oss <<
"wk_" << std::setw(3) << std::setfill(
'0') << workerIndex;
92 std::string NDimensionalIpcRunner::SerializeCoords(
const std::vector<int> & coords)
94 std::ostringstream oss;
95 for (
size_t i = 0; i < coords.size(); ++i) {
96 if (i != 0) oss <<
',';
102 std::string NDimensionalIpcRunner::SerializeIds(
const std::vector<Long64_t> & ids)
104 std::ostringstream oss;
105 for (
size_t i = 0; i < ids.size(); ++i) {
106 if (i != 0) oss <<
',';
112 int NDimensionalIpcRunner::WorkerLoop(
const std::string & endpoint,
size_t workerIndex, NThreadData * worker)
114 std::ostringstream threadName;
115 threadName <<
"ipc_" << std::setw(3) << std::setfill(
'0') << workerIndex;
118 void * ctx = zmq_ctx_new();
121 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
127 const std::string identity = BuildWorkerIdentity(workerIndex);
128 if (zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size()) != 0) {
134 int timeoutMs = 1000;
135 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
137 if (zmq_connect(dealer, endpoint.c_str()) != 0) {
143 if (!SendFrames(dealer, {
"READY"})) {
149 int rc = TaskLoop(dealer, workerIndex, worker);
156 int NDimensionalIpcRunner::TaskLoop(
void * dealer,
size_t workerIndex, NThreadData * worker)
158 bool finishedOk =
true;
159 bool aborted =
false;
160 size_t tasksProcessed = 0;
161 const bool showWorkerProgress = []() {
162 const char * env = gSystem->Getenv(
"NDMSPC_WORKER_PROGRESS");
163 if (!env || env[0] ==
'\0')
return false;
164 std::string value(env);
165 std::transform(value.begin(), value.end(), value.begin(),
166 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
167 return (value ==
"1" || value ==
"true" || value ==
"yes" || value ==
"on");
172 auto checkAbort = [&]() ->
bool {
173 zmq_pollitem_t item = {dealer, 0, ZMQ_POLLIN, 0};
174 if (zmq_poll(&item, 1, 0) <= 0)
return false;
175 std::vector<std::string> peek;
176 if (!ReceiveFrames(dealer, peek))
return false;
177 if (!peek.empty() && peek[0] ==
"STOP") {
178 aborted = (peek.size() >= 2 && peek[1] ==
"abort");
179 if (aborted) NLogPrint(
"Worker %zu: received abort from supervisor, stopping ...", workerIndex);
186 std::vector<std::string> frames;
187 if (!ReceiveFrames(dealer, frames)) {
188 if (errno == EAGAIN || errno == EWOULDBLOCK)
continue;
192 if (frames.empty())
continue;
194 const std::string & cmd = frames[0];
196 if (showWorkerProgress && tasksProcessed > 0) { fprintf(stdout,
"\n"); fflush(stdout); }
197 aborted = (frames.size() >= 2 && frames[1] ==
"abort");
199 NLogPrint(
"Worker %zu: received abort from supervisor, stopping ...", workerIndex);
203 if (cmd ==
"SETDEF") {
204 if (frames.size() < 2) {
208 if (
auto * gnWorker =
dynamic_cast<NGnThreadData *
>(worker)) {
209 gnWorker->SetCurrentDefinitionName(frames[1]);
215 if (cmd ==
"SETIDS") {
216 if (frames.size() < 2) {
220 if (
auto * gnWorker =
dynamic_cast<NGnThreadData *
>(worker)) {
221 gnWorker->SyncCurrentDefinitionIds(ParseIds(frames[1]));
227 if (cmd !=
"TASK" && cmd !=
"TASKB") {
232 std::string errTaskId;
235 if (frames.size() < 3) {
240 const std::string taskId = frames[1];
242 std::vector<int> coords = ParseCoords(frames[2]);
244 if (showWorkerProgress) {
245 fprintf(stdout,
"\rWorker %zu: processing tasks [done: %zu] ", workerIndex, tasksProcessed);
248 worker->Process(coords);
249 if (!SendFrames(dealer, {
"ACK", taskId})) {
255 if (frames.size() < 2) {
260 std::vector<std::pair<std::string, std::vector<int>>> batchTasks;
261 if (!ParseTaskBatchPayload(frames[1], batchTasks)) {
266 std::vector<std::string> ackedTaskIds;
267 ackedTaskIds.reserve(batchTasks.size());
268 tasksProcessed += batchTasks.size();
269 if (showWorkerProgress) {
270 fprintf(stdout,
"\rWorker %zu: processing tasks [done: %zu] ", workerIndex, tasksProcessed);
273 for (
const auto & task : batchTasks) {
274 if (checkAbort()) { finishedOk =
false;
break; }
275 errTaskId = task.first;
276 worker->Process(task.second);
277 ackedTaskIds.push_back(task.first);
281 if (!SendFrames(dealer, {
"ACKB", SerializeTaskIds(ackedTaskIds)})) {
287 catch (
const std::exception & ex) {
288 SendFrames(dealer, {
"ERR", errTaskId.empty() ?
"0" : errTaskId, ex.what()});
293 SendFrames(dealer, {
"ERR", errTaskId.empty() ?
"0" : errTaskId,
"unknown worker exception"});
299 if (
auto * gnWorker =
dynamic_cast<NGnThreadData *
>(worker)) {
301 std::string localTmpFile;
302 if (gnWorker->GetHnSparseBase() && gnWorker->GetHnSparseBase()->GetStorageTree()) {
303 localTmpFile = gnWorker->GetHnSparseBase()->GetStorageTree()->GetFileName();
309 NLogPrint(
"Worker %zu: aborting, skipping post-processing.", workerIndex);
310 if (gnWorker->GetHnSparseBase()) {
311 gnWorker->GetHnSparseBase()->Close(
false);
314 NLogDebug(
"Worker %zu finished processing, executing end function and closing file if open ...", workerIndex);
315 gnWorker->ExecuteEndFunction();
316 if (gnWorker->GetHnSparseBase()) {
317 gnWorker->GetHnSparseBase()->Close(
true);
321 const std::string & resultsFilename = gnWorker->GetResultsFilename();
322 if (!resultsFilename.empty()) {
323 if (!localTmpFile.empty() && localTmpFile != resultsFilename) {
324 const std::string resultsDir = std::string(gSystem->GetDirName(resultsFilename.c_str()));
326 NLogPrint(
"Worker %zu copying '%s' -> '%s' ...", workerIndex, localTmpFile.c_str(), resultsFilename.c_str());
327 if (!
NUtils::Cp(localTmpFile, resultsFilename, kFALSE)) {
328 NLogError(
"Worker %zu: failed to copy '%s' to '%s'", workerIndex, localTmpFile.c_str(),
329 resultsFilename.c_str());
338 const std::string & resultsFilenameForDelete = gnWorker->GetResultsFilename();
339 const bool hasDistinctResultsFile = !resultsFilenameForDelete.empty() && resultsFilenameForDelete != localTmpFile;
340 if (!localTmpFile.empty() && hasDistinctResultsFile) {
341 NLogPrint(
"Worker %zu: removing local tmp file '%s'", workerIndex, localTmpFile.c_str());
342 gSystem->Unlink(localTmpFile.c_str());
351 SendFrames(dealer, {
"DONE"});
357 zmq_setsockopt(dealer, ZMQ_LINGER, &linger,
sizeof(linger));
359 return finishedOk ? 0 : 1;
362 bool NDimensionalIpcRunner::WaitForChildProcesses(
const std::vector<pid_t> & pids,
int timeoutMs)
364 bool allExitedCleanly =
true;
365 const bool useTimeout = (timeoutMs >= 0);
367 for (pid_t pid : pids) {
368 if (pid <= 0)
continue;
373 rc = waitpid(pid, &status, 0);
376 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
377 while (std::chrono::steady_clock::now() < deadline) {
378 rc = waitpid(pid, &status, WNOHANG);
379 if (rc == pid || rc < 0) {
382 std::this_thread::sleep_for(std::chrono::milliseconds(10));
385 allExitedCleanly =
false;
391 allExitedCleanly =
false;
395 if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
396 allExitedCleanly =
false;
399 return allExitedCleanly;
402 void NDimensionalIpcRunner::CleanupChildProcesses(
const std::vector<pid_t> & pids)
404 for (pid_t pid : pids) {
405 if (pid <= 0)
continue;
407 pid_t rc = waitpid(pid, &status, WNOHANG);
416 rc = waitpid(pid, &status, WNOHANG);
417 if (rc == pid || rc < 0) {
421 if (!WaitForChildProcesses({pid}, 1500)) {
426 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
427 while (std::chrono::steady_clock::now() < deadline) {
428 rc = waitpid(pid, &status, WNOHANG);
429 if (rc == pid || rc < 0)
break;
430 std::this_thread::sleep_for(std::chrono::milliseconds(10));
433 NLogWarning(
"NDimensionalIpcRunner::CleanupChildProcesses: child pid=%d did not exit after SIGKILL; continuing shutdown", pid);
439 std::vector<int> NDimensionalIpcRunner::ParseCoords(
const std::string & coordsStr)
441 std::vector<int> coords;
442 std::stringstream ss(coordsStr);
444 while (std::getline(ss, token,
',')) {
445 if (token.empty())
continue;
446 coords.push_back(std::stoi(token));
451 std::vector<Long64_t> NDimensionalIpcRunner::ParseIds(
const std::string & idsStr)
453 std::vector<Long64_t> ids;
454 std::stringstream ss(idsStr);
456 while (std::getline(ss, token,
',')) {
457 if (token.empty())
continue;
458 ids.push_back(
static_cast<Long64_t
>(std::stoll(token)));
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
static bool CreateDirectory(const std::string &path)
static int Cp(std::string source, std::string destination, Bool_t progressbar=kTRUE)
Copy a file from source to destination.