ndmspc  v1.2.0-0.1.rc3
ndmspc-worker.cxx
1 #include <CLI11.hpp>
2 #include <csignal>
3 #include <chrono>
4 #include <sstream>
5 #include <sys/wait.h>
6 #include <string>
7 #include <thread>
8 #include <vector>
9 #include <unistd.h>
10 #include "TROOT.h"
11 #include "TApplication.h"
12 #include "TSystem.h"
13 #include "NLogger.h"
14 #include "NUtils.h"
15 #include "ndmspc.h"
16 #include "NDimensionalIpcRunner.h"
17 #include <zmq.h>
18 
19 static std::string app_description()
20 {
21  size_t size = 64;
22  auto buf = std::make_unique<char[]>(size);
23  size = std::snprintf(buf.get(), size, "%s v%s-%s (worker)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
24  return std::string(buf.get(), size);
25 }
26 
27 namespace {
28 
29 volatile sig_atomic_t gTerminateRequested = 0;
30 
31 void handle_signal(int)
32 {
33  gTerminateRequested = 1;
34 }
35 
36 pid_t spawn_worker_process(const std::string & workerBin, const std::string & endpoint, const std::string & mode,
37  const std::string & macroList, bool verbose)
38 {
39  pid_t pid = fork();
40  if (pid < 0) return -1;
41 
42  if (pid == 0) {
43  std::vector<std::string> args;
44  args.emplace_back(workerBin);
45  args.emplace_back("--endpoint");
46  args.emplace_back(endpoint);
47  if (!mode.empty()) {
48  args.emplace_back("--mode");
49  args.emplace_back(mode);
50  }
51  if (!macroList.empty()) {
52  args.emplace_back("--macro");
53  args.emplace_back(macroList);
54  }
55  if (verbose) args.emplace_back("--verbose");
56 
57  std::vector<char *> cargs;
58  cargs.reserve(args.size() + 1);
59  for (auto & arg : args) {
60  cargs.push_back(const_cast<char *>(arg.c_str()));
61  }
62  cargs.push_back(nullptr);
63 
64  execvp(workerBin.c_str(), cargs.data());
65  _exit(127);
66  }
67 
68  return pid;
69 }
70 
71 void terminate_workers(const std::vector<pid_t> & workerPids, int signalToSend)
72 {
73  for (pid_t pid : workerPids) {
74  if (pid > 0) kill(pid, signalToSend);
75  }
76 }
77 
78 int wait_for_workers(std::vector<pid_t> & workerPids)
79 {
80  size_t aliveCount = workerPids.size();
81  int exitCode = 0;
82 
83  while (aliveCount > 0) {
84  if (gTerminateRequested) {
85  terminate_workers(workerPids, SIGTERM);
86  }
87 
88  bool progressed = false;
89  for (pid_t & pid : workerPids) {
90  if (pid <= 0) continue;
91  int status = 0;
92  pid_t rc = waitpid(pid, &status, WNOHANG);
93  if (rc == 0) continue;
94  if (rc < 0) {
95  pid = -1;
96  --aliveCount;
97  progressed = true;
98  exitCode = 1;
99  continue;
100  }
101 
102  pid = -1;
103  --aliveCount;
104  progressed = true;
105  if (WIFEXITED(status)) {
106  if (WEXITSTATUS(status) != 0) exitCode = 1;
107  } else {
108  exitCode = 1;
109  }
110  }
111 
112  if (!progressed) {
113  std::this_thread::sleep_for(std::chrono::milliseconds(50));
114  }
115  }
116 
117  if (gTerminateRequested) {
118  return 128 + SIGTERM;
119  }
120  return exitCode;
121 }
122 
123 } // namespace
124 
125 int main(int argc, char ** argv)
126 {
127  signal(SIGTERM, handle_signal);
128  signal(SIGINT, handle_signal);
129 
130  TApplication rootApp("ndmspc-worker", 0, nullptr);
131 
132  CLI::App app{app_description()};
133 
134  std::string endpoint;
135  size_t workerIndex = std::numeric_limits<size_t>::max(); // sentinel: not set
136  std::string macroList;
137  std::string mode; // ipc | process | tcp | thread (optional override)
138  std::string workerBin;
139  size_t spawnWorkers = 0;
140  bool verbose = false;
141 
142  app.add_option("-e,--endpoint", endpoint, "Master endpoint (e.g. tcp://host:5555)")->required();
143  app.add_option("-i,--index", workerIndex, "Worker index (default: auto-assigned by supervisor)");
144  app.add_option("-m,--macro", macroList, "Comma-separated list of macro file(s) or URLs to execute");
145  app.add_option("--mode", mode,
146  "Execution mode override: ipc/process (local), tcp (remote), thread")
147  ->check(CLI::IsMember({"ipc", "process", "tcp", "thread"}));
148  app.add_option("--spawn-workers", spawnWorkers,
149  "Spawn N local ndmspc-worker processes (spawner mode)");
150  app.add_option("--worker-bin", workerBin,
151  "Worker executable for spawner mode (default: current executable)");
152  app.add_flag("-v,--verbose", verbose, "Enable verbose logging");
153 
154  CLI11_PARSE(app, argc, argv);
155 
156  // Default to file-only logging unless explicitly configured by environment.
157  if (!gSystem->Getenv("NDMSPC_LOG_CONSOLE")) {
158  gSystem->Setenv("NDMSPC_LOG_CONSOLE", "0");
159  if (!verbose) {
161  }
162  }
163 
164  if (verbose) {
166  }
167 
168  if (mode == "process") mode = "ipc";
169  if (!mode.empty() && !gSystem->Getenv("NDMSPC_EXECUTION_MODE")) {
170  gSystem->Setenv("NDMSPC_EXECUTION_MODE", mode.c_str());
171  }
172 
173  if (spawnWorkers > 0) {
174  if (workerIndex != std::numeric_limits<size_t>::max()) {
175  NLogError("ndmspc-worker: --index cannot be combined with --spawn-workers");
176  return 1;
177  }
178  if (workerBin.empty()) {
179  workerBin = (argc > 0 && argv[0] && argv[0][0] != '\0') ? argv[0] : "ndmspc-worker";
180  }
181 
182  NLogInfo("ndmspc-worker: spawning %zu worker(s) using '%s' -> %s", spawnWorkers, workerBin.c_str(),
183  endpoint.c_str());
184  std::vector<pid_t> workerPids;
185  workerPids.reserve(spawnWorkers);
186  for (size_t i = 0; i < spawnWorkers; ++i) {
187  const pid_t pid = spawn_worker_process(workerBin, endpoint, mode, macroList, verbose);
188  if (pid < 0) {
189  NLogError("ndmspc-worker: failed to spawn worker %zu", i);
190  terminate_workers(workerPids, SIGTERM);
191  return 1;
192  }
193  workerPids.push_back(pid);
194  }
195  return wait_for_workers(workerPids);
196  }
197 
198  // If macro or index not provided, bootstrap from supervisor
199  const bool needsBootstrap = macroList.empty() || workerIndex == std::numeric_limits<size_t>::max();
200  if (needsBootstrap) {
201  NLogPrint("ndmspc-worker: bootstrapping config from supervisor at %s ...", endpoint.c_str());
202  void * ctx = zmq_ctx_new();
203  void * dealer = zmq_socket(ctx, ZMQ_DEALER);
204  // Use a unique bootstrap identity so supervisor can route the CONFIG reply
205  std::ostringstream bootIdBuilder;
206  const char * host = gSystem->HostName();
207  bootIdBuilder << "boot_" << (host ? host : "unknown") << '_' << gSystem->GetPid() << '_'
208  << std::chrono::high_resolution_clock::now().time_since_epoch().count();
209  const std::string bootId = bootIdBuilder.str();
210  zmq_setsockopt(dealer, ZMQ_IDENTITY, bootId.data(), bootId.size());
211  int timeoutMs = 1000;
212  zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
213  zmq_connect(dealer, endpoint.c_str());
214  Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"BOOTSTRAP"});
215 
216  const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(300);
217  bool configOk = false;
218  while (!configOk) {
219  std::vector<std::string> frames;
220  if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
221  if (errno == EAGAIN || errno == EWOULDBLOCK) {
222  if (std::chrono::steady_clock::now() > deadline) break;
223  continue;
224  }
225  break;
226  }
227  // CONFIG frames: "CONFIG", workerIdx, macroList, NDMSPC_TMP_DIR, NDMSPC_TMP_RESULTS_DIR
228  if (frames.size() >= 5 && frames[0] == "CONFIG") {
229  if (workerIndex == std::numeric_limits<size_t>::max())
230  workerIndex = static_cast<size_t>(std::stoul(frames[1]));
231  if (macroList.empty())
232  macroList = frames[2];
233  if (!frames[3].empty())
234  gSystem->Setenv("NDMSPC_TMP_DIR", frames[3].c_str());
235  if (!frames[4].empty())
236  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[4].c_str());
237  configOk = true;
238  }
239  }
240  zmq_close(dealer);
241  zmq_ctx_term(ctx);
242 
243  if (!configOk) {
244  NLogError("ndmspc-worker: failed to receive CONFIG from supervisor at %s, exiting", endpoint.c_str());
245  return 1;
246  }
247  }
248 
249  // Fallback: if NDMSPC_TMP_RESULTS_DIR is unset or empty, use NDMSPC_TMP_DIR
250  if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
251  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
252  if (tmpDirEnv && tmpDirEnv[0] != '\0')
253  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
254  }
255 
256  if (macroList.empty()) {
257  NLogError("ndmspc-worker: no macro specified (use --macro or let supervisor send via bootstrap)");
258  return 1;
259  }
260  if (workerIndex == std::numeric_limits<size_t>::max()) workerIndex = 0;
261 
262  // Set env vars so NGnTree::Process enters worker mode
263  gSystem->Setenv("NDMSPC_WORKER_ENDPOINT", endpoint.c_str());
264  gSystem->Setenv("NDMSPC_WORKER_INDEX", std::to_string(workerIndex).c_str());
265 
266  fprintf(stdout, "ndmspc-worker: starting — index=%zu endpoint=%s\n", workerIndex, endpoint.c_str());
267  fprintf(stdout, " macro = %s\n", macroList.c_str());
268  fprintf(stdout, " NDMSPC_TMP_DIR = %s\n", gSystem->Getenv("NDMSPC_TMP_DIR") ? gSystem->Getenv("NDMSPC_TMP_DIR") : "(not set)");
269  fprintf(stdout, " NDMSPC_TMP_RESULTS_DIR = %s\n", gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") ? gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") : "(not set)");
270  fprintf(stdout, " NDMSPC_EXECUTION_MODE = %s\n", gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "(not set)");
271  fflush(stdout);
272 
273  std::vector<std::string> macros = Ndmspc::NUtils::Tokenize(macroList, ',');
274  for (const auto & macro : macros) {
275  NLogInfo("ndmspc-worker: executing macro '%s'", macro.c_str());
276  TMacro * m = Ndmspc::NUtils::OpenMacro(macro);
277  if (!m) {
278  NLogError("ndmspc-worker: failed to open macro '%s', exiting", macro.c_str());
279  return 1;
280  }
281  m->Exec();
282  delete m;
283  }
284 
285  NLogInfo("ndmspc-worker: done");
286  return 0;
287 }
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
Definition: NLogger.h:515
static TMacro * OpenMacro(std::string filename)
Open a macro file.
Definition: NUtils.cxx:781
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition: NUtils.cxx:1073