ndmspc v1.2.0-0.1.rc7
Loading...
Searching...
No Matches
NDimensionalExecutor.cxx
1#include <string>
2#include <algorithm>
3#include <cctype>
4#include <chrono>
5#include <cerrno>
6#include <csignal>
7#include <cstdio>
8#include <queue>
9#include <set>
10#include <sstream>
11#include <unordered_map>
12#include <sys/wait.h>
13#include <unistd.h>
14#include <zmq.h>
15#include <THnSparse.h>
16#include <TAxis.h>
17#include <TROOT.h>
18#include <TSystem.h>
19#include "NDimensionalExecutor.h"
20#include "NDimensionalIpcRunner.h"
21#include "NGnThreadData.h"
22#include "NUtils.h"
23
24namespace Ndmspc {
25
26namespace {
27volatile sig_atomic_t gIpcSigIntRequested = 0;
28volatile sig_atomic_t gIpcChildCount = 0;
29pid_t gIpcChildPids[1024] = {0};
30
31void IpcSigIntHandler(int)
32{
33 gIpcSigIntRequested = 1;
34 const sig_atomic_t count = gIpcChildCount;
35 for (sig_atomic_t i = 0; i < count; ++i) {
36 if (gIpcChildPids[i] > 0) {
37 // User interrupt should not leave orphan IPC workers.
38 kill(gIpcChildPids[i], SIGKILL);
39 }
40 }
41}
42
43void InstallIpcSigIntHandler(const std::vector<pid_t> & childPids, struct sigaction & oldAction, bool & hasOldAction)
44{
45 gIpcSigIntRequested = 0;
46 gIpcChildCount = 0;
47 const size_t maxChildren = sizeof(gIpcChildPids) / sizeof(gIpcChildPids[0]);
48 const size_t count = std::min(maxChildren, childPids.size());
49 for (size_t i = 0; i < count; ++i) {
50 gIpcChildPids[i] = childPids[i];
51 }
52 for (size_t i = count; i < maxChildren; ++i) {
53 gIpcChildPids[i] = 0;
54 }
55 gIpcChildCount = static_cast<sig_atomic_t>(count);
56
57 struct sigaction sa;
58 memset(&sa, 0, sizeof(sa));
59 sa.sa_handler = IpcSigIntHandler;
60 sigemptyset(&sa.sa_mask);
61 sa.sa_flags = 0;
62
63 if (sigaction(SIGINT, &sa, &oldAction) == 0) {
64 hasOldAction = true;
65 }
66}
67
68void RestoreIpcSigIntHandler(const struct sigaction & oldAction, bool hasOldAction)
69{
70 if (hasOldAction) {
71 sigaction(SIGINT, &oldAction, nullptr);
72 }
73 gIpcChildCount = 0;
74 gIpcSigIntRequested = 0;
75}
76} // namespace
77
79 void * ctx{nullptr};
80 void * router{nullptr};
81 bool isTcp{false};
82 std::string endpointPath;
83 std::string endpoint;
84 std::vector<pid_t> childPids;
85 std::unordered_map<std::string, size_t> identityToWorker;
86 std::vector<std::string> workerIdentityVec; // ordered list for round-robin
87 // TCP late-joiner support: stored so new workers can be initialised mid-run
88 std::string jobDir;
89 std::string treeName;
90 std::vector<NThreadData *> * workerObjects{nullptr};
91 size_t maxWorkers{0};
92 std::string currentDefName;
93 std::vector<Long64_t> currentDefIds;
94 bool hasCurrentDefIds{false};
95 struct sigaction oldSigIntAction{};
96 bool hasOldSigIntAction{false};
97 // Bootstrap configuration sent to workers on first contact
98 std::string macroList; // comma-separated macro paths to load on worker
99 std::string macroParams; // parameter list forwarded to TMacro::Exec on worker
100 std::string tmpDir; // supervisor's NDMSPC_TMP_DIR (fallback for workers)
101 std::string tmpResultsDir; // supervisor's NDMSPC_TMP_RESULTS_DIR
102 size_t bootstrapNextIdx{0}; // auto-assigned index counter for BOOTSTRAP
103 std::unordered_map<std::string, size_t> bootstrapAssignments; // BOOTSTRAP identity -> assigned slot
104 std::vector<std::string> pendingReadyIdentities; // READY messages consumed while waiting for ACK
105 // Task state management: unified handling of pending, running, and done tasks
106 NTaskStateManager taskStateManager;
107 std::unordered_map<std::string, std::set<size_t>> workerTaskHistory; // tasks assigned to each worker in current definition
108 std::set<std::string> earlyDoneWorkers; // workers that sent DONE before FinishProcessIpc started waiting
109 // TCP worker activity tracking for failure detection
110 std::unordered_map<std::string, std::chrono::steady_clock::time_point> workerLastActivity; // identity -> last ACK time
111 std::set<std::string> failedTcpWorkers; // identities of TCP workers that have failed
112};
113
114// --- Private Increment Logic ---
116{
117 for (int i = fNumDimensions - 1; i >= 0; --i) {
118 fCurrentCoords[i]++;
119 if (fCurrentCoords[i] <= fMaxBounds[i]) {
120 return true;
121 }
123 }
124 return false;
125}
126
127NDimensionalExecutor::NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
128 : fMinBounds(minBounds), fMaxBounds(maxBounds)
129{
133
134 if (fMinBounds.size() != fMaxBounds.size()) {
135 throw std::invalid_argument("Min and max bounds vectors must have the same size.");
136 }
137 if (fMinBounds.empty()) {
138 throw std::invalid_argument("Bounds vectors cannot be empty.");
139 }
140
141 fNumDimensions = fMinBounds.size();
142
143 for (size_t i = 0; i < fNumDimensions; ++i) {
144 if (fMinBounds[i] > fMaxBounds[i]) {
145 throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
146 ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
147 ") for dimension " + std::to_string(i));
148 }
149 }
150
153}
154
155void NDimensionalExecutor::SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
156{
157 fMinBounds = minBounds;
158 fMaxBounds = maxBounds;
159
160 if (fMinBounds.size() != fMaxBounds.size()) {
161 throw std::invalid_argument("Min and max bounds vectors must have the same size.");
162 }
163 if (fMinBounds.empty()) {
164 throw std::invalid_argument("Bounds vectors cannot be empty.");
165 }
166
167 fNumDimensions = fMinBounds.size();
168 for (size_t i = 0; i < fNumDimensions; ++i) {
169 if (fMinBounds[i] > fMaxBounds[i]) {
170 throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
171 ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
172 ") for dimension " + std::to_string(i));
173 }
174 }
175
178}
179
180NDimensionalExecutor::NDimensionalExecutor(THnSparse * hist, bool onlyfilled)
181{
185 if (hist == nullptr) {
186 throw std::invalid_argument("THnSparse pointer cannot be null.");
187 }
188
189 if (onlyfilled) {
190 // Check if the histogram is filled
191 if (hist->GetNbins() <= 0) {
192 throw std::invalid_argument("THnSparse histogram is empty.");
193 }
194
195 fMinBounds.push_back(0);
196 fMaxBounds.push_back(hist->GetNbins());
197 }
198 else {
199 // loop over all dimensions
200 for (int i = 0; i < hist->GetNdimensions(); ++i) {
201 fMinBounds.push_back(0);
202 fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
203 }
204 }
205 fNumDimensions = fMinBounds.size();
208}
209
210NDimensionalExecutor::~NDimensionalExecutor() = default;
211
212void NDimensionalExecutor::Execute(const std::function<void(const std::vector<int> & coords)> & func)
213{
217
218 if (fNumDimensions == 0) {
219 return;
220 }
221 fCurrentCoords = fMinBounds; // Reset state
222 do {
223 func(fCurrentCoords);
224 } while (Increment());
225}
226
227// --- Template Implementation for ExecuteParallel ---
234template <typename TObject>
236 const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
237 std::vector<TObject> & thread_objects)
238{
239 if (fNumDimensions == 0) {
240 return;
241 }
242 size_t threads_to_use = thread_objects.size();
243 if (threads_to_use == 0) {
244 throw std::invalid_argument("Thread objects vector cannot be empty.");
245 }
246
247 std::vector<std::thread> workers;
248 std::queue<std::function<void(TObject &)>> tasks;
249 std::mutex queue_mutex;
250 std::condition_variable condition_producer;
251 std::condition_variable condition_consumer;
252 std::atomic<size_t> active_tasks = 0;
253 std::atomic<bool> stop_pool = false;
254 // Optional: Store first exception encountered in workers
255 std::exception_ptr first_exception = nullptr;
256 std::mutex exception_mutex;
257
258 // Worker thread logic: fetch and execute tasks, handle exceptions, signal completion.
259 auto worker_logic = [&](TObject & my_object) {
260 NThreadData * md = (NThreadData *)&my_object;
261
262 std::ostringstream oss;
263 oss << "wk_" << std::setw(6) << std::setfill('0') << md->GetAssignedIndex();
264
265 NLogger::SetThreadName(oss.str());
266 while (true) {
267 std::function<void(TObject &)> task_payload;
268 bool task_acquired = false; // Track if we actually got a task this iteration
269
270 try {
271 { // Lock scope for queue access
272 std::unique_lock<std::mutex> lock(queue_mutex);
273 condition_producer.wait(lock, [&] { return stop_pool || !tasks.empty(); });
274
275 // Check stop condition *after* waking up
276 if (stop_pool && tasks.empty()) {
277 break; // Exit the while loop normally
278 }
279 // If stopping but tasks remain, continue processing them
280
281 // Only proceed if not stopping or if tasks are still present
282 if (!tasks.empty()) {
283 task_payload = std::move(tasks.front());
284 tasks.pop();
285 task_acquired = true; // We got a task
286 }
287 else {
288 // Spurious wakeup or stop_pool=true with empty queue
289 continue; // Go back to wait
290 }
291 } // Mutex unlocked
292
293 // Execute the task if we acquired one
294 if (task_acquired) {
295 task_payload(my_object); // Execute task with assigned object
296 }
297 }
298 catch (...) {
299 // --- Exception Handling ---
300 { // Lock to safely store the first exception
301 std::lock_guard<std::mutex> lock(exception_mutex);
302 if (!first_exception) {
303 first_exception = std::current_exception(); // Store it
304 }
305 }
306 // Signal pool to stop immediately on any error
307 {
308 std::unique_lock<std::mutex> lock(queue_mutex);
309 stop_pool = true;
310 }
311 condition_producer.notify_all(); // Wake all threads to check stop flag
312
313 // *** Crucial Fix: Decrement active_tasks even on exception ***
314 // Check if we actually acquired a task before decrementing
315 if (task_acquired) {
316 if (--active_tasks == 0 && stop_pool) {
317 // Also notify consumer here in case this was the last task
318 condition_consumer.notify_one();
319 }
320 }
321 // Decide whether to exit the worker or try processing remaining tasks
322 // For simplicity, let's exit the worker on error.
323 return; // Exit worker thread immediately on error
324 }
325
326 // --- Normal Task Completion ---
327 // Decrement active task count *after* successful execution
328 // Check if we actually acquired and processed a task
329 if (task_acquired) {
330 if (--active_tasks == 0 && stop_pool) {
331 condition_consumer.notify_one();
332 }
333 }
334 } // End of while loop
335 }; // End of worker_logic lambda
336
337 // --- Start Worker Threads ---
338 workers.reserve(threads_to_use);
339 for (size_t i = 0; i < threads_to_use; ++i) {
340 workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
341 }
342
343 // --- Main Thread: Iterate and Enqueue Tasks ---
344 try {
346 do {
347 // Check if pool was stopped prematurely (e.g., by an exception in a worker)
348 // Lock needed to safely check stop_pool
349 {
350 std::unique_lock<std::mutex> lock(queue_mutex);
351 if (stop_pool) break;
352 }
353
354 std::vector<int> coords_copy = fCurrentCoords;
355 {
356 std::unique_lock<std::mutex> lock(queue_mutex);
357 // Double check stop_pool after acquiring lock
358 if (stop_pool) break;
359
360 active_tasks++;
361 tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
362 }
363 condition_producer.notify_one();
364 } while (Increment());
365 }
366 catch (...) {
367 // Exception during iteration/enqueueing
368 {
369 std::unique_lock<std::mutex> lock(queue_mutex);
370 stop_pool = true; // Signal workers to stop
371 if (!first_exception) { // Store exception if none from workers yet
372 first_exception = std::current_exception();
373 }
374 }
375 condition_producer.notify_all();
376 // Proceed to join threads
377 }
378
379 // --- Signal Workers to Stop (if not already stopped by error) ---
380 {
381 std::unique_lock<std::mutex> lock(queue_mutex);
382 stop_pool = true;
383 }
384 condition_producer.notify_all();
385
386 // --- Wait for Tasks to Complete ---
387 {
388 std::unique_lock<std::mutex> lock(queue_mutex);
389 condition_consumer.wait(lock, [&] { return stop_pool && active_tasks == 0; });
390 }
391
392 // --- Join Worker Threads ---
393 for (std::thread & worker : workers) {
394 if (worker.joinable()) {
395 worker.join();
396 }
397 }
398
399 // --- Check for and rethrow exception from workers ---
400 if (first_exception) {
401 std::rethrow_exception(first_exception);
402 }
403}
404
405size_t NDimensionalExecutor::ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects,
406 size_t processCount)
407{
408 StartProcessIpc(workerObjects, processCount);
409 try {
410 size_t acked = ExecuteCurrentBoundsProcessIpc();
411 FinishProcessIpc();
412 return acked;
413 }
414 catch (...) {
415 FinishProcessIpc();
416 throw;
417 }
418}
419
420bool NDimensionalExecutor::InitTcpWorker(const std::string & identity)
421{
422 // Extract worker index from identity string (e.g. "wk_001")
423 // Derive the prefix by stripping trailing digits from a sample identity
424 const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
425 size_t numLen = 0;
426 while (numLen < sample.size() && std::isdigit((unsigned char)sample[sample.size() - 1 - numLen]))
427 ++numLen;
428 const size_t prefixLen = sample.size() - numLen; // e.g. strlen("wk_") == 3
429 if (identity.size() <= prefixLen) return false;
430 size_t workerIdx = 0;
431 try {
432 workerIdx = std::stoul(identity.substr(prefixLen));
433 }
434 catch (...) {
435 NLogWarning("NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
436 return false;
437 }
438 if (workerIdx >= fIpcSession->maxWorkers) {
439 NLogWarning("NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
440 workerIdx, fIpcSession->maxWorkers);
441 return false;
442 }
443 if (fIpcSession->identityToWorker.count(identity)) {
444 NLogWarning("NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
445 identity.c_str());
446 return false;
447 }
448
449 const std::string sessionId = std::to_string(getpid());
450 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
451 {identity, "INIT", std::to_string(workerIdx), sessionId,
452 fIpcSession->jobDir, fIpcSession->treeName,
453 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
454 NLogError("NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
455 return false;
456 }
457
458 int initTimeoutSec = 30;
459 if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
460 try {
461 initTimeoutSec = std::max(1, std::stoi(env));
462 }
463 catch (...) {
464 NLogWarning("NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
465 initTimeoutSec);
466 }
467 }
468 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
469 bool acked = false;
470 while (!acked) {
471 std::vector<std::string> ackFrames;
472 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
473 if (errno == EAGAIN || errno == EWOULDBLOCK) {
474 if (std::chrono::steady_clock::now() > initDeadline) break;
475 continue;
476 }
477 break;
478 }
479 if (ackFrames.size() >= 2 && ackFrames[1] == "BOOTSTRAP") {
480 HandleBootstrap(ackFrames[0]);
481 continue;
482 }
483 if (ackFrames.size() >= 2 && ackFrames[1] == "READY" && ackFrames[0] != identity) {
484 if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
485 std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
486 ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
487 fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
488 }
489 continue;
490 }
491 if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] == "ACK") {
492 acked = true;
493 }
494 }
495 if (!acked) {
496 NLogError("NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
497 return false;
498 }
499
500 // Check for duplicate identity (defensive - shouldn't happen with proper cleanup)
501 if (fIpcSession->identityToWorker.count(identity)) {
502 NLogWarning("NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, replacing", identity.c_str());
503 // Remove from vector if present
504 auto it = std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity);
505 if (it != fIpcSession->workerIdentityVec.end()) {
506 fIpcSession->workerIdentityVec.erase(it);
507 }
508 }
509
510 fIpcSession->identityToWorker[identity] = workerIdx;
511 fIpcSession->workerIdentityVec.push_back(identity);
512 // Initialize activity tracking for TCP worker
513 fIpcSession->workerLastActivity[identity] = std::chrono::steady_clock::now();
514 NLogInfo("NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
515 identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
516 return true;
517}
518
519bool NDimensionalExecutor::HandleBootstrap(const std::string & identity)
520{
521 if (!fIpcSession || !fIpcSession->isTcp) return false;
522
523 // Repeat BOOTSTRAP from the same identity should be idempotent.
524 auto existing = fIpcSession->bootstrapAssignments.find(identity);
525 if (existing != fIpcSession->bootstrapAssignments.end()) {
526 const size_t assignedIdx = existing->second;
527 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
528 {identity, "CONFIG", std::to_string(assignedIdx),
529 fIpcSession->macroList, fIpcSession->tmpDir,
530 fIpcSession->tmpResultsDir,
531 fIpcSession->macroParams});
532 }
533
534 if (fIpcSession->bootstrapNextIdx >= fIpcSession->maxWorkers) {
535 NLogWarning("NDimensionalExecutor::HandleBootstrap: rejecting worker '%s' (capacity reached: %zu)",
536 identity.c_str(), fIpcSession->maxWorkers);
537 return NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "REJECT", "capacity"});
538 }
539
540 const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
541
542 fIpcSession->bootstrapAssignments[identity] = assignedIdx;
543
544 NLogDebug("NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
545 identity.c_str());
546 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
547 {identity, "CONFIG", std::to_string(assignedIdx),
548 fIpcSession->macroList, fIpcSession->tmpDir,
549 fIpcSession->tmpResultsDir,
550 fIpcSession->macroParams});
551}
552
553void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
554 const std::string & tcpBindEndpoint, const std::string & jobDir,
555 const std::string & treeName, const std::string & macroList,
556 const std::string & tmpDir, const std::string & tmpResultsDir,
557 const std::string & macroParams)
558{
559 if (workerObjects.empty()) {
560 throw std::invalid_argument("Worker objects vector cannot be empty.");
561 }
562 if (fIpcSession) {
563 throw std::runtime_error("IPC session is already active.");
564 }
565
566 const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
567 NLogInfo("NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
568 workerObjects.size(), processesToUse);
569 const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
570 std::chrono::high_resolution_clock::now().time_since_epoch())
571 .count();
572 fIpcSession = std::make_unique<IpcSession>();
573
574 const bool isTcp = !tcpBindEndpoint.empty();
575 fIpcSession->isTcp = isTcp;
576
577 if (isTcp) {
578 fIpcSession->endpoint = tcpBindEndpoint;
579 fIpcSession->endpointPath.clear();
580 } else {
581 fIpcSession->endpointPath = "/tmp/ndmspc_ipc_" + std::to_string(getpid()) + "_" + std::to_string(nowNs) + ".sock";
582 fIpcSession->endpoint = "ipc://" + fIpcSession->endpointPath;
583 ::unlink(fIpcSession->endpointPath.c_str());
584 }
585
586 fIpcSession->ctx = zmq_ctx_new();
587 if (!fIpcSession->ctx) {
588 fIpcSession.reset();
589 throw std::runtime_error("Failed to create ZeroMQ context.");
590 }
591
592 fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
593 if (!fIpcSession->router) {
594 zmq_ctx_term(fIpcSession->ctx);
595 fIpcSession.reset();
596 throw std::runtime_error("Failed to create ZeroMQ ROUTER socket.");
597 }
598
599 int timeoutMs = 1000;
600 zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
601
602 if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
603 const std::string err = zmq_strerror(zmq_errno());
604 zmq_close(fIpcSession->router);
605 zmq_ctx_term(fIpcSession->ctx);
606 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
607 fIpcSession.reset();
608 throw std::runtime_error("Failed to bind endpoint '" + fIpcSession->endpoint + "': " + err);
609 }
610
611 fIpcSession->identityToWorker.clear();
612 fIpcSession->identityToWorker.reserve(processesToUse);
613 fIpcSession->workerIdentityVec.clear();
614 fIpcSession->pendingReadyIdentities.clear();
615
616 if (!isTcp) {
617 // IPC/fork mode: pre-seed the map as before so WorkerLoop identities match
618 for (size_t i = 0; i < processesToUse; ++i) {
619 fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
620 }
621 } else {
622 // TCP mode: store context for late-joining workers
623 fIpcSession->jobDir = jobDir;
624 fIpcSession->treeName = treeName;
625 fIpcSession->workerObjects = &workerObjects;
626 fIpcSession->maxWorkers = processesToUse;
627 fIpcSession->macroList = macroList;
628 fIpcSession->tmpDir = tmpDir;
629 fIpcSession->tmpResultsDir = tmpResultsDir;
630 fIpcSession->macroParams = macroParams;
631 fIpcSession->bootstrapAssignments.clear();
632 }
633
634 if (!isTcp) {
635 fIpcSession->childPids.assign(processesToUse, -1);
636 for (size_t i = 0; i < processesToUse; ++i) {
637 pid_t pid = fork();
638 if (pid < 0) {
639 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
640 zmq_close(fIpcSession->router);
641 zmq_ctx_term(fIpcSession->ctx);
642 ::unlink(fIpcSession->endpointPath.c_str());
643 fIpcSession.reset();
644 throw std::runtime_error("Failed to fork worker process.");
645 }
646 if (pid == 0) {
647 zmq_close(fIpcSession->router);
648 zmq_ctx_term(fIpcSession->ctx);
649 const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
650 _exit(rc == 0 ? 0 : 1);
651 }
652 fIpcSession->childPids[i] = pid;
653 }
654 }
655
656 // --- Wait for initial workers ---
657 // IPC (fork): wait for all; TCP: wait until at least one connects or timeout.
658 int readyTimeoutSec = isTcp ? 300 : 30;
659 if (isTcp) {
660 if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
661 try { readyTimeoutSec = std::stoi(env); } catch (...) {}
662 }
663 NLogInfo("NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
664 readyTimeoutSec, processesToUse);
665 }
666 const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
667
668 // IPC/fork mode needs the full fixed worker set before dispatch starts.
669 // TCP mode is dynamic: start as soon as the first worker is ready, and let
670 // additional workers join through the late-READY path during execution.
671 const size_t readyTarget = isTcp ? 1 : processesToUse;
672
673 while (fIpcSession->workerIdentityVec.size() < readyTarget) {
674 if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
675 const std::string identity = fIpcSession->pendingReadyIdentities.front();
676 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
677 InitTcpWorker(identity);
678 continue;
679 }
680
681 std::vector<std::string> frames;
682 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
683 if (errno == EAGAIN || errno == EWOULDBLOCK) {
684 if (std::chrono::steady_clock::now() > readyDeadline) {
685 if (!isTcp) {
686 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
687 zmq_close(fIpcSession->router);
688 zmq_ctx_term(fIpcSession->ctx);
689 ::unlink(fIpcSession->endpointPath.c_str());
690 fIpcSession.reset();
691 throw std::runtime_error("Timeout while waiting for IPC workers to become ready.");
692 }
693 zmq_close(fIpcSession->router);
694 zmq_ctx_term(fIpcSession->ctx);
695 fIpcSession.reset();
696 throw std::runtime_error("Timeout: no TCP workers connected.");
697 }
698 continue;
699 }
700 if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
701 zmq_close(fIpcSession->router);
702 zmq_ctx_term(fIpcSession->ctx);
703 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
704 fIpcSession.reset();
705 throw std::runtime_error("Failed to receive READY message from worker.");
706 }
707 if (frames.size() < 2) continue;
708 const std::string & identity = frames[0];
709 const std::string & cmd = frames[1];
710 if (isTcp && cmd == "BOOTSTRAP") {
711 HandleBootstrap(identity);
712 continue;
713 }
714 if (cmd != "READY") continue;
715
716 if (isTcp) {
717 InitTcpWorker(identity);
718 } else {
719 // IPC/fork: just register (already pre-seeded)
720 if (fIpcSession->identityToWorker.count(identity)) {
721 if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
722 == fIpcSession->workerIdentityVec.end()) {
723 fIpcSession->workerIdentityVec.push_back(identity);
724 NLogInfo("NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
725 fIpcSession->workerIdentityVec.size(), processesToUse);
726 }
727 }
728 }
729 }
730
731 // In TCP mode the startup loop exits as soon as the first worker is ready.
732 // Any further READY messages that arrived (and were buffered in
733 // pendingReadyIdentities during InitTcpWorker's ACK wait) must be drained
734 // here so all already-connected workers are fully initialised before the
735 // dispatch loop starts — otherwise their tasks would never be scheduled.
736 if (isTcp) {
737 while (!fIpcSession->pendingReadyIdentities.empty()) {
738 const std::string id = fIpcSession->pendingReadyIdentities.front();
739 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
740 InitTcpWorker(id);
741 }
742 }
743
744 if (!isTcp) {
745 InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
746 } else {
747 // TCP mode: install handler with no child PIDs — just sets gIpcSigIntRequested
748 // so the dispatch loop can break cleanly and FinishProcessIpc sends STOP to workers.
749 InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
750 }
751}
752
753// Centralized worker failure cleanup: recovers tasks, removes worker from tracking, logs, updates progress.
754// This eliminates duplication across TCP send failure, TCP timeout, and IPC crash handlers.
755size_t NDimensionalExecutor::HandleWorkerFailure(const std::string & failedIdentity,
756 const std::string & failureReason,
757 size_t & outstanding,
758 size_t & acked)
759{
760 if (!fIpcSession) {
761 return 0;
762 }
763
764 size_t redistributedCount = 0;
765 size_t replayedDoneCount = 0;
766 size_t replayedLiveCount = 0;
767
768 auto historyIt = fIpcSession->workerTaskHistory.find(failedIdentity);
769 if (historyIt != fIpcSession->workerTaskHistory.end()) {
770 for (const size_t taskId : historyIt->second) {
771 const bool wasDone = fIpcSession->taskStateManager.IsDone(taskId);
772 if (!fIpcSession->taskStateManager.RequeueTask(taskId)) {
773 continue;
774 }
775 ++redistributedCount;
776 if (wasDone) {
777 ++replayedDoneCount;
778 } else {
779 ++replayedLiveCount;
780 }
781 }
782 fIpcSession->workerTaskHistory.erase(historyIt);
783 } else {
784 const auto recovered = fIpcSession->taskStateManager.RecoverWorkerTasks(failedIdentity);
785 redistributedCount = recovered.size();
786 replayedLiveCount = redistributedCount;
787 }
788
789 if (replayedLiveCount > 0) {
790 const size_t dec = std::min(outstanding, replayedLiveCount);
791 outstanding -= dec;
792 }
793 if (replayedDoneCount > 0) {
794 const size_t dec = std::min(acked, replayedDoneCount);
795 acked -= dec;
796 }
797
798 // Remove worker from all tracking structures
799 auto identityIt = std::find(fIpcSession->workerIdentityVec.begin(),
800 fIpcSession->workerIdentityVec.end(),
801 failedIdentity);
802 if (identityIt != fIpcSession->workerIdentityVec.end()) {
803 fIpcSession->workerIdentityVec.erase(identityIt);
804 }
805 fIpcSession->identityToWorker.erase(failedIdentity);
806 fIpcSession->workerLastActivity.erase(failedIdentity);
807 fIpcSession->failedTcpWorkers.erase(failedIdentity);
808
809 // Log worker removal
810 if (redistributedCount > 0) {
811 if (failureReason == "send_failure") {
812 NLogWarning("TCP worker '%s' removed due to send failure. Redistributing %zu task(s). Remaining workers: %zu",
813 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
814 } else if (failureReason == "timeout") {
815 NLogWarning("TCP worker '%s' inactive/disconnected (%zu tasks pending). Redistributing to remaining %zu workers.",
816 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
817 } else if (failureReason == "crash") {
818 NLogWarning("Worker process '%s' exited unexpectedly. Redistributing %zu task(s) to remaining %zu workers.",
819 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
820 } else if (failureReason == "interrupted") {
821 NLogWarning("Worker '%s' interrupted. Replaying %zu task(s) on remaining %zu worker(s).",
822 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
823 } else {
824 NLogWarning("Worker '%s' failed (%s). Redistributing %zu task(s). Remaining workers: %zu",
825 failedIdentity.c_str(), failureReason.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
826 }
827 }
828
829 return redistributedCount;
830}
831
832size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(const std::string & definitionName,
833 const std::vector<Long64_t> * definitionIds,
834 const std::function<void(const ExecutionProgress&)> & progressCallback)
835{
836 if (!fIpcSession) {
837 throw std::runtime_error("IPC session is not active.");
838 }
839 if (fNumDimensions == 0) {
840 return 0;
841 }
842
843 fIpcSession->taskStateManager.Clear();
844 fIpcSession->workerTaskHistory.clear();
845
846 // Save current definition so late-joining workers can catch up
847 fIpcSession->currentDefName = definitionName;
848 fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
849 fIpcSession->hasCurrentDefIds = (definitionIds != nullptr);
850
851 if (!definitionName.empty()) {
852 std::vector<std::string> failedWorkers;
853 for (const auto & identity : fIpcSession->workerIdentityVec) {
854 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", definitionName})) {
855 if (fIpcSession->isTcp) {
856 NLogWarning("Failed to send SETDEF to TCP worker '%s', marking as failed", identity.c_str());
857 failedWorkers.push_back(identity);
858 continue; // Skip SETIDS for this worker and continue with others
859 } else {
860 throw std::runtime_error("Failed to send IPC SETDEF message to worker '" + identity + "'.");
861 }
862 }
863 if (definitionIds != nullptr) {
864 if (!NDimensionalIpcRunner::SendFrames(
865 fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
866 if (fIpcSession->isTcp) {
867 NLogWarning("Failed to send SETIDS to TCP worker '%s', marking as failed", identity.c_str());
868 failedWorkers.push_back(identity);
869 continue;
870 } else {
871 throw std::runtime_error("Failed to send IPC SETIDS message to worker '" + identity + "'.");
872 }
873 }
874 }
875 }
876
877 // Mark failed TCP workers for removal
878 if (fIpcSession->isTcp && !failedWorkers.empty()) {
879 for (const auto & identity : failedWorkers) {
880 fIpcSession->failedTcpWorkers.insert(identity);
881 }
882 NLogWarning("Marked %zu TCP worker(s) as failed during SETDEF/SETIDS", failedWorkers.size());
883 }
884 }
885
886 size_t ipcBatchSize = 1;
887 if (const char * envBatchSize = gSystem->Getenv("NDMSPC_IPC_BATCH_SIZE")) {
888 try {
889 ipcBatchSize = std::max<size_t>(1, static_cast<size_t>(std::stoll(envBatchSize)));
890 }
891 catch (...) {
892 NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
893 ipcBatchSize);
894 }
895 }
896
897 size_t nextTaskId = 0;
898 size_t dispatchMessageId = 0;
899 size_t outstanding = 0;
900 size_t outstandingMessages = 0;
901 size_t acked = 0;
902 size_t nextSchedulerLogAck = 200;
903 std::string firstError;
904 // Late-joining TCP workers: identity → assigned worker index.
905 // INIT is sent immediately on READY but the ACK is handled asynchronously
906 // in the main receive loop so we never block on socket reads mid-dispatch.
907 std::unordered_map<std::string, size_t> pendingInitWorkers;
908
909 int stallTimeoutSec = 120;
910 if (const char * envStallTimeout = gSystem->Getenv("NDMSPC_IPC_STALL_TIMEOUT")) {
911 try {
912 stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
913 }
914 catch (...) {
915 NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
916 stallTimeoutSec);
917 }
918 }
919 auto lastProgress = std::chrono::steady_clock::now();
920
922 bool hasMore = true;
923
924 // maxInFlightMessages is recalculated dynamically as workers join
925 size_t totalTasks = 1;
926 for (size_t i = 0; i < fNumDimensions; ++i) {
927 totalTasks *= static_cast<size_t>(fMaxBounds[i] - fMinBounds[i] + 1);
928 }
929 auto isUserInterrupted = []() {
930 if (gIpcSigIntRequested != 0) return true;
931 return (gROOT && gROOT->IsInterrupted());
932 };
933 // Helper: send SETDEF/SETIDS catchup to a newly-joined worker
934 auto sendCatchup = [&](const std::string & identity) {
935 if (!fIpcSession->currentDefName.empty()) {
936 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", fIpcSession->currentDefName});
937 if (fIpcSession->hasCurrentDefIds) {
938 NDimensionalIpcRunner::SendFrames(
939 fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
940 }
941 }
942 };
943
944 while ((hasMore || outstanding > 0 || fIpcSession->taskStateManager.HasPending()) && firstError.empty()) {
945 // STATE MACHINE FOR TASK DISTRIBUTION:
946 // pending → dispatch → running → ACK → done
947 //
948 // Key counters:
949 // - hasMore: More tasks available from coordinate iterator?
950 // - outstanding: Number of tasks sent but not yet ACKed
951 // - pending: Tasks queued, not yet assigned to workers
952 // - running: Tasks assigned to workers, awaiting ACK
953 // - done: Tasks fully processed and ACKed
954 // - acked: Cumulative count of tasks ACKed (used for progress callback)
955 //
956 // SAFE EXIT CONDITIONS:
957 // 1. hasMore=false && outstanding=0 && pending=0 → All work complete (SUCCESS)
958 // 2. No workers remain && (outstanding>0 || pending>0) → FAIL (work lost)
959 // 3. User interrupt signal (Ctrl+C) → FAIL with cleanup
960 // 4. Stall timeout (no progress for N seconds) → FAIL (deadlock)
961 //
962 // WORKER FAILURE RECOVERY:
963 // On TCP timeout, IPC crash, or send failure:
964 // - Call RecoverWorkerTasks(worker) → Returns pending tasks, requeues running tasks
965 // - Tasks become pending again and are re-dispatched to remaining workers
966 // - No loss of work, but may increase total execution time
967 // - No retry loop to avoid infinite loops with persistent failures
968
969 if (isUserInterrupted()) {
970 firstError = "Interrupted by user";
971 break;
972 }
973
974 // Check if all workers have failed - exit early to avoid infinite loop
975 if (fIpcSession->workerIdentityVec.empty() && (outstanding > 0 || fIpcSession->taskStateManager.HasPending() || hasMore)) {
976 if (fIpcSession->isTcp) {
977 firstError = "No workers available. All TCP workers have disconnected/failed.";
978 } else {
979 firstError = "No workers available. All worker processes have exited/failed.";
980 }
981 break;
982 }
983
984 // Allow multiple batches to be in flight for better parallelism
985 // Each worker can have up to this many batches pending
986 const size_t maxInFlightMessages = std::max<size_t>(4, fIpcSession->workerIdentityVec.size());
987
988 while ((hasMore || fIpcSession->taskStateManager.HasPending()) && outstandingMessages < maxInFlightMessages && firstError.empty()) {
989 if (fIpcSession->workerIdentityVec.empty()) break; // no workers yet — wait
990
991 // Skip workers that have been marked as failed
992 size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
993 std::string identity = fIpcSession->workerIdentityVec[workerSlot];
994
995 // In TCP mode, skip failed workers and find next available worker
996 if (fIpcSession->isTcp) {
997 size_t attempts = 0;
998 while (fIpcSession->failedTcpWorkers.count(identity) && attempts < fIpcSession->workerIdentityVec.size()) {
999 ++dispatchMessageId;
1000 ++attempts;
1001 workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
1002 identity = fIpcSession->workerIdentityVec[workerSlot];
1003 }
1004 // If all workers are failed, break to trigger cleanup
1005 if (attempts >= fIpcSession->workerIdentityVec.size()) {
1006 break;
1007 }
1008 }
1009
1010 std::vector<std::pair<size_t, std::vector<int>>> batchTasks;
1011 const size_t nw = fIpcSession->workerIdentityVec.size();
1012 const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
1013 const size_t adaptiveBatchSize = std::max<size_t>(
1014 1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
1015 batchTasks.reserve(adaptiveBatchSize);
1016
1017 // First, dispatch redistributed (pending) tasks from failed workers
1018 // These were added back to pending state by RecoverWorkerTasks or MarkFailed
1019 size_t reprocessedCount = 0;
1020 size_t redistPerBatch = 1;
1021 if (adaptiveBatchSize > 1 && fIpcSession->workerIdentityVec.size() > 0) {
1022 redistPerBatch = std::max<size_t>(1, adaptiveBatchSize / fIpcSession->workerIdentityVec.size());
1023 }
1024 size_t redistAdded = 0;
1025 while (fIpcSession->taskStateManager.HasPending() && outstanding < maxInFlightMessages * ipcBatchSize &&
1026 batchTasks.size() < adaptiveBatchSize && redistAdded < redistPerBatch) {
1027 size_t taskId = 0;
1028 std::vector<int> coords;
1029 if (!fIpcSession->taskStateManager.ClaimNextPendingForWorker(identity, taskId, coords)) {
1030 break;
1031 }
1032 batchTasks.emplace_back(taskId, coords);
1033 fIpcSession->workerTaskHistory[identity].insert(taskId);
1034 ++redistAdded;
1035 ++outstanding;
1036
1037 if (isUserInterrupted()) {
1038 firstError = "Interrupted by user";
1039 break;
1040 }
1041 }
1042
1043 if (reprocessedCount > 0) {
1044 NLogDebug("Redistributing %zu previously-completed tasks (acked counter decremented)", reprocessedCount);
1045 }
1046
1047 // Then, dispatch new tasks if space available
1048 while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
1049 fIpcSession->taskStateManager.AddPending(nextTaskId, fCurrentCoords);
1050 size_t taskId = 0;
1051 std::vector<int> payload;
1052 if (!fIpcSession->taskStateManager.ClaimNextPendingForWorker(identity, taskId, payload)) {
1053 firstError = "Failed to claim pending task for worker dispatch.";
1054 break;
1055 }
1056 batchTasks.emplace_back(taskId, payload);
1057 fIpcSession->workerTaskHistory[identity].insert(taskId);
1058 ++nextTaskId;
1059 ++outstanding;
1060
1061 if (!Increment()) {
1062 hasMore = false;
1063 }
1064
1065 if (isUserInterrupted()) {
1066 firstError = "Interrupted by user";
1067 break;
1068 }
1069 }
1070
1071 if (!firstError.empty()) {
1072 break;
1073 }
1074
1075 if (batchTasks.empty()) {
1076 continue;
1077 }
1078
1079 // Log assigned task coordinates to supervisor console for debugging
1080 for (const auto & task : batchTasks) {
1081 const std::string coordsStr = NDimensionalIpcRunner::SerializeCoords(task.second);
1082 NLogInfo("NDimensionalExecutor: Assign task %zu coords=%s -> worker '%s'", task.first, coordsStr.c_str(), identity.c_str());
1083 }
1084
1085 // Initialize/update activity tracking for TCP workers
1086 if (fIpcSession->isTcp) {
1087 fIpcSession->workerLastActivity[identity] = std::chrono::steady_clock::now();
1088 }
1089
1090 if (batchTasks.size() == 1) {
1091 const std::string taskId = std::to_string(batchTasks[0].first);
1092 const std::string coords = NDimensionalIpcRunner::SerializeCoords(batchTasks[0].second);
1093 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASK", taskId, coords})) {
1094 if (fIpcSession->isTcp) {
1095 // TCP worker likely disconnected - mark for redistribution
1096 NLogWarning("Failed to send TASK to TCP worker '%s', marking as failed", identity.c_str());
1097
1098 fIpcSession->failedTcpWorkers.insert(identity);
1099 // Put task back in redistribution queue (mark as failed to return to pending)
1100 for (const auto & task : batchTasks) {
1101 fIpcSession->taskStateManager.MarkFailed(task.first);
1102 }
1103 break; // Break inner loop to skip this worker and retry on next iteration
1104 } else {
1105 firstError = "Failed to send IPC TASK message to worker '" + identity + "'.";
1106 break;
1107 }
1108 }
1109 }
1110 else {
1111 std::ostringstream payload;
1112 for (size_t i = 0; i < batchTasks.size(); ++i) {
1113 if (i != 0) payload << ';';
1114 payload << batchTasks[i].first << ':' << NDimensionalIpcRunner::SerializeCoords(batchTasks[i].second);
1115 }
1116 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASKB", payload.str()})) {
1117 if (fIpcSession->isTcp) {
1118 // TCP worker likely disconnected - mark for redistribution
1119 NLogWarning("Failed to send TASKB to TCP worker '%s', marking as failed", identity.c_str());
1120
1121 fIpcSession->failedTcpWorkers.insert(identity);
1122 // Put tasks back in redistribution queue (mark as failed to return to pending)
1123 for (const auto & task : batchTasks) {
1124 fIpcSession->taskStateManager.MarkFailed(task.first);
1125 }
1126 break; // Break inner loop to skip this worker and retry on next iteration
1127 } else {
1128 firstError = "Failed to send IPC TASKB message to worker '" + identity + "'.";
1129 break;
1130 }
1131 }
1132 }
1133 ++dispatchMessageId;
1134 ++outstandingMessages;
1135 }
1136
1137 // Clean up any TCP workers that failed during send attempts
1138 if (fIpcSession->isTcp && !fIpcSession->failedTcpWorkers.empty()) {
1139 std::vector<std::string> workersToRemove;
1140 for (const auto & identity : fIpcSession->workerIdentityVec) {
1141 if (fIpcSession->failedTcpWorkers.count(identity)) {
1142 workersToRemove.push_back(identity);
1143 }
1144 }
1145
1146 for (const auto & failedIdentity : workersToRemove) {
1147 HandleWorkerFailure(failedIdentity, "send_failure", outstanding, acked);
1148
1149 // Update progress bar to reflect worker count change
1150 if (progressCallback) {
1151 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1152 fIpcSession->taskStateManager.RunningCount(),
1153 fIpcSession->taskStateManager.DoneCount(),
1154 fIpcSession->workerIdentityVec.size()};
1155 progressCallback(progress);
1156 }
1157 }
1158
1159 // If no workers remain, fail
1160 if (fIpcSession->workerIdentityVec.empty()) {
1161 firstError = "No workers available. All TCP workers have disconnected/failed.";
1162
1163 }
1164 }
1165
1166 if (outstanding == 0 && fIpcSession->workerIdentityVec.empty()) continue;
1167 if (outstanding == 0 && !hasMore && !fIpcSession->taskStateManager.HasPending()) continue;
1168
1169 std::vector<std::string> frames;
1170 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
1171 if (errno == EINTR || isUserInterrupted()) {
1172 firstError = "Interrupted by user";
1173 break;
1174 }
1175 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1176 // In TCP mode, receive errors can occur when workers disconnect
1177 // Let the activity timeout detection handle this gracefully
1178 if (!fIpcSession->isTcp) {
1179 firstError = "Failed to receive IPC ACK/ERR from worker.";
1180 break;
1181 }
1182 // TCP mode: treat as timeout and continue to worker failure detection
1183 }
1184
1185 // Check for worker failures and redistribute their tasks
1186 // For TCP mode: check for inactive workers (no ACKs received)
1187 if (fIpcSession->isTcp && outstanding > 0) {
1188 const auto now = std::chrono::steady_clock::now();
1189 int tcpWorkerTimeoutSec = 30; // Default timeout for TCP worker inactivity
1190 if (const char * envTcpTimeout = gSystem->Getenv("NDMSPC_TCP_WORKER_TIMEOUT")) {
1191 try {
1192 tcpWorkerTimeoutSec = std::max(10, std::stoi(envTcpTimeout));
1193 } catch (...) {}
1194 }
1195
1196 std::vector<std::string> inactiveWorkers;
1197 for (const auto & identity : fIpcSession->workerIdentityVec) {
1198 auto activityIt = fIpcSession->workerLastActivity.find(identity);
1199 if (activityIt != fIpcSession->workerLastActivity.end()) {
1200 auto inactiveSecs = std::chrono::duration_cast<std::chrono::seconds>(now - activityIt->second).count();
1201 if (inactiveSecs >= tcpWorkerTimeoutSec) {
1202 // Check if this worker has pending tasks - if so, it should have responded by now
1203 bool hasPendingTasks = !fIpcSession->taskStateManager.GetWorkerTasks(identity).empty();
1204
1205 // Mark as inactive if:
1206 // 1. It has pending tasks (should have responded by now), OR
1207 // 2. It's been idle for more than 2x the timeout (likely disconnected)
1208 if (hasPendingTasks || inactiveSecs >= tcpWorkerTimeoutSec * 2) {
1209 inactiveWorkers.push_back(identity);
1210 }
1211 }
1212 }
1213 }
1214
1215 // Redistribute tasks from inactive TCP workers
1216 for (const auto & failedIdentity : inactiveWorkers) {
1217 if (fIpcSession->failedTcpWorkers.count(failedIdentity)) continue; // already handled
1218 fIpcSession->failedTcpWorkers.insert(failedIdentity);
1219
1220 HandleWorkerFailure(failedIdentity, "timeout", outstanding, acked);
1221
1222 // Reset progress timer since we're actively handling worker failure
1223 lastProgress = std::chrono::steady_clock::now();
1224
1225 // Update progress bar to reflect worker count change
1226 if (progressCallback) {
1227 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1228 fIpcSession->taskStateManager.RunningCount(),
1229 fIpcSession->taskStateManager.DoneCount(),
1230 fIpcSession->workerIdentityVec.size()};
1231 progressCallback(progress);
1232 }
1233
1234 // If no workers remain, fail
1235 if (fIpcSession->workerIdentityVec.empty()) {
1236 firstError = "All TCP workers have disconnected/failed. No workers available to continue processing.";
1237
1238 break;
1239 }
1240 }
1241 }
1242
1243 // For IPC mode: check forked child processes
1244 for (size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
1245 int status = 0;
1246 pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
1247 if (rc == fIpcSession->childPids[i]) {
1248 // Worker process exited - redistribute its pending tasks
1249 std::string failedIdentity = NDimensionalIpcRunner::BuildWorkerIdentity(i);
1250
1251 HandleWorkerFailure(failedIdentity, "crash", outstanding, acked);
1252
1253 // Reset progress timer since we're actively redistributing
1254 lastProgress = std::chrono::steady_clock::now();
1255
1256 // Update progress bar to reflect worker count change
1257 if (progressCallback) {
1258 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1259 fIpcSession->taskStateManager.RunningCount(),
1260 fIpcSession->taskStateManager.DoneCount(),
1261 fIpcSession->workerIdentityVec.size()};
1262 progressCallback(progress);
1263 }
1264
1265 // If no workers remain, fail
1266 if (fIpcSession->workerIdentityVec.empty()) {
1267 firstError = "No workers available. All worker processes have exited/failed.";
1268
1269 break;
1270 }
1271 }
1272 }
1273 if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
1274 const auto now = std::chrono::steady_clock::now();
1275 const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
1276 if (stallSecs >= stallTimeoutSec) {
1277 const size_t activeWorkers = fIpcSession->workerIdentityVec.size();
1278 if (activeWorkers == 0) {
1279 firstError = "No workers available. All workers have disconnected/failed with " +
1280 std::to_string(outstanding) + " pending tasks remaining.";
1281 } else {
1282 firstError = "No IPC/TCP ACK progress for " + std::to_string(stallSecs) + "s with " +
1283 std::to_string(outstanding) + " pending tasks (active workers: " +
1284 std::to_string(activeWorkers) + ").";
1285 }
1286 }
1287 }
1288 continue;
1289 }
1290
1291 if (frames.size() < 2) continue;
1292
1293 // Handle late-joining TCP worker: send INIT immediately but do NOT block
1294 // waiting for the ACK — the ACK is handled below as a 2-frame message so
1295 // task ACKs from active workers are never dropped.
1296 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "READY") {
1297 const std::string & lateId = frames[0];
1298 if (!fIpcSession->identityToWorker.count(lateId) && !pendingInitWorkers.count(lateId)) {
1299 const std::string prefix = "wk_";
1300 const size_t prefixLen = prefix.size();
1301 if (lateId.size() > prefixLen && lateId.substr(0, prefixLen) == prefix) {
1302 size_t workerIdx = std::numeric_limits<size_t>::max();
1303 try { workerIdx = std::stoul(lateId.substr(prefixLen)); } catch (...) {}
1304 if (workerIdx < fIpcSession->maxWorkers) {
1305 const std::string sessionId = std::to_string(getpid());
1306 if (NDimensionalIpcRunner::SendFrames(fIpcSession->router,
1307 {lateId, "INIT", std::to_string(workerIdx), sessionId,
1308 fIpcSession->jobDir, fIpcSession->treeName,
1309 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
1310 pendingInitWorkers[lateId] = workerIdx;
1311 NLogDebug("NDimensionalExecutor: late worker '%s' sent INIT, awaiting ACK", lateId.c_str());
1312 }
1313 }
1314 }
1315 }
1316 lastProgress = std::chrono::steady_clock::now();
1317 continue;
1318 }
1319
1320 // Handle INIT ACK from a late-joining worker (2-frame: identity + "ACK").
1321 if (fIpcSession->isTcp && frames.size() == 2 && frames[1] == "ACK") {
1322 auto pit = pendingInitWorkers.find(frames[0]);
1323 if (pit != pendingInitWorkers.end()) {
1324 const std::string & id = pit->first;
1325 const size_t idx = pit->second;
1326
1327 // Check for duplicate identity (defensive - shouldn't happen with proper cleanup)
1328 if (fIpcSession->identityToWorker.count(id)) {
1329 NLogWarning("NDimensionalExecutor: late worker '%s' already registered, replacing", id.c_str());
1330 // Remove from vector if present
1331 auto it = std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), id);
1332 if (it != fIpcSession->workerIdentityVec.end()) {
1333 fIpcSession->workerIdentityVec.erase(it);
1334 }
1335 }
1336
1337 fIpcSession->identityToWorker[id] = idx;
1338 fIpcSession->workerIdentityVec.push_back(id);
1339 // Initialize activity tracking for this new TCP worker
1340 fIpcSession->workerLastActivity[id] = std::chrono::steady_clock::now();
1341 NLogInfo("NDimensionalExecutor: late worker '%s' (idx=%zu) joined [total: %zu]",
1342 id.c_str(), idx, fIpcSession->workerIdentityVec.size());
1343 // Log in-flight task distribution across all workers so the startup imbalance is visible
1344 for (const auto & wid : fIpcSession->workerIdentityVec) {
1345 const size_t inFlight = fIpcSession->taskStateManager.GetWorkerTasks(wid).size();
1346 NLogInfo("NDimensionalExecutor: in-flight distribution: worker '%s' has %zu pending task(s)", wid.c_str(), inFlight);
1347 }
1348 sendCatchup(id);
1349 pendingInitWorkers.erase(pit);
1350 lastProgress = std::chrono::steady_clock::now();
1351 continue;
1352 }
1353 // Not a pending-init ACK — fall through to the malformed-message guard.
1354 }
1355
1356 // Handle bootstrapping worker requesting config
1357 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "BOOTSTRAP") {
1358 HandleBootstrap(frames[0]);
1359 lastProgress = std::chrono::steady_clock::now();
1360 continue;
1361 }
1362
1363 // Handle worker shutdown notification (e.g., Ctrl+C)
1364 if (frames.size() >= 2 && frames[1] == "SHUTDOWN") {
1365 const std::string & workerIdentity = frames[0];
1366 const std::string reason = (frames.size() >= 3) ? frames[2] : "unknown";
1367 const std::string tasksCompleted = (frames.size() >= 4) ? frames[3] : "?";
1368
1369 NLogWarning("Worker '%s' reported shutdown: %s (completed %s tasks)", workerIdentity.c_str(), reason.c_str(), tasksCompleted.c_str());
1370
1371 if (fIpcSession->isTcp) {
1372 fIpcSession->failedTcpWorkers.insert(workerIdentity);
1373 }
1374
1375 HandleWorkerFailure(workerIdentity, "interrupted", outstanding, acked);
1376
1377 if (progressCallback) {
1378 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1379 fIpcSession->taskStateManager.RunningCount(),
1380 fIpcSession->taskStateManager.DoneCount(),
1381 fIpcSession->workerIdentityVec.size()};
1382 progressCallback(progress);
1383 }
1384
1385 // If no workers remain, fail
1386 if (fIpcSession->workerIdentityVec.empty()) {
1387 if (fIpcSession->isTcp) {
1388 firstError = "No workers available. All TCP workers have shut down.";
1389 } else {
1390 firstError = "No workers available. All worker processes have shut down.";
1391 }
1392
1393 break;
1394 }
1395
1396 lastProgress = std::chrono::steady_clock::now();
1397 continue;
1398 }
1399
1400 if (frames.size() == 2 && frames[1] == "DONE") {
1401 const std::string & workerIdentity = frames[0];
1402 if (fIpcSession->identityToWorker.count(workerIdentity)) {
1403 fIpcSession->earlyDoneWorkers.insert(workerIdentity);
1404 NLogDebug("NDimensionalExecutor::IPC: Worker '%s' sent DONE before FinishProcessIpc; deferring",
1405 workerIdentity.c_str());
1406 } else {
1407 NLogWarning("NDimensionalExecutor::IPC: ignoring DONE from unknown worker '%s'", workerIdentity.c_str());
1408 }
1409 lastProgress = std::chrono::steady_clock::now();
1410 continue;
1411 }
1412
1413 if (frames.size() < 3) {
1414 // A 2-frame ACK that isn't a pending-init reply is unexpected.
1415 if (frames.size() == 2 && frames[1] == "ACK")
1416 NLogWarning("NDimensionalExecutor: unexpected 2-frame ACK from '%s', ignoring", frames[0].c_str());
1417 else
1418 firstError = "Malformed IPC message received from worker.";
1419 continue;
1420 }
1421
1422 const std::string & cmd = frames[1];
1423 if (cmd == "ACK") {
1424 const std::string & workerIdentity = frames[0];
1425 size_t taskId = 0;
1426 try {
1427 taskId = static_cast<size_t>(std::stoull(frames[2]));
1428 }
1429 catch (...) {
1430 firstError = "Malformed IPC task id received from worker.";
1431 break;
1432 }
1433
1434 // Mark task as done using TaskStateManager
1435 if (!fIpcSession->taskStateManager.MarkDone(taskId)) {
1436 firstError = "Received ACK for unknown or already-done task " + std::to_string(taskId) + ".";
1437 break;
1438 }
1439
1440 // Update TCP worker activity tracking
1441 if (fIpcSession->isTcp) {
1442 fIpcSession->workerLastActivity[workerIdentity] = std::chrono::steady_clock::now();
1443 }
1444
1445 if (outstanding == 0) {
1446 firstError = "IPC outstanding counter underflow while processing ACK.";
1447 break;
1448 }
1449 if (outstandingMessages == 0) {
1450 firstError = "IPC message outstanding counter underflow while processing ACK.";
1451 break;
1452 }
1453 --outstanding;
1454 --outstandingMessages;
1455 ++acked;
1456 lastProgress = std::chrono::steady_clock::now();
1457 const size_t activeWorkersNow = fIpcSession->workerIdentityVec.size();
1458 if (progressCallback) {
1459 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1460 fIpcSession->taskStateManager.RunningCount(),
1461 fIpcSession->taskStateManager.DoneCount(),
1462 activeWorkersNow};
1463 progressCallback(progress);
1464 }
1465 if (acked >= nextSchedulerLogAck) {
1466 NLogDebug("NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pending=%zu running=%zu done=%zu",
1467 acked, totalTasks, activeWorkersNow, outstandingMessages,
1468 fIpcSession->taskStateManager.PendingCount(),
1469 fIpcSession->taskStateManager.RunningCount(),
1470 fIpcSession->taskStateManager.DoneCount());
1471 nextSchedulerLogAck += 200;
1472 }
1473 continue;
1474 }
1475
1476 if (cmd == "ACKB") {
1477 const std::string & workerIdentity = frames[0];
1478 if (frames.size() < 3 || frames[2].empty()) {
1479 firstError = "Malformed IPC ACKB payload received from worker.";
1480 break;
1481 }
1482
1483 if (outstandingMessages == 0) {
1484 firstError = "IPC message outstanding counter underflow while processing ACKB.";
1485 break;
1486 }
1487 --outstandingMessages;
1488
1489 std::stringstream ackStream(frames[2]);
1490 std::string ackToken;
1491 while (std::getline(ackStream, ackToken, ',')) {
1492 if (ackToken.empty()) continue;
1493 size_t ackTaskId = 0;
1494 try {
1495 ackTaskId = static_cast<size_t>(std::stoull(ackToken));
1496 }
1497 catch (...) {
1498 firstError = "Malformed IPC ACKB task id received from worker.";
1499 break;
1500 }
1501
1502 // Mark task as done using TaskStateManager
1503 if (!fIpcSession->taskStateManager.MarkDone(ackTaskId)) {
1504 firstError = "Received ACKB for unknown or already-done task " + std::to_string(ackTaskId) + ".";
1505 break;
1506 }
1507
1508 // Update TCP worker activity tracking
1509 if (fIpcSession->isTcp) {
1510 fIpcSession->workerLastActivity[workerIdentity] = std::chrono::steady_clock::now();
1511 }
1512
1513 if (outstanding == 0) {
1514 firstError = "IPC outstanding counter underflow while processing ACKB.";
1515 break;
1516 }
1517 --outstanding;
1518 ++acked;
1519 lastProgress = std::chrono::steady_clock::now();
1520 const size_t activeWorkersNow = fIpcSession->workerIdentityVec.size();
1521 if (progressCallback) {
1522 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1523 fIpcSession->taskStateManager.RunningCount(),
1524 fIpcSession->taskStateManager.DoneCount(),
1525 activeWorkersNow};
1526 progressCallback(progress);
1527 }
1528 if (acked >= nextSchedulerLogAck) {
1529 NLogDebug(
1530 "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pending=%zu running=%zu done=%zu",
1531 acked, totalTasks, activeWorkersNow, outstandingMessages,
1532 fIpcSession->taskStateManager.PendingCount(),
1533 fIpcSession->taskStateManager.RunningCount(),
1534 fIpcSession->taskStateManager.DoneCount());
1535 nextSchedulerLogAck += 200;
1536 }
1537 }
1538
1539 if (!firstError.empty()) {
1540 break;
1541 }
1542 continue;
1543 }
1544
1545 if (cmd == "ERR") {
1546 size_t taskId = 0;
1547 try {
1548 taskId = static_cast<size_t>(std::stoull(frames[2]));
1549 }
1550 catch (...) {
1551 firstError = "Malformed IPC task id received from worker.";
1552 break;
1553 }
1554 std::string errMsg = (frames.size() >= 4) ? frames[3] : "worker error";
1555 firstError = "Worker reported error for task " + std::to_string(taskId) + ": " + errMsg;
1556 break;
1557 }
1558
1559 firstError = "Unknown IPC command from worker: " + cmd;
1560 break;
1561 }
1562
1563 if (!firstError.empty()) {
1564 throw std::runtime_error(firstError);
1565 }
1566
1567 // All tasks should be either done or somehow lost - check state
1568 const size_t pendingCount = fIpcSession->taskStateManager.PendingCount();
1569 const size_t runningCount = fIpcSession->taskStateManager.RunningCount();
1570 if (pendingCount > 0 || runningCount > 0) {
1571 throw std::runtime_error("IPC execution finished with " + std::to_string(pendingCount) + " pending and " +
1572 std::to_string(runningCount) + " running tasks still unacknowledged.");
1573 }
1574
1575 return acked;
1576}
1577
1578void NDimensionalExecutor::FinishProcessIpc(bool abort)
1579{
1580 if (!fIpcSession) {
1581 return;
1582 }
1583
1584 const std::string stopReason = abort ? "abort" : "ok";
1585 for (const auto & it : fIpcSession->identityToWorker) {
1586 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first, "STOP", stopReason});
1587 }
1588
1589 if (!fIpcSession->isTcp) {
1590 const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1591 if (!exitedCleanly) {
1592 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1593 }
1594 } else if (!abort) {
1595 // TCP mode normal finish: wait for DONE from all workers before merging
1596 const size_t nWorkers = fIpcSession->identityToWorker.size();
1597 std::set<std::string> doneWorkers;
1598 for (const auto & workerIdentity : fIpcSession->earlyDoneWorkers) {
1599 if (fIpcSession->identityToWorker.count(workerIdentity)) {
1600 doneWorkers.insert(workerIdentity);
1601 }
1602 }
1603 const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1604 while (doneWorkers.size() < nWorkers) {
1605 const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1606 if (remaining <= 0) {
1607 NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1608 doneWorkers.size(), nWorkers);
1609 break;
1610 }
1611 zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1612 const int rc = zmq_poll(&item, 1, static_cast<long>(remaining));
1613 if (rc <= 0) {
1614 NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1615 doneWorkers.size(), nWorkers);
1616 break;
1617 }
1618 std::vector<std::string> frames;
1619 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2) continue;
1620 if (frames[1] == "DONE") {
1621 const std::string & workerIdentity = frames[0];
1622 doneWorkers.insert(workerIdentity);
1623
1624 NLogDebug("NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", workerIdentity.c_str(),
1625 doneWorkers.size(), nWorkers);
1626 } else if (frames[1] == "READY") {
1627 // Worker arrived after all tasks were dispatched — send STOP immediately.
1628 NLogInfo("NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1629 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0], "STOP", "ok"});
1630 } else if (frames[1] == "BOOTSTRAP") {
1631 // Worker is still bootstrapping — reply with CONFIG and a STOP will follow via READY.
1632 HandleBootstrap(frames[0]);
1633 }
1634 }
1635 }
1636 // Bound socket close time to avoid hangs at process end when peers disappear
1637 // or when STOP/DONE frames are still queued.
1638 if (fIpcSession->router) {
1639 int lingerMs = 0;
1640 if (fIpcSession->isTcp && abort) {
1641 // In TCP abort, allow a short grace period to flush STOP frames.
1642 lingerMs = 2000;
1643 }
1644 zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs, sizeof(lingerMs));
1645 }
1646
1647 if (fIpcSession->router) {
1648 zmq_close(fIpcSession->router);
1649 }
1650 if (fIpcSession->ctx) {
1651 zmq_ctx_term(fIpcSession->ctx);
1652 }
1653 if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1654 ::unlink(fIpcSession->endpointPath.c_str());
1655 }
1656 // Capture registered worker indices before releasing the session.
1658 if (fIpcSession) {
1659 for (const auto & kv : fIpcSession->identityToWorker) {
1660 fRegisteredWorkerIndices.insert(kv.second);
1661 }
1662 }
1663 RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1664 fIpcSession.reset();
1665}
1666
1668 const std::function<void(const std::vector<int> & coords, NGnThreadData & thread_object)> & func,
1669 std::vector<NGnThreadData> & thread_objects);
1670
1671} // namespace Ndmspc
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
size_t HandleWorkerFailure(const std::string &failedIdentity, const std::string &failureReason, size_t &outstanding, size_t &acked)
Centralized worker failure handling: recovers tasks, removes worker, updates state....
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Thread-local data object for NDMSPC processing.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition NLogger.cxx:132
Manages task lifecycle: pending → running → done/failed.
Thread-local data object for NDMSPC processing.
Definition NThreadData.h:21
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Definition NThreadData.h:96
Global callback function for libwebsockets client events.
Execution progress metrics for IPC-based distributed processing.