ndmspc v1.2.0-0.1.rc7
Loading...
Searching...
No Matches
NDimensionalIpcRunner.cxx
1#include <cerrno>
2#include <chrono>
3#include <cstring>
4#include <iomanip>
5#include <sstream>
6#include <signal.h>
7#include <sys/wait.h>
8#include <thread>
9#include <unistd.h>
10#include <zmq.h>
11#include "NUtils.h"
12#include <TSystem.h>
13#include "NDimensionalIpcRunner.h"
14#include "NGnThreadData.h"
15#include "NThreadData.h"
16
17namespace Ndmspc {
18
19namespace {
20// Worker signal handler for Ctrl+C
21volatile sig_atomic_t gWorkerInterrupted = 0;
22
23void WorkerSigIntHandler(int)
24{
25 gWorkerInterrupted = 1;
26}
27
28std::string SerializeTaskIds(const std::vector<std::string> & taskIds)
29{
30 std::ostringstream oss;
31 for (size_t i = 0; i < taskIds.size(); ++i) {
32 if (i != 0) oss << ',';
33 oss << taskIds[i];
34 }
35 return oss.str();
36}
37
38bool ParseTaskBatchPayload(const std::string & payload, std::vector<std::pair<std::string, std::vector<int>>> & tasks)
39{
40 tasks.clear();
41 std::stringstream batchStream(payload);
42 std::string taskToken;
43 while (std::getline(batchStream, taskToken, ';')) {
44 if (taskToken.empty()) continue;
45 size_t sep = taskToken.find(':');
46 if (sep == std::string::npos || sep == 0 || sep + 1 >= taskToken.size()) {
47 return false;
48 }
49 const std::string taskId = taskToken.substr(0, sep);
50 const std::string coords = taskToken.substr(sep + 1);
51 std::vector<int> parsedCoords;
52 std::stringstream coordStream(coords);
53 std::string coordToken;
54 while (std::getline(coordStream, coordToken, ',')) {
55 if (coordToken.empty()) continue;
56 parsedCoords.push_back(std::stoi(coordToken));
57 }
58 tasks.emplace_back(taskId, std::move(parsedCoords));
59 }
60 return !tasks.empty();
61}
62} // namespace
63
64bool NDimensionalIpcRunner::SendFrames(void * socket, const std::vector<std::string> & frames)
65{
66 for (size_t i = 0; i < frames.size(); ++i) {
67 int flags = (i + 1 < frames.size()) ? ZMQ_SNDMORE : 0;
68 if (zmq_send(socket, frames[i].data(), frames[i].size(), flags) < 0) {
69 return false;
70 }
71 }
72 return true;
73}
74
75bool NDimensionalIpcRunner::ReceiveFrames(void * socket, std::vector<std::string> & outFrames)
76{
77 outFrames.clear();
78 while (true) {
79 zmq_msg_t msg;
80 zmq_msg_init(&msg);
81 int rc = zmq_msg_recv(&msg, socket, 0);
82 if (rc < 0) {
83 zmq_msg_close(&msg);
84 return false;
85 }
86 outFrames.emplace_back(static_cast<const char *>(zmq_msg_data(&msg)), static_cast<size_t>(rc));
87 int more = zmq_msg_more(&msg);
88 zmq_msg_close(&msg);
89 if (!more) break;
90 }
91 return true;
92}
93
94std::string NDimensionalIpcRunner::BuildWorkerIdentity(size_t workerIndex)
95{
96 std::ostringstream oss;
97 oss << "wk_" << std::setw(6) << std::setfill('0') << workerIndex;
98 return oss.str();
99}
100
101std::string NDimensionalIpcRunner::SerializeCoords(const std::vector<int> & coords)
102{
103 std::ostringstream oss;
104 for (size_t i = 0; i < coords.size(); ++i) {
105 if (i != 0) oss << ',';
106 oss << coords[i];
107 }
108 return oss.str();
109}
110
111std::string NDimensionalIpcRunner::SerializeIds(const std::vector<Long64_t> & ids)
112{
113 std::ostringstream oss;
114 for (size_t i = 0; i < ids.size(); ++i) {
115 if (i != 0) oss << ',';
116 oss << ids[i];
117 }
118 return oss.str();
119}
120
121int NDimensionalIpcRunner::WorkerLoop(const std::string & endpoint, size_t workerIndex, NThreadData * worker)
122{
123 std::ostringstream threadName;
124 threadName << "ipc_" << std::setw(6) << std::setfill('0') << workerIndex;
125 NLogger::SetThreadName(threadName.str());
126
127 void * ctx = zmq_ctx_new();
128 if (!ctx) {
129 return 1;
130 }
131
132 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
133 if (!dealer) {
134 zmq_ctx_term(ctx);
135 return 1;
136 }
137
138 const std::string identity = BuildWorkerIdentity(workerIndex);
139 if (zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size()) != 0) {
140 zmq_close(dealer);
141 zmq_ctx_term(ctx);
142 return 1;
143 }
144
145 int timeoutMs = 1000;
146 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
147
148 if (zmq_connect(dealer, endpoint.c_str()) != 0) {
149 zmq_close(dealer);
150 zmq_ctx_term(ctx);
151 return 1;
152 }
153
154 if (!SendFrames(dealer, {"READY"})) {
155 zmq_close(dealer);
156 zmq_ctx_term(ctx);
157 return 1;
158 }
159
160 NLogPrint("Worker %zu: connected to %s, ready for tasks", workerIndex, endpoint.c_str());
161
162 int rc = TaskLoop(dealer, workerIndex, worker);
163
164 zmq_close(dealer);
165 zmq_ctx_term(ctx);
166 return rc;
167}
168
169int NDimensionalIpcRunner::TaskLoop(void * dealer, size_t workerIndex, NThreadData * worker)
170{
171 // Install signal handler for Ctrl+C
172 gWorkerInterrupted = 0;
173 struct sigaction sa;
174 struct sigaction oldSa;
175 memset(&sa, 0, sizeof(sa));
176 sa.sa_handler = WorkerSigIntHandler;
177 sigemptyset(&sa.sa_mask);
178 sa.sa_flags = 0;
179 sigaction(SIGINT, &sa, &oldSa);
180
181 bool finishedOk = true;
182 bool aborted = false;
183 bool wasInterrupted = false;
184 size_t tasksProcessed = 0;
185 size_t lastReportedProgress = 0;
186 const bool showWorkerProgress = []() {
187 const char * env = gSystem->Getenv("NDMSPC_WORKER_PROGRESS");
188 if (!env || env[0] == '\0') return false;
189 std::string value(env);
190 std::transform(value.begin(), value.end(), value.begin(),
191 [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
192 return (value == "1" || value == "true" || value == "yes" || value == "on");
193 }();
194
195 const size_t progressReportInterval = []() -> size_t {
196 const char * env = gSystem->Getenv("NDMSPC_WORKER_PROGRESS_INTERVAL");
197 if (!env || env[0] == '\0') return 50UL; // default: report every 50 tasks
198 try {
199 int val = std::stoi(env);
200 return (val > 0) ? static_cast<size_t>(val) : 50UL;
201 } catch (...) {
202 return 50UL;
203 }
204 }();
205
206 // Non-blocking check: poll the socket and consume a STOP frame if present.
207 // Used between sub-tasks in a batch to react to supervisor abort quickly.
208 auto checkAbort = [&]() -> bool {
209 // Check for Ctrl+C first
210 if (gWorkerInterrupted) {
211 if (!aborted) {
212 // Send shutdown notification on first detection
213 SendFrames(dealer, {"SHUTDOWN", "interrupted", std::to_string(tasksProcessed)});
214 }
215 aborted = true;
216 return true;
217 }
218 zmq_pollitem_t item = {dealer, 0, ZMQ_POLLIN, 0};
219 if (zmq_poll(&item, 1, 0) <= 0) return false;
220 std::vector<std::string> peek;
221 if (!ReceiveFrames(dealer, peek)) return false;
222 if (!peek.empty() && peek[0] == "STOP") {
223 aborted = (peek.size() >= 2 && peek[1] == "abort");
224 if (aborted) NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
225 return true;
226 }
227 return false; // unexpected frame — ignore, will be handled in main loop
228 };
229
230 while (true) {
231 // Check for Ctrl+C
232 if (gWorkerInterrupted) {
233 if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(""); }
234 NLogPrint("Worker %zu: interrupted by user (Ctrl+C), exiting ...", workerIndex);
235 NLogPrint("Worker %zu: interrupted by user (Ctrl+C) after processing %zu tasks, shutting down...", workerIndex, tasksProcessed);
236 // Notify supervisor before exiting
237 SendFrames(dealer, {"SHUTDOWN", "interrupted", std::to_string(tasksProcessed)});
238 aborted = true;
239 wasInterrupted = true;
240 finishedOk = false;
241 break;
242 }
243
244 std::vector<std::string> frames;
245 if (!ReceiveFrames(dealer, frames)) {
246 if (errno == EAGAIN || errno == EWOULDBLOCK) {
247 // Check for interruption on timeout
248 if (gWorkerInterrupted) {
249 if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(""); }
250 NLogPrint("Worker %zu: interrupted by user (Ctrl+C), exiting ...", workerIndex);
251 NLogPrint("Worker %zu: interrupted by user (Ctrl+C) after processing %zu tasks, shutting down...", workerIndex, tasksProcessed);
252 // Notify supervisor before exiting
253 SendFrames(dealer, {"SHUTDOWN", "interrupted", std::to_string(tasksProcessed)});
254 aborted = true;
255 wasInterrupted = true;
256 finishedOk = false;
257 break;
258 }
259 continue;
260 }
261 finishedOk = false;
262 break;
263 }
264 if (frames.empty()) continue;
265
266 const std::string & cmd = frames[0];
267 if (cmd == "STOP") {
268 if (showWorkerProgress && tasksProcessed > 0) { NLogPrint(""); }
269 aborted = (frames.size() >= 2 && frames[1] == "abort");
270 if (aborted) {
271 NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
272 } else {
273 // NLogPrint("Worker %zu: received STOP, processed %zu tasks total", workerIndex, tasksProcessed);
274 }
275 break;
276 }
277 if (cmd == "SETDEF") {
278 if (frames.size() < 2) {
279 finishedOk = false;
280 break;
281 }
282 if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
283 gnWorker->SetCurrentDefinitionName(frames[1]);
284 continue;
285 }
286 finishedOk = false;
287 break;
288 }
289 if (cmd == "SETIDS") {
290 if (frames.size() < 2) {
291 finishedOk = false;
292 break;
293 }
294 if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
295 gnWorker->SyncCurrentDefinitionIds(ParseIds(frames[1]));
296 continue;
297 }
298 finishedOk = false;
299 break;
300 }
301 if (cmd != "TASK" && cmd != "TASKB") {
302 finishedOk = false;
303 break;
304 }
305
306 std::string errTaskId;
307 try {
308 if (cmd == "TASK") {
309 if (frames.size() < 3) {
310 finishedOk = false;
311 break;
312 }
313
314 const std::string taskId = frames[1];
315 errTaskId = taskId;
316 std::vector<int> coords = ParseCoords(frames[2]);
317 ++tasksProcessed;
318 if (showWorkerProgress) {
319 NLogPrint("Worker %zu: processing tasks [done: %zu]", workerIndex, tasksProcessed);
320 }
321 // Report progress at configured interval for visibility
322 // Always show first task to confirm worker is actively processing
323 if (tasksProcessed == 1 || tasksProcessed - lastReportedProgress >= progressReportInterval) {
324 // NLogPrint("Worker %zu: processed %zu tasks", workerIndex, tasksProcessed);
325 lastReportedProgress = tasksProcessed;
326 }
327 worker->Process(coords);
328 if (!SendFrames(dealer, {"ACK", taskId})) {
329 finishedOk = false;
330 break;
331 }
332 }
333 else {
334 if (frames.size() < 2) {
335 finishedOk = false;
336 break;
337 }
338
339 std::vector<std::pair<std::string, std::vector<int>>> batchTasks;
340 if (!ParseTaskBatchPayload(frames[1], batchTasks)) {
341 finishedOk = false;
342 break;
343 }
344
345 std::vector<std::string> ackedTaskIds;
346 ackedTaskIds.reserve(batchTasks.size());
347 tasksProcessed += batchTasks.size();
348 if (showWorkerProgress) {
349 NLogPrint("Worker %zu: processing tasks [done: %zu]", workerIndex, tasksProcessed);
350 }
351 // Report progress at configured interval for visibility
352 // Always show first task to confirm worker is actively processing
353 if (tasksProcessed == 1 || tasksProcessed - lastReportedProgress >= progressReportInterval) {
354 // NLogPrint("Worker %zu: processed %zu tasks", workerIndex, tasksProcessed);
355 lastReportedProgress = tasksProcessed;
356 }
357 for (const auto & task : batchTasks) {
358 if (checkAbort()) { finishedOk = false; break; }
359 errTaskId = task.first;
360 worker->Process(task.second);
361 ackedTaskIds.push_back(task.first);
362 }
363 if (aborted) break;
364
365 if (!SendFrames(dealer, {"ACKB", SerializeTaskIds(ackedTaskIds)})) {
366 finishedOk = false;
367 break;
368 }
369 }
370 }
371 catch (const std::exception & ex) {
372 NLogPrint("Worker %zu: ERROR processing task %s: %s", workerIndex, errTaskId.c_str(), ex.what());
373 SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, ex.what()});
374 finishedOk = false;
375 break;
376 }
377 catch (...) {
378 NLogPrint("Worker %zu: ERROR processing task %s: unknown exception", workerIndex, errTaskId.c_str());
379 SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, "unknown worker exception"});
380 finishedOk = false;
381 break;
382 }
383 }
384
385 if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
386 // Capture local tmp file path before closing (storage object path is cleared after Close).
387 std::string localTmpFile;
388 if (gnWorker->GetHnSparseBase() && gnWorker->GetHnSparseBase()->GetStorageTree()) {
389 localTmpFile = gnWorker->GetHnSparseBase()->GetStorageTree()->GetFileName();
390 }
391
392 if (aborted) {
393 // Supervisor aborted — skip end function and copy, but close the file handle
394 // so we can delete it cleanly.
395 NLogPrint("Worker %zu: aborting, skipping post-processing.", workerIndex);
396 if (gnWorker->GetHnSparseBase()) {
397 gnWorker->GetHnSparseBase()->Close(false); // false = don't save
398 }
399 } else {
400 NLogDebug("Worker %zu finished processing, executing end function and closing file if open ...", workerIndex);
401 gnWorker->ExecuteEndFunction();
402 if (gnWorker->GetHnSparseBase()) {
403 gnWorker->GetHnSparseBase()->Close(true);
404 }
405
406 // TCP mode: copy local result file to shared results dir so supervisor can merge.
407 const std::string & resultsFilename = gnWorker->GetResultsFilename();
408 if (!resultsFilename.empty()) {
409 if (!localTmpFile.empty() && localTmpFile != resultsFilename) {
410 const std::string resultsDir = std::string(gSystem->GetDirName(resultsFilename.c_str()));
411 NUtils::CreateDirectory(resultsDir);
412 NLogPrint("Worker %zu copying '%s' -> '%s' ...", workerIndex, localTmpFile.c_str(), resultsFilename.c_str());
413 if (!NUtils::Cp(localTmpFile, resultsFilename, kFALSE)) {
414 NLogError("Worker %zu: failed to copy '%s' to '%s'", workerIndex, localTmpFile.c_str(),
415 resultsFilename.c_str());
416 }
417 }
418 }
419 }
420
421 // Delete the local tmp file only when a distinct results file was set.
422 // An empty resultsFilename means localTmpFile IS the results file (same path),
423 // so it must be kept for the supervisor to merge.
424 const std::string & resultsFilenameForDelete = gnWorker->GetResultsFilename();
425 const bool hasDistinctResultsFile = !resultsFilenameForDelete.empty() && resultsFilenameForDelete != localTmpFile;
426 if (!localTmpFile.empty() && hasDistinctResultsFile) {
427 NLogPrint("Worker %zu: removing local tmp file '%s'", workerIndex, localTmpFile.c_str());
428 gSystem->Unlink(localTmpFile.c_str());
429 }
430 }
431
432 if (!aborted) {
433 // Signal master that this worker has finished writing its file.
434 // For TCP mode, master waits for this DONE before starting to merge.
435 // For IPC (fork) mode, master uses WaitForChildProcesses instead; the DONE
436 // message stays unread in the ZMQ buffer which is harmless.
437 SendFrames(dealer, {"DONE"});
438 NLogPrint("Worker %zu: completed successfully, processed %zu tasks total", workerIndex, tasksProcessed);
439 } else if (wasInterrupted) {
440 // Interrupted by Ctrl+C - already printed message above, don't print duplicate
441 } else if (!finishedOk) {
442 NLogPrint("Worker %zu: exited with error after processing %zu tasks", workerIndex, tasksProcessed);
443 }
444
445 // Drop any unsent/undelivered messages immediately so zmq_close/zmq_ctx_term
446 // do not hang at shutdown.
447 int linger = 0;
448 zmq_setsockopt(dealer, ZMQ_LINGER, &linger, sizeof(linger));
449
450 // Restore original signal handler
451 sigaction(SIGINT, &oldSa, nullptr);
452
453 return finishedOk ? 0 : 1;
454}
455
456bool NDimensionalIpcRunner::WaitForChildProcesses(const std::vector<pid_t> & pids, int timeoutMs)
457{
458 bool allExitedCleanly = true;
459 const bool useTimeout = (timeoutMs >= 0);
460
461 for (pid_t pid : pids) {
462 if (pid <= 0) continue;
463
464 int status = 0;
465 pid_t rc = -1;
466 if (!useTimeout) {
467 rc = waitpid(pid, &status, 0);
468 }
469 else {
470 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
471 while (std::chrono::steady_clock::now() < deadline) {
472 rc = waitpid(pid, &status, WNOHANG);
473 if (rc == pid || rc < 0) {
474 break;
475 }
476 std::this_thread::sleep_for(std::chrono::milliseconds(10));
477 }
478 if (rc == 0) {
479 allExitedCleanly = false;
480 continue;
481 }
482 }
483
484 if (rc < 0) {
485 allExitedCleanly = false;
486 continue;
487 }
488
489 if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
490 allExitedCleanly = false;
491 }
492 }
493 return allExitedCleanly;
494}
495
496void NDimensionalIpcRunner::CleanupChildProcesses(const std::vector<pid_t> & pids)
497{
498 for (pid_t pid : pids) {
499 if (pid <= 0) continue;
500 int status = 0;
501 pid_t rc = waitpid(pid, &status, WNOHANG);
502 if (rc == pid) {
503 continue;
504 }
505 if (rc < 0) {
506 continue;
507 }
508
509 kill(pid, SIGTERM);
510 rc = waitpid(pid, &status, WNOHANG);
511 if (rc == pid || rc < 0) {
512 continue;
513 }
514
515 if (!WaitForChildProcesses({pid}, 1500)) {
516 kill(pid, SIGKILL);
517
518 // Never block indefinitely here: if a child is stuck in uninterruptible I/O
519 // state, waitpid(..., 0) can hang forever and stall process shutdown.
520 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
521 while (std::chrono::steady_clock::now() < deadline) {
522 rc = waitpid(pid, &status, WNOHANG);
523 if (rc == pid || rc < 0) break;
524 std::this_thread::sleep_for(std::chrono::milliseconds(10));
525 }
526 if (rc == 0) {
527 NLogWarning("NDimensionalIpcRunner::CleanupChildProcesses: child pid=%d did not exit after SIGKILL; continuing shutdown", pid);
528 }
529 }
530 }
531}
532
533std::vector<int> NDimensionalIpcRunner::ParseCoords(const std::string & coordsStr)
534{
535 std::vector<int> coords;
536 std::stringstream ss(coordsStr);
537 std::string token;
538 while (std::getline(ss, token, ',')) {
539 if (token.empty()) continue;
540 coords.push_back(std::stoi(token));
541 }
542 return coords;
543}
544
545std::vector<Long64_t> NDimensionalIpcRunner::ParseIds(const std::string & idsStr)
546{
547 std::vector<Long64_t> ids;
548 std::stringstream ss(idsStr);
549 std::string token;
550 while (std::getline(ss, token, ',')) {
551 if (token.empty()) continue;
552 ids.push_back(static_cast<Long64_t>(std::stoll(token)));
553 }
554 return ids;
555}
556
557} // namespace Ndmspc
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition NLogger.cxx:132
Thread-local data object for NDMSPC processing.
Definition NThreadData.h:21
static bool CreateDirectory(const std::string &path)
Definition NUtils.cxx:688
static int Cp(std::string source, std::string destination, Bool_t progressbar=kTRUE)
Copy a file from source to destination.
Definition NUtils.cxx:155
Global callback function for libwebsockets client events.