ndmspc v1.2.0-0.1.rc7
Loading...
Searching...
No Matches
ndmspc-run.cxx
1#include <CLI11.hpp>
2#include <chrono>
3#include <csignal>
4#include <sys/wait.h>
5#include <string>
6#include <thread>
7#include <vector>
8#include <cstdio>
9#include <unistd.h>
10#include "TROOT.h"
11#include "TApplication.h"
12#include "TSystem.h"
13#include "NLogger.h"
14#include "NUtils.h"
15#include "ndmspc.h"
16
17namespace {
18
19pid_t spawn_worker_process(const std::string & workerBin, const std::string & endpoint, bool verbose)
20{
21 pid_t pid = fork();
22 if (pid < 0) return -1;
23
24 if (pid == 0) {
25 if (verbose) {
26 execlp(workerBin.c_str(), workerBin.c_str(), "--endpoint", endpoint.c_str(), "--verbose", nullptr);
27 } else {
28 execlp(workerBin.c_str(), workerBin.c_str(), "--endpoint", endpoint.c_str(), nullptr);
29 }
30 _exit(127);
31 }
32
33 return pid;
34}
35
36void stop_worker_processes(std::vector<pid_t> & workerPids)
37{
38 if (workerPids.empty()) return;
39
40 for (pid_t pid : workerPids) {
41 if (pid > 0) kill(pid, SIGTERM);
42 }
43
44 const auto waitDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
45 for (pid_t pid : workerPids) {
46 if (pid <= 0) continue;
47
48 int status = 0;
49 while (waitpid(pid, &status, WNOHANG) == 0) {
50 if (std::chrono::steady_clock::now() >= waitDeadline) {
51 kill(pid, SIGKILL);
52 waitpid(pid, &status, 0);
53 break;
54 }
55 std::this_thread::sleep_for(std::chrono::milliseconds(25));
56 }
57 }
58
59 workerPids.clear();
60}
61
62} // namespace
63
64static std::string app_description()
65{
66 size_t size = 128;
67 auto buf = std::make_unique<char[]>(size);
68 size = std::snprintf(buf.get(), size, "%s v%s-%s (run)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
69 return std::string(buf.get(), size);
70}
71
72static std::string app_version()
73{
74 size_t size = 128;
75 auto buf = std::make_unique<char[]>(size);
76 size = std::snprintf(buf.get(), size, "%s v%s-%s", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
77 return std::string(buf.get(), size);
78}
79
80int main(int argc, char ** argv)
81{
82 TApplication rootApp("ndmspc-run", 0, nullptr);
83 gROOT->SetBatch(kTRUE);
84
85 CLI::App app{app_description()};
86 app.set_version_flag("--version", app_version(), "Print version information and exit");
87
88 std::string macroList;
89 std::string macroParams;
90 std::string mode; // ipc | tcp | thread (maps to NDMSPC_EXECUTION_MODE)
91 size_t nProcesses = 0; // 0 = not set
92 std::string tcpPort;
93 std::string workerBin;
94 std::string workerEndpoint;
95 std::string tmpDir;
96 std::string tmpResultsDir;
97 size_t spawnWorkers = 0;
98 bool verbose = false;
99
100 app.add_option("macro", macroList,
101 "Comma-separated list of macro file(s) or URLs to execute")
102 ->required();
103 app.add_option("--macro-params", macroParams,
104 "Parameter list forwarded to TMacro::Exec(params), e.g. '42,\"sample\"'");
105 app.add_option("--mode", mode,
106 "Execution mode: ipc/process (forked local processes), tcp (remote workers), thread")
107 ->check(CLI::IsMember({"ipc", "process", "tcp", "thread"}));
108 app.add_option("-n,--processes", nProcesses,
109 "Number of worker processes (NDMSPC_MAX_PROCESSES)");
110 app.add_option("--tcp-port", tcpPort,
111 "TCP port to bind for remote workers (NDMSPC_TCP_PORT, default: 5555)");
112 app.add_option("--spawn-workers", spawnWorkers,
113 "In TCP mode, spawn N local ndmspc-worker processes");
114 app.add_option("--worker-bin", workerBin,
115 "Worker executable to spawn (default: ndmspc-worker)");
116 app.add_option("--worker-endpoint", workerEndpoint,
117 "Worker endpoint for spawned workers (default: tcp://localhost:<tcp-port>)");
118 app.add_option("--tmp-dir", tmpDir,
119 "Local scratch directory for temporary files (NDMSPC_TMP_DIR)");
120 app.add_option("--results-dir", tmpResultsDir,
121 "Shared results directory where workers deposit output (NDMSPC_TMP_RESULTS_DIR)");
122 app.add_flag("-v,--verbose", verbose, "Enable verbose logging");
123
124 CLI11_PARSE(app, argc, argv);
125
126 // Default to file-only logging unless explicitly configured by environment.
127 if (!gSystem->Getenv("NDMSPC_LOG_CONSOLE")) {
128 gSystem->Setenv("NDMSPC_LOG_CONSOLE", "0");
129 if (!verbose) {
131 }
132 }
133
134 if (verbose) {
136 }
137
138 // Set env vars from CLI args (only if not already set in the environment)
139 auto setenvIfEmpty = [](const char * name, const std::string & value) {
140 if (!value.empty() && !gSystem->Getenv(name))
141 gSystem->Setenv(name, value.c_str());
142 };
143
144 if (mode == "process") mode = "ipc";
145 if (!mode.empty() && !gSystem->Getenv("NDMSPC_EXECUTION_MODE"))
146 gSystem->Setenv("NDMSPC_EXECUTION_MODE", mode.c_str());
147
148 // When auto-spawning TCP workers, default NDMSPC_MAX_PROCESSES to that count
149 // if the caller didn't set --processes / NDMSPC_MAX_PROCESSES explicitly.
150 if (spawnWorkers > 0 && nProcesses == 0 && !gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
151 nProcesses = spawnWorkers;
152 }
153
154 if (nProcesses > 0 && !gSystem->Getenv("NDMSPC_MAX_PROCESSES"))
155 gSystem->Setenv("NDMSPC_MAX_PROCESSES", std::to_string(nProcesses).c_str());
156 setenvIfEmpty("NDMSPC_MACRO_PARAMS", macroParams);
157 setenvIfEmpty("NDMSPC_TCP_PORT", tcpPort);
158 setenvIfEmpty("NDMSPC_TMP_DIR", tmpDir);
159 setenvIfEmpty("NDMSPC_TMP_RESULTS_DIR", tmpResultsDir);
160
161 const std::string effectiveMacroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS") ? gSystem->Getenv("NDMSPC_MACRO_PARAMS") : "";
162
163 if (workerBin.empty()) workerBin = "ndmspc-worker";
164
165 std::vector<pid_t> spawnedWorkers;
166 auto cleanupSpawnedWorkers = [&]() { stop_worker_processes(spawnedWorkers); };
167
168 if (spawnWorkers > 0) {
169 const std::string effectiveMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "";
170 if (effectiveMode != "tcp") {
171 NLogError("ndmspc-run: --spawn-workers is supported only with --mode tcp (effective mode: '%s')",
172 effectiveMode.c_str());
173 return 1;
174 }
175
176 if (workerEndpoint.empty()) {
177 const char * effectivePort = gSystem->Getenv("NDMSPC_TCP_PORT");
178 workerEndpoint = std::string("tcp://localhost:") + (effectivePort ? effectivePort : "5555");
179 }
180
181 NLogInfo("ndmspc-run: spawning %zu worker(s) using '%s' at %s", spawnWorkers, workerBin.c_str(),
182 workerEndpoint.c_str());
183 spawnedWorkers.reserve(spawnWorkers);
184 for (size_t i = 0; i < spawnWorkers; ++i) {
185 const pid_t pid = spawn_worker_process(workerBin, workerEndpoint, verbose);
186 if (pid < 0) {
187 NLogError("ndmspc-run: failed to spawn worker %zu", i);
188 cleanupSpawnedWorkers();
189 return 1;
190 }
191 spawnedWorkers.push_back(pid);
192 }
193 }
194
195 // Expose the macro list so NGnTree::Process can forward it to TCP workers
196 // automatically without requiring an explicit tree.SetWorkerMacro() call.
197 gSystem->Setenv("NDMSPC_MACRO", macroList.c_str());
198
199 std::vector<std::string> macros = Ndmspc::NUtils::Tokenize(macroList, ',');
200 for (const auto & macro : macros) {
201 if (effectiveMacroParams.empty()) {
202 NLogInfo("ndmspc-run: executing macro '%s'", macro.c_str());
203 } else {
204 NLogInfo("ndmspc-run: executing macro '%s' with params '%s'", macro.c_str(), effectiveMacroParams.c_str());
205 }
206 TMacro * m = Ndmspc::NUtils::OpenMacro(macro);
207 if (!m) {
208 NLogError("ndmspc-run: failed to open macro '%s', exiting", macro.c_str());
209 cleanupSpawnedWorkers();
210 return 1;
211 }
212 m->Exec(effectiveMacroParams.empty() ? nullptr : effectiveMacroParams.c_str());
213 delete m;
214 }
215
216 cleanupSpawnedWorkers();
217 NLogInfo("ndmspc-run: done");
218
219 // // Workaround: ROOT teardown can intermittently hang at process exit in IPC
220 // // batch mode after all useful work has already completed.
221 // const std::string effectiveMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "";
222 // if (effectiveMode == "ipc") {
223 // std::fflush(nullptr);
224 // _exit(0);
225 // }
226
227 return 0;
228}
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
Definition NLogger.h:515
static TMacro * OpenMacro(std::string filename)
Open a macro file.
Definition NUtils.cxx:781
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition NUtils.cxx:1077