ndmspc  v1.2.0-0.1.rc3
ndmspc-run.cxx
1 #include <CLI11.hpp>
2 #include <chrono>
3 #include <csignal>
4 #include <sys/wait.h>
5 #include <string>
6 #include <thread>
7 #include <vector>
8 #include <cstdio>
9 #include <unistd.h>
10 #include "TROOT.h"
11 #include "TApplication.h"
12 #include "TSystem.h"
13 #include "NLogger.h"
14 #include "NUtils.h"
15 #include "ndmspc.h"
16 
17 namespace {
18 
19 pid_t spawn_worker_process(const std::string & workerBin, const std::string & endpoint, bool verbose)
20 {
21  pid_t pid = fork();
22  if (pid < 0) return -1;
23 
24  if (pid == 0) {
25  if (verbose) {
26  execlp(workerBin.c_str(), workerBin.c_str(), "--endpoint", endpoint.c_str(), "--verbose", nullptr);
27  } else {
28  execlp(workerBin.c_str(), workerBin.c_str(), "--endpoint", endpoint.c_str(), nullptr);
29  }
30  _exit(127);
31  }
32 
33  return pid;
34 }
35 
36 void stop_worker_processes(std::vector<pid_t> & workerPids)
37 {
38  if (workerPids.empty()) return;
39 
40  for (pid_t pid : workerPids) {
41  if (pid > 0) kill(pid, SIGTERM);
42  }
43 
44  const auto waitDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
45  for (pid_t pid : workerPids) {
46  if (pid <= 0) continue;
47 
48  int status = 0;
49  while (waitpid(pid, &status, WNOHANG) == 0) {
50  if (std::chrono::steady_clock::now() >= waitDeadline) {
51  kill(pid, SIGKILL);
52  waitpid(pid, &status, 0);
53  break;
54  }
55  std::this_thread::sleep_for(std::chrono::milliseconds(25));
56  }
57  }
58 
59  workerPids.clear();
60 }
61 
62 } // namespace
63 
64 static std::string app_description()
65 {
66  size_t size = 128;
67  auto buf = std::make_unique<char[]>(size);
68  size = std::snprintf(buf.get(), size, "%s v%s-%s (run)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
69  return std::string(buf.get(), size);
70 }
71 
72 int main(int argc, char ** argv)
73 {
74  TApplication rootApp("ndmspc-run", 0, nullptr);
75  gROOT->SetBatch(kTRUE);
76 
77  CLI::App app{app_description()};
78 
79  std::string macroList;
80  std::string mode; // ipc | tcp | thread (maps to NDMSPC_EXECUTION_MODE)
81  size_t nProcesses = 0; // 0 = not set
82  std::string tcpPort;
83  std::string workerBin;
84  std::string workerEndpoint;
85  std::string tmpDir;
86  std::string tmpResultsDir;
87  size_t spawnWorkers = 0;
88  bool verbose = false;
89 
90  app.add_option("macro", macroList,
91  "Comma-separated list of macro file(s) or URLs to execute")
92  ->required();
93  app.add_option("--mode", mode,
94  "Execution mode: ipc/process (forked local processes), tcp (remote workers), thread")
95  ->check(CLI::IsMember({"ipc", "process", "tcp", "thread"}));
96  app.add_option("-n,--processes", nProcesses,
97  "Number of worker processes (NDMSPC_MAX_PROCESSES)");
98  app.add_option("--tcp-port", tcpPort,
99  "TCP port to bind for remote workers (NDMSPC_TCP_PORT, default: 5555)");
100  app.add_option("--spawn-workers", spawnWorkers,
101  "In TCP mode, spawn N local ndmspc-worker processes");
102  app.add_option("--worker-bin", workerBin,
103  "Worker executable to spawn (default: ndmspc-worker)");
104  app.add_option("--worker-endpoint", workerEndpoint,
105  "Worker endpoint for spawned workers (default: tcp://localhost:<tcp-port>)");
106  app.add_option("--tmp-dir", tmpDir,
107  "Local scratch directory for temporary files (NDMSPC_TMP_DIR)");
108  app.add_option("--results-dir", tmpResultsDir,
109  "Shared results directory where workers deposit output (NDMSPC_TMP_RESULTS_DIR)");
110  app.add_flag("-v,--verbose", verbose, "Enable verbose logging");
111 
112  CLI11_PARSE(app, argc, argv);
113 
114  // Default to file-only logging unless explicitly configured by environment.
115  if (!gSystem->Getenv("NDMSPC_LOG_CONSOLE")) {
116  gSystem->Setenv("NDMSPC_LOG_CONSOLE", "0");
117  if (!verbose) {
119  }
120  }
121 
122  if (verbose) {
124  }
125 
126  // Set env vars from CLI args (only if not already set in the environment)
127  auto setenvIfEmpty = [](const char * name, const std::string & value) {
128  if (!value.empty() && !gSystem->Getenv(name))
129  gSystem->Setenv(name, value.c_str());
130  };
131 
132  if (mode == "process") mode = "ipc";
133  if (!mode.empty() && !gSystem->Getenv("NDMSPC_EXECUTION_MODE"))
134  gSystem->Setenv("NDMSPC_EXECUTION_MODE", mode.c_str());
135 
136  // When auto-spawning TCP workers, default NDMSPC_MAX_PROCESSES to that count
137  // if the caller didn't set --processes / NDMSPC_MAX_PROCESSES explicitly.
138  if (spawnWorkers > 0 && nProcesses == 0 && !gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
139  nProcesses = spawnWorkers;
140  }
141 
142  if (nProcesses > 0 && !gSystem->Getenv("NDMSPC_MAX_PROCESSES"))
143  gSystem->Setenv("NDMSPC_MAX_PROCESSES", std::to_string(nProcesses).c_str());
144  setenvIfEmpty("NDMSPC_TCP_PORT", tcpPort);
145  setenvIfEmpty("NDMSPC_TMP_DIR", tmpDir);
146  setenvIfEmpty("NDMSPC_TMP_RESULTS_DIR", tmpResultsDir);
147 
148  if (workerBin.empty()) workerBin = "ndmspc-worker";
149 
150  std::vector<pid_t> spawnedWorkers;
151  auto cleanupSpawnedWorkers = [&]() { stop_worker_processes(spawnedWorkers); };
152 
153  if (spawnWorkers > 0) {
154  const std::string effectiveMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "";
155  if (effectiveMode != "tcp") {
156  NLogError("ndmspc-run: --spawn-workers is supported only with --mode tcp (effective mode: '%s')",
157  effectiveMode.c_str());
158  return 1;
159  }
160 
161  if (workerEndpoint.empty()) {
162  const char * effectivePort = gSystem->Getenv("NDMSPC_TCP_PORT");
163  workerEndpoint = std::string("tcp://localhost:") + (effectivePort ? effectivePort : "5555");
164  }
165 
166  NLogInfo("ndmspc-run: spawning %zu worker(s) using '%s' at %s", spawnWorkers, workerBin.c_str(),
167  workerEndpoint.c_str());
168  spawnedWorkers.reserve(spawnWorkers);
169  for (size_t i = 0; i < spawnWorkers; ++i) {
170  const pid_t pid = spawn_worker_process(workerBin, workerEndpoint, verbose);
171  if (pid < 0) {
172  NLogError("ndmspc-run: failed to spawn worker %zu", i);
173  cleanupSpawnedWorkers();
174  return 1;
175  }
176  spawnedWorkers.push_back(pid);
177  }
178  }
179 
180  // Expose the macro list so NGnTree::Process can forward it to TCP workers
181  // automatically without requiring an explicit tree.SetWorkerMacro() call.
182  gSystem->Setenv("NDMSPC_MACRO", macroList.c_str());
183 
184  std::vector<std::string> macros = Ndmspc::NUtils::Tokenize(macroList, ',');
185  for (const auto & macro : macros) {
186  NLogInfo("ndmspc-run: executing macro '%s'", macro.c_str());
187  TMacro * m = Ndmspc::NUtils::OpenMacro(macro);
188  if (!m) {
189  NLogError("ndmspc-run: failed to open macro '%s', exiting", macro.c_str());
190  cleanupSpawnedWorkers();
191  return 1;
192  }
193  m->Exec();
194  delete m;
195  }
196 
197  cleanupSpawnedWorkers();
198  NLogInfo("ndmspc-run: done");
199 
200  // // Workaround: ROOT teardown can intermittently hang at process exit in IPC
201  // // batch mode after all useful work has already completed.
202  // const std::string effectiveMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "";
203  // if (effectiveMode == "ipc") {
204  // std::fflush(nullptr);
205  // _exit(0);
206  // }
207 
208  return 0;
209 }
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
Definition: NLogger.h:515
static TMacro * OpenMacro(std::string filename)
Open a macro file.
Definition: NUtils.cxx:781
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition: NUtils.cxx:1073