9 #include <unordered_map>
13 #include <THnSparse.h>
17 #include "NDimensionalExecutor.h"
18 #include "NDimensionalIpcRunner.h"
19 #include "NGnThreadData.h"
25 volatile sig_atomic_t gIpcSigIntRequested = 0;
26 volatile sig_atomic_t gIpcChildCount = 0;
27 pid_t gIpcChildPids[1024] = {0};
29 void IpcSigIntHandler(
int)
31 gIpcSigIntRequested = 1;
32 const sig_atomic_t count = gIpcChildCount;
33 for (sig_atomic_t i = 0; i < count; ++i) {
34 if (gIpcChildPids[i] > 0) {
36 kill(gIpcChildPids[i], SIGKILL);
41 void InstallIpcSigIntHandler(
const std::vector<pid_t> & childPids,
struct sigaction & oldAction,
bool & hasOldAction)
43 gIpcSigIntRequested = 0;
45 const size_t maxChildren =
sizeof(gIpcChildPids) /
sizeof(gIpcChildPids[0]);
46 const size_t count = std::min(maxChildren, childPids.size());
47 for (
size_t i = 0; i < count; ++i) {
48 gIpcChildPids[i] = childPids[i];
50 for (
size_t i = count; i < maxChildren; ++i) {
53 gIpcChildCount =
static_cast<sig_atomic_t
>(count);
56 memset(&sa, 0,
sizeof(sa));
57 sa.sa_handler = IpcSigIntHandler;
58 sigemptyset(&sa.sa_mask);
61 if (sigaction(SIGINT, &sa, &oldAction) == 0) {
66 void RestoreIpcSigIntHandler(
const struct sigaction & oldAction,
bool hasOldAction)
69 sigaction(SIGINT, &oldAction,
nullptr);
72 gIpcSigIntRequested = 0;
78 void * router{
nullptr};
80 std::string endpointPath;
82 std::vector<pid_t> childPids;
83 std::unordered_map<std::string, size_t> identityToWorker;
84 std::vector<std::string> workerIdentityVec;
88 std::vector<NThreadData *> * workerObjects{
nullptr};
90 std::string currentDefName;
91 std::vector<Long64_t> currentDefIds;
92 bool hasCurrentDefIds{
false};
94 bool hasOldSigIntAction{
false};
96 std::string macroList;
98 std::string tmpResultsDir;
99 size_t bootstrapNextIdx{0};
100 std::vector<std::string> pendingReadyIdentities;
117 : fMinBounds(minBounds), fMaxBounds(maxBounds)
124 throw std::invalid_argument(
"Min and max bounds vectors must have the same size.");
127 throw std::invalid_argument(
"Bounds vectors cannot be empty.");
134 throw std::invalid_argument(
"Min bound (" + std::to_string(
fMinBounds[i]) +
135 ") cannot be greater than max bound (" + std::to_string(
fMaxBounds[i]) +
136 ") for dimension " + std::to_string(i));
144 void NDimensionalExecutor::SetBounds(
const std::vector<int> & minBounds,
const std::vector<int> & maxBounds)
150 throw std::invalid_argument(
"Min and max bounds vectors must have the same size.");
153 throw std::invalid_argument(
"Bounds vectors cannot be empty.");
159 throw std::invalid_argument(
"Min bound (" + std::to_string(
fMinBounds[i]) +
160 ") cannot be greater than max bound (" + std::to_string(
fMaxBounds[i]) +
161 ") for dimension " + std::to_string(i));
174 if (hist ==
nullptr) {
175 throw std::invalid_argument(
"THnSparse pointer cannot be null.");
180 if (hist->GetNbins() <= 0) {
181 throw std::invalid_argument(
"THnSparse histogram is empty.");
189 for (
int i = 0; i < hist->GetNdimensions(); ++i) {
191 fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
199 NDimensionalExecutor::~NDimensionalExecutor() =
default;
223 template <
typename TObject>
225 const std::function<
void(
const std::vector<int> & coords, TObject & thread_object)> & func,
226 std::vector<TObject> & thread_objects)
231 size_t threads_to_use = thread_objects.size();
232 if (threads_to_use == 0) {
233 throw std::invalid_argument(
"Thread objects vector cannot be empty.");
236 std::vector<std::thread> workers;
237 std::queue<std::function<void(TObject &)>> tasks;
238 std::mutex queue_mutex;
239 std::condition_variable condition_producer;
240 std::condition_variable condition_consumer;
241 std::atomic<size_t> active_tasks = 0;
242 std::atomic<bool> stop_pool =
false;
244 std::exception_ptr first_exception =
nullptr;
245 std::mutex exception_mutex;
248 auto worker_logic = [&](TObject & my_object) {
251 std::ostringstream oss;
252 oss <<
"wk_" << std::setw(3) << std::setfill(
'0') << md->
GetAssignedIndex();
256 std::function<void(TObject &)> task_payload;
257 bool task_acquired =
false;
261 std::unique_lock<std::mutex> lock(queue_mutex);
262 condition_producer.wait(lock, [&] {
return stop_pool || !tasks.empty(); });
265 if (stop_pool && tasks.empty()) {
271 if (!tasks.empty()) {
272 task_payload = std::move(tasks.front());
274 task_acquired =
true;
284 task_payload(my_object);
290 std::lock_guard<std::mutex> lock(exception_mutex);
291 if (!first_exception) {
292 first_exception = std::current_exception();
297 std::unique_lock<std::mutex> lock(queue_mutex);
300 condition_producer.notify_all();
305 if (--active_tasks == 0 && stop_pool) {
307 condition_consumer.notify_one();
319 if (--active_tasks == 0 && stop_pool) {
320 condition_consumer.notify_one();
327 workers.reserve(threads_to_use);
328 for (
size_t i = 0; i < threads_to_use; ++i) {
329 workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
339 std::unique_lock<std::mutex> lock(queue_mutex);
340 if (stop_pool)
break;
345 std::unique_lock<std::mutex> lock(queue_mutex);
347 if (stop_pool)
break;
350 tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
352 condition_producer.notify_one();
358 std::unique_lock<std::mutex> lock(queue_mutex);
360 if (!first_exception) {
361 first_exception = std::current_exception();
364 condition_producer.notify_all();
370 std::unique_lock<std::mutex> lock(queue_mutex);
373 condition_producer.notify_all();
377 std::unique_lock<std::mutex> lock(queue_mutex);
378 condition_consumer.wait(lock, [&] {
return stop_pool && active_tasks == 0; });
382 for (std::thread & worker : workers) {
383 if (worker.joinable()) {
389 if (first_exception) {
390 std::rethrow_exception(first_exception);
397 StartProcessIpc(workerObjects, processCount);
399 size_t acked = ExecuteCurrentBoundsProcessIpc();
413 const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
415 while (numLen < sample.size() && std::isdigit((
unsigned char)sample[sample.size() - 1 - numLen]))
417 const size_t prefixLen = sample.size() - numLen;
418 if (identity.size() <= prefixLen)
return false;
419 size_t workerIdx = 0;
421 workerIdx = std::stoul(identity.substr(prefixLen));
424 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
427 if (workerIdx >= fIpcSession->maxWorkers) {
428 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
429 workerIdx, fIpcSession->maxWorkers);
432 if (fIpcSession->identityToWorker.count(identity)) {
433 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
438 const std::string sessionId = std::to_string(getpid());
439 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
440 {identity,
"INIT", std::to_string(workerIdx), sessionId,
441 fIpcSession->jobDir, fIpcSession->treeName,
442 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
443 NLogError(
"NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
447 int initTimeoutSec = 30;
448 if (
const char * env = gSystem->Getenv(
"NDMSPC_WORKER_TIMEOUT")) {
450 initTimeoutSec = std::max(1, std::stoi(env));
453 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
457 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
460 std::vector<std::string> ackFrames;
461 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
462 if (errno == EAGAIN || errno == EWOULDBLOCK) {
463 if (std::chrono::steady_clock::now() > initDeadline)
break;
468 if (ackFrames.size() >= 2 && ackFrames[1] ==
"BOOTSTRAP") {
472 if (ackFrames.size() >= 2 && ackFrames[1] ==
"READY" && ackFrames[0] != identity) {
473 if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
474 std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
475 ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
476 fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
480 if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] ==
"ACK") {
485 NLogError(
"NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
489 fIpcSession->identityToWorker[identity] = workerIdx;
490 fIpcSession->workerIdentityVec.push_back(identity);
491 NLogInfo(
"NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
492 identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
498 if (!fIpcSession || !fIpcSession->isTcp)
return false;
499 const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
500 NLogDebug(
"NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
502 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
503 {identity,
"CONFIG", std::to_string(assignedIdx),
504 fIpcSession->macroList, fIpcSession->tmpDir,
505 fIpcSession->tmpResultsDir});
508 void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects,
size_t processCount,
509 const std::string & tcpBindEndpoint,
const std::string & jobDir,
510 const std::string & treeName,
const std::string & macroList,
511 const std::string & tmpDir,
const std::string & tmpResultsDir)
513 if (workerObjects.empty()) {
514 throw std::invalid_argument(
"Worker objects vector cannot be empty.");
517 throw std::runtime_error(
"IPC session is already active.");
520 const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
521 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
522 workerObjects.size(), processesToUse);
523 const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
524 std::chrono::high_resolution_clock::now().time_since_epoch())
526 fIpcSession = std::make_unique<IpcSession>();
528 const bool isTcp = !tcpBindEndpoint.empty();
529 fIpcSession->isTcp = isTcp;
532 fIpcSession->endpoint = tcpBindEndpoint;
533 fIpcSession->endpointPath.clear();
535 fIpcSession->endpointPath =
"/tmp/ndmspc_ipc_" + std::to_string(getpid()) +
"_" + std::to_string(nowNs) +
".sock";
536 fIpcSession->endpoint =
"ipc://" + fIpcSession->endpointPath;
537 ::unlink(fIpcSession->endpointPath.c_str());
540 fIpcSession->ctx = zmq_ctx_new();
541 if (!fIpcSession->ctx) {
543 throw std::runtime_error(
"Failed to create ZeroMQ context.");
546 fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
547 if (!fIpcSession->router) {
548 zmq_ctx_term(fIpcSession->ctx);
550 throw std::runtime_error(
"Failed to create ZeroMQ ROUTER socket.");
553 int timeoutMs = 1000;
554 zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
556 if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
557 const std::string err = zmq_strerror(zmq_errno());
558 zmq_close(fIpcSession->router);
559 zmq_ctx_term(fIpcSession->ctx);
560 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
562 throw std::runtime_error(
"Failed to bind endpoint '" + fIpcSession->endpoint +
"': " + err);
565 fIpcSession->identityToWorker.clear();
566 fIpcSession->identityToWorker.reserve(processesToUse);
567 fIpcSession->workerIdentityVec.clear();
568 fIpcSession->pendingReadyIdentities.clear();
572 for (
size_t i = 0; i < processesToUse; ++i) {
573 fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
577 fIpcSession->jobDir = jobDir;
578 fIpcSession->treeName = treeName;
579 fIpcSession->workerObjects = &workerObjects;
580 fIpcSession->maxWorkers = processesToUse;
581 fIpcSession->macroList = macroList;
582 fIpcSession->tmpDir = tmpDir;
583 fIpcSession->tmpResultsDir = tmpResultsDir;
587 fIpcSession->childPids.assign(processesToUse, -1);
588 for (
size_t i = 0; i < processesToUse; ++i) {
591 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
592 zmq_close(fIpcSession->router);
593 zmq_ctx_term(fIpcSession->ctx);
594 ::unlink(fIpcSession->endpointPath.c_str());
596 throw std::runtime_error(
"Failed to fork worker process.");
599 zmq_close(fIpcSession->router);
600 zmq_ctx_term(fIpcSession->ctx);
601 const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
602 _exit(rc == 0 ? 0 : 1);
604 fIpcSession->childPids[i] = pid;
610 int readyTimeoutSec = isTcp ? 300 : 30;
612 if (
const char * env = gSystem->Getenv(
"NDMSPC_WORKER_TIMEOUT")) {
613 try { readyTimeoutSec = std::stoi(env); }
catch (...) {}
615 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
616 readyTimeoutSec, processesToUse);
618 const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
623 const size_t readyTarget = processesToUse;
625 while (fIpcSession->workerIdentityVec.size() < readyTarget) {
626 if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
627 const std::string identity = fIpcSession->pendingReadyIdentities.front();
628 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
633 std::vector<std::string> frames;
634 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
635 if (errno == EAGAIN || errno == EWOULDBLOCK) {
636 if (std::chrono::steady_clock::now() > readyDeadline) {
638 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
639 zmq_close(fIpcSession->router);
640 zmq_ctx_term(fIpcSession->ctx);
641 ::unlink(fIpcSession->endpointPath.c_str());
643 throw std::runtime_error(
"Timeout while waiting for IPC workers to become ready.");
645 zmq_close(fIpcSession->router);
646 zmq_ctx_term(fIpcSession->ctx);
648 throw std::runtime_error(
"Timeout: no TCP workers connected.");
652 if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
653 zmq_close(fIpcSession->router);
654 zmq_ctx_term(fIpcSession->ctx);
655 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
657 throw std::runtime_error(
"Failed to receive READY message from worker.");
659 if (frames.size() < 2)
continue;
660 const std::string & identity = frames[0];
661 const std::string & cmd = frames[1];
662 if (isTcp && cmd ==
"BOOTSTRAP") {
666 if (cmd !=
"READY")
continue;
672 if (fIpcSession->identityToWorker.count(identity)) {
673 if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
674 == fIpcSession->workerIdentityVec.end()) {
675 fIpcSession->workerIdentityVec.push_back(identity);
676 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
677 fIpcSession->workerIdentityVec.size(), processesToUse);
684 InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
688 InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
692 size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(
const std::string & definitionName,
693 const std::vector<Long64_t> * definitionIds,
694 const std::function<
void(
size_t,
size_t)> & progressCallback)
697 throw std::runtime_error(
"IPC session is not active.");
704 fIpcSession->currentDefName = definitionName;
705 fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
706 fIpcSession->hasCurrentDefIds = (definitionIds !=
nullptr);
708 if (!definitionName.empty()) {
709 for (
const auto & identity : fIpcSession->workerIdentityVec) {
710 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"SETDEF", definitionName})) {
711 throw std::runtime_error(
"Failed to send IPC SETDEF message to worker '" + identity +
"'.");
713 if (definitionIds !=
nullptr) {
714 if (!NDimensionalIpcRunner::SendFrames(
715 fIpcSession->router, {identity,
"SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
716 throw std::runtime_error(
"Failed to send IPC SETIDS message to worker '" + identity +
"'.");
722 size_t ipcBatchSize = 1;
723 if (
const char * envBatchSize = gSystem->Getenv(
"NDMSPC_IPC_BATCH_SIZE")) {
725 ipcBatchSize = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envBatchSize)));
728 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
733 size_t nextTaskId = 0;
734 size_t dispatchMessageId = 0;
735 size_t outstanding = 0;
736 size_t outstandingMessages = 0;
738 size_t nextSchedulerLogAck = 200;
739 std::string firstError;
740 std::set<size_t> pendingTasks;
742 int stallTimeoutSec = 120;
743 if (
const char * envStallTimeout = gSystem->Getenv(
"NDMSPC_IPC_STALL_TIMEOUT")) {
745 stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
748 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
752 auto lastProgress = std::chrono::steady_clock::now();
758 size_t totalTasks = 1;
762 auto isUserInterrupted = []() {
763 if (gIpcSigIntRequested != 0)
return true;
764 return (gROOT && gROOT->IsInterrupted());
767 auto sendCatchup = [&](
const std::string & identity) {
768 if (!fIpcSession->currentDefName.empty()) {
769 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"SETDEF", fIpcSession->currentDefName});
770 if (fIpcSession->hasCurrentDefIds) {
771 NDimensionalIpcRunner::SendFrames(
772 fIpcSession->router, {identity,
"SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
777 while ((hasMore || outstanding > 0) && firstError.empty()) {
778 if (isUserInterrupted()) {
779 firstError =
"Interrupted by user";
783 const size_t currentWorkers = fIpcSession->workerIdentityVec.size();
784 const size_t maxInFlightMessages = std::max<size_t>(currentWorkers * 8, 8);
786 while (hasMore && outstandingMessages < maxInFlightMessages && firstError.empty()) {
787 if (fIpcSession->workerIdentityVec.empty())
break;
788 const size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
789 const std::string identity = fIpcSession->workerIdentityVec[workerSlot];
790 std::vector<std::pair<size_t, std::string>> batchTasks;
791 const size_t nw = fIpcSession->workerIdentityVec.size();
792 const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
793 const size_t adaptiveBatchSize = std::max<size_t>(
794 1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
795 batchTasks.reserve(adaptiveBatchSize);
797 while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
798 batchTasks.emplace_back(nextTaskId, NDimensionalIpcRunner::SerializeCoords(
fCurrentCoords));
799 pendingTasks.insert(nextTaskId);
807 if (isUserInterrupted()) {
808 firstError =
"Interrupted by user";
813 if (!firstError.empty()) {
817 if (batchTasks.empty()) {
821 if (batchTasks.size() == 1) {
822 const std::string taskId = std::to_string(batchTasks[0].first);
823 const std::string coords = batchTasks[0].second;
824 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"TASK", taskId, coords})) {
825 firstError =
"Failed to send IPC TASK message to worker '" + identity +
"'.";
830 std::ostringstream payload;
831 for (
size_t i = 0; i < batchTasks.size(); ++i) {
832 if (i != 0) payload <<
';';
833 payload << batchTasks[i].first <<
':' << batchTasks[i].second;
835 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"TASKB", payload.str()})) {
836 firstError =
"Failed to send IPC TASKB message to worker '" + identity +
"'.";
841 ++outstandingMessages;
844 if (outstanding == 0 && fIpcSession->workerIdentityVec.empty())
continue;
845 if (outstanding == 0 && !hasMore)
continue;
847 std::vector<std::string> frames;
848 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
849 if (errno == EINTR || isUserInterrupted()) {
850 firstError =
"Interrupted by user";
853 if (errno != EAGAIN && errno != EWOULDBLOCK) {
854 firstError =
"Failed to receive IPC ACK/ERR from worker.";
858 for (
size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
860 pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
861 if (rc == fIpcSession->childPids[i]) {
862 firstError =
"Worker process " + std::to_string(i) +
" exited unexpectedly.";
866 if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
867 const auto now = std::chrono::steady_clock::now();
868 const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
869 if (stallSecs >= stallTimeoutSec) {
870 firstError =
"No IPC/TCP ACK progress for " + std::to_string(stallSecs) +
"s with " +
871 std::to_string(outstanding) +
" pending tasks.";
877 if (frames.size() < 2)
continue;
880 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] ==
"READY") {
882 sendCatchup(frames[0]);
883 lastProgress = std::chrono::steady_clock::now();
889 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] ==
"BOOTSTRAP") {
891 lastProgress = std::chrono::steady_clock::now();
895 if (frames.size() < 3) {
896 firstError =
"Malformed IPC message received from worker.";
900 const std::string & cmd = frames[1];
904 taskId =
static_cast<size_t>(std::stoull(frames[2]));
907 firstError =
"Malformed IPC task id received from worker.";
911 if (pendingTasks.find(taskId) == pendingTasks.end()) {
912 firstError =
"Received duplicate or unknown IPC task id " + std::to_string(taskId) +
".";
916 pendingTasks.erase(taskId);
917 if (outstanding == 0) {
918 firstError =
"IPC outstanding counter underflow while processing ACK.";
921 if (outstandingMessages == 0) {
922 firstError =
"IPC message outstanding counter underflow while processing ACK.";
926 --outstandingMessages;
928 lastProgress = std::chrono::steady_clock::now();
929 const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
930 if (progressCallback) {
931 progressCallback(acked, activeWorkersNow);
933 if (acked >= nextSchedulerLogAck) {
934 NLogDebug(
"NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
935 acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
936 nextSchedulerLogAck += 200;
942 if (frames.size() < 3 || frames[2].empty()) {
943 firstError =
"Malformed IPC ACKB payload received from worker.";
947 if (outstandingMessages == 0) {
948 firstError =
"IPC message outstanding counter underflow while processing ACKB.";
951 --outstandingMessages;
953 std::stringstream ackStream(frames[2]);
954 std::string ackToken;
955 while (std::getline(ackStream, ackToken,
',')) {
956 if (ackToken.empty())
continue;
957 size_t ackTaskId = 0;
959 ackTaskId =
static_cast<size_t>(std::stoull(ackToken));
962 firstError =
"Malformed IPC ACKB task id received from worker.";
966 if (pendingTasks.find(ackTaskId) == pendingTasks.end()) {
967 firstError =
"Received duplicate or unknown IPC task id " + std::to_string(ackTaskId) +
".";
971 pendingTasks.erase(ackTaskId);
972 if (outstanding == 0) {
973 firstError =
"IPC outstanding counter underflow while processing ACKB.";
978 lastProgress = std::chrono::steady_clock::now();
979 const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
980 if (progressCallback) {
981 progressCallback(acked, activeWorkersNow);
983 if (acked >= nextSchedulerLogAck) {
985 "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
986 acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
987 nextSchedulerLogAck += 200;
991 if (!firstError.empty()) {
1000 taskId =
static_cast<size_t>(std::stoull(frames[2]));
1003 firstError =
"Malformed IPC task id received from worker.";
1006 std::string errMsg = (frames.size() >= 4) ? frames[3] :
"worker error";
1007 firstError =
"Worker reported error for task " + std::to_string(taskId) +
": " + errMsg;
1011 firstError =
"Unknown IPC command from worker: " + cmd;
1015 if (!firstError.empty()) {
1016 throw std::runtime_error(firstError);
1019 if (!pendingTasks.empty()) {
1020 throw std::runtime_error(
"IPC execution finished with pending tasks still unacknowledged.");
1026 void NDimensionalExecutor::FinishProcessIpc(
bool abort)
1032 const std::string stopReason = abort ?
"abort" :
"ok";
1033 for (
const auto & it : fIpcSession->identityToWorker) {
1034 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first,
"STOP", stopReason});
1037 if (!fIpcSession->isTcp) {
1038 const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1039 if (!exitedCleanly) {
1040 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1042 }
else if (!abort) {
1044 const size_t nWorkers = fIpcSession->identityToWorker.size();
1045 std::set<std::string> doneWorkers;
1046 const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1047 while (doneWorkers.size() < nWorkers) {
1048 const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1049 if (remaining <= 0) {
1050 NLogWarning(
"NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1051 doneWorkers.size(), nWorkers);
1054 zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1055 const int rc = zmq_poll(&item, 1,
static_cast<long>(remaining));
1057 NLogWarning(
"NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1058 doneWorkers.size(), nWorkers);
1061 std::vector<std::string> frames;
1062 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2)
continue;
1063 if (frames[1] ==
"DONE") {
1064 doneWorkers.insert(frames[0]);
1065 NLogDebug(
"NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", frames[0].c_str(),
1066 doneWorkers.size(), nWorkers);
1067 }
else if (frames[1] ==
"READY") {
1069 NLogInfo(
"NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1070 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0],
"STOP",
"ok"});
1071 }
else if (frames[1] ==
"BOOTSTRAP") {
1079 if (fIpcSession->router) {
1081 if (fIpcSession->isTcp && abort) {
1085 zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs,
sizeof(lingerMs));
1088 if (fIpcSession->router) {
1089 zmq_close(fIpcSession->router);
1091 if (fIpcSession->ctx) {
1092 zmq_ctx_term(fIpcSession->ctx);
1094 if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1095 ::unlink(fIpcSession->endpointPath.c_str());
1100 for (
const auto & kv : fIpcSession->identityToWorker) {
1104 RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1105 fIpcSession.reset();
1108 template void NDimensionalExecutor::ExecuteParallel<NGnThreadData>(
1109 const std::function<
void(
const std::vector<int> & coords, NGnThreadData & thread_object)> & func,
1110 std::vector<NGnThreadData> & thread_objects);
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.