11#include "TApplication.h"
16#include "NDimensionalIpcRunner.h"
19static std::string app_description()
22 auto buf = std::make_unique<char[]>(size);
23 size = std::snprintf(buf.get(), size,
"%s v%s-%s (worker)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
24 return std::string(buf.get(), size);
27static std::string app_version()
30 auto buf = std::make_unique<char[]>(size);
31 size = std::snprintf(buf.get(), size,
"%s v%s-%s", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
32 return std::string(buf.get(), size);
37volatile sig_atomic_t gTerminateRequested = 0;
39void handle_signal(
int)
41 gTerminateRequested = 1;
44pid_t spawn_worker_process(
const std::string & workerBin,
const std::string & endpoint,
const std::string & mode,
45 const std::string & macroList,
const std::string & macroParams,
bool verbose)
48 if (pid < 0)
return -1;
51 std::vector<std::string> args;
52 args.emplace_back(workerBin);
53 args.emplace_back(
"--endpoint");
54 args.emplace_back(endpoint);
56 args.emplace_back(
"--mode");
57 args.emplace_back(mode);
59 if (!macroList.empty()) {
60 args.emplace_back(
"--macro");
61 args.emplace_back(macroList);
63 if (!macroParams.empty()) {
64 args.emplace_back(
"--macro-params");
65 args.emplace_back(macroParams);
67 if (verbose) args.emplace_back(
"--verbose");
69 std::vector<char *> cargs;
70 cargs.reserve(args.size() + 1);
71 for (
auto & arg : args) {
72 cargs.push_back(
const_cast<char *
>(arg.c_str()));
74 cargs.push_back(
nullptr);
76 execvp(workerBin.c_str(), cargs.data());
83void terminate_workers(
const std::vector<pid_t> & workerPids,
int signalToSend)
85 for (pid_t pid : workerPids) {
86 if (pid > 0) kill(pid, signalToSend);
90int wait_for_workers(std::vector<pid_t> & workerPids)
92 size_t aliveCount = workerPids.size();
95 while (aliveCount > 0) {
96 if (gTerminateRequested) {
97 terminate_workers(workerPids, SIGTERM);
100 bool progressed =
false;
101 for (pid_t & pid : workerPids) {
102 if (pid <= 0)
continue;
104 pid_t rc = waitpid(pid, &status, WNOHANG);
105 if (rc == 0)
continue;
117 if (WIFEXITED(status)) {
118 if (WEXITSTATUS(status) != 0) exitCode = 1;
125 std::this_thread::sleep_for(std::chrono::milliseconds(50));
129 if (gTerminateRequested) {
130 return 128 + SIGTERM;
137int main(
int argc,
char ** argv)
139 signal(SIGTERM, handle_signal);
140 signal(SIGINT, handle_signal);
142 TApplication rootApp(
"ndmspc-worker", 0,
nullptr);
144 CLI::App app{app_description()};
145 app.set_version_flag(
"--version", app_version(),
"Print version information and exit");
147 std::string endpoint;
148 size_t workerIndex = std::numeric_limits<size_t>::max();
149 std::string macroList;
150 std::string macroParams;
152 std::string workerBin;
153 size_t spawnWorkers = 0;
154 bool verbose =
false;
156 app.add_option(
"-e,--endpoint", endpoint,
"Master endpoint (e.g. tcp://host:5555)")->required();
157 app.add_option(
"-i,--index", workerIndex,
"Worker index (default: auto-assigned by supervisor)");
158 app.add_option(
"-m,--macro", macroList,
"Comma-separated list of macro file(s) or URLs to execute");
159 app.add_option(
"--macro-params", macroParams,
160 "Parameter list forwarded to TMacro::Exec(params), e.g. '42,\"sample\"'");
161 app.add_option(
"--mode", mode,
162 "Execution mode override: ipc/process (local), tcp (remote), thread")
163 ->check(CLI::IsMember({
"ipc",
"process",
"tcp",
"thread"}));
164 app.add_option(
"--spawn-workers", spawnWorkers,
165 "Spawn N local ndmspc-worker processes (spawner mode)");
166 app.add_option(
"--worker-bin", workerBin,
167 "Worker executable for spawner mode (default: current executable)");
168 app.add_flag(
"-v,--verbose", verbose,
"Enable verbose logging");
170 CLI11_PARSE(app, argc, argv);
173 if (!gSystem->Getenv(
"NDMSPC_LOG_CONSOLE")) {
174 gSystem->Setenv(
"NDMSPC_LOG_CONSOLE",
"0");
184 if (mode ==
"process") mode =
"ipc";
185 if (!mode.empty() && !gSystem->Getenv(
"NDMSPC_EXECUTION_MODE")) {
186 gSystem->Setenv(
"NDMSPC_EXECUTION_MODE", mode.c_str());
188 if (!macroParams.empty() && !gSystem->Getenv(
"NDMSPC_MACRO_PARAMS")) {
189 gSystem->Setenv(
"NDMSPC_MACRO_PARAMS", macroParams.c_str());
192 if (spawnWorkers > 0) {
193 if (workerIndex != std::numeric_limits<size_t>::max()) {
194 NLogError(
"ndmspc-worker: --index cannot be combined with --spawn-workers");
197 if (workerBin.empty()) {
198 workerBin = (argc > 0 && argv[0] && argv[0][0] !=
'\0') ? argv[0] :
"ndmspc-worker";
201 NLogInfo(
"ndmspc-worker: spawning %zu worker(s) using '%s' -> %s", spawnWorkers, workerBin.c_str(),
203 std::vector<pid_t> workerPids;
204 workerPids.reserve(spawnWorkers);
205 for (
size_t i = 0; i < spawnWorkers; ++i) {
206 const pid_t pid = spawn_worker_process(workerBin, endpoint, mode, macroList, macroParams, verbose);
208 NLogError(
"ndmspc-worker: failed to spawn worker %zu", i);
209 terminate_workers(workerPids, SIGTERM);
212 workerPids.push_back(pid);
214 return wait_for_workers(workerPids);
218 const bool needsBootstrap = macroList.empty() || workerIndex == std::numeric_limits<size_t>::max();
219 if (needsBootstrap) {
220 NLogPrint(
"ndmspc-worker: bootstrapping config from supervisor at %s ...", endpoint.c_str());
221 void * ctx = zmq_ctx_new();
222 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
224 std::ostringstream bootIdBuilder;
225 const char * host = gSystem->HostName();
226 bootIdBuilder <<
"boot_" << (host ? host :
"unknown") <<
'_' << gSystem->GetPid() <<
'_'
227 << std::chrono::high_resolution_clock::now().time_since_epoch().count();
228 const std::string bootId = bootIdBuilder.str();
229 zmq_setsockopt(dealer, ZMQ_IDENTITY, bootId.data(), bootId.size());
230 int timeoutMs = 1000;
231 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
232 zmq_connect(dealer, endpoint.c_str());
233 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"BOOTSTRAP"});
235 const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(300);
236 bool configOk =
false;
237 bool rejectedBySupervisor =
false;
239 std::vector<std::string> frames;
240 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
241 if (errno == EAGAIN || errno == EWOULDBLOCK) {
242 if (std::chrono::steady_clock::now() > deadline)
break;
249 if (frames.size() >= 5 && frames[0] ==
"CONFIG") {
250 if (workerIndex == std::numeric_limits<size_t>::max())
251 workerIndex =
static_cast<size_t>(std::stoul(frames[1]));
252 if (macroList.empty())
253 macroList = frames[2];
254 if (!frames[3].empty())
255 gSystem->Setenv(
"NDMSPC_TMP_DIR", frames[3].c_str());
256 if (!frames[4].empty())
257 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", frames[4].c_str());
258 if (frames.size() >= 6 && !frames[5].empty() && !gSystem->Getenv(
"NDMSPC_MACRO_PARAMS"))
259 gSystem->Setenv(
"NDMSPC_MACRO_PARAMS", frames[5].c_str());
261 }
else if (frames.size() >= 1 && frames[0] ==
"REJECT") {
262 const std::string reason = (frames.size() >= 2) ? frames[1] :
"unspecified";
263 NLogWarning(
"ndmspc-worker: bootstrap rejected by supervisor (%s), exiting", reason.c_str());
264 rejectedBySupervisor =
true;
271 if (rejectedBySupervisor) {
276 NLogError(
"ndmspc-worker: failed to receive CONFIG from supervisor at %s, exiting", endpoint.c_str());
282 if (!gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR")[0] ==
'\0') {
283 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
284 if (tmpDirEnv && tmpDirEnv[0] !=
'\0')
285 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
288 if (macroList.empty()) {
289 NLogError(
"ndmspc-worker: no macro specified (use --macro or let supervisor send via bootstrap)");
292 if (workerIndex == std::numeric_limits<size_t>::max()) workerIndex = 0;
295 gSystem->Setenv(
"NDMSPC_WORKER_ENDPOINT", endpoint.c_str());
296 gSystem->Setenv(
"NDMSPC_WORKER_INDEX", std::to_string(workerIndex).c_str());
298 NLogPrint(
"ndmspc-worker: starting — index=%zu endpoint=%s", workerIndex, endpoint.c_str());
299 NLogPrint(
" macro = %s", macroList.c_str());
300 NLogPrint(
" NDMSPC_TMP_DIR = %s", gSystem->Getenv(
"NDMSPC_TMP_DIR") ? gSystem->Getenv(
"NDMSPC_TMP_DIR") :
"(not set)");
301 NLogPrint(
" NDMSPC_TMP_RESULTS_DIR = %s", gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") ? gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") :
"(not set)");
302 NLogPrint(
" NDMSPC_EXECUTION_MODE = %s", gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") ? gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") :
"(not set)");
303 NLogPrint(
" NDMSPC_MACRO_PARAMS = %s", gSystem->Getenv(
"NDMSPC_MACRO_PARAMS") ? gSystem->Getenv(
"NDMSPC_MACRO_PARAMS") :
"(not set)");
305 const std::string effectiveMacroParams = gSystem->Getenv(
"NDMSPC_MACRO_PARAMS") ? gSystem->Getenv(
"NDMSPC_MACRO_PARAMS") :
"";
307 for (
const auto & macro : macros) {
308 if (effectiveMacroParams.empty()) {
309 NLogInfo(
"ndmspc-worker: executing macro '%s'", macro.c_str());
311 NLogInfo(
"ndmspc-worker: executing macro '%s' with params '%s'", macro.c_str(), effectiveMacroParams.c_str());
315 NLogError(
"ndmspc-worker: failed to open macro '%s', exiting", macro.c_str());
318 m->Exec(effectiveMacroParams.empty() ?
nullptr : effectiveMacroParams.c_str());
322 NLogInfo(
"ndmspc-worker: done");
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
static TMacro * OpenMacro(std::string filename)
Open a macro file.
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.