ndmspc  v1.2.0-0.1.rc3
NDimensionalIpcRunner.cxx
1 #include <cerrno>
2 #include <chrono>
3 #include <iomanip>
4 #include <sstream>
5 #include <signal.h>
6 #include <sys/wait.h>
7 #include <thread>
8 #include <unistd.h>
9 #include <zmq.h>
10 #include "NUtils.h"
11 #include <TSystem.h>
12 #include "NDimensionalIpcRunner.h"
13 #include "NGnThreadData.h"
14 #include "NThreadData.h"
15 
16 namespace Ndmspc {
17 
18 namespace {
19 std::string SerializeTaskIds(const std::vector<std::string> & taskIds)
20 {
21  std::ostringstream oss;
22  for (size_t i = 0; i < taskIds.size(); ++i) {
23  if (i != 0) oss << ',';
24  oss << taskIds[i];
25  }
26  return oss.str();
27 }
28 
29 bool ParseTaskBatchPayload(const std::string & payload, std::vector<std::pair<std::string, std::vector<int>>> & tasks)
30 {
31  tasks.clear();
32  std::stringstream batchStream(payload);
33  std::string taskToken;
34  while (std::getline(batchStream, taskToken, ';')) {
35  if (taskToken.empty()) continue;
36  size_t sep = taskToken.find(':');
37  if (sep == std::string::npos || sep == 0 || sep + 1 >= taskToken.size()) {
38  return false;
39  }
40  const std::string taskId = taskToken.substr(0, sep);
41  const std::string coords = taskToken.substr(sep + 1);
42  std::vector<int> parsedCoords;
43  std::stringstream coordStream(coords);
44  std::string coordToken;
45  while (std::getline(coordStream, coordToken, ',')) {
46  if (coordToken.empty()) continue;
47  parsedCoords.push_back(std::stoi(coordToken));
48  }
49  tasks.emplace_back(taskId, std::move(parsedCoords));
50  }
51  return !tasks.empty();
52 }
53 } // namespace
54 
55 bool NDimensionalIpcRunner::SendFrames(void * socket, const std::vector<std::string> & frames)
56 {
57  for (size_t i = 0; i < frames.size(); ++i) {
58  int flags = (i + 1 < frames.size()) ? ZMQ_SNDMORE : 0;
59  if (zmq_send(socket, frames[i].data(), frames[i].size(), flags) < 0) {
60  return false;
61  }
62  }
63  return true;
64 }
65 
66 bool NDimensionalIpcRunner::ReceiveFrames(void * socket, std::vector<std::string> & outFrames)
67 {
68  outFrames.clear();
69  while (true) {
70  zmq_msg_t msg;
71  zmq_msg_init(&msg);
72  int rc = zmq_msg_recv(&msg, socket, 0);
73  if (rc < 0) {
74  zmq_msg_close(&msg);
75  return false;
76  }
77  outFrames.emplace_back(static_cast<const char *>(zmq_msg_data(&msg)), static_cast<size_t>(rc));
78  int more = zmq_msg_more(&msg);
79  zmq_msg_close(&msg);
80  if (!more) break;
81  }
82  return true;
83 }
84 
85 std::string NDimensionalIpcRunner::BuildWorkerIdentity(size_t workerIndex)
86 {
87  std::ostringstream oss;
88  oss << "wk_" << std::setw(3) << std::setfill('0') << workerIndex;
89  return oss.str();
90 }
91 
92 std::string NDimensionalIpcRunner::SerializeCoords(const std::vector<int> & coords)
93 {
94  std::ostringstream oss;
95  for (size_t i = 0; i < coords.size(); ++i) {
96  if (i != 0) oss << ',';
97  oss << coords[i];
98  }
99  return oss.str();
100 }
101 
102 std::string NDimensionalIpcRunner::SerializeIds(const std::vector<Long64_t> & ids)
103 {
104  std::ostringstream oss;
105  for (size_t i = 0; i < ids.size(); ++i) {
106  if (i != 0) oss << ',';
107  oss << ids[i];
108  }
109  return oss.str();
110 }
111 
112 int NDimensionalIpcRunner::WorkerLoop(const std::string & endpoint, size_t workerIndex, NThreadData * worker)
113 {
114  std::ostringstream threadName;
115  threadName << "ipc_" << std::setw(3) << std::setfill('0') << workerIndex;
116  NLogger::SetThreadName(threadName.str());
117 
118  void * ctx = zmq_ctx_new();
119  if (!ctx) return 1;
120 
121  void * dealer = zmq_socket(ctx, ZMQ_DEALER);
122  if (!dealer) {
123  zmq_ctx_term(ctx);
124  return 1;
125  }
126 
127  const std::string identity = BuildWorkerIdentity(workerIndex);
128  if (zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size()) != 0) {
129  zmq_close(dealer);
130  zmq_ctx_term(ctx);
131  return 1;
132  }
133 
134  int timeoutMs = 1000;
135  zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
136 
137  if (zmq_connect(dealer, endpoint.c_str()) != 0) {
138  zmq_close(dealer);
139  zmq_ctx_term(ctx);
140  return 1;
141  }
142 
143  if (!SendFrames(dealer, {"READY"})) {
144  zmq_close(dealer);
145  zmq_ctx_term(ctx);
146  return 1;
147  }
148 
149  int rc = TaskLoop(dealer, workerIndex, worker);
150 
151  zmq_close(dealer);
152  zmq_ctx_term(ctx);
153  return rc;
154 }
155 
156 int NDimensionalIpcRunner::TaskLoop(void * dealer, size_t workerIndex, NThreadData * worker)
157 {
158  bool finishedOk = true;
159  bool aborted = false;
160  size_t tasksProcessed = 0;
161  const bool showWorkerProgress = []() {
162  const char * env = gSystem->Getenv("NDMSPC_WORKER_PROGRESS");
163  if (!env || env[0] == '\0') return false;
164  std::string value(env);
165  std::transform(value.begin(), value.end(), value.begin(),
166  [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
167  return (value == "1" || value == "true" || value == "yes" || value == "on");
168  }();
169 
170  // Non-blocking check: poll the socket and consume a STOP frame if present.
171  // Used between sub-tasks in a batch to react to supervisor abort quickly.
172  auto checkAbort = [&]() -> bool {
173  zmq_pollitem_t item = {dealer, 0, ZMQ_POLLIN, 0};
174  if (zmq_poll(&item, 1, 0) <= 0) return false;
175  std::vector<std::string> peek;
176  if (!ReceiveFrames(dealer, peek)) return false;
177  if (!peek.empty() && peek[0] == "STOP") {
178  aborted = (peek.size() >= 2 && peek[1] == "abort");
179  if (aborted) NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
180  return true;
181  }
182  return false; // unexpected frame — ignore, will be handled in main loop
183  };
184 
185  while (true) {
186  std::vector<std::string> frames;
187  if (!ReceiveFrames(dealer, frames)) {
188  if (errno == EAGAIN || errno == EWOULDBLOCK) continue;
189  finishedOk = false;
190  break;
191  }
192  if (frames.empty()) continue;
193 
194  const std::string & cmd = frames[0];
195  if (cmd == "STOP") {
196  if (showWorkerProgress && tasksProcessed > 0) { fprintf(stdout, "\n"); fflush(stdout); }
197  aborted = (frames.size() >= 2 && frames[1] == "abort");
198  if (aborted) {
199  NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
200  }
201  break;
202  }
203  if (cmd == "SETDEF") {
204  if (frames.size() < 2) {
205  finishedOk = false;
206  break;
207  }
208  if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
209  gnWorker->SetCurrentDefinitionName(frames[1]);
210  continue;
211  }
212  finishedOk = false;
213  break;
214  }
215  if (cmd == "SETIDS") {
216  if (frames.size() < 2) {
217  finishedOk = false;
218  break;
219  }
220  if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
221  gnWorker->SyncCurrentDefinitionIds(ParseIds(frames[1]));
222  continue;
223  }
224  finishedOk = false;
225  break;
226  }
227  if (cmd != "TASK" && cmd != "TASKB") {
228  finishedOk = false;
229  break;
230  }
231 
232  std::string errTaskId;
233  try {
234  if (cmd == "TASK") {
235  if (frames.size() < 3) {
236  finishedOk = false;
237  break;
238  }
239 
240  const std::string taskId = frames[1];
241  errTaskId = taskId;
242  std::vector<int> coords = ParseCoords(frames[2]);
243  ++tasksProcessed;
244  if (showWorkerProgress) {
245  fprintf(stdout, "\rWorker %zu: processing tasks [done: %zu] ", workerIndex, tasksProcessed);
246  fflush(stdout);
247  }
248  worker->Process(coords);
249  if (!SendFrames(dealer, {"ACK", taskId})) {
250  finishedOk = false;
251  break;
252  }
253  }
254  else {
255  if (frames.size() < 2) {
256  finishedOk = false;
257  break;
258  }
259 
260  std::vector<std::pair<std::string, std::vector<int>>> batchTasks;
261  if (!ParseTaskBatchPayload(frames[1], batchTasks)) {
262  finishedOk = false;
263  break;
264  }
265 
266  std::vector<std::string> ackedTaskIds;
267  ackedTaskIds.reserve(batchTasks.size());
268  tasksProcessed += batchTasks.size();
269  if (showWorkerProgress) {
270  fprintf(stdout, "\rWorker %zu: processing tasks [done: %zu] ", workerIndex, tasksProcessed);
271  fflush(stdout);
272  }
273  for (const auto & task : batchTasks) {
274  if (checkAbort()) { finishedOk = false; break; }
275  errTaskId = task.first;
276  worker->Process(task.second);
277  ackedTaskIds.push_back(task.first);
278  }
279  if (aborted) break;
280 
281  if (!SendFrames(dealer, {"ACKB", SerializeTaskIds(ackedTaskIds)})) {
282  finishedOk = false;
283  break;
284  }
285  }
286  }
287  catch (const std::exception & ex) {
288  SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, ex.what()});
289  finishedOk = false;
290  break;
291  }
292  catch (...) {
293  SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, "unknown worker exception"});
294  finishedOk = false;
295  break;
296  }
297  }
298 
299  if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
300  // Capture local tmp file path before closing (storage object path is cleared after Close).
301  std::string localTmpFile;
302  if (gnWorker->GetHnSparseBase() && gnWorker->GetHnSparseBase()->GetStorageTree()) {
303  localTmpFile = gnWorker->GetHnSparseBase()->GetStorageTree()->GetFileName();
304  }
305 
306  if (aborted) {
307  // Supervisor aborted — skip end function and copy, but close the file handle
308  // so we can delete it cleanly.
309  NLogPrint("Worker %zu: aborting, skipping post-processing.", workerIndex);
310  if (gnWorker->GetHnSparseBase()) {
311  gnWorker->GetHnSparseBase()->Close(false); // false = don't save
312  }
313  } else {
314  NLogDebug("Worker %zu finished processing, executing end function and closing file if open ...", workerIndex);
315  gnWorker->ExecuteEndFunction();
316  if (gnWorker->GetHnSparseBase()) {
317  gnWorker->GetHnSparseBase()->Close(true);
318  }
319 
320  // TCP mode: copy local result file to shared results dir so supervisor can merge.
321  const std::string & resultsFilename = gnWorker->GetResultsFilename();
322  if (!resultsFilename.empty()) {
323  if (!localTmpFile.empty() && localTmpFile != resultsFilename) {
324  const std::string resultsDir = std::string(gSystem->GetDirName(resultsFilename.c_str()));
325  NUtils::CreateDirectory(resultsDir);
326  NLogPrint("Worker %zu copying '%s' -> '%s' ...", workerIndex, localTmpFile.c_str(), resultsFilename.c_str());
327  if (!NUtils::Cp(localTmpFile, resultsFilename, kFALSE)) {
328  NLogError("Worker %zu: failed to copy '%s' to '%s'", workerIndex, localTmpFile.c_str(),
329  resultsFilename.c_str());
330  }
331  }
332  }
333  }
334 
335  // Delete the local tmp file only when a distinct results file was set.
336  // An empty resultsFilename means localTmpFile IS the results file (same path),
337  // so it must be kept for the supervisor to merge.
338  const std::string & resultsFilenameForDelete = gnWorker->GetResultsFilename();
339  const bool hasDistinctResultsFile = !resultsFilenameForDelete.empty() && resultsFilenameForDelete != localTmpFile;
340  if (!localTmpFile.empty() && hasDistinctResultsFile) {
341  NLogPrint("Worker %zu: removing local tmp file '%s'", workerIndex, localTmpFile.c_str());
342  gSystem->Unlink(localTmpFile.c_str());
343  }
344  }
345 
346  if (!aborted) {
347  // Signal master that this worker has finished writing its file.
348  // For TCP mode, master waits for this DONE before starting to merge.
349  // For IPC (fork) mode, master uses WaitForChildProcesses instead; the DONE
350  // message stays unread in the ZMQ buffer which is harmless.
351  SendFrames(dealer, {"DONE"});
352  }
353 
354  // Drop any unsent/undelivered messages immediately so zmq_close/zmq_ctx_term
355  // do not hang at shutdown.
356  int linger = 0;
357  zmq_setsockopt(dealer, ZMQ_LINGER, &linger, sizeof(linger));
358 
359  return finishedOk ? 0 : 1;
360 }
361 
362 bool NDimensionalIpcRunner::WaitForChildProcesses(const std::vector<pid_t> & pids, int timeoutMs)
363 {
364  bool allExitedCleanly = true;
365  const bool useTimeout = (timeoutMs >= 0);
366 
367  for (pid_t pid : pids) {
368  if (pid <= 0) continue;
369 
370  int status = 0;
371  pid_t rc = -1;
372  if (!useTimeout) {
373  rc = waitpid(pid, &status, 0);
374  }
375  else {
376  const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
377  while (std::chrono::steady_clock::now() < deadline) {
378  rc = waitpid(pid, &status, WNOHANG);
379  if (rc == pid || rc < 0) {
380  break;
381  }
382  std::this_thread::sleep_for(std::chrono::milliseconds(10));
383  }
384  if (rc == 0) {
385  allExitedCleanly = false;
386  continue;
387  }
388  }
389 
390  if (rc < 0) {
391  allExitedCleanly = false;
392  continue;
393  }
394 
395  if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
396  allExitedCleanly = false;
397  }
398  }
399  return allExitedCleanly;
400 }
401 
402 void NDimensionalIpcRunner::CleanupChildProcesses(const std::vector<pid_t> & pids)
403 {
404  for (pid_t pid : pids) {
405  if (pid <= 0) continue;
406  int status = 0;
407  pid_t rc = waitpid(pid, &status, WNOHANG);
408  if (rc == pid) {
409  continue;
410  }
411  if (rc < 0) {
412  continue;
413  }
414 
415  kill(pid, SIGTERM);
416  rc = waitpid(pid, &status, WNOHANG);
417  if (rc == pid || rc < 0) {
418  continue;
419  }
420 
421  if (!WaitForChildProcesses({pid}, 1500)) {
422  kill(pid, SIGKILL);
423 
424  // Never block indefinitely here: if a child is stuck in uninterruptible I/O
425  // state, waitpid(..., 0) can hang forever and stall process shutdown.
426  const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
427  while (std::chrono::steady_clock::now() < deadline) {
428  rc = waitpid(pid, &status, WNOHANG);
429  if (rc == pid || rc < 0) break;
430  std::this_thread::sleep_for(std::chrono::milliseconds(10));
431  }
432  if (rc == 0) {
433  NLogWarning("NDimensionalIpcRunner::CleanupChildProcesses: child pid=%d did not exit after SIGKILL; continuing shutdown", pid);
434  }
435  }
436  }
437 }
438 
439 std::vector<int> NDimensionalIpcRunner::ParseCoords(const std::string & coordsStr)
440 {
441  std::vector<int> coords;
442  std::stringstream ss(coordsStr);
443  std::string token;
444  while (std::getline(ss, token, ',')) {
445  if (token.empty()) continue;
446  coords.push_back(std::stoi(token));
447  }
448  return coords;
449 }
450 
451 std::vector<Long64_t> NDimensionalIpcRunner::ParseIds(const std::string & idsStr)
452 {
453  std::vector<Long64_t> ids;
454  std::stringstream ss(idsStr);
455  std::string token;
456  while (std::getline(ss, token, ',')) {
457  if (token.empty()) continue;
458  ids.push_back(static_cast<Long64_t>(std::stoll(token)));
459  }
460  return ids;
461 }
462 
463 } // namespace Ndmspc
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition: NLogger.cxx:132
static bool CreateDirectory(const std::string &path)
Definition: NUtils.cxx:688
static int Cp(std::string source, std::string destination, Bool_t progressbar=kTRUE)
Copy a file from source to destination.
Definition: NUtils.cxx:155