9#include <unordered_map>
17#include "NDimensionalExecutor.h"
18#include "NDimensionalIpcRunner.h"
19#include "NGnThreadData.h"
25volatile sig_atomic_t gIpcSigIntRequested = 0;
26volatile sig_atomic_t gIpcChildCount = 0;
27pid_t gIpcChildPids[1024] = {0};
29void IpcSigIntHandler(
int)
31 gIpcSigIntRequested = 1;
32 const sig_atomic_t count = gIpcChildCount;
33 for (sig_atomic_t i = 0; i < count; ++i) {
34 if (gIpcChildPids[i] > 0) {
36 kill(gIpcChildPids[i], SIGKILL);
41void InstallIpcSigIntHandler(
const std::vector<pid_t> & childPids,
struct sigaction & oldAction,
bool & hasOldAction)
43 gIpcSigIntRequested = 0;
45 const size_t maxChildren =
sizeof(gIpcChildPids) /
sizeof(gIpcChildPids[0]);
46 const size_t count = std::min(maxChildren, childPids.size());
47 for (
size_t i = 0; i < count; ++i) {
48 gIpcChildPids[i] = childPids[i];
50 for (
size_t i = count; i < maxChildren; ++i) {
53 gIpcChildCount =
static_cast<sig_atomic_t
>(count);
56 memset(&sa, 0,
sizeof(sa));
57 sa.sa_handler = IpcSigIntHandler;
58 sigemptyset(&sa.sa_mask);
61 if (sigaction(SIGINT, &sa, &oldAction) == 0) {
66void RestoreIpcSigIntHandler(
const struct sigaction & oldAction,
bool hasOldAction)
69 sigaction(SIGINT, &oldAction,
nullptr);
72 gIpcSigIntRequested = 0;
78 void * router{
nullptr};
80 std::string endpointPath;
82 std::vector<pid_t> childPids;
83 std::unordered_map<std::string, size_t> identityToWorker;
84 std::vector<std::string> workerIdentityVec;
88 std::vector<NThreadData *> * workerObjects{
nullptr};
90 std::string currentDefName;
91 std::vector<Long64_t> currentDefIds;
92 bool hasCurrentDefIds{
false};
94 bool hasOldSigIntAction{
false};
96 std::string macroList;
97 std::string macroParams;
99 std::string tmpResultsDir;
100 size_t bootstrapNextIdx{0};
101 std::vector<std::string> pendingReadyIdentities;
125 throw std::invalid_argument(
"Min and max bounds vectors must have the same size.");
128 throw std::invalid_argument(
"Bounds vectors cannot be empty.");
135 throw std::invalid_argument(
"Min bound (" + std::to_string(
fMinBounds[i]) +
136 ") cannot be greater than max bound (" + std::to_string(
fMaxBounds[i]) +
137 ") for dimension " + std::to_string(i));
145void NDimensionalExecutor::SetBounds(
const std::vector<int> & minBounds,
const std::vector<int> & maxBounds)
151 throw std::invalid_argument(
"Min and max bounds vectors must have the same size.");
154 throw std::invalid_argument(
"Bounds vectors cannot be empty.");
160 throw std::invalid_argument(
"Min bound (" + std::to_string(
fMinBounds[i]) +
161 ") cannot be greater than max bound (" + std::to_string(
fMaxBounds[i]) +
162 ") for dimension " + std::to_string(i));
175 if (hist ==
nullptr) {
176 throw std::invalid_argument(
"THnSparse pointer cannot be null.");
181 if (hist->GetNbins() <= 0) {
182 throw std::invalid_argument(
"THnSparse histogram is empty.");
190 for (
int i = 0; i < hist->GetNdimensions(); ++i) {
192 fMaxBounds.push_back(hist->GetAxis(i)->GetNbins());
200NDimensionalExecutor::~NDimensionalExecutor() =
default;
224template <
typename TObject>
226 const std::function<
void(
const std::vector<int> & coords, TObject & thread_object)> & func,
227 std::vector<TObject> & thread_objects)
232 size_t threads_to_use = thread_objects.size();
233 if (threads_to_use == 0) {
234 throw std::invalid_argument(
"Thread objects vector cannot be empty.");
237 std::vector<std::thread> workers;
238 std::queue<std::function<void(TObject &)>> tasks;
239 std::mutex queue_mutex;
240 std::condition_variable condition_producer;
241 std::condition_variable condition_consumer;
242 std::atomic<size_t> active_tasks = 0;
243 std::atomic<bool> stop_pool =
false;
245 std::exception_ptr first_exception =
nullptr;
246 std::mutex exception_mutex;
249 auto worker_logic = [&](TObject & my_object) {
252 std::ostringstream oss;
253 oss <<
"wk_" << std::setw(3) << std::setfill(
'0') << md->
GetAssignedIndex();
257 std::function<void(TObject &)> task_payload;
258 bool task_acquired =
false;
262 std::unique_lock<std::mutex> lock(queue_mutex);
263 condition_producer.wait(lock, [&] {
return stop_pool || !tasks.empty(); });
266 if (stop_pool && tasks.empty()) {
272 if (!tasks.empty()) {
273 task_payload = std::move(tasks.front());
275 task_acquired =
true;
285 task_payload(my_object);
291 std::lock_guard<std::mutex> lock(exception_mutex);
292 if (!first_exception) {
293 first_exception = std::current_exception();
298 std::unique_lock<std::mutex> lock(queue_mutex);
301 condition_producer.notify_all();
306 if (--active_tasks == 0 && stop_pool) {
308 condition_consumer.notify_one();
320 if (--active_tasks == 0 && stop_pool) {
321 condition_consumer.notify_one();
328 workers.reserve(threads_to_use);
329 for (
size_t i = 0; i < threads_to_use; ++i) {
330 workers.emplace_back(worker_logic, std::ref(thread_objects[i]));
340 std::unique_lock<std::mutex> lock(queue_mutex);
341 if (stop_pool)
break;
346 std::unique_lock<std::mutex> lock(queue_mutex);
348 if (stop_pool)
break;
351 tasks.emplace([func, coords_copy](TObject & obj) { func(coords_copy, obj); });
353 condition_producer.notify_one();
359 std::unique_lock<std::mutex> lock(queue_mutex);
361 if (!first_exception) {
362 first_exception = std::current_exception();
365 condition_producer.notify_all();
371 std::unique_lock<std::mutex> lock(queue_mutex);
374 condition_producer.notify_all();
378 std::unique_lock<std::mutex> lock(queue_mutex);
379 condition_consumer.wait(lock, [&] {
return stop_pool && active_tasks == 0; });
383 for (std::thread & worker : workers) {
384 if (worker.joinable()) {
390 if (first_exception) {
391 std::rethrow_exception(first_exception);
398 StartProcessIpc(workerObjects, processCount);
400 size_t acked = ExecuteCurrentBoundsProcessIpc();
414 const std::string sample = NDimensionalIpcRunner::BuildWorkerIdentity(0);
416 while (numLen < sample.size() && std::isdigit((
unsigned char)sample[sample.size() - 1 - numLen]))
418 const size_t prefixLen = sample.size() - numLen;
419 if (identity.size() <= prefixLen)
return false;
420 size_t workerIdx = 0;
422 workerIdx = std::stoul(identity.substr(prefixLen));
425 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: cannot parse index from identity '%s'", identity.c_str());
428 if (workerIdx >= fIpcSession->maxWorkers) {
429 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker index %zu >= maxWorkers %zu, ignoring",
430 workerIdx, fIpcSession->maxWorkers);
433 if (fIpcSession->identityToWorker.count(identity)) {
434 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: worker '%s' already registered, ignoring duplicate READY",
439 const std::string sessionId = std::to_string(getpid());
440 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router,
441 {identity,
"INIT", std::to_string(workerIdx), sessionId,
442 fIpcSession->jobDir, fIpcSession->treeName,
443 fIpcSession->tmpDir, fIpcSession->tmpResultsDir})) {
444 NLogError(
"NDimensionalExecutor::InitTcpWorker: failed to send INIT to '%s'", identity.c_str());
448 int initTimeoutSec = 30;
449 if (
const char * env = gSystem->Getenv(
"NDMSPC_WORKER_TIMEOUT")) {
451 initTimeoutSec = std::max(1, std::stoi(env));
454 NLogWarning(
"NDimensionalExecutor::InitTcpWorker: Invalid NDMSPC_WORKER_TIMEOUT='%s', using default=%d", env,
458 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(initTimeoutSec);
461 std::vector<std::string> ackFrames;
462 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, ackFrames)) {
463 if (errno == EAGAIN || errno == EWOULDBLOCK) {
464 if (std::chrono::steady_clock::now() > initDeadline)
break;
469 if (ackFrames.size() >= 2 && ackFrames[1] ==
"BOOTSTRAP") {
473 if (ackFrames.size() >= 2 && ackFrames[1] ==
"READY" && ackFrames[0] != identity) {
474 if (!fIpcSession->identityToWorker.count(ackFrames[0]) &&
475 std::find(fIpcSession->pendingReadyIdentities.begin(), fIpcSession->pendingReadyIdentities.end(),
476 ackFrames[0]) == fIpcSession->pendingReadyIdentities.end()) {
477 fIpcSession->pendingReadyIdentities.push_back(ackFrames[0]);
481 if (ackFrames.size() >= 2 && ackFrames[0] == identity && ackFrames[1] ==
"ACK") {
486 NLogError(
"NDimensionalExecutor::InitTcpWorker: worker '%s' did not ACK INIT", identity.c_str());
490 fIpcSession->identityToWorker[identity] = workerIdx;
491 fIpcSession->workerIdentityVec.push_back(identity);
492 NLogInfo(
"NDimensionalExecutor::InitTcpWorker: worker '%s' (idx=%zu) joined [total: %zu]",
493 identity.c_str(), workerIdx, fIpcSession->workerIdentityVec.size());
499 if (!fIpcSession || !fIpcSession->isTcp)
return false;
500 const size_t assignedIdx = fIpcSession->bootstrapNextIdx++;
501 NLogDebug(
"NDimensionalExecutor::HandleBootstrap: assigning index %zu to worker '%s'", assignedIdx,
503 return NDimensionalIpcRunner::SendFrames(fIpcSession->router,
504 {identity,
"CONFIG", std::to_string(assignedIdx),
505 fIpcSession->macroList, fIpcSession->tmpDir,
506 fIpcSession->tmpResultsDir,
507 fIpcSession->macroParams});
510void NDimensionalExecutor::StartProcessIpc(std::vector<NThreadData *> & workerObjects,
size_t processCount,
511 const std::string & tcpBindEndpoint,
const std::string & jobDir,
512 const std::string & treeName,
const std::string & macroList,
513 const std::string & tmpDir,
const std::string & tmpResultsDir,
514 const std::string & macroParams)
516 if (workerObjects.empty()) {
517 throw std::invalid_argument(
"Worker objects vector cannot be empty.");
520 throw std::runtime_error(
"IPC session is already active.");
523 const size_t processesToUse = std::max<size_t>(1, std::min(processCount, workerObjects.size()));
524 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: requested=%zu, workerObjects=%zu, spawning=%zu", processCount,
525 workerObjects.size(), processesToUse);
526 const auto nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
527 std::chrono::high_resolution_clock::now().time_since_epoch())
529 fIpcSession = std::make_unique<IpcSession>();
531 const bool isTcp = !tcpBindEndpoint.empty();
532 fIpcSession->isTcp = isTcp;
535 fIpcSession->endpoint = tcpBindEndpoint;
536 fIpcSession->endpointPath.clear();
538 fIpcSession->endpointPath =
"/tmp/ndmspc_ipc_" + std::to_string(getpid()) +
"_" + std::to_string(nowNs) +
".sock";
539 fIpcSession->endpoint =
"ipc://" + fIpcSession->endpointPath;
540 ::unlink(fIpcSession->endpointPath.c_str());
543 fIpcSession->ctx = zmq_ctx_new();
544 if (!fIpcSession->ctx) {
546 throw std::runtime_error(
"Failed to create ZeroMQ context.");
549 fIpcSession->router = zmq_socket(fIpcSession->ctx, ZMQ_ROUTER);
550 if (!fIpcSession->router) {
551 zmq_ctx_term(fIpcSession->ctx);
553 throw std::runtime_error(
"Failed to create ZeroMQ ROUTER socket.");
556 int timeoutMs = 1000;
557 zmq_setsockopt(fIpcSession->router, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
559 if (zmq_bind(fIpcSession->router, fIpcSession->endpoint.c_str()) != 0) {
560 const std::string err = zmq_strerror(zmq_errno());
561 zmq_close(fIpcSession->router);
562 zmq_ctx_term(fIpcSession->ctx);
563 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
565 throw std::runtime_error(
"Failed to bind endpoint '" + fIpcSession->endpoint +
"': " + err);
568 fIpcSession->identityToWorker.clear();
569 fIpcSession->identityToWorker.reserve(processesToUse);
570 fIpcSession->workerIdentityVec.clear();
571 fIpcSession->pendingReadyIdentities.clear();
575 for (
size_t i = 0; i < processesToUse; ++i) {
576 fIpcSession->identityToWorker[NDimensionalIpcRunner::BuildWorkerIdentity(i)] = i;
580 fIpcSession->jobDir = jobDir;
581 fIpcSession->treeName = treeName;
582 fIpcSession->workerObjects = &workerObjects;
583 fIpcSession->maxWorkers = processesToUse;
584 fIpcSession->macroList = macroList;
585 fIpcSession->tmpDir = tmpDir;
586 fIpcSession->tmpResultsDir = tmpResultsDir;
587 fIpcSession->macroParams = macroParams;
591 fIpcSession->childPids.assign(processesToUse, -1);
592 for (
size_t i = 0; i < processesToUse; ++i) {
595 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
596 zmq_close(fIpcSession->router);
597 zmq_ctx_term(fIpcSession->ctx);
598 ::unlink(fIpcSession->endpointPath.c_str());
600 throw std::runtime_error(
"Failed to fork worker process.");
603 zmq_close(fIpcSession->router);
604 zmq_ctx_term(fIpcSession->ctx);
605 const int rc = NDimensionalIpcRunner::WorkerLoop(fIpcSession->endpoint, i, workerObjects[i]);
606 _exit(rc == 0 ? 0 : 1);
608 fIpcSession->childPids[i] = pid;
614 int readyTimeoutSec = isTcp ? 300 : 30;
616 if (
const char * env = gSystem->Getenv(
"NDMSPC_WORKER_TIMEOUT")) {
617 try { readyTimeoutSec = std::stoi(env); }
catch (...) {}
619 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: waiting up to %d s for TCP workers (max %zu) ...",
620 readyTimeoutSec, processesToUse);
622 const auto readyDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(readyTimeoutSec);
627 const size_t readyTarget = processesToUse;
629 while (fIpcSession->workerIdentityVec.size() < readyTarget) {
630 if (isTcp && !fIpcSession->pendingReadyIdentities.empty()) {
631 const std::string identity = fIpcSession->pendingReadyIdentities.front();
632 fIpcSession->pendingReadyIdentities.erase(fIpcSession->pendingReadyIdentities.begin());
637 std::vector<std::string> frames;
638 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
639 if (errno == EAGAIN || errno == EWOULDBLOCK) {
640 if (std::chrono::steady_clock::now() > readyDeadline) {
642 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
643 zmq_close(fIpcSession->router);
644 zmq_ctx_term(fIpcSession->ctx);
645 ::unlink(fIpcSession->endpointPath.c_str());
647 throw std::runtime_error(
"Timeout while waiting for IPC workers to become ready.");
649 zmq_close(fIpcSession->router);
650 zmq_ctx_term(fIpcSession->ctx);
652 throw std::runtime_error(
"Timeout: no TCP workers connected.");
656 if (!isTcp) NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
657 zmq_close(fIpcSession->router);
658 zmq_ctx_term(fIpcSession->ctx);
659 if (!isTcp) ::unlink(fIpcSession->endpointPath.c_str());
661 throw std::runtime_error(
"Failed to receive READY message from worker.");
663 if (frames.size() < 2)
continue;
664 const std::string & identity = frames[0];
665 const std::string & cmd = frames[1];
666 if (isTcp && cmd ==
"BOOTSTRAP") {
670 if (cmd !=
"READY")
continue;
676 if (fIpcSession->identityToWorker.count(identity)) {
677 if (std::find(fIpcSession->workerIdentityVec.begin(), fIpcSession->workerIdentityVec.end(), identity)
678 == fIpcSession->workerIdentityVec.end()) {
679 fIpcSession->workerIdentityVec.push_back(identity);
680 NLogInfo(
"NDimensionalExecutor::StartProcessIpc: worker '%s' ready (%zu / %zu)", identity.c_str(),
681 fIpcSession->workerIdentityVec.size(), processesToUse);
688 InstallIpcSigIntHandler(fIpcSession->childPids, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
692 InstallIpcSigIntHandler({}, fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
696size_t NDimensionalExecutor::ExecuteCurrentBoundsProcessIpc(
const std::string & definitionName,
697 const std::vector<Long64_t> * definitionIds,
698 const std::function<
void(
size_t,
size_t)> & progressCallback)
701 throw std::runtime_error(
"IPC session is not active.");
708 fIpcSession->currentDefName = definitionName;
709 fIpcSession->currentDefIds = definitionIds ? *definitionIds : std::vector<Long64_t>{};
710 fIpcSession->hasCurrentDefIds = (definitionIds !=
nullptr);
712 if (!definitionName.empty()) {
713 for (
const auto & identity : fIpcSession->workerIdentityVec) {
714 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"SETDEF", definitionName})) {
715 throw std::runtime_error(
"Failed to send IPC SETDEF message to worker '" + identity +
"'.");
717 if (definitionIds !=
nullptr) {
718 if (!NDimensionalIpcRunner::SendFrames(
719 fIpcSession->router, {identity,
"SETIDS", NDimensionalIpcRunner::SerializeIds(*definitionIds)})) {
720 throw std::runtime_error(
"Failed to send IPC SETIDS message to worker '" + identity +
"'.");
726 size_t ipcBatchSize = 1;
727 if (
const char * envBatchSize = gSystem->Getenv(
"NDMSPC_IPC_BATCH_SIZE")) {
729 ipcBatchSize = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envBatchSize)));
732 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_IPC_BATCH_SIZE='%s', using default=%zu", envBatchSize,
737 size_t nextTaskId = 0;
738 size_t dispatchMessageId = 0;
739 size_t outstanding = 0;
740 size_t outstandingMessages = 0;
742 size_t nextSchedulerLogAck = 200;
743 std::string firstError;
744 std::set<size_t> pendingTasks;
746 int stallTimeoutSec = 120;
747 if (
const char * envStallTimeout = gSystem->Getenv(
"NDMSPC_IPC_STALL_TIMEOUT")) {
749 stallTimeoutSec = std::max(5, std::stoi(envStallTimeout));
752 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_IPC_STALL_TIMEOUT='%s', using default=%d", envStallTimeout,
756 auto lastProgress = std::chrono::steady_clock::now();
762 size_t totalTasks = 1;
766 auto isUserInterrupted = []() {
767 if (gIpcSigIntRequested != 0)
return true;
768 return (gROOT && gROOT->IsInterrupted());
771 auto sendCatchup = [&](
const std::string & identity) {
772 if (!fIpcSession->currentDefName.empty()) {
773 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"SETDEF", fIpcSession->currentDefName});
774 if (fIpcSession->hasCurrentDefIds) {
775 NDimensionalIpcRunner::SendFrames(
776 fIpcSession->router, {identity,
"SETIDS", NDimensionalIpcRunner::SerializeIds(fIpcSession->currentDefIds)});
781 while ((hasMore || outstanding > 0) && firstError.empty()) {
782 if (isUserInterrupted()) {
783 firstError =
"Interrupted by user";
787 const size_t currentWorkers = fIpcSession->workerIdentityVec.size();
788 const size_t maxInFlightMessages = std::max<size_t>(currentWorkers * 8, 8);
790 while (hasMore && outstandingMessages < maxInFlightMessages && firstError.empty()) {
791 if (fIpcSession->workerIdentityVec.empty())
break;
792 const size_t workerSlot = dispatchMessageId % fIpcSession->workerIdentityVec.size();
793 const std::string identity = fIpcSession->workerIdentityVec[workerSlot];
794 std::vector<std::pair<size_t, std::string>> batchTasks;
795 const size_t nw = fIpcSession->workerIdentityVec.size();
796 const size_t remainingTasks = (nextTaskId < totalTasks) ? (totalTasks - nextTaskId) : 0;
797 const size_t adaptiveBatchSize = std::max<size_t>(
798 1, std::min(ipcBatchSize, std::max<size_t>(1, (remainingTasks + nw - 1) / nw)));
799 batchTasks.reserve(adaptiveBatchSize);
801 while (hasMore && outstanding < maxInFlightMessages * ipcBatchSize && batchTasks.size() < adaptiveBatchSize) {
802 batchTasks.emplace_back(nextTaskId, NDimensionalIpcRunner::SerializeCoords(
fCurrentCoords));
803 pendingTasks.insert(nextTaskId);
811 if (isUserInterrupted()) {
812 firstError =
"Interrupted by user";
817 if (!firstError.empty()) {
821 if (batchTasks.empty()) {
825 if (batchTasks.size() == 1) {
826 const std::string taskId = std::to_string(batchTasks[0].first);
827 const std::string coords = batchTasks[0].second;
828 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"TASK", taskId, coords})) {
829 firstError =
"Failed to send IPC TASK message to worker '" + identity +
"'.";
834 std::ostringstream payload;
835 for (
size_t i = 0; i < batchTasks.size(); ++i) {
836 if (i != 0) payload <<
';';
837 payload << batchTasks[i].first <<
':' << batchTasks[i].second;
839 if (!NDimensionalIpcRunner::SendFrames(fIpcSession->router, {identity,
"TASKB", payload.str()})) {
840 firstError =
"Failed to send IPC TASKB message to worker '" + identity +
"'.";
845 ++outstandingMessages;
848 if (outstanding == 0 && fIpcSession->workerIdentityVec.empty())
continue;
849 if (outstanding == 0 && !hasMore)
continue;
851 std::vector<std::string> frames;
852 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames)) {
853 if (errno == EINTR || isUserInterrupted()) {
854 firstError =
"Interrupted by user";
857 if (errno != EAGAIN && errno != EWOULDBLOCK) {
858 firstError =
"Failed to receive IPC ACK/ERR from worker.";
862 for (
size_t i = 0; i < fIpcSession->childPids.size(); ++i) {
864 pid_t rc = waitpid(fIpcSession->childPids[i], &status, WNOHANG);
865 if (rc == fIpcSession->childPids[i]) {
866 firstError =
"Worker process " + std::to_string(i) +
" exited unexpectedly.";
870 if (firstError.empty() && outstanding > 0 && stallTimeoutSec > 0) {
871 const auto now = std::chrono::steady_clock::now();
872 const auto stallSecs = std::chrono::duration_cast<std::chrono::seconds>(now - lastProgress).count();
873 if (stallSecs >= stallTimeoutSec) {
874 firstError =
"No IPC/TCP ACK progress for " + std::to_string(stallSecs) +
"s with " +
875 std::to_string(outstanding) +
" pending tasks.";
881 if (frames.size() < 2)
continue;
884 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] ==
"READY") {
886 sendCatchup(frames[0]);
887 lastProgress = std::chrono::steady_clock::now();
893 if (fIpcSession->isTcp && frames.size() >= 2 && frames[1] ==
"BOOTSTRAP") {
895 lastProgress = std::chrono::steady_clock::now();
899 if (frames.size() < 3) {
900 firstError =
"Malformed IPC message received from worker.";
904 const std::string & cmd = frames[1];
908 taskId =
static_cast<size_t>(std::stoull(frames[2]));
911 firstError =
"Malformed IPC task id received from worker.";
915 if (pendingTasks.find(taskId) == pendingTasks.end()) {
916 firstError =
"Received duplicate or unknown IPC task id " + std::to_string(taskId) +
".";
920 pendingTasks.erase(taskId);
921 if (outstanding == 0) {
922 firstError =
"IPC outstanding counter underflow while processing ACK.";
925 if (outstandingMessages == 0) {
926 firstError =
"IPC message outstanding counter underflow while processing ACK.";
930 --outstandingMessages;
932 lastProgress = std::chrono::steady_clock::now();
933 const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
934 if (progressCallback) {
935 progressCallback(acked, activeWorkersNow);
937 if (acked >= nextSchedulerLogAck) {
938 NLogDebug(
"NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
939 acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
940 nextSchedulerLogAck += 200;
946 if (frames.size() < 3 || frames[2].empty()) {
947 firstError =
"Malformed IPC ACKB payload received from worker.";
951 if (outstandingMessages == 0) {
952 firstError =
"IPC message outstanding counter underflow while processing ACKB.";
955 --outstandingMessages;
957 std::stringstream ackStream(frames[2]);
958 std::string ackToken;
959 while (std::getline(ackStream, ackToken,
',')) {
960 if (ackToken.empty())
continue;
961 size_t ackTaskId = 0;
963 ackTaskId =
static_cast<size_t>(std::stoull(ackToken));
966 firstError =
"Malformed IPC ACKB task id received from worker.";
970 if (pendingTasks.find(ackTaskId) == pendingTasks.end()) {
971 firstError =
"Received duplicate or unknown IPC task id " + std::to_string(ackTaskId) +
".";
975 pendingTasks.erase(ackTaskId);
976 if (outstanding == 0) {
977 firstError =
"IPC outstanding counter underflow while processing ACKB.";
982 lastProgress = std::chrono::steady_clock::now();
983 const size_t activeWorkersNow = std::min(fIpcSession->workerIdentityVec.size(), outstandingMessages);
984 if (progressCallback) {
985 progressCallback(acked, activeWorkersNow);
987 if (acked >= nextSchedulerLogAck) {
989 "NDimensionalExecutor::IPC: acked=%zu/%zu activeWorkers=%zu inFlightMessages=%zu pendingTasks=%zu",
990 acked, totalTasks, activeWorkersNow, outstandingMessages, pendingTasks.size());
991 nextSchedulerLogAck += 200;
995 if (!firstError.empty()) {
1004 taskId =
static_cast<size_t>(std::stoull(frames[2]));
1007 firstError =
"Malformed IPC task id received from worker.";
1010 std::string errMsg = (frames.size() >= 4) ? frames[3] :
"worker error";
1011 firstError =
"Worker reported error for task " + std::to_string(taskId) +
": " + errMsg;
1015 firstError =
"Unknown IPC command from worker: " + cmd;
1019 if (!firstError.empty()) {
1020 throw std::runtime_error(firstError);
1023 if (!pendingTasks.empty()) {
1024 throw std::runtime_error(
"IPC execution finished with pending tasks still unacknowledged.");
1030void NDimensionalExecutor::FinishProcessIpc(
bool abort)
1036 const std::string stopReason = abort ?
"abort" :
"ok";
1037 for (
const auto & it : fIpcSession->identityToWorker) {
1038 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {it.first,
"STOP", stopReason});
1041 if (!fIpcSession->isTcp) {
1042 const bool exitedCleanly = NDimensionalIpcRunner::WaitForChildProcesses(fIpcSession->childPids, 1500);
1043 if (!exitedCleanly) {
1044 NDimensionalIpcRunner::CleanupChildProcesses(fIpcSession->childPids);
1046 }
else if (!abort) {
1048 const size_t nWorkers = fIpcSession->identityToWorker.size();
1049 std::set<std::string> doneWorkers;
1050 const auto doneDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
1051 while (doneWorkers.size() < nWorkers) {
1052 const auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(doneDeadline - std::chrono::steady_clock::now()).count();
1053 if (remaining <= 0) {
1054 NLogWarning(
"NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1055 doneWorkers.size(), nWorkers);
1058 zmq_pollitem_t item = {fIpcSession->router, 0, ZMQ_POLLIN, 0};
1059 const int rc = zmq_poll(&item, 1,
static_cast<long>(remaining));
1061 NLogWarning(
"NDimensionalExecutor::FinishProcessIpc: Timeout waiting for DONE from TCP workers (%zu/%zu received)",
1062 doneWorkers.size(), nWorkers);
1065 std::vector<std::string> frames;
1066 if (!NDimensionalIpcRunner::ReceiveFrames(fIpcSession->router, frames) || frames.size() < 2)
continue;
1067 if (frames[1] ==
"DONE") {
1068 doneWorkers.insert(frames[0]);
1069 NLogDebug(
"NDimensionalExecutor::FinishProcessIpc: Worker '%s' sent DONE (%zu/%zu)", frames[0].c_str(),
1070 doneWorkers.size(), nWorkers);
1071 }
else if (frames[1] ==
"READY") {
1073 NLogInfo(
"NDimensionalExecutor::FinishProcessIpc: Late worker '%s' arrived, sending STOP", frames[0].c_str());
1074 NDimensionalIpcRunner::SendFrames(fIpcSession->router, {frames[0],
"STOP",
"ok"});
1075 }
else if (frames[1] ==
"BOOTSTRAP") {
1083 if (fIpcSession->router) {
1085 if (fIpcSession->isTcp && abort) {
1089 zmq_setsockopt(fIpcSession->router, ZMQ_LINGER, &lingerMs,
sizeof(lingerMs));
1092 if (fIpcSession->router) {
1093 zmq_close(fIpcSession->router);
1095 if (fIpcSession->ctx) {
1096 zmq_ctx_term(fIpcSession->ctx);
1098 if (!fIpcSession->isTcp && !fIpcSession->endpointPath.empty()) {
1099 ::unlink(fIpcSession->endpointPath.c_str());
1104 for (
const auto & kv : fIpcSession->identityToWorker) {
1108 RestoreIpcSigIntHandler(fIpcSession->oldSigIntAction, fIpcSession->hasOldSigIntAction);
1109 fIpcSession.reset();
1113 const std::function<
void(
const std::vector<int> & coords,
NGnThreadData & thread_object)> & func,
1114 std::vector<NGnThreadData> & thread_objects);
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Thread-local data object for NDMSPC processing.
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Global callback function for libwebsockets client events.