ndmspc v1.2.0-0.1.rc5
Loading...
Searching...
No Matches
ndmspc-worker.cxx
1#include <CLI11.hpp>
2#include <csignal>
3#include <chrono>
4#include <sstream>
5#include <sys/wait.h>
6#include <string>
7#include <thread>
8#include <vector>
9#include <unistd.h>
10#include "TROOT.h"
11#include "TApplication.h"
12#include "TSystem.h"
13#include "NLogger.h"
14#include "NUtils.h"
15#include "ndmspc.h"
16#include "NDimensionalIpcRunner.h"
17#include <zmq.h>
18
19static std::string app_description()
20{
21 size_t size = 64;
22 auto buf = std::make_unique<char[]>(size);
23 size = std::snprintf(buf.get(), size, "%s v%s-%s (worker)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
24 return std::string(buf.get(), size);
25}
26
27static std::string app_version()
28{
29 size_t size = 128;
30 auto buf = std::make_unique<char[]>(size);
31 size = std::snprintf(buf.get(), size, "%s v%s-%s", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
32 return std::string(buf.get(), size);
33}
34
35namespace {
36
37volatile sig_atomic_t gTerminateRequested = 0;
38
39void handle_signal(int)
40{
41 gTerminateRequested = 1;
42}
43
44pid_t spawn_worker_process(const std::string & workerBin, const std::string & endpoint, const std::string & mode,
45 const std::string & macroList, const std::string & macroParams, bool verbose)
46{
47 pid_t pid = fork();
48 if (pid < 0) return -1;
49
50 if (pid == 0) {
51 std::vector<std::string> args;
52 args.emplace_back(workerBin);
53 args.emplace_back("--endpoint");
54 args.emplace_back(endpoint);
55 if (!mode.empty()) {
56 args.emplace_back("--mode");
57 args.emplace_back(mode);
58 }
59 if (!macroList.empty()) {
60 args.emplace_back("--macro");
61 args.emplace_back(macroList);
62 }
63 if (!macroParams.empty()) {
64 args.emplace_back("--macro-params");
65 args.emplace_back(macroParams);
66 }
67 if (verbose) args.emplace_back("--verbose");
68
69 std::vector<char *> cargs;
70 cargs.reserve(args.size() + 1);
71 for (auto & arg : args) {
72 cargs.push_back(const_cast<char *>(arg.c_str()));
73 }
74 cargs.push_back(nullptr);
75
76 execvp(workerBin.c_str(), cargs.data());
77 _exit(127);
78 }
79
80 return pid;
81}
82
83void terminate_workers(const std::vector<pid_t> & workerPids, int signalToSend)
84{
85 for (pid_t pid : workerPids) {
86 if (pid > 0) kill(pid, signalToSend);
87 }
88}
89
90int wait_for_workers(std::vector<pid_t> & workerPids)
91{
92 size_t aliveCount = workerPids.size();
93 int exitCode = 0;
94
95 while (aliveCount > 0) {
96 if (gTerminateRequested) {
97 terminate_workers(workerPids, SIGTERM);
98 }
99
100 bool progressed = false;
101 for (pid_t & pid : workerPids) {
102 if (pid <= 0) continue;
103 int status = 0;
104 pid_t rc = waitpid(pid, &status, WNOHANG);
105 if (rc == 0) continue;
106 if (rc < 0) {
107 pid = -1;
108 --aliveCount;
109 progressed = true;
110 exitCode = 1;
111 continue;
112 }
113
114 pid = -1;
115 --aliveCount;
116 progressed = true;
117 if (WIFEXITED(status)) {
118 if (WEXITSTATUS(status) != 0) exitCode = 1;
119 } else {
120 exitCode = 1;
121 }
122 }
123
124 if (!progressed) {
125 std::this_thread::sleep_for(std::chrono::milliseconds(50));
126 }
127 }
128
129 if (gTerminateRequested) {
130 return 128 + SIGTERM;
131 }
132 return exitCode;
133}
134
135} // namespace
136
137int main(int argc, char ** argv)
138{
139 signal(SIGTERM, handle_signal);
140 signal(SIGINT, handle_signal);
141
142 TApplication rootApp("ndmspc-worker", 0, nullptr);
143
144 CLI::App app{app_description()};
145 app.set_version_flag("--version", app_version(), "Print version information and exit");
146
147 std::string endpoint;
148 size_t workerIndex = std::numeric_limits<size_t>::max(); // sentinel: not set
149 std::string macroList;
150 std::string macroParams;
151 std::string mode; // ipc | process | tcp | thread (optional override)
152 std::string workerBin;
153 size_t spawnWorkers = 0;
154 bool verbose = false;
155
156 app.add_option("-e,--endpoint", endpoint, "Master endpoint (e.g. tcp://host:5555)")->required();
157 app.add_option("-i,--index", workerIndex, "Worker index (default: auto-assigned by supervisor)");
158 app.add_option("-m,--macro", macroList, "Comma-separated list of macro file(s) or URLs to execute");
159 app.add_option("--macro-params", macroParams,
160 "Parameter list forwarded to TMacro::Exec(params), e.g. '42,\"sample\"'");
161 app.add_option("--mode", mode,
162 "Execution mode override: ipc/process (local), tcp (remote), thread")
163 ->check(CLI::IsMember({"ipc", "process", "tcp", "thread"}));
164 app.add_option("--spawn-workers", spawnWorkers,
165 "Spawn N local ndmspc-worker processes (spawner mode)");
166 app.add_option("--worker-bin", workerBin,
167 "Worker executable for spawner mode (default: current executable)");
168 app.add_flag("-v,--verbose", verbose, "Enable verbose logging");
169
170 CLI11_PARSE(app, argc, argv);
171
172 // Default to file-only logging unless explicitly configured by environment.
173 if (!gSystem->Getenv("NDMSPC_LOG_CONSOLE")) {
174 gSystem->Setenv("NDMSPC_LOG_CONSOLE", "0");
175 if (!verbose) {
177 }
178 }
179
180 if (verbose) {
182 }
183
184 if (mode == "process") mode = "ipc";
185 if (!mode.empty() && !gSystem->Getenv("NDMSPC_EXECUTION_MODE")) {
186 gSystem->Setenv("NDMSPC_EXECUTION_MODE", mode.c_str());
187 }
188 if (!macroParams.empty() && !gSystem->Getenv("NDMSPC_MACRO_PARAMS")) {
189 gSystem->Setenv("NDMSPC_MACRO_PARAMS", macroParams.c_str());
190 }
191
192 if (spawnWorkers > 0) {
193 if (workerIndex != std::numeric_limits<size_t>::max()) {
194 NLogError("ndmspc-worker: --index cannot be combined with --spawn-workers");
195 return 1;
196 }
197 if (workerBin.empty()) {
198 workerBin = (argc > 0 && argv[0] && argv[0][0] != '\0') ? argv[0] : "ndmspc-worker";
199 }
200
201 NLogInfo("ndmspc-worker: spawning %zu worker(s) using '%s' -> %s", spawnWorkers, workerBin.c_str(),
202 endpoint.c_str());
203 std::vector<pid_t> workerPids;
204 workerPids.reserve(spawnWorkers);
205 for (size_t i = 0; i < spawnWorkers; ++i) {
206 const pid_t pid = spawn_worker_process(workerBin, endpoint, mode, macroList, macroParams, verbose);
207 if (pid < 0) {
208 NLogError("ndmspc-worker: failed to spawn worker %zu", i);
209 terminate_workers(workerPids, SIGTERM);
210 return 1;
211 }
212 workerPids.push_back(pid);
213 }
214 return wait_for_workers(workerPids);
215 }
216
217 // If macro or index not provided, bootstrap from supervisor
218 const bool needsBootstrap = macroList.empty() || workerIndex == std::numeric_limits<size_t>::max();
219 if (needsBootstrap) {
220 NLogPrint("ndmspc-worker: bootstrapping config from supervisor at %s ...", endpoint.c_str());
221 void * ctx = zmq_ctx_new();
222 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
223 // Use a unique bootstrap identity so supervisor can route the CONFIG reply
224 std::ostringstream bootIdBuilder;
225 const char * host = gSystem->HostName();
226 bootIdBuilder << "boot_" << (host ? host : "unknown") << '_' << gSystem->GetPid() << '_'
227 << std::chrono::high_resolution_clock::now().time_since_epoch().count();
228 const std::string bootId = bootIdBuilder.str();
229 zmq_setsockopt(dealer, ZMQ_IDENTITY, bootId.data(), bootId.size());
230 int timeoutMs = 1000;
231 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
232 zmq_connect(dealer, endpoint.c_str());
233 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"BOOTSTRAP"});
234
235 const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(300);
236 bool configOk = false;
237 bool rejectedBySupervisor = false;
238 while (!configOk) {
239 std::vector<std::string> frames;
240 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
241 if (errno == EAGAIN || errno == EWOULDBLOCK) {
242 if (std::chrono::steady_clock::now() > deadline) break;
243 continue;
244 }
245 break;
246 }
247 // CONFIG frames: "CONFIG", workerIdx, macroList, NDMSPC_TMP_DIR,
248 // NDMSPC_TMP_RESULTS_DIR, [optional] NDMSPC_MACRO_PARAMS
249 if (frames.size() >= 5 && frames[0] == "CONFIG") {
250 if (workerIndex == std::numeric_limits<size_t>::max())
251 workerIndex = static_cast<size_t>(std::stoul(frames[1]));
252 if (macroList.empty())
253 macroList = frames[2];
254 if (!frames[3].empty())
255 gSystem->Setenv("NDMSPC_TMP_DIR", frames[3].c_str());
256 if (!frames[4].empty())
257 gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[4].c_str());
258 if (frames.size() >= 6 && !frames[5].empty() && !gSystem->Getenv("NDMSPC_MACRO_PARAMS"))
259 gSystem->Setenv("NDMSPC_MACRO_PARAMS", frames[5].c_str());
260 configOk = true;
261 } else if (frames.size() >= 1 && frames[0] == "REJECT") {
262 const std::string reason = (frames.size() >= 2) ? frames[1] : "unspecified";
263 NLogWarning("ndmspc-worker: bootstrap rejected by supervisor (%s), exiting", reason.c_str());
264 rejectedBySupervisor = true;
265 break;
266 }
267 }
268 zmq_close(dealer);
269 zmq_ctx_term(ctx);
270
271 if (rejectedBySupervisor) {
272 return 0;
273 }
274
275 if (!configOk) {
276 NLogError("ndmspc-worker: failed to receive CONFIG from supervisor at %s, exiting", endpoint.c_str());
277 return 1;
278 }
279 }
280
281 // Fallback: if NDMSPC_TMP_RESULTS_DIR is unset or empty, use NDMSPC_TMP_DIR
282 if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
283 const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
284 if (tmpDirEnv && tmpDirEnv[0] != '\0')
285 gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
286 }
287
288 if (macroList.empty()) {
289 NLogError("ndmspc-worker: no macro specified (use --macro or let supervisor send via bootstrap)");
290 return 1;
291 }
292 if (workerIndex == std::numeric_limits<size_t>::max()) workerIndex = 0;
293
294 // Set env vars so NGnTree::Process enters worker mode
295 gSystem->Setenv("NDMSPC_WORKER_ENDPOINT", endpoint.c_str());
296 gSystem->Setenv("NDMSPC_WORKER_INDEX", std::to_string(workerIndex).c_str());
297
298 NLogPrint("ndmspc-worker: starting — index=%zu endpoint=%s", workerIndex, endpoint.c_str());
299 NLogPrint(" macro = %s", macroList.c_str());
300 NLogPrint(" NDMSPC_TMP_DIR = %s", gSystem->Getenv("NDMSPC_TMP_DIR") ? gSystem->Getenv("NDMSPC_TMP_DIR") : "(not set)");
301 NLogPrint(" NDMSPC_TMP_RESULTS_DIR = %s", gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") ? gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") : "(not set)");
302 NLogPrint(" NDMSPC_EXECUTION_MODE = %s", gSystem->Getenv("NDMSPC_EXECUTION_MODE") ? gSystem->Getenv("NDMSPC_EXECUTION_MODE") : "(not set)");
303 NLogPrint(" NDMSPC_MACRO_PARAMS = %s", gSystem->Getenv("NDMSPC_MACRO_PARAMS") ? gSystem->Getenv("NDMSPC_MACRO_PARAMS") : "(not set)");
304
305 const std::string effectiveMacroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS") ? gSystem->Getenv("NDMSPC_MACRO_PARAMS") : "";
306 std::vector<std::string> macros = Ndmspc::NUtils::Tokenize(macroList, ',');
307 for (const auto & macro : macros) {
308 if (effectiveMacroParams.empty()) {
309 NLogInfo("ndmspc-worker: executing macro '%s'", macro.c_str());
310 } else {
311 NLogInfo("ndmspc-worker: executing macro '%s' with params '%s'", macro.c_str(), effectiveMacroParams.c_str());
312 }
313 TMacro * m = Ndmspc::NUtils::OpenMacro(macro);
314 if (!m) {
315 NLogError("ndmspc-worker: failed to open macro '%s', exiting", macro.c_str());
316 return 1;
317 }
318 m->Exec(effectiveMacroParams.empty() ? nullptr : effectiveMacroParams.c_str());
319 delete m;
320 }
321
322 NLogInfo("ndmspc-worker: done");
323 return 0;
324}
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
Definition NLogger.h:515
static TMacro * OpenMacro(std::string filename)
Open a macro file.
Definition NUtils.cxx:781
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition NUtils.cxx:1077