11#include "TApplication.h"
19pid_t spawn_worker_process(
const std::string & workerBin,
const std::string & endpoint,
bool verbose)
22 if (pid < 0)
return -1;
26 execlp(workerBin.c_str(), workerBin.c_str(),
"--endpoint", endpoint.c_str(),
"--verbose",
nullptr);
28 execlp(workerBin.c_str(), workerBin.c_str(),
"--endpoint", endpoint.c_str(),
nullptr);
36void stop_worker_processes(std::vector<pid_t> & workerPids)
38 if (workerPids.empty())
return;
40 for (pid_t pid : workerPids) {
41 if (pid > 0) kill(pid, SIGTERM);
44 const auto waitDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
45 for (pid_t pid : workerPids) {
46 if (pid <= 0)
continue;
49 while (waitpid(pid, &status, WNOHANG) == 0) {
50 if (std::chrono::steady_clock::now() >= waitDeadline) {
52 waitpid(pid, &status, 0);
55 std::this_thread::sleep_for(std::chrono::milliseconds(25));
64static std::string app_description()
67 auto buf = std::make_unique<char[]>(size);
68 size = std::snprintf(buf.get(), size,
"%s v%s-%s (run)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
69 return std::string(buf.get(), size);
72static std::string app_version()
75 auto buf = std::make_unique<char[]>(size);
76 size = std::snprintf(buf.get(), size,
"%s v%s-%s", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
77 return std::string(buf.get(), size);
80int main(
int argc,
char ** argv)
82 TApplication rootApp(
"ndmspc-run", 0,
nullptr);
83 gROOT->SetBatch(kTRUE);
85 CLI::App app{app_description()};
86 app.set_version_flag(
"--version", app_version(),
"Print version information and exit");
88 std::string macroList;
89 std::string macroParams;
91 size_t nProcesses = 0;
93 std::string workerBin;
94 std::string workerEndpoint;
96 std::string tmpResultsDir;
97 size_t spawnWorkers = 0;
100 app.add_option(
"macro", macroList,
101 "Comma-separated list of macro file(s) or URLs to execute")
103 app.add_option(
"--macro-params", macroParams,
104 "Parameter list forwarded to TMacro::Exec(params), e.g. '42,\"sample\"'");
105 app.add_option(
"--mode", mode,
106 "Execution mode: ipc/process (forked local processes), tcp (remote workers), thread")
107 ->check(CLI::IsMember({
"ipc",
"process",
"tcp",
"thread"}));
108 app.add_option(
"-n,--processes", nProcesses,
109 "Number of worker processes (NDMSPC_MAX_PROCESSES)");
110 app.add_option(
"--tcp-port", tcpPort,
111 "TCP port to bind for remote workers (NDMSPC_TCP_PORT, default: 5555)");
112 app.add_option(
"--spawn-workers", spawnWorkers,
113 "In TCP mode, spawn N local ndmspc-worker processes");
114 app.add_option(
"--worker-bin", workerBin,
115 "Worker executable to spawn (default: ndmspc-worker)");
116 app.add_option(
"--worker-endpoint", workerEndpoint,
117 "Worker endpoint for spawned workers (default: tcp://localhost:<tcp-port>)");
118 app.add_option(
"--tmp-dir", tmpDir,
119 "Local scratch directory for temporary files (NDMSPC_TMP_DIR)");
120 app.add_option(
"--results-dir", tmpResultsDir,
121 "Shared results directory where workers deposit output (NDMSPC_TMP_RESULTS_DIR)");
122 app.add_flag(
"-v,--verbose", verbose,
"Enable verbose logging");
124 CLI11_PARSE(app, argc, argv);
127 if (!gSystem->Getenv(
"NDMSPC_LOG_CONSOLE")) {
128 gSystem->Setenv(
"NDMSPC_LOG_CONSOLE",
"0");
139 auto setenvIfEmpty = [](
const char * name,
const std::string & value) {
140 if (!value.empty() && !gSystem->Getenv(name))
141 gSystem->Setenv(name, value.c_str());
144 if (mode ==
"process") mode =
"ipc";
145 if (!mode.empty() && !gSystem->Getenv(
"NDMSPC_EXECUTION_MODE"))
146 gSystem->Setenv(
"NDMSPC_EXECUTION_MODE", mode.c_str());
150 if (spawnWorkers > 0 && nProcesses == 0 && !gSystem->Getenv(
"NDMSPC_MAX_PROCESSES")) {
151 nProcesses = spawnWorkers;
154 if (nProcesses > 0 && !gSystem->Getenv(
"NDMSPC_MAX_PROCESSES"))
155 gSystem->Setenv(
"NDMSPC_MAX_PROCESSES", std::to_string(nProcesses).c_str());
156 setenvIfEmpty(
"NDMSPC_MACRO_PARAMS", macroParams);
157 setenvIfEmpty(
"NDMSPC_TCP_PORT", tcpPort);
158 setenvIfEmpty(
"NDMSPC_TMP_DIR", tmpDir);
159 setenvIfEmpty(
"NDMSPC_TMP_RESULTS_DIR", tmpResultsDir);
161 const std::string effectiveMacroParams = gSystem->Getenv(
"NDMSPC_MACRO_PARAMS") ? gSystem->Getenv(
"NDMSPC_MACRO_PARAMS") :
"";
163 if (workerBin.empty()) workerBin =
"ndmspc-worker";
165 std::vector<pid_t> spawnedWorkers;
166 auto cleanupSpawnedWorkers = [&]() { stop_worker_processes(spawnedWorkers); };
168 if (spawnWorkers > 0) {
169 const std::string effectiveMode = gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") ? gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") :
"";
170 if (effectiveMode !=
"tcp") {
171 NLogError(
"ndmspc-run: --spawn-workers is supported only with --mode tcp (effective mode: '%s')",
172 effectiveMode.c_str());
176 if (workerEndpoint.empty()) {
177 const char * effectivePort = gSystem->Getenv(
"NDMSPC_TCP_PORT");
178 workerEndpoint = std::string(
"tcp://localhost:") + (effectivePort ? effectivePort :
"5555");
181 NLogInfo(
"ndmspc-run: spawning %zu worker(s) using '%s' at %s", spawnWorkers, workerBin.c_str(),
182 workerEndpoint.c_str());
183 spawnedWorkers.reserve(spawnWorkers);
184 for (
size_t i = 0; i < spawnWorkers; ++i) {
185 const pid_t pid = spawn_worker_process(workerBin, workerEndpoint, verbose);
187 NLogError(
"ndmspc-run: failed to spawn worker %zu", i);
188 cleanupSpawnedWorkers();
191 spawnedWorkers.push_back(pid);
197 gSystem->Setenv(
"NDMSPC_MACRO", macroList.c_str());
200 for (
const auto & macro : macros) {
201 if (effectiveMacroParams.empty()) {
202 NLogInfo(
"ndmspc-run: executing macro '%s'", macro.c_str());
204 NLogInfo(
"ndmspc-run: executing macro '%s' with params '%s'", macro.c_str(), effectiveMacroParams.c_str());
208 NLogError(
"ndmspc-run: failed to open macro '%s', exiting", macro.c_str());
209 cleanupSpawnedWorkers();
212 m->Exec(effectiveMacroParams.empty() ?
nullptr : effectiveMacroParams.c_str());
216 cleanupSpawnedWorkers();
217 NLogInfo(
"ndmspc-run: done");
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
static TMacro * OpenMacro(std::string filename)
Open a macro file.
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.