ndmspc v1.2.0-0.1.rc4
Loading...
Searching...
No Matches
NDimensionalExecutor.cxx
1#include <string>
2#include <algorithm>
3#include <cctype>
4#include <chrono>
5#include <cerrno>
6#include <csignal>
7#include <set>
8#include <sstream>
9#include <unordered_map>
10#include <sys/wait.h>
11#include <unistd.h>
12#include <zmq.h>
13#include <THnSparse.h>
14#include <TAxis.h>
15#include <TROOT.h>
16#include <TSystem.h>
17#include "NDimensionalExecutor.h"
18#include "NDimensionalIpcRunner.h"
19#include "NGnThreadData.h"
20#include "NUtils.h"
21
22namespace Ndmspc {
23
24namespace {
25volatile sig_atomic_t gIpcSigIntRequested = 0;
26volatile sig_atomic_t gIpcChildCount = 0;
27pid_t gIpcChildPids[1024] = {0};
28
29void IpcSigIntHandler(int)
30{
31 gIpcSigIntRequested = 1;
32 const sig_atomic_t count = gIpcChildCount;
33 for (sig_atomic_t i = 0; i < count; ++i) {
34 if (gIpcChildPids[i] > 0) {
35 // User interrupt should not leave orphan IPC workers.
36 kill(gIpcChildPids[i], SIGKILL);
37 }
38 }
39}
40
41void InstallIpcSigIntHandler(const std::vector<pid_t> & childPids, struct sigaction & oldAction, bool & hasOldAction)
42{
43 gIpcSigIntRequested = 0;
44 gIpcChildCount = 0;
45 const size_t maxChildren = sizeof(gIpcChildPids) / sizeof(gIpcChildPids[0]);
46 const size_t count = std::min(maxChildren, childPids.size());
47 for (size_t i = 0; i < count; ++i) {
48 gIpcChildPids[i] = childPids[i];
49 }
50 for (size_t i = count; i < maxChildren; ++i) {
51 gIpcChildPids[i] = 0;
52 }
53 gIpcChildCount = static_cast<sig_atomic_t>(count);
54
55 struct sigaction sa;
56 memset(&sa, 0, sizeof(sa));
57 sa.sa_handler = IpcSigIntHandler;
58 sigemptyset(&sa.sa_mask);
59 sa.sa_flags = 0;
60
61 if (sigaction(SIGINT, &sa, &oldAction) == 0) {
62 hasOldAction = true;
63 }
64}
65
66void RestoreIpcSigIntHandler(const struct sigaction & oldAction, bool hasOldAction)
67{
68 if (hasOldAction) {
69 sigaction(SIGINT, &oldAction, nullptr);
70 }
71 gIpcChildCount = 0;
72 gIpcSigIntRequested = 0;
73}
74} // namespace
75
77 void * ctx{nullptr};
78 void * router{nullptr};
79 bool isTcp{false};
80 std::string endpointPath;
81 std::string endpoint;
82 std::vector<pid_t> childPids;
83 std::unordered_map<std::string, size_t> identityToWorker;
84 std::vector<std::string> workerIdentityVec; // ordered list for round-robin
85 // TCP late-joiner support: stored so new workers can be initialised mid-run
86 std::string jobDir;
87 std::string treeName;
88 std::vector<NThreadData *> * workerObjects{nullptr};
89 size_t maxWorkers{0};
90 std::string currentDefName;
91 std::vector<Long64_t> currentDefIds;
92 bool hasCurrentDefIds{false};
93 struct sigaction oldSigIntAction{};
94 bool hasOldSigIntAction{false};
95 // Bootstrap configuration sent to workers on first contact
96 std::string macroList; // comma-separated macro paths to load on worker
97 std::string macroParams; // parameter list forwarded to TMacro::Exec on worker
98 std::string tmpDir; // supervisor's NDMSPC_TMP_DIR (fallback for workers)
99 std::string tmpResultsDir; // supervisor's NDMSPC_TMP_RESULTS_DIR
100 size_t bootstrapNextIdx{0}; // auto-assigned index counter for BOOTSTRAP
101 std::vector<std::string> pendingReadyIdentities; // READY messages consumed while waiting for ACK
102};
103
104// --- Private Increment Logic ---
106{
107 for (int i = fNumDimensions - 1; i >= 0; --i) {
108 fCurrentCoords[i]++;
109 if (fCurrentCoords[i] <= fMaxBounds[i]) {
110 return true;
111 }
113 }
114 return false;
115}
116
117NDimensionalExecutor::NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
118 : fMinBounds(minBounds), fMaxBounds(maxBounds)
119{
123
124 if (fMinBounds.size() != fMaxBounds.size()) {
125 throw std::invalid_argument("Min and max bounds vectors must have the same size.");
126 }
127 if (fMinBounds.empty()) {
128 throw std::invalid_argument("Bounds vectors cannot be empty.");
129 }
130
131 fNumDimensions = fMinBounds.size();
132
133 for (size_t i = 0; i < fNumDimensions; ++i) {
134 if (fMinBounds[i] > fMaxBounds[i]) {
135 throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
136 ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
137 ") for dimension " + std::to_string(i));
138 }
139 }
140
143}
144
145void NDimensionalExecutor::SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
146{
147 fMinBounds = minBounds;
148 fMaxBounds = maxBounds;
149
150 if (fMinBounds.size() != fMaxBounds.size()) {
151 throw std::invalid_argument("Min and max bounds vectors must have the same size.");
152 }
153 if (fMinBounds.empty()) {
154 throw std::invalid_argument("Bounds vectors cannot be empty.");
155 }
156
157 fNumDimensions = fMinBounds.size();
158 for (size_t i = 0; i < fNumDimensions; ++i) {
159 if (fMinBounds[i] > fMaxBounds[i]) {
160 throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
161 ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
162 ") for dimension " + std::to_string(i));
163 }
164 }
165
168}
169
170NDimensionalExecutor::NDimensionalExecutor(THnSparse * hist, bool onlyfilled)
171{
175 if (hist == nullptr) {
176 throw std::invalid_argument("THnSparse pointer cannot be null.");
177 }
178
179 if (onlyfilled) {
180 // Check if the histogram is filled
181 if (hist->GetNbins() <= 0) {
182 throw std::invalid_argument("THnSparse histogram is empty.");
183 }
184
185 fMinBounds.push_back(0);
186 fMaxBounds.push_back(hist->GetNbins());
187 }
188 else {
189 // loop over all dimensions
190 for (int i = 0; i < hist->GetNdimensions(); ++i) {
191 fMinBounds.push_back(0);
192 fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
193 }
194 }
195 fNumDimensions = fMinBounds.size();
198}
199
200NDimensionalExecutor::~NDimensionalExecutor() = default;
201
202void NDimensionalExecutor::Execute(const std::function<void(const std::vector<int> & coords)> & func)
203{
207
208 if (fNumDimensions == 0) {
209 return;
210 }
211 fCurrentCoords = fMinBounds; // Reset state
212 do {
213 func(fCurrentCoords);
214 } while (Increment());
215}
216
217// --- Template Implementation for ExecuteParallel ---
224template <typename TObject>
226 const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
227 std::vector<TObject> & thread_objects)
228{
229 if (fNumDimensions == 0) {
230 return;
231 }
232 size_t threads_to_use = thread_objects.size();
233 if (threads_to_use == 0) {
234 throw std::invalid_argument("Thread objects vector cannot be empty.");
235 }
236
237 std::vector<std::thread> workers;
238 std::queue<std::function<void(TObject &)>> tasks;
239 std::mutex queue_mutex;
240 std::condition_variable condition_producer;
241 std::condition_variable condition_consumer;
242 std::atomic<size_t> active_tasks = 0;
243 std::atomic<bool> stop_pool = false;
244 // Optional: Store first exception encountered in workers
245 std::exception_ptr first_exception = nullptr;
246 std::mutex exception_mutex;
247
248 // Worker thread logic: fetch and execute tasks, handle exceptions, signal completion.
249 auto worker_logic = [&](TObject & my_object) {
250 NThreadData * md = (NThreadData *)&my_object;
251
252 std::ostringstream oss;
253 oss << "wk_" << std::setw(3) << std::setfill('0') << md->GetAssignedIndex();
254
255 NLogger::SetThreadName(oss.str());
256 while (true) {
257 std::function<void(TObject &)> task_payload;
258 bool task_acquired = false; // Track if we actually got a task this iteration
259
260 try {
261 { // Lock scope for queue access
262 std::unique_lock<std::mutex> lock(queue_mutex);
263 condition_producer.wait(lock, [&] { return stop_pool || !tasks.empty(); });
264
265 // Check stop condition *after* waking up
266 if (stop_pool && tasks.empty()) {
267 break; // Exit the while loop normally
268 }
269 // If stopping but tasks remain, continue processing them
270
271 // Only proceed if not stopping or if tasks are still present
272 if (!tasks.empty()) {
273 task_payload = std::move(tasks.front());
274 tasks.pop();
275 task_acquired = true; // We got a task
276 }
277 else {
278 // Spurious wakeup or stop_pool=true with empty queue
279 continue; // Go back to wait
280 }
281 } // Mutex unlocked
282
283 // Execute the task if we acquired one
284 if (task_acquired) {
285 task_payload(my_object); // Execute task with assigned object
286 }
287 }
288 catch (...) {
289 // --- Exception Handling ---
290 { // Lock to safely store the first exception
291 std::lock_guard<std::mutex> lock(exception_mutex);
292 if (!first_exception) {
293 first_exception = std::current_exception(); // Store it
294 }
295 }
296 // Signal pool to stop immediately on any error
297 {
298 std::unique_lock<std::mutex> lock(queue_mutex);
299 stop_pool = true;
300 }
301 condition_producer.notify_all(); // Wake all threads to check stop flag
302
303 // *** Crucial Fix: Decrement active_tasks even on exception ***
304 // Check if we actually acquired a task before decrementing
305 if (task_acquired) {
306 if (--active_tasks == 0 && stop_pool) {
307 // Also notify consumer here in case this was the last task
308 condition_consumer.notify_one();
309 }
310 }
311 // Decide whether to exit the worker or try processing remaining tasks
312 // For simplicity, let's exit the worker on error.
313 return; // Exit worker thread immediately on error
314 }
315
316 // --- Normal Task Completion ---
317 // Decrement active task count *after* successful execution
318 // Check if we actually acquired and processed a task
319 if (task_acquired) {
320 if (--active_tasks == 0 && stop_pool) {
321 condition_consumer.notify_one();
322 }
323 }
324 } // End of while loop
325 }; // End of worker_logic lambda
326
327 // --- Start Worker Threads ---
328 workers.reserve(threads_to_use);
329 for (size_t i = 0; i < threads_to_use; ++i) {
330 workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
331 }
332
333 // --- Main Thread: Iterate and Enqueue Tasks ---
334 try {
336 do {
337 // Check if pool was stopped prematurely (e.g., by an exception in a worker)
338 // Lock needed to safely check stop_pool
339 {
340 std::unique_lock<std::mutex> lock(queue_mutex);
341 if (stop_pool) break;
342 }
343
344 std::vector<int> coords_copy = fCurrentCoords;
345 {
346 std::unique_lock<std::mutex> lock(queue_mutex);
347 // Double check stop_pool after acquiring lock
348 if (stop_pool) break;
349
350 active_tasks++;
351 tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
352 }
353 condition_producer.notify_one();
354 } while (Increment());
355 }
356 catch (...) {
357 // Exception during iteration/enqueueing
358 {
359 std::unique_lock<std::mutex> lock(queue_mutex);
360 stop_pool = true; // Signal workers to stop
361 if (!first_exception) { // Store exception if none from workers yet
362 first_exception = std::current_exception();
363 }
364 }
365 condition_producer.notify_all();
366 // Proceed to join threads
367 }
368
369 // --- Signal Workers to Stop (if not already stopped by error) ---
370 {
371 std::unique_lock<std::mutex> lock(queue_mutex);
372 stop_pool = true;
373 }
374 condition_producer.notify_all();
375
376 // --- Wait for Tasks to Complete ---
377 {
378 std::unique_lock<std::mutex> lock(queue_mutex);
379 condition_consumer.wait(lock, [&] { return stop_pool && active_tasks == 0; });
380 }
381
382 // --- Join Worker Threads ---
383 for (std::thread & worker : workers) {
384 if (worker.joinable()) {
385 worker.join();
386 }
387 }
388
389 // --- Check for and rethrow exception from workers ---
390 if (first_exception) {
391 std::rethrow_exception(first_exception);
392 }
393}
394
395size_t NDimensionalExecutor::ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects,
396 size_t processCount)
397{
398 StartProcessIpc(workerObjects, processCount);
399 try {
400 size_t acked = ExecuteCurrentBoundsProcessIpc();
401 FinishProcessIpc();
402 return acked;
403 }
404 catch (...) {
405 FinishProcessIpc();
406 throw;
407 }
408}
409
410bool NDimensionalExecutor::InitTcpWorker(const std::string & identity)
411{
412 // Extract worker index from identity string (e.g. "wk_001")
413 // Derive the prefix by stripping trailing digits from a sample identity
414 const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
415 size_t numLen = 0;
416 while (numLen < sample.size() && std::isdigit((unsigned char)sample[sample.size() - 1 - numLen]))
417 ++numLen;
418 const size_t prefixLen = sample.size() - numLen; // e.g. strlen("wk_") == 3
419 if (identity.size() <= prefixLen) return false;
420 size_t workerIdx = 0;
421 try {
422 workerIdx = std::stoul(identity.substr(prefixLen));
423 }
424 catch (...) {
425 NLogWarning("NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
426 return false;
427 }
428 if (workerIdx >= fIpcSession->maxWorkers) {
429 NLogWarning("NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
430 workerIdx, fIpcSession->maxWorkers);
431 return false;
432 }
433 if (fIpcSession->identityToWorker.count(identity)) {
434 NLogWarning("NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
435 identity.c_str());
436 return false;
437 }
438
439 const std::string sessionId = std::to_string(getpid());
440 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
441 {identity, "INIT", std::to_string(workerIdx), sessionId,
442 fIpcSession->jobDir, fIpcSession->treeName,
443 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
444 NLogError("NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
445 return false;
446 }
447
448 int initTimeoutSec = 30;
449 if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
450 try {
451 initTimeoutSec = std::max(1, std::stoi(env));
452 }
453 catch (...) {
454 NLogWarning("NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
455 initTimeoutSec);
456 }
457 }
458 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
459 bool acked = false;
460 while (!acked) {
461 std::vector<std::string> ackFrames;
462 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
463 if (errno == EAGAIN || errno == EWOULDBLOCK) {
464 if (std::chrono::steady_clock::now() > initDeadline) break;
465 continue;
466 }
467 break;
468 }
469 if (ackFrames.size() >= 2 && ackFrames[1] == "BOOTSTRAP") {
470 HandleBootstrap(ackFrames[0]);
471 continue;
472 }
473 if (ackFrames.size() >= 2 && ackFrames[1] == "READY" && ackFrames[0] != identity) {
474 if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
475 std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
476 ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
477 fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
478 }
479 continue;
480 }
481 if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] == "ACK") {
482 acked = true;
483 }
484 }
485 if (!acked) {
486 NLogError("NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
487 return false;
488 }
489
490 fIpcSession->identityToWorker[identity] = workerIdx;
491 fIpcSession->workerIdentityVec.push_back(identity);
492 NLogInfo("NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
493 identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
494 return true;
495}
496
497bool NDimensionalExecutor::HandleBootstrap(const std::string & identity)
498{
499 if (!fIpcSession || !fIpcSession->isTcp) return false;
500 const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
501 NLogDebug("NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
502 identity.c_str());
503 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
504 {identity, "CONFIG", std::to_string(assignedIdx),
505 fIpcSession->macroList, fIpcSession->tmpDir,
506 fIpcSession->tmpResultsDir,
507 fIpcSession->macroParams});
508}
509
510void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
511 const std::string & tcpBindEndpoint, const std::string & jobDir,
512 const std::string & treeName, const std::string & macroList,
513 const std::string & tmpDir, const std::string & tmpResultsDir,
514 const std::string & macroParams)
515{
516 if (workerObjects.empty()) {
517 throw std::invalid_argument("Worker objects vector cannot be empty.");
518 }
519 if (fIpcSession) {
520 throw std::runtime_error("IPC session is already active.");
521 }
522
523 const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
524 NLogInfo("NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
525 workerObjects.size(), processesToUse);
526 const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
527 std::chrono::high_resolution_clock::now().time_since_epoch())
528 .count();
529 fIpcSession = std::make_unique<IpcSession>();
530
531 const bool isTcp = !tcpBindEndpoint.empty();
532 fIpcSession->isTcp = isTcp;
533
534 if (isTcp) {
535 fIpcSession->endpoint = tcpBindEndpoint;
536 fIpcSession->endpointPath.clear();
537 } else {
538 fIpcSession->endpointPath = "/tmp/ndmspc_ipc_" + std::to_string(getpid()) + "_" + std::to_string(nowNs) + ".sock";
539 fIpcSession->endpoint = "ipc://" + fIpcSession->endpointPath;
540 ::unlink(fIpcSession->endpointPath.c_str());
541 }
542
543 fIpcSession->ctx = zmq_ctx_new();
544 if (!fIpcSession->ctx) {
545 fIpcSession.reset();
546 throw std::runtime_error("Failed to create ZeroMQ context.");
547 }
548
549 fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
550 if (!fIpcSession->router) {
551 zmq_ctx_term(fIpcSession->ctx);
552 fIpcSession.reset();
553 throw std::runtime_error("Failed to create ZeroMQ ROUTER socket.");
554 }
555
556 int timeoutMs = 1000;
557 zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
558
559 if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
560 const std::string err = zmq_strerror(zmq_errno());
561 zmq_close(fIpcSession->router);
562 zmq_ctx_term(fIpcSession->ctx);
563 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
564 fIpcSession.reset();
565 throw std::runtime_error("Failed to bind endpoint '" + fIpcSession->endpoint + "': " + err);
566 }
567
568 fIpcSession->identityToWorker.clear();
569 fIpcSession->identityToWorker.reserve(processesToUse);
570 fIpcSession->workerIdentityVec.clear();
571 fIpcSession->pendingReadyIdentities.clear();
572
573 if (!isTcp) {
574 // IPC/fork mode: pre-seed the map as before so WorkerLoop identities match
575 for (size_t i = 0; i < processesToUse; ++i) {
576 fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
577 }
578 } else {
579 // TCP mode: store context for late-joining workers
580 fIpcSession->jobDir = jobDir;
581 fIpcSession->treeName = treeName;
582 fIpcSession->workerObjects = &workerObjects;
583 fIpcSession->maxWorkers = processesToUse;
584 fIpcSession->macroList = macroList;
585 fIpcSession->tmpDir = tmpDir;
586 fIpcSession->tmpResultsDir = tmpResultsDir;
587 fIpcSession->macroParams = macroParams;
588 }
589
590 if (!isTcp) {
591 fIpcSession->childPids.assign(processesToUse, -1);
592 for (size_t i = 0; i < processesToUse; ++i) {
593 pid_t pid = fork();
594 if (pid < 0) {
595 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
596 zmq_close(fIpcSession->router);
597 zmq_ctx_term(fIpcSession->ctx);
598 ::unlink(fIpcSession->endpointPath.c_str());
599 fIpcSession.reset();
600 throw std::runtime_error("Failed to fork worker process.");
601 }
602 if (pid == 0) {
603 zmq_close(fIpcSession->router);
604 zmq_ctx_term(fIpcSession->ctx);
605 const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
606 _exit(rc == 0 ? 0 : 1);
607 }
608 fIpcSession->childPids[i] = pid;
609 }
610 }
611
612 // --- Wait for initial workers ---
613 // IPC (fork): wait for all; TCP: wait until at least one connects or timeout.
614 int readyTimeoutSec = isTcp ? 300 : 30;
615 if (isTcp) {
616 if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
617 try { readyTimeoutSec = std::stoi(env); } catch (...) {}
618 }
619 NLogInfo("NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
620 readyTimeoutSec, processesToUse);
621 }
622 const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
623
624 // Processing and merge assume a fixed worker set. In TCP mode, starting after
625 // only one READY leaves late workers partially initialised and produces
626 // incomplete or corrupt merge inputs.
627 const size_t readyTarget = processesToUse;
628
629 while (fIpcSession->workerIdentityVec.size() < readyTarget) {
630 if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
631 const std::string identity = fIpcSession->pendingReadyIdentities.front();
632 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
633 InitTcpWorker(identity);
634 continue;
635 }
636
637 std::vector<std::string> frames;
638 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
639 if (errno == EAGAIN || errno == EWOULDBLOCK) {
640 if (std::chrono::steady_clock::now() > readyDeadline) {
641 if (!isTcp) {
642 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
643 zmq_close(fIpcSession->router);
644 zmq_ctx_term(fIpcSession->ctx);
645 ::unlink(fIpcSession->endpointPath.c_str());
646 fIpcSession.reset();
647 throw std::runtime_error("Timeout while waiting for IPC workers to become ready.");
648 }
649 zmq_close(fIpcSession->router);
650 zmq_ctx_term(fIpcSession->ctx);
651 fIpcSession.reset();
652 throw std::runtime_error("Timeout: no TCP workers connected.");
653 }
654 continue;
655 }
656 if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
657 zmq_close(fIpcSession->router);
658 zmq_ctx_term(fIpcSession->ctx);
659 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
660 fIpcSession.reset();
661 throw std::runtime_error("Failed to receive READY message from worker.");
662 }
663 if (frames.size() < 2) continue;
664 const std::string & identity = frames[0];
665 const std::string & cmd = frames[1];
666 if (isTcp && cmd == "BOOTSTRAP") {
667 HandleBootstrap(identity);
668 continue;
669 }
670 if (cmd != "READY") continue;
671
672 if (isTcp) {
673 InitTcpWorker(identity);
674 } else {
675 // IPC/fork: just register (already pre-seeded)
676 if (fIpcSession->identityToWorker.count(identity)) {
677 if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
678 == fIpcSession->workerIdentityVec.end()) {
679 fIpcSession->workerIdentityVec.push_back(identity);
680 NLogInfo("NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
681 fIpcSession->workerIdentityVec.size(), processesToUse);
682 }
683 }
684 }
685 }
686
687 if (!isTcp) {
688 InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
689 } else {
690 // TCP mode: install handler with no child PIDs — just sets gIpcSigIntRequested
691 // so the dispatch loop can break cleanly and FinishProcessIpc sends STOP to workers.
692 InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
693 }
694}
695
696size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(const std::string & definitionName,
697 const std::vector<Long64_t> * definitionIds,
698 const std::function<void(size_t, size_t)> & progressCallback)
699{
700 if (!fIpcSession) {
701 throw std::runtime_error("IPC session is not active.");
702 }
703 if (fNumDimensions == 0) {
704 return 0;
705 }
706
707 // Save current definition so late-joining workers can catch up
708 fIpcSession->currentDefName = definitionName;
709 fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
710 fIpcSession->hasCurrentDefIds = (definitionIds != nullptr);
711
712 if (!definitionName.empty()) {
713 for (const auto & identity : fIpcSession->workerIdentityVec) {
714 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", definitionName})) {
715 throw std::runtime_error("Failed to send IPC SETDEF message to worker '" + identity + "'.");
716 }
717 if (definitionIds != nullptr) {
718 if (!NDimensionalIpcRunner::SendFrames(
719 fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
720 throw std::runtime_error("Failed to send IPC SETIDS message to worker '" + identity + "'.");
721 }
722 }
723 }
724 }
725
726 size_t ipcBatchSize = 1;
727 if (const char * envBatchSize = gSystem->Getenv("NDMSPC_IPC_BATCH_SIZE")) {
728 try {
729 ipcBatchSize = std::max<size_t>(1, static_cast<size_t>(std::stoll(envBatchSize)));
730 }
731 catch (...) {
732 NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
733 ipcBatchSize);
734 }
735 }
736
737 size_t nextTaskId = 0;
738 size_t dispatchMessageId = 0;
739 size_t outstanding = 0;
740 size_t outstandingMessages = 0;
741 size_t acked = 0;
742 size_t nextSchedulerLogAck = 200;
743 std::string firstError;
744 std::set<size_t> pendingTasks;
745
746 int stallTimeoutSec = 120;
747 if (const char * envStallTimeout = gSystem->Getenv("NDMSPC_IPC_STALL_TIMEOUT")) {
748 try {
749 stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
750 }
751 catch (...) {
752 NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
753 stallTimeoutSec);
754 }
755 }
756 auto lastProgress = std::chrono::steady_clock::now();
757
759 bool hasMore = true;
760
761 // maxInFlightMessages is recalculated dynamically as workers join
762 size_t totalTasks = 1;
763 for (size_t i = 0; i < fNumDimensions; ++i) {
764 totalTasks *= static_cast<size_t>(fMaxBounds[i] - fMinBounds[i] + 1);
765 }
766 auto isUserInterrupted = []() {
767 if (gIpcSigIntRequested != 0) return true;
768 return (gROOT && gROOT->IsInterrupted());
769 };
770 // Helper: send SETDEF/SETIDS catchup to a newly-joined worker
771 auto sendCatchup = [&](const std::string & identity) {
772 if (!fIpcSession->currentDefName.empty()) {
773 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", fIpcSession->currentDefName});
774 if (fIpcSession->hasCurrentDefIds) {
775 NDimensionalIpcRunner::SendFrames(
776 fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
777 }
778 }
779 };
780
781 while ((hasMore || outstanding > 0) && firstError.empty()) {
782 if (isUserInterrupted()) {
783 firstError = "Interrupted by user";
784 break;
785 }
786
787 const size_t currentWorkers = fIpcSession->workerIdentityVec.size();
788 const size_t maxInFlightMessages = std::max<size_t>(currentWorkers * 8, 8);
789
790 while (hasMore && outstandingMessages < maxInFlightMessages && firstError.empty()) {
791 if (fIpcSession->workerIdentityVec.empty()) break; // no workers yet — wait
792 const size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
793 const std::string identity = fIpcSession->workerIdentityVec[workerSlot];
794 std::vector<std::pair<size_t, std::string>> batchTasks;
795 const size_t nw = fIpcSession->workerIdentityVec.size();
796 const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
797 const size_t adaptiveBatchSize = std::max<size_t>(
798 1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
799 batchTasks.reserve(adaptiveBatchSize);
800
801 while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
802 batchTasks.emplace_back(nextTaskId, NDimensionalIpcRunner::SerializeCoords(fCurrentCoords));
803 pendingTasks.insert(nextTaskId);
804 ++nextTaskId;
805 ++outstanding;
806
807 if (!Increment()) {
808 hasMore = false;
809 }
810
811 if (isUserInterrupted()) {
812 firstError = "Interrupted by user";
813 break;
814 }
815 }
816
817 if (!firstError.empty()) {
818 break;
819 }
820
821 if (batchTasks.empty()) {
822 continue;
823 }
824
825 if (batchTasks.size() == 1) {
826 const std::string taskId = std::to_string(batchTasks[0].first);
827 const std::string coords = batchTasks[0].second;
828 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASK", taskId, coords})) {
829 firstError = "Failed to send IPC TASK message to worker '" + identity + "'.";
830 break;
831 }
832 }
833 else {
834 std::ostringstream payload;
835 for (size_t i = 0; i < batchTasks.size(); ++i) {
836 if (i != 0) payload << ';';
837 payload << batchTasks[i].first << ':' << batchTasks[i].second;
838 }
839 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASKB", payload.str()})) {
840 firstError = "Failed to send IPC TASKB message to worker '" + identity + "'.";
841 break;
842 }
843 }
844 ++dispatchMessageId;
845 ++outstandingMessages;
846 }
847
848 if (outstanding == 0 && fIpcSession->workerIdentityVec.empty()) continue;
849 if (outstanding == 0 && !hasMore) continue;
850
851 std::vector<std::string> frames;
852 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
853 if (errno == EINTR || isUserInterrupted()) {
854 firstError = "Interrupted by user";
855 break;
856 }
857 if (errno != EAGAIN && errno != EWOULDBLOCK) {
858 firstError = "Failed to receive IPC ACK/ERR from worker.";
859 break;
860 }
861
862 for (size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
863 int status = 0;
864 pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
865 if (rc == fIpcSession->childPids[i]) {
866 firstError = "Worker process " + std::to_string(i) + " exited unexpectedly.";
867 break;
868 }
869 }
870 if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
871 const auto now = std::chrono::steady_clock::now();
872 const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
873 if (stallSecs >= stallTimeoutSec) {
874 firstError = "No IPC/TCP ACK progress for " + std::to_string(stallSecs) + "s with " +
875 std::to_string(outstanding) + " pending tasks.";
876 }
877 }
878 continue;
879 }
880
881 if (frames.size() < 2) continue;
882
883 // Handle late-joining TCP worker
884 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "READY") {
885 if (InitTcpWorker(frames[0])) {
886 sendCatchup(frames[0]);
887 lastProgress = std::chrono::steady_clock::now();
888 }
889 continue;
890 }
891
892 // Handle bootstrapping worker requesting config
893 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "BOOTSTRAP") {
894 HandleBootstrap(frames[0]);
895 lastProgress = std::chrono::steady_clock::now();
896 continue;
897 }
898
899 if (frames.size() < 3) {
900 firstError = "Malformed IPC message received from worker.";
901 break;
902 }
903
904 const std::string & cmd = frames[1];
905 if (cmd == "ACK") {
906 size_t taskId = 0;
907 try {
908 taskId = static_cast<size_t>(std::stoull(frames[2]));
909 }
910 catch (...) {
911 firstError = "Malformed IPC task id received from worker.";
912 break;
913 }
914
915 if (pendingTasks.find(taskId) == pendingTasks.end()) {
916 firstError = "Received duplicate or unknown IPC task id " + std::to_string(taskId) + ".";
917 break;
918 }
919
920 pendingTasks.erase(taskId);
921 if (outstanding == 0) {
922 firstError = "IPC outstanding counter underflow while processing ACK.";
923 break;
924 }
925 if (outstandingMessages == 0) {
926 firstError = "IPC message outstanding counter underflow while processing ACK.";
927 break;
928 }
929 --outstanding;
930 --outstandingMessages;
931 ++acked;
932 lastProgress = std::chrono::steady_clock::now();
933 const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
934 if (progressCallback) {
935 progressCallback(acked, activeWorkersNow);
936 }
937 if (acked >= nextSchedulerLogAck) {
938 NLogDebug("NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
939 acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
940 nextSchedulerLogAck += 200;
941 }
942 continue;
943 }
944
945 if (cmd == "ACKB") {
946 if (frames.size() < 3 || frames[2].empty()) {
947 firstError = "Malformed IPC ACKB payload received from worker.";
948 break;
949 }
950
951 if (outstandingMessages == 0) {
952 firstError = "IPC message outstanding counter underflow while processing ACKB.";
953 break;
954 }
955 --outstandingMessages;
956
957 std::stringstream ackStream(frames[2]);
958 std::string ackToken;
959 while (std::getline(ackStream, ackToken, ',')) {
960 if (ackToken.empty()) continue;
961 size_t ackTaskId = 0;
962 try {
963 ackTaskId = static_cast<size_t>(std::stoull(ackToken));
964 }
965 catch (...) {
966 firstError = "Malformed IPC ACKB task id received from worker.";
967 break;
968 }
969
970 if (pendingTasks.find(ackTaskId) == pendingTasks.end()) {
971 firstError = "Received duplicate or unknown IPC task id " + std::to_string(ackTaskId) + ".";
972 break;
973 }
974
975 pendingTasks.erase(ackTaskId);
976 if (outstanding == 0) {
977 firstError = "IPC outstanding counter underflow while processing ACKB.";
978 break;
979 }
980 --outstanding;
981 ++acked;
982 lastProgress = std::chrono::steady_clock::now();
983 const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
984 if (progressCallback) {
985 progressCallback(acked, activeWorkersNow);
986 }
987 if (acked >= nextSchedulerLogAck) {
988 NLogDebug(
989 "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
990 acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
991 nextSchedulerLogAck += 200;
992 }
993 }
994
995 if (!firstError.empty()) {
996 break;
997 }
998 continue;
999 }
1000
1001 if (cmd == "ERR") {
1002 size_t taskId = 0;
1003 try {
1004 taskId = static_cast<size_t>(std::stoull(frames[2]));
1005 }
1006 catch (...) {
1007 firstError = "Malformed IPC task id received from worker.";
1008 break;
1009 }
1010 std::string errMsg = (frames.size() >= 4) ? frames[3] : "worker error";
1011 firstError = "Worker reported error for task " + std::to_string(taskId) + ": " + errMsg;
1012 break;
1013 }
1014
1015 firstError = "Unknown IPC command from worker: " + cmd;
1016 break;
1017 }
1018
1019 if (!firstError.empty()) {
1020 throw std::runtime_error(firstError);
1021 }
1022
1023 if (!pendingTasks.empty()) {
1024 throw std::runtime_error("IPC execution finished with pending tasks still unacknowledged.");
1025 }
1026
1027 return acked;
1028}
1029
1030void NDimensionalExecutor::FinishProcessIpc(bool abort)
1031{
1032 if (!fIpcSession) {
1033 return;
1034 }
1035
1036 const std::string stopReason = abort ? "abort" : "ok";
1037 for (const auto & it : fIpcSession->identityToWorker) {
1038 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first, "STOP", stopReason});
1039 }
1040
1041 if (!fIpcSession->isTcp) {
1042 const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1043 if (!exitedCleanly) {
1044 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1045 }
1046 } else if (!abort) {
1047 // TCP mode normal finish: wait for DONE from all workers before merging
1048 const size_t nWorkers = fIpcSession->identityToWorker.size();
1049 std::set<std::string> doneWorkers;
1050 const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1051 while (doneWorkers.size() < nWorkers) {
1052 const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1053 if (remaining <= 0) {
1054 NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1055 doneWorkers.size(), nWorkers);
1056 break;
1057 }
1058 zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1059 const int rc = zmq_poll(&item, 1, static_cast<long>(remaining));
1060 if (rc <= 0) {
1061 NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1062 doneWorkers.size(), nWorkers);
1063 break;
1064 }
1065 std::vector<std::string> frames;
1066 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2) continue;
1067 if (frames[1] == "DONE") {
1068 doneWorkers.insert(frames[0]);
1069 NLogDebug("NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", frames[0].c_str(),
1070 doneWorkers.size(), nWorkers);
1071 } else if (frames[1] == "READY") {
1072 // Worker arrived after all tasks were dispatched — send STOP immediately.
1073 NLogInfo("NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1074 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0], "STOP", "ok"});
1075 } else if (frames[1] == "BOOTSTRAP") {
1076 // Worker is still bootstrapping — reply with CONFIG and a STOP will follow via READY.
1077 HandleBootstrap(frames[0]);
1078 }
1079 }
1080 }
1081 // Bound socket close time to avoid hangs at process end when peers disappear
1082 // or when STOP/DONE frames are still queued.
1083 if (fIpcSession->router) {
1084 int lingerMs = 0;
1085 if (fIpcSession->isTcp && abort) {
1086 // In TCP abort, allow a short grace period to flush STOP frames.
1087 lingerMs = 2000;
1088 }
1089 zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs, sizeof(lingerMs));
1090 }
1091
1092 if (fIpcSession->router) {
1093 zmq_close(fIpcSession->router);
1094 }
1095 if (fIpcSession->ctx) {
1096 zmq_ctx_term(fIpcSession->ctx);
1097 }
1098 if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1099 ::unlink(fIpcSession->endpointPath.c_str());
1100 }
1101 // Capture registered worker indices before releasing the session.
1103 if (fIpcSession) {
1104 for (const auto & kv : fIpcSession->identityToWorker) {
1105 fRegisteredWorkerIndices.insert(kv.second);
1106 }
1107 }
1108 RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1109 fIpcSession.reset();
1110}
1111
1113 const std::function<void(const std::vector<int> & coords, NGnThreadData & thread_object)> & func,
1114 std::vector<NGnThreadData> & thread_objects);
1115
1116} // namespace Ndmspc
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Thread-local data object for NDMSPC processing.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition NLogger.cxx:132
Thread-local data object for NDMSPC processing.
Definition NThreadData.h:21
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Definition NThreadData.h:96
Global callback function for libwebsockets client events.