13#include "NDimensionalIpcRunner.h"
14#include "NGnThreadData.h"
15#include "NThreadData.h"
21volatile sig_atomic_t gWorkerInterrupted = 0;
23void WorkerSigIntHandler(
int)
25 gWorkerInterrupted = 1;
28std::string SerializeTaskIds(
const std::vector<std::string> & taskIds)
30 std::ostringstream oss;
31 for (
size_t i = 0; i < taskIds.size(); ++i) {
32 if (i != 0) oss <<
',';
38bool ParseTaskBatchPayload(
const std::string & payload, std::vector<std::pair<std::string, std::vector<int>>> & tasks)
41 std::stringstream batchStream(payload);
42 std::string taskToken;
43 while (std::getline(batchStream, taskToken,
';')) {
44 if (taskToken.empty())
continue;
45 size_t sep = taskToken.find(
':');
46 if (sep == std::string::npos || sep == 0 || sep + 1 >= taskToken.size()) {
49 const std::string taskId = taskToken.substr(0, sep);
50 const std::string coords = taskToken.substr(sep + 1);
51 std::vector<int> parsedCoords;
52 std::stringstream coordStream(coords);
53 std::string coordToken;
54 while (std::getline(coordStream, coordToken,
',')) {
55 if (coordToken.empty())
continue;
56 parsedCoords.push_back(std::stoi(coordToken));
58 tasks.emplace_back(taskId, std::move(parsedCoords));
60 return !tasks.empty();
64bool NDimensionalIpcRunner::SendFrames(
void * socket,
const std::vector<std::string> & frames)
66 for (
size_t i = 0; i < frames.size(); ++i) {
67 int flags = (i + 1 < frames.size()) ? ZMQ_SNDMORE : 0;
68 if (zmq_send(socket, frames[i].data(), frames[i].size(), flags) < 0) {
75bool NDimensionalIpcRunner::ReceiveFrames(
void * socket, std::vector<std::string> & outFrames)
81 int rc = zmq_msg_recv(&msg, socket, 0);
86 outFrames.emplace_back(
static_cast<const char *
>(zmq_msg_data(&msg)),
static_cast<size_t>(rc));
87 int more = zmq_msg_more(&msg);
94std::string NDimensionalIpcRunner::BuildWorkerIdentity(
size_t workerIndex)
96 std::ostringstream oss;
97 oss <<
"wk_" << std::setw(6) << std::setfill(
'0') << workerIndex;
101std::string NDimensionalIpcRunner::SerializeCoords(
const std::vector<int> & coords)
103 std::ostringstream oss;
104 for (
size_t i = 0; i < coords.size(); ++i) {
105 if (i != 0) oss <<
',';
111std::string NDimensionalIpcRunner::SerializeIds(
const std::vector<Long64_t> & ids)
113 std::ostringstream oss;
114 for (
size_t i = 0; i < ids.size(); ++i) {
115 if (i != 0) oss <<
',';
121int NDimensionalIpcRunner::WorkerLoop(
const std::string & endpoint,
size_t workerIndex,
NThreadData * worker)
123 std::ostringstream threadName;
124 threadName <<
"ipc_" << std::setw(6) << std::setfill(
'0') << workerIndex;
127 void * ctx = zmq_ctx_new();
132 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
138 const std::string identity = BuildWorkerIdentity(workerIndex);
139 if (zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size()) != 0) {
145 int timeoutMs = 1000;
146 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
148 if (zmq_connect(dealer, endpoint.c_str()) != 0) {
154 if (!SendFrames(dealer, {
"READY"})) {
160 NLogPrint(
"Worker %zu: connected to %s, ready for tasks", workerIndex, endpoint.c_str());
162 int rc = TaskLoop(dealer, workerIndex, worker);
169int NDimensionalIpcRunner::TaskLoop(
void * dealer,
size_t workerIndex,
NThreadData * worker)
172 gWorkerInterrupted = 0;
174 struct sigaction oldSa;
175 memset(&sa, 0,
sizeof(sa));
176 sa.sa_handler = WorkerSigIntHandler;
177 sigemptyset(&sa.sa_mask);
179 sigaction(SIGINT, &sa, &oldSa);
181 bool finishedOk =
true;
182 bool aborted =
false;
183 bool wasInterrupted =
false;
184 size_t tasksProcessed = 0;
185 size_t lastReportedProgress = 0;
186 const bool showWorkerProgress = []() {
187 const char * env = gSystem->Getenv(
"NDMSPC_WORKER_PROGRESS");
188 if (!env || env[0] ==
'\0')
return false;
189 std::string value(env);
190 std::transform(value.begin(), value.end(), value.begin(),
191 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
192 return (value ==
"1" || value ==
"true" || value ==
"yes" || value ==
"on");
195 const size_t progressReportInterval = []() ->
size_t {
196 const char * env = gSystem->Getenv(
"NDMSPC_WORKER_PROGRESS_INTERVAL");
197 if (!env || env[0] ==
'\0')
return 50UL;
199 int val = std::stoi(env);
200 return (val > 0) ?
static_cast<size_t>(val) : 50UL;
208 auto checkAbort = [&]() ->
bool {
210 if (gWorkerInterrupted) {
213 SendFrames(dealer, {
"SHUTDOWN",
"interrupted", std::to_string(tasksProcessed)});
218 zmq_pollitem_t item = {dealer, 0, ZMQ_POLLIN, 0};
219 if (zmq_poll(&item, 1, 0) <= 0)
return false;
220 std::vector<std::string> peek;
221 if (!ReceiveFrames(dealer, peek))
return false;
222 if (!peek.empty() && peek[0] ==
"STOP") {
223 aborted = (peek.size() >= 2 && peek[1] ==
"abort");
224 if (aborted) NLogPrint(
"Worker %zu: received abort from supervisor, stopping ...", workerIndex);
232 if (gWorkerInterrupted) {
233 if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(
""); }
234 NLogPrint(
"Worker %zu: interrupted by user (Ctrl+C), exiting ...", workerIndex);
235 NLogPrint(
"Worker %zu: interrupted by user (Ctrl+C) after processing %zu tasks, shutting down...", workerIndex, tasksProcessed);
237 SendFrames(dealer, {
"SHUTDOWN",
"interrupted", std::to_string(tasksProcessed)});
239 wasInterrupted =
true;
244 std::vector<std::string> frames;
245 if (!ReceiveFrames(dealer, frames)) {
246 if (errno == EAGAIN || errno == EWOULDBLOCK) {
248 if (gWorkerInterrupted) {
249 if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(
""); }
250 NLogPrint(
"Worker %zu: interrupted by user (Ctrl+C), exiting ...", workerIndex);
251 NLogPrint(
"Worker %zu: interrupted by user (Ctrl+C) after processing %zu tasks, shutting down...", workerIndex, tasksProcessed);
253 SendFrames(dealer, {
"SHUTDOWN",
"interrupted", std::to_string(tasksProcessed)});
255 wasInterrupted =
true;
264 if (frames.empty())
continue;
266 const std::string & cmd = frames[0];
268 if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(
""); }
269 aborted = (frames.size() >= 2 && frames[1] ==
"abort");
271 NLogPrint(
"Worker %zu: received abort from supervisor, stopping ...", workerIndex);
277 if (cmd ==
"SETDEF") {
278 if (frames.size() < 2) {
282 if (
auto * gnWorker =
dynamic_cast<NGnThreadData *
>(worker)) {
283 gnWorker->SetCurrentDefinitionName(frames[1]);
289 if (cmd ==
"SETIDS") {
290 if (frames.size() < 2) {
294 if (
auto * gnWorker =
dynamic_cast<NGnThreadData *
>(worker)) {
295 gnWorker->SyncCurrentDefinitionIds(ParseIds(frames[1]));
301 if (cmd !=
"TASK" && cmd !=
"TASKB") {
306 std::string errTaskId;
309 if (frames.size() < 3) {
314 const std::string taskId = frames[1];
316 std::vector<int> coords = ParseCoords(frames[2]);
318 if (showWorkerProgress) {
319 NLogPrint(
"Worker %zu: processing tasks [done: %zu]", workerIndex, tasksProcessed);
323 if (tasksProcessed == 1 || tasksProcessed - lastReportedProgress >= progressReportInterval) {
324 NLogPrint(
"Worker %zu: processed %zu tasks", workerIndex, tasksProcessed);
325 lastReportedProgress = tasksProcessed;
327 worker->Process(coords);
328 if (!SendFrames(dealer, {
"ACK", taskId})) {
334 if (frames.size() < 2) {
339 std::vector<std::pair<std::string, std::vector<int>>> batchTasks;
340 if (!ParseTaskBatchPayload(frames[1], batchTasks)) {
345 std::vector<std::string> ackedTaskIds;
346 ackedTaskIds.reserve(batchTasks.size());
347 tasksProcessed += batchTasks.size();
348 if (showWorkerProgress) {
349 NLogPrint(
"Worker %zu: processing tasks [done: %zu]", workerIndex, tasksProcessed);
353 if (tasksProcessed == 1 || tasksProcessed - lastReportedProgress >= progressReportInterval) {
354 NLogPrint(
"Worker %zu: processed %zu tasks", workerIndex, tasksProcessed);
355 lastReportedProgress = tasksProcessed;
357 for (
const auto & task : batchTasks) {
358 if (checkAbort()) { finishedOk =
false;
break; }
359 errTaskId = task.first;
360 worker->Process(task.second);
361 ackedTaskIds.push_back(task.first);
365 if (!SendFrames(dealer, {
"ACKB", SerializeTaskIds(ackedTaskIds)})) {
371 catch (
const std::exception & ex) {
372 NLogPrint(
"Worker %zu: ERROR processing task %s: %s", workerIndex, errTaskId.c_str(), ex.what());
373 SendFrames(dealer, {
"ERR", errTaskId.empty() ?
"0" : errTaskId, ex.what()});
378 NLogPrint(
"Worker %zu: ERROR processing task %s: unknown exception", workerIndex, errTaskId.c_str());
379 SendFrames(dealer, {
"ERR", errTaskId.empty() ?
"0" : errTaskId,
"unknown worker exception"});
385 if (
auto * gnWorker =
dynamic_cast<NGnThreadData *
>(worker)) {
387 std::string localTmpFile;
388 if (gnWorker->GetHnSparseBase() && gnWorker->GetHnSparseBase()->GetStorageTree()) {
389 localTmpFile = gnWorker->GetHnSparseBase()->GetStorageTree()->GetFileName();
395 NLogPrint(
"Worker %zu: aborting, skipping post-processing.", workerIndex);
396 if (gnWorker->GetHnSparseBase()) {
397 gnWorker->GetHnSparseBase()->Close(
false);
400 NLogDebug(
"Worker %zu finished processing, executing end function and closing file if open ...", workerIndex);
401 gnWorker->ExecuteEndFunction();
402 if (gnWorker->GetHnSparseBase()) {
403 gnWorker->GetHnSparseBase()->Close(
true);
407 const std::string & resultsFilename = gnWorker->GetResultsFilename();
408 if (!resultsFilename.empty()) {
409 if (!localTmpFile.empty() && localTmpFile != resultsFilename) {
410 const std::string resultsDir = std::string(gSystem->GetDirName(resultsFilename.c_str()));
412 NLogPrint(
"Worker %zu copying '%s' -> '%s' ...", workerIndex, localTmpFile.c_str(), resultsFilename.c_str());
413 if (!
NUtils::Cp(localTmpFile, resultsFilename, kFALSE)) {
414 NLogError(
"Worker %zu: failed to copy '%s' to '%s'", workerIndex, localTmpFile.c_str(),
415 resultsFilename.c_str());
424 const std::string & resultsFilenameForDelete = gnWorker->GetResultsFilename();
425 const bool hasDistinctResultsFile = !resultsFilenameForDelete.empty() && resultsFilenameForDelete != localTmpFile;
426 if (!localTmpFile.empty() && hasDistinctResultsFile) {
427 NLogPrint(
"Worker %zu: removing local tmp file '%s'", workerIndex, localTmpFile.c_str());
428 gSystem->Unlink(localTmpFile.c_str());
437 SendFrames(dealer, {
"DONE"});
438 NLogPrint(
"Worker %zu: completed successfully, processed %zu tasks total", workerIndex, tasksProcessed);
439 }
else if (wasInterrupted) {
441 }
else if (!finishedOk) {
442 NLogPrint(
"Worker %zu: exited with error after processing %zu tasks", workerIndex, tasksProcessed);
448 zmq_setsockopt(dealer, ZMQ_LINGER, &linger,
sizeof(linger));
451 sigaction(SIGINT, &oldSa,
nullptr);
453 return finishedOk ? 0 : 1;
456bool NDimensionalIpcRunner::WaitForChildProcesses(
const std::vector<pid_t> & pids,
int timeoutMs)
458 bool allExitedCleanly =
true;
459 const bool useTimeout = (timeoutMs >= 0);
461 for (pid_t pid : pids) {
462 if (pid <= 0)
continue;
467 rc = waitpid(pid, &status, 0);
470 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
471 while (std::chrono::steady_clock::now() < deadline) {
472 rc = waitpid(pid, &status, WNOHANG);
473 if (rc == pid || rc < 0) {
476 std::this_thread::sleep_for(std::chrono::milliseconds(10));
479 allExitedCleanly =
false;
485 allExitedCleanly =
false;
489 if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
490 allExitedCleanly =
false;
493 return allExitedCleanly;
496void NDimensionalIpcRunner::CleanupChildProcesses(
const std::vector<pid_t> & pids)
498 for (pid_t pid : pids) {
499 if (pid <= 0)
continue;
501 pid_t rc = waitpid(pid, &status, WNOHANG);
510 rc = waitpid(pid, &status, WNOHANG);
511 if (rc == pid || rc < 0) {
515 if (!WaitForChildProcesses({pid}, 1500)) {
520 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
521 while (std::chrono::steady_clock::now() < deadline) {
522 rc = waitpid(pid, &status, WNOHANG);
523 if (rc == pid || rc < 0)
break;
524 std::this_thread::sleep_for(std::chrono::milliseconds(10));
527 NLogWarning(
"NDimensionalIpcRunner::CleanupChildProcesses: child pid=%d did not exit after SIGKILL; continuing shutdown", pid);
533std::vector<int> NDimensionalIpcRunner::ParseCoords(
const std::string & coordsStr)
535 std::vector<int> coords;
536 std::stringstream ss(coordsStr);
538 while (std::getline(ss, token,
',')) {
539 if (token.empty())
continue;
540 coords.push_back(std::stoi(token));
545std::vector<Long64_t> NDimensionalIpcRunner::ParseIds(
const std::string & idsStr)
547 std::vector<Long64_t> ids;
548 std::stringstream ss(idsStr);
550 while (std::getline(ss, token,
',')) {
551 if (token.empty())
continue;
552 ids.push_back(
static_cast<Long64_t
>(std::stoll(token)));
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Thread-local data object for NDMSPC processing.
static bool CreateDirectory(const std::string &path)
static int Cp(std::string source, std::string destination, Bool_t progressbar=kTRUE)
Copy a file from source to destination.
Global callback function for libwebsockets client events.