11 #include "TApplication.h"
16 #include "NDimensionalIpcRunner.h"
19 static std::string app_description()
22 auto buf = std::make_unique<char[]>(size);
23 size = std::snprintf(buf.get(), size,
"%s v%s-%s (worker)", NDMSPC_NAME, NDMSPC_VERSION, NDMSPC_VERSION_RELEASE);
24 return std::string(buf.get(), size);
29 volatile sig_atomic_t gTerminateRequested = 0;
31 void handle_signal(
int)
33 gTerminateRequested = 1;
36 pid_t spawn_worker_process(
const std::string & workerBin,
const std::string & endpoint,
const std::string & mode,
37 const std::string & macroList,
bool verbose)
40 if (pid < 0)
return -1;
43 std::vector<std::string> args;
44 args.emplace_back(workerBin);
45 args.emplace_back(
"--endpoint");
46 args.emplace_back(endpoint);
48 args.emplace_back(
"--mode");
49 args.emplace_back(mode);
51 if (!macroList.empty()) {
52 args.emplace_back(
"--macro");
53 args.emplace_back(macroList);
55 if (verbose) args.emplace_back(
"--verbose");
57 std::vector<char *> cargs;
58 cargs.reserve(args.size() + 1);
59 for (
auto & arg : args) {
60 cargs.push_back(
const_cast<char *
>(arg.c_str()));
62 cargs.push_back(
nullptr);
64 execvp(workerBin.c_str(), cargs.data());
71 void terminate_workers(
const std::vector<pid_t> & workerPids,
int signalToSend)
73 for (pid_t pid : workerPids) {
74 if (pid > 0) kill(pid, signalToSend);
78 int wait_for_workers(std::vector<pid_t> & workerPids)
80 size_t aliveCount = workerPids.size();
83 while (aliveCount > 0) {
84 if (gTerminateRequested) {
85 terminate_workers(workerPids, SIGTERM);
88 bool progressed =
false;
89 for (pid_t & pid : workerPids) {
90 if (pid <= 0)
continue;
92 pid_t rc = waitpid(pid, &status, WNOHANG);
93 if (rc == 0)
continue;
105 if (WIFEXITED(status)) {
106 if (WEXITSTATUS(status) != 0) exitCode = 1;
113 std::this_thread::sleep_for(std::chrono::milliseconds(50));
117 if (gTerminateRequested) {
118 return 128 + SIGTERM;
125 int main(
int argc,
char ** argv)
127 signal(SIGTERM, handle_signal);
128 signal(SIGINT, handle_signal);
130 TApplication rootApp(
"ndmspc-worker", 0,
nullptr);
132 CLI::App app{app_description()};
134 std::string endpoint;
135 size_t workerIndex = std::numeric_limits<size_t>::max();
136 std::string macroList;
138 std::string workerBin;
139 size_t spawnWorkers = 0;
140 bool verbose =
false;
142 app.add_option(
"-e,--endpoint", endpoint,
"Master endpoint (e.g. tcp://host:5555)")->required();
143 app.add_option(
"-i,--index", workerIndex,
"Worker index (default: auto-assigned by supervisor)");
144 app.add_option(
"-m,--macro", macroList,
"Comma-separated list of macro file(s) or URLs to execute");
145 app.add_option(
"--mode", mode,
146 "Execution mode override: ipc/process (local), tcp (remote), thread")
147 ->check(CLI::IsMember({
"ipc",
"process",
"tcp",
"thread"}));
148 app.add_option(
"--spawn-workers", spawnWorkers,
149 "Spawn N local ndmspc-worker processes (spawner mode)");
150 app.add_option(
"--worker-bin", workerBin,
151 "Worker executable for spawner mode (default: current executable)");
152 app.add_flag(
"-v,--verbose", verbose,
"Enable verbose logging");
154 CLI11_PARSE(app, argc, argv);
157 if (!gSystem->Getenv(
"NDMSPC_LOG_CONSOLE")) {
158 gSystem->Setenv(
"NDMSPC_LOG_CONSOLE",
"0");
168 if (mode ==
"process") mode =
"ipc";
169 if (!mode.empty() && !gSystem->Getenv(
"NDMSPC_EXECUTION_MODE")) {
170 gSystem->Setenv(
"NDMSPC_EXECUTION_MODE", mode.c_str());
173 if (spawnWorkers > 0) {
174 if (workerIndex != std::numeric_limits<size_t>::max()) {
175 NLogError(
"ndmspc-worker: --index cannot be combined with --spawn-workers");
178 if (workerBin.empty()) {
179 workerBin = (argc > 0 && argv[0] && argv[0][0] !=
'\0') ? argv[0] :
"ndmspc-worker";
182 NLogInfo(
"ndmspc-worker: spawning %zu worker(s) using '%s' -> %s", spawnWorkers, workerBin.c_str(),
184 std::vector<pid_t> workerPids;
185 workerPids.reserve(spawnWorkers);
186 for (
size_t i = 0; i < spawnWorkers; ++i) {
187 const pid_t pid = spawn_worker_process(workerBin, endpoint, mode, macroList, verbose);
189 NLogError(
"ndmspc-worker: failed to spawn worker %zu", i);
190 terminate_workers(workerPids, SIGTERM);
193 workerPids.push_back(pid);
195 return wait_for_workers(workerPids);
199 const bool needsBootstrap = macroList.empty() || workerIndex == std::numeric_limits<size_t>::max();
200 if (needsBootstrap) {
201 NLogPrint(
"ndmspc-worker: bootstrapping config from supervisor at %s ...", endpoint.c_str());
202 void * ctx = zmq_ctx_new();
203 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
205 std::ostringstream bootIdBuilder;
206 const char * host = gSystem->HostName();
207 bootIdBuilder <<
"boot_" << (host ? host :
"unknown") <<
'_' << gSystem->GetPid() <<
'_'
208 << std::chrono::high_resolution_clock::now().time_since_epoch().count();
209 const std::string bootId = bootIdBuilder.str();
210 zmq_setsockopt(dealer, ZMQ_IDENTITY, bootId.data(), bootId.size());
211 int timeoutMs = 1000;
212 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
213 zmq_connect(dealer, endpoint.c_str());
214 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"BOOTSTRAP"});
216 const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(300);
217 bool configOk =
false;
219 std::vector<std::string> frames;
220 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
221 if (errno == EAGAIN || errno == EWOULDBLOCK) {
222 if (std::chrono::steady_clock::now() > deadline)
break;
228 if (frames.size() >= 5 && frames[0] ==
"CONFIG") {
229 if (workerIndex == std::numeric_limits<size_t>::max())
230 workerIndex =
static_cast<size_t>(std::stoul(frames[1]));
231 if (macroList.empty())
232 macroList = frames[2];
233 if (!frames[3].empty())
234 gSystem->Setenv(
"NDMSPC_TMP_DIR", frames[3].c_str());
235 if (!frames[4].empty())
236 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", frames[4].c_str());
244 NLogError(
"ndmspc-worker: failed to receive CONFIG from supervisor at %s, exiting", endpoint.c_str());
250 if (!gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR")[0] ==
'\0') {
251 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
252 if (tmpDirEnv && tmpDirEnv[0] !=
'\0')
253 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
256 if (macroList.empty()) {
257 NLogError(
"ndmspc-worker: no macro specified (use --macro or let supervisor send via bootstrap)");
260 if (workerIndex == std::numeric_limits<size_t>::max()) workerIndex = 0;
263 gSystem->Setenv(
"NDMSPC_WORKER_ENDPOINT", endpoint.c_str());
264 gSystem->Setenv(
"NDMSPC_WORKER_INDEX", std::to_string(workerIndex).c_str());
266 fprintf(stdout,
"ndmspc-worker: starting — index=%zu endpoint=%s\n", workerIndex, endpoint.c_str());
267 fprintf(stdout,
" macro = %s\n", macroList.c_str());
268 fprintf(stdout,
" NDMSPC_TMP_DIR = %s\n", gSystem->Getenv(
"NDMSPC_TMP_DIR") ? gSystem->Getenv(
"NDMSPC_TMP_DIR") :
"(not set)");
269 fprintf(stdout,
" NDMSPC_TMP_RESULTS_DIR = %s\n", gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") ? gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") :
"(not set)");
270 fprintf(stdout,
" NDMSPC_EXECUTION_MODE = %s\n", gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") ? gSystem->Getenv(
"NDMSPC_EXECUTION_MODE") :
"(not set)");
274 for (
const auto & macro : macros) {
275 NLogInfo(
"ndmspc-worker: executing macro '%s'", macro.c_str());
278 NLogError(
"ndmspc-worker: failed to open macro '%s', exiting", macro.c_str());
285 NLogInfo(
"ndmspc-worker: done");
static void SetConsoleOutput(bool enable)
Enables or disables logging output to the console.
static TMacro * OpenMacro(std::string filename)
Open a macro file.
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.