11#include <unordered_map>
19#include "NDimensionalExecutor.h"
20#include "NDimensionalIpcRunner.h"
21#include "NGnThreadData.h"
27volatile sig_atomic_t gIpcSigIntRequested = 0;
28volatile sig_atomic_t gIpcChildCount = 0;
29pid_t gIpcChildPids[1024] = {0};
31void IpcSigIntHandler(
int)
33 gIpcSigIntRequested = 1;
34 const sig_atomic_t count = gIpcChildCount;
35 for (sig_atomic_t i = 0; i < count; ++i) {
36 if (gIpcChildPids[i] > 0) {
38 kill(gIpcChildPids[i], SIGKILL);
43void InstallIpcSigIntHandler(
const std::vector<pid_t> & childPids,
struct sigaction & oldAction,
bool & hasOldAction)
45 gIpcSigIntRequested = 0;
47 const size_t maxChildren =
sizeof(gIpcChildPids) /
sizeof(gIpcChildPids[0]);
48 const size_t count = std::min(maxChildren, childPids.size());
49 for (
size_t i = 0; i < count; ++i) {
50 gIpcChildPids[i] = childPids[i];
52 for (
size_t i = count; i < maxChildren; ++i) {
55 gIpcChildCount =
static_cast<sig_atomic_t
>(count);
58 memset(&sa, 0,
sizeof(sa));
59 sa.sa_handler = IpcSigIntHandler;
60 sigemptyset(&sa.sa_mask);
63 if (sigaction(SIGINT, &sa, &oldAction) == 0) {
68void RestoreIpcSigIntHandler(
const struct sigaction & oldAction,
bool hasOldAction)
71 sigaction(SIGINT, &oldAction,
nullptr);
74 gIpcSigIntRequested = 0;
80 void * router{
nullptr};
82 std::string endpointPath;
84 std::vector<pid_t> childPids;
85 std::unordered_map<std::string, size_t> identityToWorker;
86 std::vector<std::string> workerIdentityVec;
90 std::vector<NThreadData *> * workerObjects{
nullptr};
92 std::string currentDefName;
93 std::vector<Long64_t> currentDefIds;
94 bool hasCurrentDefIds{
false};
96 bool hasOldSigIntAction{
false};
98 std::string macroList;
99 std::string macroParams;
101 std::string tmpResultsDir;
102 size_t bootstrapNextIdx{0};
103 std::unordered_map<std::string, size_t> bootstrapAssignments;
104 std::vector<std::string> pendingReadyIdentities;
107 std::unordered_map<std::string, std::set<size_t>> workerTaskHistory;
108 std::set<std::string> earlyDoneWorkers;
110 std::unordered_map<std::string, std::chrono::steady_clock::time_point> workerLastActivity;
111 std::set<std::string> failedTcpWorkers;
135 throw std::invalid_argument(
"Min and max bounds vectors must have the same size.");
138 throw std::invalid_argument(
"Bounds vectors cannot be empty.");
145 throw std::invalid_argument(
"Min bound (" + std::to_string(
fMinBounds[i]) +
146 ") cannot be greater than max bound (" + std::to_string(
fMaxBounds[i]) +
147 ") for dimension " + std::to_string(i));
155void NDimensionalExecutor::SetBounds(
const std::vector<int> & minBounds,
const std::vector<int> & maxBounds)
161 throw std::invalid_argument(
"Min and max bounds vectors must have the same size.");
164 throw std::invalid_argument(
"Bounds vectors cannot be empty.");
170 throw std::invalid_argument(
"Min bound (" + std::to_string(
fMinBounds[i]) +
171 ") cannot be greater than max bound (" + std::to_string(
fMaxBounds[i]) +
172 ") for dimension " + std::to_string(i));
185 if (hist ==
nullptr) {
186 throw std::invalid_argument(
"THnSparse pointer cannot be null.");
191 if (hist->GetNbins() <= 0) {
192 throw std::invalid_argument(
"THnSparse histogram is empty.");
200 for (
int i = 0; i < hist->GetNdimensions(); ++i) {
202 fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
210NDimensionalExecutor::~NDimensionalExecutor() =
default;
234template <
typename TObject>
236 const std::function<
void(
const std::vector<int> & coords, TObject & thread_object)> & func,
237 std::vector<TObject> & thread_objects)
242 size_t threads_to_use = thread_objects.size();
243 if (threads_to_use == 0) {
244 throw std::invalid_argument(
"Thread objects vector cannot be empty.");
247 std::vector<std::thread> workers;
248 std::queue<std::function<void(TObject &)>> tasks;
249 std::mutex queue_mutex;
250 std::condition_variable condition_producer;
251 std::condition_variable condition_consumer;
252 std::atomic<size_t> active_tasks = 0;
253 std::atomic<bool> stop_pool =
false;
255 std::exception_ptr first_exception =
nullptr;
256 std::mutex exception_mutex;
259 auto worker_logic = [&](TObject & my_object) {
262 std::ostringstream oss;
263 oss <<
"wk_" << std::setw(6) << std::setfill(
'0') << md->
GetAssignedIndex();
267 std::function<void(TObject &)> task_payload;
268 bool task_acquired =
false;
272 std::unique_lock<std::mutex> lock(queue_mutex);
273 condition_producer.wait(lock, [&] {
return stop_pool || !tasks.empty(); });
276 if (stop_pool && tasks.empty()) {
282 if (!tasks.empty()) {
283 task_payload = std::move(tasks.front());
285 task_acquired =
true;
295 task_payload(my_object);
301 std::lock_guard<std::mutex> lock(exception_mutex);
302 if (!first_exception) {
303 first_exception = std::current_exception();
308 std::unique_lock<std::mutex> lock(queue_mutex);
311 condition_producer.notify_all();
316 if (--active_tasks == 0 && stop_pool) {
318 condition_consumer.notify_one();
330 if (--active_tasks == 0 && stop_pool) {
331 condition_consumer.notify_one();
338 workers.reserve(threads_to_use);
339 for (
size_t i = 0; i < threads_to_use; ++i) {
340 workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
350 std::unique_lock<std::mutex> lock(queue_mutex);
351 if (stop_pool)
break;
356 std::unique_lock<std::mutex> lock(queue_mutex);
358 if (stop_pool)
break;
361 tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
363 condition_producer.notify_one();
369 std::unique_lock<std::mutex> lock(queue_mutex);
371 if (!first_exception) {
372 first_exception = std::current_exception();
375 condition_producer.notify_all();
381 std::unique_lock<std::mutex> lock(queue_mutex);
384 condition_producer.notify_all();
388 std::unique_lock<std::mutex> lock(queue_mutex);
389 condition_consumer.wait(lock, [&] {
return stop_pool && active_tasks == 0; });
393 for (std::thread & worker : workers) {
394 if (worker.joinable()) {
400 if (first_exception) {
401 std::rethrow_exception(first_exception);
408 StartProcessIpc(workerObjects, processCount);
410 size_t acked = ExecuteCurrentBoundsProcessIpc();
424 const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
426 while (numLen < sample.size() && std::isdigit((
unsigned char)sample[sample.size() - 1 - numLen]))
428 const size_t prefixLen = sample.size() - numLen;
429 if (identity.size() <= prefixLen)
return false;
430 size_t workerIdx = 0;
432 workerIdx = std::stoul(identity.substr(prefixLen));
435 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
438 if (workerIdx >= fIpcSession->maxWorkers) {
439 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
440 workerIdx, fIpcSession->maxWorkers);
443 if (fIpcSession->identityToWorker.count(identity)) {
444 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
449 const std::string sessionId = std::to_string(getpid());
450 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
451 {identity,
"INIT", std::to_string(workerIdx), sessionId,
452 fIpcSession->jobDir, fIpcSession->treeName,
453 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
454 NLogError(
"NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
458 int initTimeoutSec = 30;
459 if (
const char * env = gSystem->Getenv(
"NDMSPC_WORKER_TIMEOUT")) {
461 initTimeoutSec = std::max(1, std::stoi(env));
464 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
468 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
471 std::vector<std::string> ackFrames;
472 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
473 if (errno == EAGAIN || errno == EWOULDBLOCK) {
474 if (std::chrono::steady_clock::now() > initDeadline)
break;
479 if (ackFrames.size() >= 2 && ackFrames[1] ==
"BOOTSTRAP") {
483 if (ackFrames.size() >= 2 && ackFrames[1] ==
"READY" && ackFrames[0] != identity) {
484 if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
485 std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
486 ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
487 fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
491 if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] ==
"ACK") {
496 NLogError(
"NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
501 if (fIpcSession->identityToWorker.count(identity)) {
502 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, replacing", identity.c_str());
504 auto it = std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity);
505 if (it != fIpcSession->workerIdentityVec.end()) {
506 fIpcSession->workerIdentityVec.erase(it);
510 fIpcSession->identityToWorker[identity] = workerIdx;
511 fIpcSession->workerIdentityVec.push_back(identity);
513 fIpcSession->workerLastActivity[identity] = std::chrono::steady_clock::now();
514 NLogInfo(
"NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
515 identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
521 if (!fIpcSession || !fIpcSession->isTcp)
return false;
524 auto existing = fIpcSession->bootstrapAssignments.find(identity);
525 if (existing != fIpcSession->bootstrapAssignments.end()) {
526 const size_t assignedIdx = existing->second;
527 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
528 {identity,
"CONFIG", std::to_string(assignedIdx),
529 fIpcSession->macroList, fIpcSession->tmpDir,
530 fIpcSession->tmpResultsDir,
531 fIpcSession->macroParams});
534 if (fIpcSession->bootstrapNextIdx >= fIpcSession->maxWorkers) {
535 NLogWarning(
"NDimensionalExecutor::HandleBootstrap: rejecting worker '%s' (capacity reached: %zu)",
536 identity.c_str(), fIpcSession->maxWorkers);
537 return NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"REJECT",
"capacity"});
540 const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
542 fIpcSession->bootstrapAssignments[identity] = assignedIdx;
544 NLogDebug(
"NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
546 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
547 {identity,
"CONFIG", std::to_string(assignedIdx),
548 fIpcSession->macroList, fIpcSession->tmpDir,
549 fIpcSession->tmpResultsDir,
550 fIpcSession->macroParams});
553void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects,
size_t processCount,
554 const std::string & tcpBindEndpoint,
const std::string & jobDir,
555 const std::string & treeName,
const std::string & macroList,
556 const std::string & tmpDir,
const std::string & tmpResultsDir,
557 const std::string & macroParams)
559 if (workerObjects.empty()) {
560 throw std::invalid_argument(
"Worker objects vector cannot be empty.");
563 throw std::runtime_error(
"IPC session is already active.");
566 const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
567 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
568 workerObjects.size(), processesToUse);
569 const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
570 std::chrono::high_resolution_clock::now().time_since_epoch())
572 fIpcSession = std::make_unique<IpcSession>();
574 const bool isTcp = !tcpBindEndpoint.empty();
575 fIpcSession->isTcp = isTcp;
578 fIpcSession->endpoint = tcpBindEndpoint;
579 fIpcSession->endpointPath.clear();
581 fIpcSession->endpointPath =
"/tmp/ndmspc_ipc_" + std::to_string(getpid()) +
"_" + std::to_string(nowNs) +
".sock";
582 fIpcSession->endpoint =
"ipc://" + fIpcSession->endpointPath;
583 ::unlink(fIpcSession->endpointPath.c_str());
586 fIpcSession->ctx = zmq_ctx_new();
587 if (!fIpcSession->ctx) {
589 throw std::runtime_error(
"Failed to create ZeroMQ context.");
592 fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
593 if (!fIpcSession->router) {
594 zmq_ctx_term(fIpcSession->ctx);
596 throw std::runtime_error(
"Failed to create ZeroMQ ROUTER socket.");
599 int timeoutMs = 1000;
600 zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
602 if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
603 const std::string err = zmq_strerror(zmq_errno());
604 zmq_close(fIpcSession->router);
605 zmq_ctx_term(fIpcSession->ctx);
606 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
608 throw std::runtime_error(
"Failed to bind endpoint '" + fIpcSession->endpoint +
"': " + err);
611 fIpcSession->identityToWorker.clear();
612 fIpcSession->identityToWorker.reserve(processesToUse);
613 fIpcSession->workerIdentityVec.clear();
614 fIpcSession->pendingReadyIdentities.clear();
618 for (
size_t i = 0; i < processesToUse; ++i) {
619 fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
623 fIpcSession->jobDir = jobDir;
624 fIpcSession->treeName = treeName;
625 fIpcSession->workerObjects = &workerObjects;
626 fIpcSession->maxWorkers = processesToUse;
627 fIpcSession->macroList = macroList;
628 fIpcSession->tmpDir = tmpDir;
629 fIpcSession->tmpResultsDir = tmpResultsDir;
630 fIpcSession->macroParams = macroParams;
631 fIpcSession->bootstrapAssignments.clear();
635 fIpcSession->childPids.assign(processesToUse, -1);
636 for (
size_t i = 0; i < processesToUse; ++i) {
639 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
640 zmq_close(fIpcSession->router);
641 zmq_ctx_term(fIpcSession->ctx);
642 ::unlink(fIpcSession->endpointPath.c_str());
644 throw std::runtime_error(
"Failed to fork worker process.");
647 zmq_close(fIpcSession->router);
648 zmq_ctx_term(fIpcSession->ctx);
649 const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
650 _exit(rc == 0 ? 0 : 1);
652 fIpcSession->childPids[i] = pid;
658 int readyTimeoutSec = isTcp ? 300 : 30;
660 if (
const char * env = gSystem->Getenv(
"NDMSPC_WORKER_TIMEOUT")) {
661 try { readyTimeoutSec = std::stoi(env); }
catch (...) {}
663 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
664 readyTimeoutSec, processesToUse);
666 const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
671 const size_t readyTarget = isTcp ? 1 : processesToUse;
673 while (fIpcSession->workerIdentityVec.size() < readyTarget) {
674 if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
675 const std::string identity = fIpcSession->pendingReadyIdentities.front();
676 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
681 std::vector<std::string> frames;
682 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
683 if (errno == EAGAIN || errno == EWOULDBLOCK) {
684 if (std::chrono::steady_clock::now() > readyDeadline) {
686 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
687 zmq_close(fIpcSession->router);
688 zmq_ctx_term(fIpcSession->ctx);
689 ::unlink(fIpcSession->endpointPath.c_str());
691 throw std::runtime_error(
"Timeout while waiting for IPC workers to become ready.");
693 zmq_close(fIpcSession->router);
694 zmq_ctx_term(fIpcSession->ctx);
696 throw std::runtime_error(
"Timeout: no TCP workers connected.");
700 if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
701 zmq_close(fIpcSession->router);
702 zmq_ctx_term(fIpcSession->ctx);
703 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
705 throw std::runtime_error(
"Failed to receive READY message from worker.");
707 if (frames.size() < 2)
continue;
708 const std::string & identity = frames[0];
709 const std::string & cmd = frames[1];
710 if (isTcp && cmd ==
"BOOTSTRAP") {
714 if (cmd !=
"READY")
continue;
720 if (fIpcSession->identityToWorker.count(identity)) {
721 if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
722 == fIpcSession->workerIdentityVec.end()) {
723 fIpcSession->workerIdentityVec.push_back(identity);
724 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
725 fIpcSession->workerIdentityVec.size(), processesToUse);
737 while (!fIpcSession->pendingReadyIdentities.empty()) {
738 const std::string
id = fIpcSession->pendingReadyIdentities.front();
739 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
745 InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
749 InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
756 const std::string & failureReason,
757 size_t & outstanding,
764 size_t redistributedCount = 0;
765 size_t replayedDoneCount = 0;
766 size_t replayedLiveCount = 0;
768 auto historyIt = fIpcSession->workerTaskHistory.find(failedIdentity);
769 if (historyIt != fIpcSession->workerTaskHistory.end()) {
770 for (
const size_t taskId : historyIt->second) {
771 const bool wasDone = fIpcSession->taskStateManager.IsDone(taskId);
772 if (!fIpcSession->taskStateManager.RequeueTask(taskId)) {
775 ++redistributedCount;
782 fIpcSession->workerTaskHistory.erase(historyIt);
784 const auto recovered = fIpcSession->taskStateManager.RecoverWorkerTasks(failedIdentity);
785 redistributedCount = recovered.size();
786 replayedLiveCount = redistributedCount;
789 if (replayedLiveCount > 0) {
790 const size_t dec = std::min(outstanding, replayedLiveCount);
793 if (replayedDoneCount > 0) {
794 const size_t dec = std::min(acked, replayedDoneCount);
799 auto identityIt = std::find(fIpcSession->workerIdentityVec.begin(),
800 fIpcSession->workerIdentityVec.end(),
802 if (identityIt != fIpcSession->workerIdentityVec.end()) {
803 fIpcSession->workerIdentityVec.erase(identityIt);
805 fIpcSession->identityToWorker.erase(failedIdentity);
806 fIpcSession->workerLastActivity.erase(failedIdentity);
807 fIpcSession->failedTcpWorkers.erase(failedIdentity);
810 if (redistributedCount > 0) {
811 if (failureReason ==
"send_failure") {
812 NLogWarning(
"TCP worker '%s' removed due to send failure. Redistributing %zu task(s). Remaining workers: %zu",
813 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
814 }
else if (failureReason ==
"timeout") {
815 NLogWarning(
"TCP worker '%s' inactive/disconnected (%zu tasks pending). Redistributing to remaining %zu workers.",
816 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
817 }
else if (failureReason ==
"crash") {
818 NLogWarning(
"Worker process '%s' exited unexpectedly. Redistributing %zu task(s) to remaining %zu workers.",
819 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
820 }
else if (failureReason ==
"interrupted") {
821 NLogWarning(
"Worker '%s' interrupted. Replaying %zu task(s) on remaining %zu worker(s).",
822 failedIdentity.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
824 NLogWarning(
"Worker '%s' failed (%s). Redistributing %zu task(s). Remaining workers: %zu",
825 failedIdentity.c_str(), failureReason.c_str(), redistributedCount, fIpcSession->workerIdentityVec.size());
829 return redistributedCount;
832size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(
const std::string & definitionName,
833 const std::vector<Long64_t> * definitionIds,
837 throw std::runtime_error(
"IPC session is not active.");
843 fIpcSession->taskStateManager.Clear();
844 fIpcSession->workerTaskHistory.clear();
847 fIpcSession->currentDefName = definitionName;
848 fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
849 fIpcSession->hasCurrentDefIds = (definitionIds !=
nullptr);
851 if (!definitionName.empty()) {
852 std::vector<std::string> failedWorkers;
853 for (
const auto & identity : fIpcSession->workerIdentityVec) {
854 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"SETDEF", definitionName})) {
855 if (fIpcSession->isTcp) {
856 NLogWarning(
"Failed to send SETDEF to TCP worker '%s', marking as failed", identity.c_str());
857 failedWorkers.push_back(identity);
860 throw std::runtime_error(
"Failed to send IPC SETDEF message to worker '" + identity +
"'.");
863 if (definitionIds !=
nullptr) {
864 if (!NDimensionalIpcRunner::SendFrames(
865 fIpcSession->router, {identity,
"SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
866 if (fIpcSession->isTcp) {
867 NLogWarning(
"Failed to send SETIDS to TCP worker '%s', marking as failed", identity.c_str());
868 failedWorkers.push_back(identity);
871 throw std::runtime_error(
"Failed to send IPC SETIDS message to worker '" + identity +
"'.");
878 if (fIpcSession->isTcp && !failedWorkers.empty()) {
879 for (
const auto & identity : failedWorkers) {
880 fIpcSession->failedTcpWorkers.insert(identity);
882 NLogWarning(
"Marked %zu TCP worker(s) as failed during SETDEF/SETIDS", failedWorkers.size());
886 size_t ipcBatchSize = 1;
887 if (
const char * envBatchSize = gSystem->Getenv(
"NDMSPC_IPC_BATCH_SIZE")) {
889 ipcBatchSize = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envBatchSize)));
892 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
897 size_t nextTaskId = 0;
898 size_t dispatchMessageId = 0;
899 size_t outstanding = 0;
900 size_t outstandingMessages = 0;
902 size_t nextSchedulerLogAck = 200;
903 std::string firstError;
907 std::unordered_map<std::string, size_t> pendingInitWorkers;
909 int stallTimeoutSec = 120;
910 if (
const char * envStallTimeout = gSystem->Getenv(
"NDMSPC_IPC_STALL_TIMEOUT")) {
912 stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
915 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
919 auto lastProgress = std::chrono::steady_clock::now();
925 size_t totalTasks = 1;
929 auto isUserInterrupted = []() {
930 if (gIpcSigIntRequested != 0)
return true;
931 return (gROOT && gROOT->IsInterrupted());
934 auto sendCatchup = [&](
const std::string & identity) {
935 if (!fIpcSession->currentDefName.empty()) {
936 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"SETDEF", fIpcSession->currentDefName});
937 if (fIpcSession->hasCurrentDefIds) {
938 NDimensionalIpcRunner::SendFrames(
939 fIpcSession->router, {identity,
"SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
944 while ((hasMore || outstanding > 0 || fIpcSession->taskStateManager.HasPending()) && firstError.empty()) {
969 if (isUserInterrupted()) {
970 firstError =
"Interrupted by user";
975 if (fIpcSession->workerIdentityVec.empty() && (outstanding > 0 || fIpcSession->taskStateManager.HasPending() || hasMore)) {
976 if (fIpcSession->isTcp) {
977 firstError =
"No workers available. All TCP workers have disconnected/failed.";
979 firstError =
"No workers available. All worker processes have exited/failed.";
986 const size_t maxInFlightMessages = std::max<size_t>(4, fIpcSession->workerIdentityVec.size());
988 while ((hasMore || fIpcSession->taskStateManager.HasPending()) && outstandingMessages < maxInFlightMessages && firstError.empty()) {
989 if (fIpcSession->workerIdentityVec.empty())
break;
992 size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
993 std::string identity = fIpcSession->workerIdentityVec[workerSlot];
996 if (fIpcSession->isTcp) {
998 while (fIpcSession->failedTcpWorkers.count(identity) && attempts < fIpcSession->workerIdentityVec.size()) {
1001 workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
1002 identity = fIpcSession->workerIdentityVec[workerSlot];
1005 if (attempts >= fIpcSession->workerIdentityVec.size()) {
1010 std::vector<std::pair<size_t, std::vector<int>>> batchTasks;
1011 const size_t nw = fIpcSession->workerIdentityVec.size();
1012 const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
1013 const size_t adaptiveBatchSize = std::max<size_t>(
1014 1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
1015 batchTasks.reserve(adaptiveBatchSize);
1019 size_t reprocessedCount = 0;
1020 size_t redistPerBatch = 1;
1021 if (adaptiveBatchSize > 1 && fIpcSession->workerIdentityVec.size() > 0) {
1022 redistPerBatch = std::max<size_t>(1, adaptiveBatchSize / fIpcSession->workerIdentityVec.size());
1024 size_t redistAdded = 0;
1025 while (fIpcSession->taskStateManager.HasPending() && outstanding < maxInFlightMessages * ipcBatchSize &&
1026 batchTasks.size() < adaptiveBatchSize && redistAdded < redistPerBatch) {
1028 std::vector<int> coords;
1029 if (!fIpcSession->taskStateManager.ClaimNextPendingForWorker(identity, taskId, coords)) {
1032 batchTasks.emplace_back(taskId, coords);
1033 fIpcSession->workerTaskHistory[identity].insert(taskId);
1037 if (isUserInterrupted()) {
1038 firstError =
"Interrupted by user";
1043 if (reprocessedCount > 0) {
1044 NLogDebug(
"Redistributing %zu previously-completed tasks (acked counter decremented)", reprocessedCount);
1048 while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
1049 fIpcSession->taskStateManager.AddPending(nextTaskId,
fCurrentCoords);
1051 std::vector<int> payload;
1052 if (!fIpcSession->taskStateManager.ClaimNextPendingForWorker(identity, taskId, payload)) {
1053 firstError =
"Failed to claim pending task for worker dispatch.";
1056 batchTasks.emplace_back(taskId, payload);
1057 fIpcSession->workerTaskHistory[identity].insert(taskId);
1065 if (isUserInterrupted()) {
1066 firstError =
"Interrupted by user";
1071 if (!firstError.empty()) {
1075 if (batchTasks.empty()) {
1080 for (
const auto & task : batchTasks) {
1081 const std::string coordsStr = NDimensionalIpcRunner::SerializeCoords(task.second);
1082 NLogInfo(
"NDimensionalExecutor: Assign task %zu coords=%s -> worker '%s'", task.first, coordsStr.c_str(), identity.c_str());
1086 if (fIpcSession->isTcp) {
1087 fIpcSession->workerLastActivity[identity] = std::chrono::steady_clock::now();
1090 if (batchTasks.size() == 1) {
1091 const std::string taskId = std::to_string(batchTasks[0].first);
1092 const std::string coords = NDimensionalIpcRunner::SerializeCoords(batchTasks[0].second);
1093 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"TASK", taskId, coords})) {
1094 if (fIpcSession->isTcp) {
1096 NLogWarning(
"Failed to send TASK to TCP worker '%s', marking as failed", identity.c_str());
1098 fIpcSession->failedTcpWorkers.insert(identity);
1100 for (
const auto & task : batchTasks) {
1101 fIpcSession->taskStateManager.MarkFailed(task.first);
1105 firstError =
"Failed to send IPC TASK message to worker '" + identity +
"'.";
1111 std::ostringstream payload;
1112 for (
size_t i = 0; i < batchTasks.size(); ++i) {
1113 if (i != 0) payload <<
';';
1114 payload << batchTasks[i].first <<
':' << NDimensionalIpcRunner::SerializeCoords(batchTasks[i].second);
1116 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"TASKB", payload.str()})) {
1117 if (fIpcSession->isTcp) {
1119 NLogWarning(
"Failed to send TASKB to TCP worker '%s', marking as failed", identity.c_str());
1121 fIpcSession->failedTcpWorkers.insert(identity);
1123 for (
const auto & task : batchTasks) {
1124 fIpcSession->taskStateManager.MarkFailed(task.first);
1128 firstError =
"Failed to send IPC TASKB message to worker '" + identity +
"'.";
1133 ++dispatchMessageId;
1134 ++outstandingMessages;
1138 if (fIpcSession->isTcp && !fIpcSession->failedTcpWorkers.empty()) {
1139 std::vector<std::string> workersToRemove;
1140 for (
const auto & identity : fIpcSession->workerIdentityVec) {
1141 if (fIpcSession->failedTcpWorkers.count(identity)) {
1142 workersToRemove.push_back(identity);
1146 for (
const auto & failedIdentity : workersToRemove) {
1150 if (progressCallback) {
1151 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1152 fIpcSession->taskStateManager.RunningCount(),
1153 fIpcSession->taskStateManager.DoneCount(),
1154 fIpcSession->workerIdentityVec.size()};
1155 progressCallback(progress);
1160 if (fIpcSession->workerIdentityVec.empty()) {
1161 firstError =
"No workers available. All TCP workers have disconnected/failed.";
1166 if (outstanding == 0 && fIpcSession->workerIdentityVec.empty())
continue;
1167 if (outstanding == 0 && !hasMore && !fIpcSession->taskStateManager.HasPending())
continue;
1169 std::vector<std::string> frames;
1170 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
1171 if (errno == EINTR || isUserInterrupted()) {
1172 firstError =
"Interrupted by user";
1175 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1178 if (!fIpcSession->isTcp) {
1179 firstError =
"Failed to receive IPC ACK/ERR from worker.";
1187 if (fIpcSession->isTcp && outstanding > 0) {
1188 const auto now = std::chrono::steady_clock::now();
1189 int tcpWorkerTimeoutSec = 30;
1190 if (
const char * envTcpTimeout = gSystem->Getenv(
"NDMSPC_TCP_WORKER_TIMEOUT")) {
1192 tcpWorkerTimeoutSec = std::max(10, std::stoi(envTcpTimeout));
1196 std::vector<std::string> inactiveWorkers;
1197 for (
const auto & identity : fIpcSession->workerIdentityVec) {
1198 auto activityIt = fIpcSession->workerLastActivity.find(identity);
1199 if (activityIt != fIpcSession->workerLastActivity.end()) {
1200 auto inactiveSecs = std::chrono::duration_cast<std::chrono::seconds>(now - activityIt->second).count();
1201 if (inactiveSecs >= tcpWorkerTimeoutSec) {
1203 bool hasPendingTasks = !fIpcSession->taskStateManager.GetWorkerTasks(identity).empty();
1208 if (hasPendingTasks || inactiveSecs >= tcpWorkerTimeoutSec * 2) {
1209 inactiveWorkers.push_back(identity);
1216 for (
const auto & failedIdentity : inactiveWorkers) {
1217 if (fIpcSession->failedTcpWorkers.count(failedIdentity))
continue;
1218 fIpcSession->failedTcpWorkers.insert(failedIdentity);
1223 lastProgress = std::chrono::steady_clock::now();
1226 if (progressCallback) {
1227 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1228 fIpcSession->taskStateManager.RunningCount(),
1229 fIpcSession->taskStateManager.DoneCount(),
1230 fIpcSession->workerIdentityVec.size()};
1231 progressCallback(progress);
1235 if (fIpcSession->workerIdentityVec.empty()) {
1236 firstError =
"All TCP workers have disconnected/failed. No workers available to continue processing.";
1244 for (
size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
1246 pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
1247 if (rc == fIpcSession->childPids[i]) {
1249 std::string failedIdentity = NDimensionalIpcRunner::BuildWorkerIdentity(i);
1254 lastProgress = std::chrono::steady_clock::now();
1257 if (progressCallback) {
1258 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1259 fIpcSession->taskStateManager.RunningCount(),
1260 fIpcSession->taskStateManager.DoneCount(),
1261 fIpcSession->workerIdentityVec.size()};
1262 progressCallback(progress);
1266 if (fIpcSession->workerIdentityVec.empty()) {
1267 firstError =
"No workers available. All worker processes have exited/failed.";
1273 if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
1274 const auto now = std::chrono::steady_clock::now();
1275 const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
1276 if (stallSecs >= stallTimeoutSec) {
1277 const size_t activeWorkers = fIpcSession->workerIdentityVec.size();
1278 if (activeWorkers == 0) {
1279 firstError =
"No workers available. All workers have disconnected/failed with " +
1280 std::to_string(outstanding) +
" pending tasks remaining.";
1282 firstError =
"No IPC/TCP ACK progress for " + std::to_string(stallSecs) +
"s with " +
1283 std::to_string(outstanding) +
" pending tasks (active workers: " +
1284 std::to_string(activeWorkers) +
").";
1291 if (frames.size() < 2)
continue;
1296 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] ==
"READY") {
1297 const std::string & lateId = frames[0];
1298 if (!fIpcSession->identityToWorker.count(lateId) && !pendingInitWorkers.count(lateId)) {
1299 const std::string prefix =
"wk_";
1300 const size_t prefixLen = prefix.size();
1301 if (lateId.size() > prefixLen && lateId.substr(0, prefixLen) == prefix) {
1302 size_t workerIdx = std::numeric_limits<size_t>::max();
1303 try { workerIdx = std::stoul(lateId.substr(prefixLen)); }
catch (...) {}
1304 if (workerIdx < fIpcSession->maxWorkers) {
1305 const std::string sessionId = std::to_string(getpid());
1306 if (NDimensionalIpcRunner::SendFrames(fIpcSession->router,
1307 {lateId,
"INIT", std::to_string(workerIdx), sessionId,
1308 fIpcSession->jobDir, fIpcSession->treeName,
1309 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
1310 pendingInitWorkers[lateId] = workerIdx;
1311 NLogDebug(
"NDimensionalExecutor: late worker '%s' sent INIT, awaiting ACK", lateId.c_str());
1316 lastProgress = std::chrono::steady_clock::now();
1321 if (fIpcSession->isTcp && frames.size() == 2 && frames[1] ==
"ACK") {
1322 auto pit = pendingInitWorkers.find(frames[0]);
1323 if (pit != pendingInitWorkers.end()) {
1324 const std::string &
id = pit->first;
1325 const size_t idx = pit->second;
1328 if (fIpcSession->identityToWorker.count(
id)) {
1329 NLogWarning(
"NDimensionalExecutor: late worker '%s' already registered, replacing",
id.c_str());
1331 auto it = std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(),
id);
1332 if (it != fIpcSession->workerIdentityVec.end()) {
1333 fIpcSession->workerIdentityVec.erase(it);
1337 fIpcSession->identityToWorker[id] = idx;
1338 fIpcSession->workerIdentityVec.push_back(
id);
1340 fIpcSession->workerLastActivity[id] = std::chrono::steady_clock::now();
1341 NLogInfo(
"NDimensionalExecutor: late worker '%s' (idx=%zu) joined [total: %zu]",
1342 id.c_str(), idx, fIpcSession->workerIdentityVec.size());
1344 for (
const auto & wid : fIpcSession->workerIdentityVec) {
1345 const size_t inFlight = fIpcSession->taskStateManager.GetWorkerTasks(wid).size();
1346 NLogInfo(
"NDimensionalExecutor: in-flight distribution: worker '%s' has %zu pending task(s)", wid.c_str(), inFlight);
1349 pendingInitWorkers.erase(pit);
1350 lastProgress = std::chrono::steady_clock::now();
1357 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] ==
"BOOTSTRAP") {
1359 lastProgress = std::chrono::steady_clock::now();
1364 if (frames.size() >= 2 && frames[1] ==
"SHUTDOWN") {
1365 const std::string & workerIdentity = frames[0];
1366 const std::string reason = (frames.size() >= 3) ? frames[2] :
"unknown";
1367 const std::string tasksCompleted = (frames.size() >= 4) ? frames[3] :
"?";
1369 NLogWarning(
"Worker '%s' reported shutdown: %s (completed %s tasks)", workerIdentity.c_str(), reason.c_str(), tasksCompleted.c_str());
1371 if (fIpcSession->isTcp) {
1372 fIpcSession->failedTcpWorkers.insert(workerIdentity);
1377 if (progressCallback) {
1378 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1379 fIpcSession->taskStateManager.RunningCount(),
1380 fIpcSession->taskStateManager.DoneCount(),
1381 fIpcSession->workerIdentityVec.size()};
1382 progressCallback(progress);
1386 if (fIpcSession->workerIdentityVec.empty()) {
1387 if (fIpcSession->isTcp) {
1388 firstError =
"No workers available. All TCP workers have shut down.";
1390 firstError =
"No workers available. All worker processes have shut down.";
1396 lastProgress = std::chrono::steady_clock::now();
1400 if (frames.size() == 2 && frames[1] ==
"DONE") {
1401 const std::string & workerIdentity = frames[0];
1402 if (fIpcSession->identityToWorker.count(workerIdentity)) {
1403 fIpcSession->earlyDoneWorkers.insert(workerIdentity);
1404 NLogDebug(
"NDimensionalExecutor::IPC: Worker '%s' sent DONE before FinishProcessIpc; deferring",
1405 workerIdentity.c_str());
1407 NLogWarning(
"NDimensionalExecutor::IPC: ignoring DONE from unknown worker '%s'", workerIdentity.c_str());
1409 lastProgress = std::chrono::steady_clock::now();
1413 if (frames.size() < 3) {
1415 if (frames.size() == 2 && frames[1] ==
"ACK")
1416 NLogWarning(
"NDimensionalExecutor: unexpected 2-frame ACK from '%s', ignoring", frames[0].c_str());
1418 firstError =
"Malformed IPC message received from worker.";
1422 const std::string & cmd = frames[1];
1424 const std::string & workerIdentity = frames[0];
1427 taskId =
static_cast<size_t>(std::stoull(frames[2]));
1430 firstError =
"Malformed IPC task id received from worker.";
1435 if (!fIpcSession->taskStateManager.MarkDone(taskId)) {
1436 firstError =
"Received ACK for unknown or already-done task " + std::to_string(taskId) +
".";
1441 if (fIpcSession->isTcp) {
1442 fIpcSession->workerLastActivity[workerIdentity] = std::chrono::steady_clock::now();
1445 if (outstanding == 0) {
1446 firstError =
"IPC outstanding counter underflow while processing ACK.";
1449 if (outstandingMessages == 0) {
1450 firstError =
"IPC message outstanding counter underflow while processing ACK.";
1454 --outstandingMessages;
1456 lastProgress = std::chrono::steady_clock::now();
1457 const size_t activeWorkersNow = fIpcSession->workerIdentityVec.size();
1458 if (progressCallback) {
1459 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1460 fIpcSession->taskStateManager.RunningCount(),
1461 fIpcSession->taskStateManager.DoneCount(),
1463 progressCallback(progress);
1465 if (acked >= nextSchedulerLogAck) {
1466 NLogDebug(
"NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pending=%zu running=%zu done=%zu",
1467 acked, totalTasks, activeWorkersNow, outstandingMessages,
1468 fIpcSession->taskStateManager.PendingCount(),
1469 fIpcSession->taskStateManager.RunningCount(),
1470 fIpcSession->taskStateManager.DoneCount());
1471 nextSchedulerLogAck += 200;
1476 if (cmd ==
"ACKB") {
1477 const std::string & workerIdentity = frames[0];
1478 if (frames.size() < 3 || frames[2].empty()) {
1479 firstError =
"Malformed IPC ACKB payload received from worker.";
1483 if (outstandingMessages == 0) {
1484 firstError =
"IPC message outstanding counter underflow while processing ACKB.";
1487 --outstandingMessages;
1489 std::stringstream ackStream(frames[2]);
1490 std::string ackToken;
1491 while (std::getline(ackStream, ackToken,
',')) {
1492 if (ackToken.empty())
continue;
1493 size_t ackTaskId = 0;
1495 ackTaskId =
static_cast<size_t>(std::stoull(ackToken));
1498 firstError =
"Malformed IPC ACKB task id received from worker.";
1503 if (!fIpcSession->taskStateManager.MarkDone(ackTaskId)) {
1504 firstError =
"Received ACKB for unknown or already-done task " + std::to_string(ackTaskId) +
".";
1509 if (fIpcSession->isTcp) {
1510 fIpcSession->workerLastActivity[workerIdentity] = std::chrono::steady_clock::now();
1513 if (outstanding == 0) {
1514 firstError =
"IPC outstanding counter underflow while processing ACKB.";
1519 lastProgress = std::chrono::steady_clock::now();
1520 const size_t activeWorkersNow = fIpcSession->workerIdentityVec.size();
1521 if (progressCallback) {
1522 ExecutionProgress progress{acked, fIpcSession->taskStateManager.PendingCount(),
1523 fIpcSession->taskStateManager.RunningCount(),
1524 fIpcSession->taskStateManager.DoneCount(),
1526 progressCallback(progress);
1528 if (acked >= nextSchedulerLogAck) {
1530 "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pending=%zu running=%zu done=%zu",
1531 acked, totalTasks, activeWorkersNow, outstandingMessages,
1532 fIpcSession->taskStateManager.PendingCount(),
1533 fIpcSession->taskStateManager.RunningCount(),
1534 fIpcSession->taskStateManager.DoneCount());
1535 nextSchedulerLogAck += 200;
1539 if (!firstError.empty()) {
1548 taskId =
static_cast<size_t>(std::stoull(frames[2]));
1551 firstError =
"Malformed IPC task id received from worker.";
1554 std::string errMsg = (frames.size() >= 4) ? frames[3] :
"worker error";
1555 firstError =
"Worker reported error for task " + std::to_string(taskId) +
": " + errMsg;
1559 firstError =
"Unknown IPC command from worker: " + cmd;
1563 if (!firstError.empty()) {
1564 throw std::runtime_error(firstError);
1568 const size_t pendingCount = fIpcSession->taskStateManager.PendingCount();
1569 const size_t runningCount = fIpcSession->taskStateManager.RunningCount();
1570 if (pendingCount > 0 || runningCount > 0) {
1571 throw std::runtime_error(
"IPC execution finished with " + std::to_string(pendingCount) +
" pending and " +
1572 std::to_string(runningCount) +
" running tasks still unacknowledged.");
1578void NDimensionalExecutor::FinishProcessIpc(
bool abort)
1584 const std::string stopReason = abort ?
"abort" :
"ok";
1585 for (
const auto & it : fIpcSession->identityToWorker) {
1586 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first,
"STOP", stopReason});
1589 if (!fIpcSession->isTcp) {
1590 const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1591 if (!exitedCleanly) {
1592 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1594 }
else if (!abort) {
1596 const size_t nWorkers = fIpcSession->identityToWorker.size();
1597 std::set<std::string> doneWorkers;
1598 for (
const auto & workerIdentity : fIpcSession->earlyDoneWorkers) {
1599 if (fIpcSession->identityToWorker.count(workerIdentity)) {
1600 doneWorkers.insert(workerIdentity);
1603 const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1604 while (doneWorkers.size() < nWorkers) {
1605 const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1606 if (remaining <= 0) {
1607 NLogWarning(
"NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1608 doneWorkers.size(), nWorkers);
1611 zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1612 const int rc = zmq_poll(&item, 1,
static_cast<long>(remaining));
1614 NLogWarning(
"NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1615 doneWorkers.size(), nWorkers);
1618 std::vector<std::string> frames;
1619 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2)
continue;
1620 if (frames[1] ==
"DONE") {
1621 const std::string & workerIdentity = frames[0];
1622 doneWorkers.insert(workerIdentity);
1624 NLogDebug(
"NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", workerIdentity.c_str(),
1625 doneWorkers.size(), nWorkers);
1626 }
else if (frames[1] ==
"READY") {
1628 NLogInfo(
"NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1629 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0],
"STOP",
"ok"});
1630 }
else if (frames[1] ==
"BOOTSTRAP") {
1638 if (fIpcSession->router) {
1640 if (fIpcSession->isTcp && abort) {
1644 zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs,
sizeof(lingerMs));
1647 if (fIpcSession->router) {
1648 zmq_close(fIpcSession->router);
1650 if (fIpcSession->ctx) {
1651 zmq_ctx_term(fIpcSession->ctx);
1653 if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1654 ::unlink(fIpcSession->endpointPath.c_str());
1659 for (
const auto & kv : fIpcSession->identityToWorker) {
1663 RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1664 fIpcSession.reset();
1668 const std::function<
void(
const std::vector<int> & coords,
NGnThreadData & thread_object)> & func,
1669 std::vector<NGnThreadData> & thread_objects);
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
size_t HandleWorkerFailure(const std::string &failedIdentity, const std::string &failureReason, size_t &outstanding, size_t &acked)
Centralized worker failure handling: recovers tasks, removes worker, updates state....
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Thread-local data object for NDMSPC processing.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Manages task lifecycle: pending → running → done/failed.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Global callback function for libwebsockets client events.
Execution progress metrics for IPC-based distributed processing.