11 #include "TApplication.h"
19 pid_t spawn_worker_process(
const std::string & workerBin,
const std::string & endpoint,
bool verbose)
22 if (pid < 0)
return -1;
26 execlp(workerBin.c_str(), workerBin.c_str(),
"--endpoint", endpoint.c_str(),
"--verbose",
nullptr);
28 execlp(workerBin.c_str(), workerBin.c_str(),
"--endpoint", endpoint.c_str(),
nullptr);
36 void stop_worker_processes(std::vector<pid_t> & workerPids)
38 if (workerPids.empty())
return;
40 for (pid_t pid : workerPids) {
41 if (pid > 0) kill(pid, SIGTERM);
44 const auto waitDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
45 for (pid_t pid : workerPids) {
46 if (pid <= 0)
continue;
49 while (waitpid(pid, &status, WNOHANG) == 0) {
50 if (std::chrono::steady_clock::now() >= waitDeadline) {
52 waitpid(pid, &status, 0);
55 std::this_thread::sleep_for(std::chrono::milliseconds(25));
64 static std::string app_description()
67 auto buf = std::make_unique<char[]>(size);
68 size = std::snprintf(buf.get(), size,
"%s v%s-%s (run)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
69 return std::string(buf.get(), size);
72 int main(
int argc,
char ** argv)
74 TApplication rootApp(
"ndmspc-run", 0,
nullptr);
75 gROOT->SetBatch(kTRUE);
77 CLI::App app{app_description()};
79 std::string macroList;
81 size_t nProcesses = 0;
83 std::string workerBin;
84 std::string workerEndpoint;
86 std::string tmpResultsDir;
87 size_t spawnWorkers = 0;
90 app.add_option(
"macro", macroList,
91 "Comma-separated list of macro file(s) or URLs to execute")
93 app.add_option(
"--mode", mode,
94 "Execution mode: ipc/process (forked local processes), tcp (remote workers), thread")
95 ->check(CLI::IsMember({
"ipc",
"process",
"tcp",
"thread"}));
96 app.add_option(
"-n,--processes", nProcesses,
97 "Number of worker processes (NDMSPC_MAX_PROCESSES)");
98 app.add_option(
"--tcp-port", tcpPort,
99 "TCP port to bind for remote workers (NDMSPC_TCP_PORT, default: 5555)");
100 app.add_option(
"--spawn-workers", spawnWorkers,
101 "In TCP mode, spawn N local ndmspc-worker processes");
102 app.add_option(
"--worker-bin", workerBin,
103 "Worker executable to spawn (default: ndmspc-worker)");
104 app.add_option(
"--worker-endpoint", workerEndpoint,
105 "Worker endpoint for spawned workers (default: tcp://localhost:<tcp-port>)");
106 app.add_option(
"--tmp-dir", tmpDir,
107 "Local scratch directory for temporary files (NDMSPC_TMP_DIR)");
108 app.add_option(
"--results-dir", tmpResultsDir,
109 "Shared results directory where workers deposit output (NDMSPC_TMP_RESULTS_DIR)");
110 app.add_flag(
"-v,--verbose", verbose,
"Enable verbose logging");
112 CLI11_PARSE(app, argc, argv);
115 if (!gSystem->Getenv(
"NDMSPC_LOG_CONSOLE")) {
116 gSystem->Setenv(
"NDMSPC_LOG_CONSOLE",
"0");
127 auto setenvIfEmpty = [](
const char * name,
const std::string & value) {
128 if (!value.empty() && !gSystem->Getenv(name))
129 gSystem->Setenv(name, value.c_str());
132 if (mode ==
"process") mode =
"ipc";
133 if (!mode.empty() && !gSystem->Getenv(
"NDMSPC_EXECUTION_MODE"))
134 gSystem->Setenv(
"NDMSPC_EXECUTION_MODE", mode.c_str());
138 if (spawnWorkers > 0 && nProcesses == 0 && !gSystem->Getenv(
"NDMSPC_MAX_PROCESSES")) {
139 nProcesses = spawnWorkers;
142 if (nProcesses > 0 && !gSystem->Getenv(
"NDMSPC_MAX_PROCESSES"))
143 gSystem->Setenv(
"NDMSPC_MAX_PROCESSES", std::to_string(nProcesses).c_str());
144 setenvIfEmpty(
"NDMSPC_TCP_PORT", tcpPort);
145 setenvIfEmpty(
"NDMSPC_TMP_DIR", tmpDir);
146 setenvIfEmpty(
"NDMSPC_TMP_RESULTS_DIR", tmpResultsDir);
148 if (workerBin.empty()) workerBin =
"ndmspc-worker";
150 std::vector<pid_t> spawnedWorkers;
151 auto cleanupSpawnedWorkers = [&]() { stop_worker_processes(spawnedWorkers); };
153 if (spawnWorkers > 0) {
154 const std::string effectiveMode = gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") ? gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") :
"";
155 if (effectiveMode !=
"tcp") {
156 NLogError(
"ndmspc-run: --spawn-workers is supported only with --mode tcp (effective mode: '%s')",
157 effectiveMode.c_str());
161 if (workerEndpoint.empty()) {
162 const char * effectivePort = gSystem->Getenv(
"NDMSPC_TCP_PORT");
163 workerEndpoint = std::string(
"tcp://localhost:") + (effectivePort ? effectivePort :
"5555");
166 NLogInfo(
"ndmspc-run: spawning %zu worker(s) using '%s' at %s", spawnWorkers, workerBin.c_str(),
167 workerEndpoint.c_str());
168 spawnedWorkers.reserve(spawnWorkers);
169 for (
size_t i = 0; i < spawnWorkers; ++i) {
170 const pid_t pid = spawn_worker_process(workerBin, workerEndpoint, verbose);
172 NLogError(
"ndmspc-run: failed to spawn worker %zu", i);
173 cleanupSpawnedWorkers();
176 spawnedWorkers.push_back(pid);
182 gSystem->Setenv(
"NDMSPC_MACRO", macroList.c_str());
185 for (
const auto & macro : macros) {
186 NLogInfo(
"ndmspc-run: executing macro '%s'", macro.c_str());
189 NLogError(
"ndmspc-run: failed to open macro '%s', exiting", macro.c_str());
190 cleanupSpawnedWorkers();
197 cleanupSpawnedWorkers();
198 NLogInfo(
"ndmspc-run: done");
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
static TMacro * OpenMacro(std::string filename)
Open a macro file.
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.