ndmspc  v1.2.0-0.1.rc3
NDimensionalExecutor.cxx
1 #include <string>
2 #include <algorithm>
3 #include <cctype>
4 #include <chrono>
5 #include <cerrno>
6 #include <csignal>
7 #include <set>
8 #include <sstream>
9 #include <unordered_map>
10 #include <sys/wait.h>
11 #include <unistd.h>
12 #include <zmq.h>
13 #include <THnSparse.h>
14 #include <TAxis.h>
15 #include <TROOT.h>
16 #include <TSystem.h>
17 #include "NDimensionalExecutor.h"
18 #include "NDimensionalIpcRunner.h"
19 #include "NGnThreadData.h"
20 #include "NUtils.h"
21 
22 namespace Ndmspc {
23 
24 namespace {
25 volatile sig_atomic_t gIpcSigIntRequested = 0;
26 volatile sig_atomic_t gIpcChildCount = 0;
27 pid_t gIpcChildPids[1024] = {0};
28 
29 void IpcSigIntHandler(int)
30 {
31  gIpcSigIntRequested = 1;
32  const sig_atomic_t count = gIpcChildCount;
33  for (sig_atomic_t i = 0; i < count; ++i) {
34  if (gIpcChildPids[i] > 0) {
35  // User interrupt should not leave orphan IPC workers.
36  kill(gIpcChildPids[i], SIGKILL);
37  }
38  }
39 }
40 
41 void InstallIpcSigIntHandler(const std::vector<pid_t> & childPids, struct sigaction & oldAction, bool & hasOldAction)
42 {
43  gIpcSigIntRequested = 0;
44  gIpcChildCount = 0;
45  const size_t maxChildren = sizeof(gIpcChildPids) / sizeof(gIpcChildPids[0]);
46  const size_t count = std::min(maxChildren, childPids.size());
47  for (size_t i = 0; i < count; ++i) {
48  gIpcChildPids[i] = childPids[i];
49  }
50  for (size_t i = count; i < maxChildren; ++i) {
51  gIpcChildPids[i] = 0;
52  }
53  gIpcChildCount = static_cast<sig_atomic_t>(count);
54 
55  struct sigaction sa;
56  memset(&sa, 0, sizeof(sa));
57  sa.sa_handler = IpcSigIntHandler;
58  sigemptyset(&sa.sa_mask);
59  sa.sa_flags = 0;
60 
61  if (sigaction(SIGINT, &sa, &oldAction) == 0) {
62  hasOldAction = true;
63  }
64 }
65 
66 void RestoreIpcSigIntHandler(const struct sigaction & oldAction, bool hasOldAction)
67 {
68  if (hasOldAction) {
69  sigaction(SIGINT, &oldAction, nullptr);
70  }
71  gIpcChildCount = 0;
72  gIpcSigIntRequested = 0;
73 }
74 } // namespace
75 
77  void * ctx{nullptr};
78  void * router{nullptr};
79  bool isTcp{false};
80  std::string endpointPath;
81  std::string endpoint;
82  std::vector<pid_t> childPids;
83  std::unordered_map<std::string, size_t> identityToWorker;
84  std::vector<std::string> workerIdentityVec; // ordered list for round-robin
85  // TCP late-joiner support: stored so new workers can be initialised mid-run
86  std::string jobDir;
87  std::string treeName;
88  std::vector<NThreadData *> * workerObjects{nullptr};
89  size_t maxWorkers{0};
90  std::string currentDefName;
91  std::vector<Long64_t> currentDefIds;
92  bool hasCurrentDefIds{false};
93  struct sigaction oldSigIntAction{};
94  bool hasOldSigIntAction{false};
95  // Bootstrap configuration sent to workers on first contact
96  std::string macroList; // comma-separated macro paths to load on worker
97  std::string tmpDir; // supervisor's NDMSPC_TMP_DIR (fallback for workers)
98  std::string tmpResultsDir; // supervisor's NDMSPC_TMP_RESULTS_DIR
99  size_t bootstrapNextIdx{0}; // auto-assigned index counter for BOOTSTRAP
100  std::vector<std::string> pendingReadyIdentities; // READY messages consumed while waiting for ACK
101 };
102 
103 // --- Private Increment Logic ---
105 {
106  for (int i = fNumDimensions - 1; i >= 0; --i) {
107  fCurrentCoords[i]++;
108  if (fCurrentCoords[i] <= fMaxBounds[i]) {
109  return true;
110  }
111  fCurrentCoords[i] = fMinBounds[i];
112  }
113  return false;
114 }
115 
116 NDimensionalExecutor::NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
117  : fMinBounds(minBounds), fMaxBounds(maxBounds)
118 {
122 
123  if (fMinBounds.size() != fMaxBounds.size()) {
124  throw std::invalid_argument("Min and max bounds vectors must have the same size.");
125  }
126  if (fMinBounds.empty()) {
127  throw std::invalid_argument("Bounds vectors cannot be empty.");
128  }
129 
130  fNumDimensions = fMinBounds.size();
131 
132  for (size_t i = 0; i < fNumDimensions; ++i) {
133  if (fMinBounds[i] > fMaxBounds[i]) {
134  throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
135  ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
136  ") for dimension " + std::to_string(i));
137  }
138  }
139 
142 }
143 
144 void NDimensionalExecutor::SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds)
145 {
146  fMinBounds = minBounds;
147  fMaxBounds = maxBounds;
148 
149  if (fMinBounds.size() != fMaxBounds.size()) {
150  throw std::invalid_argument("Min and max bounds vectors must have the same size.");
151  }
152  if (fMinBounds.empty()) {
153  throw std::invalid_argument("Bounds vectors cannot be empty.");
154  }
155 
156  fNumDimensions = fMinBounds.size();
157  for (size_t i = 0; i < fNumDimensions; ++i) {
158  if (fMinBounds[i] > fMaxBounds[i]) {
159  throw std::invalid_argument("Min bound (" + std::to_string(fMinBounds[i]) +
160  ") cannot be greater than max bound (" + std::to_string(fMaxBounds[i]) +
161  ") for dimension " + std::to_string(i));
162  }
163  }
164 
167 }
168 
169 NDimensionalExecutor::NDimensionalExecutor(THnSparse * hist, bool onlyfilled)
170 {
174  if (hist == nullptr) {
175  throw std::invalid_argument("THnSparse pointer cannot be null.");
176  }
177 
178  if (onlyfilled) {
179  // Check if the histogram is filled
180  if (hist->GetNbins() <= 0) {
181  throw std::invalid_argument("THnSparse histogram is empty.");
182  }
183 
184  fMinBounds.push_back(0);
185  fMaxBounds.push_back(hist->GetNbins());
186  }
187  else {
188  // loop over all dimensions
189  for (int i = 0; i < hist->GetNdimensions(); ++i) {
190  fMinBounds.push_back(0);
191  fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
192  }
193  }
194  fNumDimensions = fMinBounds.size();
197 }
198 
199 NDimensionalExecutor::~NDimensionalExecutor() = default;
200 
201 void NDimensionalExecutor::Execute(const std::function<void(const std::vector<int> & coords)> & func)
202 {
206 
207  if (fNumDimensions == 0) {
208  return;
209  }
210  fCurrentCoords = fMinBounds; // Reset state
211  do {
212  func(fCurrentCoords);
213  } while (Increment());
214 }
215 
216 // --- Template Implementation for ExecuteParallel ---
223 template <typename TObject>
225  const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
226  std::vector<TObject> & thread_objects)
227 {
228  if (fNumDimensions == 0) {
229  return;
230  }
231  size_t threads_to_use = thread_objects.size();
232  if (threads_to_use == 0) {
233  throw std::invalid_argument("Thread objects vector cannot be empty.");
234  }
235 
236  std::vector<std::thread> workers;
237  std::queue<std::function<void(TObject &)>> tasks;
238  std::mutex queue_mutex;
239  std::condition_variable condition_producer;
240  std::condition_variable condition_consumer;
241  std::atomic<size_t> active_tasks = 0;
242  std::atomic<bool> stop_pool = false;
243  // Optional: Store first exception encountered in workers
244  std::exception_ptr first_exception = nullptr;
245  std::mutex exception_mutex;
246 
247  // Worker thread logic: fetch and execute tasks, handle exceptions, signal completion.
248  auto worker_logic = [&](TObject & my_object) {
249  NThreadData * md = (NThreadData *)&my_object;
250 
251  std::ostringstream oss;
252  oss << "wk_" << std::setw(3) << std::setfill('0') << md->GetAssignedIndex();
253 
254  NLogger::SetThreadName(oss.str());
255  while (true) {
256  std::function<void(TObject &)> task_payload;
257  bool task_acquired = false; // Track if we actually got a task this iteration
258 
259  try {
260  { // Lock scope for queue access
261  std::unique_lock<std::mutex> lock(queue_mutex);
262  condition_producer.wait(lock, [&] { return stop_pool || !tasks.empty(); });
263 
264  // Check stop condition *after* waking up
265  if (stop_pool && tasks.empty()) {
266  break; // Exit the while loop normally
267  }
268  // If stopping but tasks remain, continue processing them
269 
270  // Only proceed if not stopping or if tasks are still present
271  if (!tasks.empty()) {
272  task_payload = std::move(tasks.front());
273  tasks.pop();
274  task_acquired = true; // We got a task
275  }
276  else {
277  // Spurious wakeup or stop_pool=true with empty queue
278  continue; // Go back to wait
279  }
280  } // Mutex unlocked
281 
282  // Execute the task if we acquired one
283  if (task_acquired) {
284  task_payload(my_object); // Execute task with assigned object
285  }
286  }
287  catch (...) {
288  // --- Exception Handling ---
289  { // Lock to safely store the first exception
290  std::lock_guard<std::mutex> lock(exception_mutex);
291  if (!first_exception) {
292  first_exception = std::current_exception(); // Store it
293  }
294  }
295  // Signal pool to stop immediately on any error
296  {
297  std::unique_lock<std::mutex> lock(queue_mutex);
298  stop_pool = true;
299  }
300  condition_producer.notify_all(); // Wake all threads to check stop flag
301 
302  // *** Crucial Fix: Decrement active_tasks even on exception ***
303  // Check if we actually acquired a task before decrementing
304  if (task_acquired) {
305  if (--active_tasks == 0 && stop_pool) {
306  // Also notify consumer here in case this was the last task
307  condition_consumer.notify_one();
308  }
309  }
310  // Decide whether to exit the worker or try processing remaining tasks
311  // For simplicity, let's exit the worker on error.
312  return; // Exit worker thread immediately on error
313  }
314 
315  // --- Normal Task Completion ---
316  // Decrement active task count *after* successful execution
317  // Check if we actually acquired and processed a task
318  if (task_acquired) {
319  if (--active_tasks == 0 && stop_pool) {
320  condition_consumer.notify_one();
321  }
322  }
323  } // End of while loop
324  }; // End of worker_logic lambda
325 
326  // --- Start Worker Threads ---
327  workers.reserve(threads_to_use);
328  for (size_t i = 0; i < threads_to_use; ++i) {
329  workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
330  }
331 
332  // --- Main Thread: Iterate and Enqueue Tasks ---
333  try {
335  do {
336  // Check if pool was stopped prematurely (e.g., by an exception in a worker)
337  // Lock needed to safely check stop_pool
338  {
339  std::unique_lock<std::mutex> lock(queue_mutex);
340  if (stop_pool) break;
341  }
342 
343  std::vector<int> coords_copy = fCurrentCoords;
344  {
345  std::unique_lock<std::mutex> lock(queue_mutex);
346  // Double check stop_pool after acquiring lock
347  if (stop_pool) break;
348 
349  active_tasks++;
350  tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
351  }
352  condition_producer.notify_one();
353  } while (Increment());
354  }
355  catch (...) {
356  // Exception during iteration/enqueueing
357  {
358  std::unique_lock<std::mutex> lock(queue_mutex);
359  stop_pool = true; // Signal workers to stop
360  if (!first_exception) { // Store exception if none from workers yet
361  first_exception = std::current_exception();
362  }
363  }
364  condition_producer.notify_all();
365  // Proceed to join threads
366  }
367 
368  // --- Signal Workers to Stop (if not already stopped by error) ---
369  {
370  std::unique_lock<std::mutex> lock(queue_mutex);
371  stop_pool = true;
372  }
373  condition_producer.notify_all();
374 
375  // --- Wait for Tasks to Complete ---
376  {
377  std::unique_lock<std::mutex> lock(queue_mutex);
378  condition_consumer.wait(lock, [&] { return stop_pool && active_tasks == 0; });
379  }
380 
381  // --- Join Worker Threads ---
382  for (std::thread & worker : workers) {
383  if (worker.joinable()) {
384  worker.join();
385  }
386  }
387 
388  // --- Check for and rethrow exception from workers ---
389  if (first_exception) {
390  std::rethrow_exception(first_exception);
391  }
392 }
393 
394 size_t NDimensionalExecutor::ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects,
395  size_t processCount)
396 {
397  StartProcessIpc(workerObjects, processCount);
398  try {
399  size_t acked = ExecuteCurrentBoundsProcessIpc();
400  FinishProcessIpc();
401  return acked;
402  }
403  catch (...) {
404  FinishProcessIpc();
405  throw;
406  }
407 }
408 
409 bool NDimensionalExecutor::InitTcpWorker(const std::string & identity)
410 {
411  // Extract worker index from identity string (e.g. "wk_001")
412  // Derive the prefix by stripping trailing digits from a sample identity
413  const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
414  size_t numLen = 0;
415  while (numLen < sample.size() && std::isdigit((unsigned char)sample[sample.size() - 1 - numLen]))
416  ++numLen;
417  const size_t prefixLen = sample.size() - numLen; // e.g. strlen("wk_") == 3
418  if (identity.size() <= prefixLen) return false;
419  size_t workerIdx = 0;
420  try {
421  workerIdx = std::stoul(identity.substr(prefixLen));
422  }
423  catch (...) {
424  NLogWarning("NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
425  return false;
426  }
427  if (workerIdx >= fIpcSession->maxWorkers) {
428  NLogWarning("NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
429  workerIdx, fIpcSession->maxWorkers);
430  return false;
431  }
432  if (fIpcSession->identityToWorker.count(identity)) {
433  NLogWarning("NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
434  identity.c_str());
435  return false;
436  }
437 
438  const std::string sessionId = std::to_string(getpid());
439  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
440  {identity, "INIT", std::to_string(workerIdx), sessionId,
441  fIpcSession->jobDir, fIpcSession->treeName,
442  fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
443  NLogError("NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
444  return false;
445  }
446 
447  int initTimeoutSec = 30;
448  if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
449  try {
450  initTimeoutSec = std::max(1, std::stoi(env));
451  }
452  catch (...) {
453  NLogWarning("NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
454  initTimeoutSec);
455  }
456  }
457  const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
458  bool acked = false;
459  while (!acked) {
460  std::vector<std::string> ackFrames;
461  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
462  if (errno == EAGAIN || errno == EWOULDBLOCK) {
463  if (std::chrono::steady_clock::now() > initDeadline) break;
464  continue;
465  }
466  break;
467  }
468  if (ackFrames.size() >= 2 && ackFrames[1] == "BOOTSTRAP") {
469  HandleBootstrap(ackFrames[0]);
470  continue;
471  }
472  if (ackFrames.size() >= 2 && ackFrames[1] == "READY" && ackFrames[0] != identity) {
473  if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
474  std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
475  ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
476  fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
477  }
478  continue;
479  }
480  if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] == "ACK") {
481  acked = true;
482  }
483  }
484  if (!acked) {
485  NLogError("NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
486  return false;
487  }
488 
489  fIpcSession->identityToWorker[identity] = workerIdx;
490  fIpcSession->workerIdentityVec.push_back(identity);
491  NLogInfo("NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
492  identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
493  return true;
494 }
495 
496 bool NDimensionalExecutor::HandleBootstrap(const std::string & identity)
497 {
498  if (!fIpcSession || !fIpcSession->isTcp) return false;
499  const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
500  NLogDebug("NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
501  identity.c_str());
502  return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
503  {identity, "CONFIG", std::to_string(assignedIdx),
504  fIpcSession->macroList, fIpcSession->tmpDir,
505  fIpcSession->tmpResultsDir});
506 }
507 
508 void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
509  const std::string & tcpBindEndpoint, const std::string & jobDir,
510  const std::string & treeName, const std::string & macroList,
511  const std::string & tmpDir, const std::string & tmpResultsDir)
512 {
513  if (workerObjects.empty()) {
514  throw std::invalid_argument("Worker objects vector cannot be empty.");
515  }
516  if (fIpcSession) {
517  throw std::runtime_error("IPC session is already active.");
518  }
519 
520  const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
521  NLogInfo("NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
522  workerObjects.size(), processesToUse);
523  const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
524  std::chrono::high_resolution_clock::now().time_since_epoch())
525  .count();
526  fIpcSession = std::make_unique<IpcSession>();
527 
528  const bool isTcp = !tcpBindEndpoint.empty();
529  fIpcSession->isTcp = isTcp;
530 
531  if (isTcp) {
532  fIpcSession->endpoint = tcpBindEndpoint;
533  fIpcSession->endpointPath.clear();
534  } else {
535  fIpcSession->endpointPath = "/tmp/ndmspc_ipc_" + std::to_string(getpid()) + "_" + std::to_string(nowNs) + ".sock";
536  fIpcSession->endpoint = "ipc://" + fIpcSession->endpointPath;
537  ::unlink(fIpcSession->endpointPath.c_str());
538  }
539 
540  fIpcSession->ctx = zmq_ctx_new();
541  if (!fIpcSession->ctx) {
542  fIpcSession.reset();
543  throw std::runtime_error("Failed to create ZeroMQ context.");
544  }
545 
546  fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
547  if (!fIpcSession->router) {
548  zmq_ctx_term(fIpcSession->ctx);
549  fIpcSession.reset();
550  throw std::runtime_error("Failed to create ZeroMQ ROUTER socket.");
551  }
552 
553  int timeoutMs = 1000;
554  zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
555 
556  if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
557  const std::string err = zmq_strerror(zmq_errno());
558  zmq_close(fIpcSession->router);
559  zmq_ctx_term(fIpcSession->ctx);
560  if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
561  fIpcSession.reset();
562  throw std::runtime_error("Failed to bind endpoint '" + fIpcSession->endpoint + "': " + err);
563  }
564 
565  fIpcSession->identityToWorker.clear();
566  fIpcSession->identityToWorker.reserve(processesToUse);
567  fIpcSession->workerIdentityVec.clear();
568  fIpcSession->pendingReadyIdentities.clear();
569 
570  if (!isTcp) {
571  // IPC/fork mode: pre-seed the map as before so WorkerLoop identities match
572  for (size_t i = 0; i < processesToUse; ++i) {
573  fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
574  }
575  } else {
576  // TCP mode: store context for late-joining workers
577  fIpcSession->jobDir = jobDir;
578  fIpcSession->treeName = treeName;
579  fIpcSession->workerObjects = &workerObjects;
580  fIpcSession->maxWorkers = processesToUse;
581  fIpcSession->macroList = macroList;
582  fIpcSession->tmpDir = tmpDir;
583  fIpcSession->tmpResultsDir = tmpResultsDir;
584  }
585 
586  if (!isTcp) {
587  fIpcSession->childPids.assign(processesToUse, -1);
588  for (size_t i = 0; i < processesToUse; ++i) {
589  pid_t pid = fork();
590  if (pid < 0) {
591  NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
592  zmq_close(fIpcSession->router);
593  zmq_ctx_term(fIpcSession->ctx);
594  ::unlink(fIpcSession->endpointPath.c_str());
595  fIpcSession.reset();
596  throw std::runtime_error("Failed to fork worker process.");
597  }
598  if (pid == 0) {
599  zmq_close(fIpcSession->router);
600  zmq_ctx_term(fIpcSession->ctx);
601  const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
602  _exit(rc == 0 ? 0 : 1);
603  }
604  fIpcSession->childPids[i] = pid;
605  }
606  }
607 
608  // --- Wait for initial workers ---
609  // IPC (fork): wait for all; TCP: wait until at least one connects or timeout.
610  int readyTimeoutSec = isTcp ? 300 : 30;
611  if (isTcp) {
612  if (const char * env = gSystem->Getenv("NDMSPC_WORKER_TIMEOUT")) {
613  try { readyTimeoutSec = std::stoi(env); } catch (...) {}
614  }
615  NLogInfo("NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
616  readyTimeoutSec, processesToUse);
617  }
618  const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
619 
620  // Processing and merge assume a fixed worker set. In TCP mode, starting after
621  // only one READY leaves late workers partially initialised and produces
622  // incomplete or corrupt merge inputs.
623  const size_t readyTarget = processesToUse;
624 
625  while (fIpcSession->workerIdentityVec.size() < readyTarget) {
626  if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
627  const std::string identity = fIpcSession->pendingReadyIdentities.front();
628  fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
629  InitTcpWorker(identity);
630  continue;
631  }
632 
633  std::vector<std::string> frames;
634  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
635  if (errno == EAGAIN || errno == EWOULDBLOCK) {
636  if (std::chrono::steady_clock::now() > readyDeadline) {
637  if (!isTcp) {
638  NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
639  zmq_close(fIpcSession->router);
640  zmq_ctx_term(fIpcSession->ctx);
641  ::unlink(fIpcSession->endpointPath.c_str());
642  fIpcSession.reset();
643  throw std::runtime_error("Timeout while waiting for IPC workers to become ready.");
644  }
645  zmq_close(fIpcSession->router);
646  zmq_ctx_term(fIpcSession->ctx);
647  fIpcSession.reset();
648  throw std::runtime_error("Timeout: no TCP workers connected.");
649  }
650  continue;
651  }
652  if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
653  zmq_close(fIpcSession->router);
654  zmq_ctx_term(fIpcSession->ctx);
655  if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
656  fIpcSession.reset();
657  throw std::runtime_error("Failed to receive READY message from worker.");
658  }
659  if (frames.size() < 2) continue;
660  const std::string & identity = frames[0];
661  const std::string & cmd = frames[1];
662  if (isTcp && cmd == "BOOTSTRAP") {
663  HandleBootstrap(identity);
664  continue;
665  }
666  if (cmd != "READY") continue;
667 
668  if (isTcp) {
669  InitTcpWorker(identity);
670  } else {
671  // IPC/fork: just register (already pre-seeded)
672  if (fIpcSession->identityToWorker.count(identity)) {
673  if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
674  == fIpcSession->workerIdentityVec.end()) {
675  fIpcSession->workerIdentityVec.push_back(identity);
676  NLogInfo("NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
677  fIpcSession->workerIdentityVec.size(), processesToUse);
678  }
679  }
680  }
681  }
682 
683  if (!isTcp) {
684  InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
685  } else {
686  // TCP mode: install handler with no child PIDs — just sets gIpcSigIntRequested
687  // so the dispatch loop can break cleanly and FinishProcessIpc sends STOP to workers.
688  InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
689  }
690 }
691 
692 size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(const std::string & definitionName,
693  const std::vector<Long64_t> * definitionIds,
694  const std::function<void(size_t, size_t)> & progressCallback)
695 {
696  if (!fIpcSession) {
697  throw std::runtime_error("IPC session is not active.");
698  }
699  if (fNumDimensions == 0) {
700  return 0;
701  }
702 
703  // Save current definition so late-joining workers can catch up
704  fIpcSession->currentDefName = definitionName;
705  fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
706  fIpcSession->hasCurrentDefIds = (definitionIds != nullptr);
707 
708  if (!definitionName.empty()) {
709  for (const auto & identity : fIpcSession->workerIdentityVec) {
710  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", definitionName})) {
711  throw std::runtime_error("Failed to send IPC SETDEF message to worker '" + identity + "'.");
712  }
713  if (definitionIds != nullptr) {
714  if (!NDimensionalIpcRunner::SendFrames(
715  fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
716  throw std::runtime_error("Failed to send IPC SETIDS message to worker '" + identity + "'.");
717  }
718  }
719  }
720  }
721 
722  size_t ipcBatchSize = 1;
723  if (const char * envBatchSize = gSystem->Getenv("NDMSPC_IPC_BATCH_SIZE")) {
724  try {
725  ipcBatchSize = std::max<size_t>(1, static_cast<size_t>(std::stoll(envBatchSize)));
726  }
727  catch (...) {
728  NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
729  ipcBatchSize);
730  }
731  }
732 
733  size_t nextTaskId = 0;
734  size_t dispatchMessageId = 0;
735  size_t outstanding = 0;
736  size_t outstandingMessages = 0;
737  size_t acked = 0;
738  size_t nextSchedulerLogAck = 200;
739  std::string firstError;
740  std::set<size_t> pendingTasks;
741 
742  int stallTimeoutSec = 120;
743  if (const char * envStallTimeout = gSystem->Getenv("NDMSPC_IPC_STALL_TIMEOUT")) {
744  try {
745  stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
746  }
747  catch (...) {
748  NLogWarning("NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
749  stallTimeoutSec);
750  }
751  }
752  auto lastProgress = std::chrono::steady_clock::now();
753 
755  bool hasMore = true;
756 
757  // maxInFlightMessages is recalculated dynamically as workers join
758  size_t totalTasks = 1;
759  for (size_t i = 0; i < fNumDimensions; ++i) {
760  totalTasks *= static_cast<size_t>(fMaxBounds[i] - fMinBounds[i] + 1);
761  }
762  auto isUserInterrupted = []() {
763  if (gIpcSigIntRequested != 0) return true;
764  return (gROOT && gROOT->IsInterrupted());
765  };
766  // Helper: send SETDEF/SETIDS catchup to a newly-joined worker
767  auto sendCatchup = [&](const std::string & identity) {
768  if (!fIpcSession->currentDefName.empty()) {
769  NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "SETDEF", fIpcSession->currentDefName});
770  if (fIpcSession->hasCurrentDefIds) {
771  NDimensionalIpcRunner::SendFrames(
772  fIpcSession->router, {identity, "SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
773  }
774  }
775  };
776 
777  while ((hasMore || outstanding > 0) && firstError.empty()) {
778  if (isUserInterrupted()) {
779  firstError = "Interrupted by user";
780  break;
781  }
782 
783  const size_t currentWorkers = fIpcSession->workerIdentityVec.size();
784  const size_t maxInFlightMessages = std::max<size_t>(currentWorkers * 8, 8);
785 
786  while (hasMore && outstandingMessages < maxInFlightMessages && firstError.empty()) {
787  if (fIpcSession->workerIdentityVec.empty()) break; // no workers yet — wait
788  const size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
789  const std::string identity = fIpcSession->workerIdentityVec[workerSlot];
790  std::vector<std::pair<size_t, std::string>> batchTasks;
791  const size_t nw = fIpcSession->workerIdentityVec.size();
792  const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
793  const size_t adaptiveBatchSize = std::max<size_t>(
794  1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
795  batchTasks.reserve(adaptiveBatchSize);
796 
797  while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
798  batchTasks.emplace_back(nextTaskId, NDimensionalIpcRunner::SerializeCoords(fCurrentCoords));
799  pendingTasks.insert(nextTaskId);
800  ++nextTaskId;
801  ++outstanding;
802 
803  if (!Increment()) {
804  hasMore = false;
805  }
806 
807  if (isUserInterrupted()) {
808  firstError = "Interrupted by user";
809  break;
810  }
811  }
812 
813  if (!firstError.empty()) {
814  break;
815  }
816 
817  if (batchTasks.empty()) {
818  continue;
819  }
820 
821  if (batchTasks.size() == 1) {
822  const std::string taskId = std::to_string(batchTasks[0].first);
823  const std::string coords = batchTasks[0].second;
824  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASK", taskId, coords})) {
825  firstError = "Failed to send IPC TASK message to worker '" + identity + "'.";
826  break;
827  }
828  }
829  else {
830  std::ostringstream payload;
831  for (size_t i = 0; i < batchTasks.size(); ++i) {
832  if (i != 0) payload << ';';
833  payload << batchTasks[i].first << ':' << batchTasks[i].second;
834  }
835  if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity, "TASKB", payload.str()})) {
836  firstError = "Failed to send IPC TASKB message to worker '" + identity + "'.";
837  break;
838  }
839  }
840  ++dispatchMessageId;
841  ++outstandingMessages;
842  }
843 
844  if (outstanding == 0 && fIpcSession->workerIdentityVec.empty()) continue;
845  if (outstanding == 0 && !hasMore) continue;
846 
847  std::vector<std::string> frames;
848  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
849  if (errno == EINTR || isUserInterrupted()) {
850  firstError = "Interrupted by user";
851  break;
852  }
853  if (errno != EAGAIN && errno != EWOULDBLOCK) {
854  firstError = "Failed to receive IPC ACK/ERR from worker.";
855  break;
856  }
857 
858  for (size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
859  int status = 0;
860  pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
861  if (rc == fIpcSession->childPids[i]) {
862  firstError = "Worker process " + std::to_string(i) + " exited unexpectedly.";
863  break;
864  }
865  }
866  if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
867  const auto now = std::chrono::steady_clock::now();
868  const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
869  if (stallSecs >= stallTimeoutSec) {
870  firstError = "No IPC/TCP ACK progress for " + std::to_string(stallSecs) + "s with " +
871  std::to_string(outstanding) + " pending tasks.";
872  }
873  }
874  continue;
875  }
876 
877  if (frames.size() < 2) continue;
878 
879  // Handle late-joining TCP worker
880  if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "READY") {
881  if (InitTcpWorker(frames[0])) {
882  sendCatchup(frames[0]);
883  lastProgress = std::chrono::steady_clock::now();
884  }
885  continue;
886  }
887 
888  // Handle bootstrapping worker requesting config
889  if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] == "BOOTSTRAP") {
890  HandleBootstrap(frames[0]);
891  lastProgress = std::chrono::steady_clock::now();
892  continue;
893  }
894 
895  if (frames.size() < 3) {
896  firstError = "Malformed IPC message received from worker.";
897  break;
898  }
899 
900  const std::string & cmd = frames[1];
901  if (cmd == "ACK") {
902  size_t taskId = 0;
903  try {
904  taskId = static_cast<size_t>(std::stoull(frames[2]));
905  }
906  catch (...) {
907  firstError = "Malformed IPC task id received from worker.";
908  break;
909  }
910 
911  if (pendingTasks.find(taskId) == pendingTasks.end()) {
912  firstError = "Received duplicate or unknown IPC task id " + std::to_string(taskId) + ".";
913  break;
914  }
915 
916  pendingTasks.erase(taskId);
917  if (outstanding == 0) {
918  firstError = "IPC outstanding counter underflow while processing ACK.";
919  break;
920  }
921  if (outstandingMessages == 0) {
922  firstError = "IPC message outstanding counter underflow while processing ACK.";
923  break;
924  }
925  --outstanding;
926  --outstandingMessages;
927  ++acked;
928  lastProgress = std::chrono::steady_clock::now();
929  const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
930  if (progressCallback) {
931  progressCallback(acked, activeWorkersNow);
932  }
933  if (acked >= nextSchedulerLogAck) {
934  NLogDebug("NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
935  acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
936  nextSchedulerLogAck += 200;
937  }
938  continue;
939  }
940 
941  if (cmd == "ACKB") {
942  if (frames.size() < 3 || frames[2].empty()) {
943  firstError = "Malformed IPC ACKB payload received from worker.";
944  break;
945  }
946 
947  if (outstandingMessages == 0) {
948  firstError = "IPC message outstanding counter underflow while processing ACKB.";
949  break;
950  }
951  --outstandingMessages;
952 
953  std::stringstream ackStream(frames[2]);
954  std::string ackToken;
955  while (std::getline(ackStream, ackToken, ',')) {
956  if (ackToken.empty()) continue;
957  size_t ackTaskId = 0;
958  try {
959  ackTaskId = static_cast<size_t>(std::stoull(ackToken));
960  }
961  catch (...) {
962  firstError = "Malformed IPC ACKB task id received from worker.";
963  break;
964  }
965 
966  if (pendingTasks.find(ackTaskId) == pendingTasks.end()) {
967  firstError = "Received duplicate or unknown IPC task id " + std::to_string(ackTaskId) + ".";
968  break;
969  }
970 
971  pendingTasks.erase(ackTaskId);
972  if (outstanding == 0) {
973  firstError = "IPC outstanding counter underflow while processing ACKB.";
974  break;
975  }
976  --outstanding;
977  ++acked;
978  lastProgress = std::chrono::steady_clock::now();
979  const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
980  if (progressCallback) {
981  progressCallback(acked, activeWorkersNow);
982  }
983  if (acked >= nextSchedulerLogAck) {
984  NLogDebug(
985  "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
986  acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
987  nextSchedulerLogAck += 200;
988  }
989  }
990 
991  if (!firstError.empty()) {
992  break;
993  }
994  continue;
995  }
996 
997  if (cmd == "ERR") {
998  size_t taskId = 0;
999  try {
1000  taskId = static_cast<size_t>(std::stoull(frames[2]));
1001  }
1002  catch (...) {
1003  firstError = "Malformed IPC task id received from worker.";
1004  break;
1005  }
1006  std::string errMsg = (frames.size() >= 4) ? frames[3] : "worker error";
1007  firstError = "Worker reported error for task " + std::to_string(taskId) + ": " + errMsg;
1008  break;
1009  }
1010 
1011  firstError = "Unknown IPC command from worker: " + cmd;
1012  break;
1013  }
1014 
1015  if (!firstError.empty()) {
1016  throw std::runtime_error(firstError);
1017  }
1018 
1019  if (!pendingTasks.empty()) {
1020  throw std::runtime_error("IPC execution finished with pending tasks still unacknowledged.");
1021  }
1022 
1023  return acked;
1024 }
1025 
1026 void NDimensionalExecutor::FinishProcessIpc(bool abort)
1027 {
1028  if (!fIpcSession) {
1029  return;
1030  }
1031 
1032  const std::string stopReason = abort ? "abort" : "ok";
1033  for (const auto & it : fIpcSession->identityToWorker) {
1034  NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first, "STOP", stopReason});
1035  }
1036 
1037  if (!fIpcSession->isTcp) {
1038  const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1039  if (!exitedCleanly) {
1040  NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1041  }
1042  } else if (!abort) {
1043  // TCP mode normal finish: wait for DONE from all workers before merging
1044  const size_t nWorkers = fIpcSession->identityToWorker.size();
1045  std::set<std::string> doneWorkers;
1046  const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1047  while (doneWorkers.size() < nWorkers) {
1048  const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1049  if (remaining <= 0) {
1050  NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1051  doneWorkers.size(), nWorkers);
1052  break;
1053  }
1054  zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1055  const int rc = zmq_poll(&item, 1, static_cast<long>(remaining));
1056  if (rc <= 0) {
1057  NLogWarning("NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1058  doneWorkers.size(), nWorkers);
1059  break;
1060  }
1061  std::vector<std::string> frames;
1062  if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2) continue;
1063  if (frames[1] == "DONE") {
1064  doneWorkers.insert(frames[0]);
1065  NLogDebug("NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", frames[0].c_str(),
1066  doneWorkers.size(), nWorkers);
1067  } else if (frames[1] == "READY") {
1068  // Worker arrived after all tasks were dispatched — send STOP immediately.
1069  NLogInfo("NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1070  NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0], "STOP", "ok"});
1071  } else if (frames[1] == "BOOTSTRAP") {
1072  // Worker is still bootstrapping — reply with CONFIG and a STOP will follow via READY.
1073  HandleBootstrap(frames[0]);
1074  }
1075  }
1076  }
1077  // Bound socket close time to avoid hangs at process end when peers disappear
1078  // or when STOP/DONE frames are still queued.
1079  if (fIpcSession->router) {
1080  int lingerMs = 0;
1081  if (fIpcSession->isTcp && abort) {
1082  // In TCP abort, allow a short grace period to flush STOP frames.
1083  lingerMs = 2000;
1084  }
1085  zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs, sizeof(lingerMs));
1086  }
1087 
1088  if (fIpcSession->router) {
1089  zmq_close(fIpcSession->router);
1090  }
1091  if (fIpcSession->ctx) {
1092  zmq_ctx_term(fIpcSession->ctx);
1093  }
1094  if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1095  ::unlink(fIpcSession->endpointPath.c_str());
1096  }
1097  // Capture registered worker indices before releasing the session.
1098  fRegisteredWorkerIndices.clear();
1099  if (fIpcSession) {
1100  for (const auto & kv : fIpcSession->identityToWorker) {
1101  fRegisteredWorkerIndices.insert(kv.second);
1102  }
1103  }
1104  RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1105  fIpcSession.reset();
1106 }
1107 
1108 template void NDimensionalExecutor::ExecuteParallel<NGnThreadData>(
1109  const std::function<void(const std::vector<int> & coords, NGnThreadData & thread_object)> & func,
1110  std::vector<NGnThreadData> & thread_objects);
1111 
1112 } // namespace Ndmspc
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition: NLogger.cxx:132
Thread-local data object for NDMSPC processing.
Definition: NThreadData.h:21
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Definition: NThreadData.h:96