ndmspc v1.2.0-0.1.rc4
Loading...
Searching...
No Matches
NDimensionalIpcRunner.cxx
1#include <cerrno>
2#include <chrono>
3#include <iomanip>
4#include <sstream>
5#include <signal.h>
6#include <sys/wait.h>
7#include <thread>
8#include <unistd.h>
9#include <zmq.h>
10#include "NUtils.h"
11#include <TSystem.h>
12#include "NDimensionalIpcRunner.h"
13#include "NGnThreadData.h"
14#include "NThreadData.h"
15
16namespace Ndmspc {
17
18namespace {
19std::string SerializeTaskIds(const std::vector<std::string> & taskIds)
20{
21 std::ostringstream oss;
22 for (size_t i = 0; i < taskIds.size(); ++i) {
23 if (i != 0) oss << ',';
24 oss << taskIds[i];
25 }
26 return oss.str();
27}
28
29bool ParseTaskBatchPayload(const std::string & payload, std::vector<std::pair<std::string, std::vector<int>>> & tasks)
30{
31 tasks.clear();
32 std::stringstream batchStream(payload);
33 std::string taskToken;
34 while (std::getline(batchStream, taskToken, ';')) {
35 if (taskToken.empty()) continue;
36 size_t sep = taskToken.find(':');
37 if (sep == std::string::npos || sep == 0 || sep + 1 >= taskToken.size()) {
38 return false;
39 }
40 const std::string taskId = taskToken.substr(0, sep);
41 const std::string coords = taskToken.substr(sep + 1);
42 std::vector<int> parsedCoords;
43 std::stringstream coordStream(coords);
44 std::string coordToken;
45 while (std::getline(coordStream, coordToken, ',')) {
46 if (coordToken.empty()) continue;
47 parsedCoords.push_back(std::stoi(coordToken));
48 }
49 tasks.emplace_back(taskId, std::move(parsedCoords));
50 }
51 return !tasks.empty();
52}
53} // namespace
54
55bool NDimensionalIpcRunner::SendFrames(void * socket, const std::vector<std::string> & frames)
56{
57 for (size_t i = 0; i < frames.size(); ++i) {
58 int flags = (i + 1 < frames.size()) ? ZMQ_SNDMORE : 0;
59 if (zmq_send(socket, frames[i].data(), frames[i].size(), flags) < 0) {
60 return false;
61 }
62 }
63 return true;
64}
65
66bool NDimensionalIpcRunner::ReceiveFrames(void * socket, std::vector<std::string> & outFrames)
67{
68 outFrames.clear();
69 while (true) {
70 zmq_msg_t msg;
71 zmq_msg_init(&msg);
72 int rc = zmq_msg_recv(&msg, socket, 0);
73 if (rc < 0) {
74 zmq_msg_close(&msg);
75 return false;
76 }
77 outFrames.emplace_back(static_cast<const char *>(zmq_msg_data(&msg)), static_cast<size_t>(rc));
78 int more = zmq_msg_more(&msg);
79 zmq_msg_close(&msg);
80 if (!more) break;
81 }
82 return true;
83}
84
85std::string NDimensionalIpcRunner::BuildWorkerIdentity(size_t workerIndex)
86{
87 std::ostringstream oss;
88 oss << "wk_" << std::setw(3) << std::setfill('0') << workerIndex;
89 return oss.str();
90}
91
92std::string NDimensionalIpcRunner::SerializeCoords(const std::vector<int> & coords)
93{
94 std::ostringstream oss;
95 for (size_t i = 0; i < coords.size(); ++i) {
96 if (i != 0) oss << ',';
97 oss << coords[i];
98 }
99 return oss.str();
100}
101
102std::string NDimensionalIpcRunner::SerializeIds(const std::vector<Long64_t> & ids)
103{
104 std::ostringstream oss;
105 for (size_t i = 0; i < ids.size(); ++i) {
106 if (i != 0) oss << ',';
107 oss << ids[i];
108 }
109 return oss.str();
110}
111
112int NDimensionalIpcRunner::WorkerLoop(const std::string & endpoint, size_t workerIndex, NThreadData * worker)
113{
114 std::ostringstream threadName;
115 threadName << "ipc_" << std::setw(3) << std::setfill('0') << workerIndex;
116 NLogger::SetThreadName(threadName.str());
117
118 void * ctx = zmq_ctx_new();
119 if (!ctx) return 1;
120
121 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
122 if (!dealer) {
123 zmq_ctx_term(ctx);
124 return 1;
125 }
126
127 const std::string identity = BuildWorkerIdentity(workerIndex);
128 if (zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size()) != 0) {
129 zmq_close(dealer);
130 zmq_ctx_term(ctx);
131 return 1;
132 }
133
134 int timeoutMs = 1000;
135 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
136
137 if (zmq_connect(dealer, endpoint.c_str()) != 0) {
138 zmq_close(dealer);
139 zmq_ctx_term(ctx);
140 return 1;
141 }
142
143 if (!SendFrames(dealer, {"READY"})) {
144 zmq_close(dealer);
145 zmq_ctx_term(ctx);
146 return 1;
147 }
148
149 int rc = TaskLoop(dealer, workerIndex, worker);
150
151 zmq_close(dealer);
152 zmq_ctx_term(ctx);
153 return rc;
154}
155
156int NDimensionalIpcRunner::TaskLoop(void * dealer, size_t workerIndex, NThreadData * worker)
157{
158 bool finishedOk = true;
159 bool aborted = false;
160 size_t tasksProcessed = 0;
161 const bool showWorkerProgress = []() {
162 const char * env = gSystem->Getenv("NDMSPC_WORKER_PROGRESS");
163 if (!env || env[0] == '\0') return false;
164 std::string value(env);
165 std::transform(value.begin(), value.end(), value.begin(),
166 [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
167 return (value == "1" || value == "true" || value == "yes" || value == "on");
168 }();
169
170 // Non-blocking check: poll the socket and consume a STOP frame if present.
171 // Used between sub-tasks in a batch to react to supervisor abort quickly.
172 auto checkAbort = [&]() -> bool {
173 zmq_pollitem_t item = {dealer, 0, ZMQ_POLLIN, 0};
174 if (zmq_poll(&item, 1, 0) <= 0) return false;
175 std::vector<std::string> peek;
176 if (!ReceiveFrames(dealer, peek)) return false;
177 if (!peek.empty() && peek[0] == "STOP") {
178 aborted = (peek.size() >= 2 && peek[1] == "abort");
179 if (aborted) NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
180 return true;
181 }
182 return false; // unexpected frame — ignore, will be handled in main loop
183 };
184
185 while (true) {
186 std::vector<std::string> frames;
187 if (!ReceiveFrames(dealer, frames)) {
188 if (errno == EAGAIN || errno == EWOULDBLOCK) continue;
189 finishedOk = false;
190 break;
191 }
192 if (frames.empty()) continue;
193
194 const std::string & cmd = frames[0];
195 if (cmd == "STOP") {
196 if (showWorkerProgress && tasksProcessed > 0) { fprintf(stdout, "\n"); fflush(stdout); }
197 aborted = (frames.size() >= 2 && frames[1] == "abort");
198 if (aborted) {
199 NLogPrint("Worker %zu: received abort from supervisor, stopping ...", workerIndex);
200 }
201 break;
202 }
203 if (cmd == "SETDEF") {
204 if (frames.size() < 2) {
205 finishedOk = false;
206 break;
207 }
208 if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
209 gnWorker->SetCurrentDefinitionName(frames[1]);
210 continue;
211 }
212 finishedOk = false;
213 break;
214 }
215 if (cmd == "SETIDS") {
216 if (frames.size() < 2) {
217 finishedOk = false;
218 break;
219 }
220 if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
221 gnWorker->SyncCurrentDefinitionIds(ParseIds(frames[1]));
222 continue;
223 }
224 finishedOk = false;
225 break;
226 }
227 if (cmd != "TASK" && cmd != "TASKB") {
228 finishedOk = false;
229 break;
230 }
231
232 std::string errTaskId;
233 try {
234 if (cmd == "TASK") {
235 if (frames.size() < 3) {
236 finishedOk = false;
237 break;
238 }
239
240 const std::string taskId = frames[1];
241 errTaskId = taskId;
242 std::vector<int> coords = ParseCoords(frames[2]);
243 ++tasksProcessed;
244 if (showWorkerProgress) {
245 fprintf(stdout, "\rWorker %zu: processing tasks [done: %zu] ", workerIndex, tasksProcessed);
246 fflush(stdout);
247 }
248 worker->Process(coords);
249 if (!SendFrames(dealer, {"ACK", taskId})) {
250 finishedOk = false;
251 break;
252 }
253 }
254 else {
255 if (frames.size() < 2) {
256 finishedOk = false;
257 break;
258 }
259
260 std::vector<std::pair<std::string, std::vector<int>>> batchTasks;
261 if (!ParseTaskBatchPayload(frames[1], batchTasks)) {
262 finishedOk = false;
263 break;
264 }
265
266 std::vector<std::string> ackedTaskIds;
267 ackedTaskIds.reserve(batchTasks.size());
268 tasksProcessed += batchTasks.size();
269 if (showWorkerProgress) {
270 fprintf(stdout, "\rWorker %zu: processing tasks [done: %zu] ", workerIndex, tasksProcessed);
271 fflush(stdout);
272 }
273 for (const auto & task : batchTasks) {
274 if (checkAbort()) { finishedOk = false; break; }
275 errTaskId = task.first;
276 worker->Process(task.second);
277 ackedTaskIds.push_back(task.first);
278 }
279 if (aborted) break;
280
281 if (!SendFrames(dealer, {"ACKB", SerializeTaskIds(ackedTaskIds)})) {
282 finishedOk = false;
283 break;
284 }
285 }
286 }
287 catch (const std::exception & ex) {
288 SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, ex.what()});
289 finishedOk = false;
290 break;
291 }
292 catch (...) {
293 SendFrames(dealer, {"ERR", errTaskId.empty() ? "0" : errTaskId, "unknown worker exception"});
294 finishedOk = false;
295 break;
296 }
297 }
298
299 if (auto * gnWorker = dynamic_cast<NGnThreadData *>(worker)) {
300 // Capture local tmp file path before closing (storage object path is cleared after Close).
301 std::string localTmpFile;
302 if (gnWorker->GetHnSparseBase() && gnWorker->GetHnSparseBase()->GetStorageTree()) {
303 localTmpFile = gnWorker->GetHnSparseBase()->GetStorageTree()->GetFileName();
304 }
305
306 if (aborted) {
307 // Supervisor aborted — skip end function and copy, but close the file handle
308 // so we can delete it cleanly.
309 NLogPrint("Worker %zu: aborting, skipping post-processing.", workerIndex);
310 if (gnWorker->GetHnSparseBase()) {
311 gnWorker->GetHnSparseBase()->Close(false); // false = don't save
312 }
313 } else {
314 NLogDebug("Worker %zu finished processing, executing end function and closing file if open ...", workerIndex);
315 gnWorker->ExecuteEndFunction();
316 if (gnWorker->GetHnSparseBase()) {
317 gnWorker->GetHnSparseBase()->Close(true);
318 }
319
320 // TCP mode: copy local result file to shared results dir so supervisor can merge.
321 const std::string & resultsFilename = gnWorker->GetResultsFilename();
322 if (!resultsFilename.empty()) {
323 if (!localTmpFile.empty() && localTmpFile != resultsFilename) {
324 const std::string resultsDir = std::string(gSystem->GetDirName(resultsFilename.c_str()));
325 NUtils::CreateDirectory(resultsDir);
326 NLogPrint("Worker %zu copying '%s' -> '%s' ...", workerIndex, localTmpFile.c_str(), resultsFilename.c_str());
327 if (!NUtils::Cp(localTmpFile, resultsFilename, kFALSE)) {
328 NLogError("Worker %zu: failed to copy '%s' to '%s'", workerIndex, localTmpFile.c_str(),
329 resultsFilename.c_str());
330 }
331 }
332 }
333 }
334
335 // Delete the local tmp file only when a distinct results file was set.
336 // An empty resultsFilename means localTmpFile IS the results file (same path),
337 // so it must be kept for the supervisor to merge.
338 const std::string & resultsFilenameForDelete = gnWorker->GetResultsFilename();
339 const bool hasDistinctResultsFile = !resultsFilenameForDelete.empty() && resultsFilenameForDelete != localTmpFile;
340 if (!localTmpFile.empty() && hasDistinctResultsFile) {
341 NLogPrint("Worker %zu: removing local tmp file '%s'", workerIndex, localTmpFile.c_str());
342 gSystem->Unlink(localTmpFile.c_str());
343 }
344 }
345
346 if (!aborted) {
347 // Signal master that this worker has finished writing its file.
348 // For TCP mode, master waits for this DONE before starting to merge.
349 // For IPC (fork) mode, master uses WaitForChildProcesses instead; the DONE
350 // message stays unread in the ZMQ buffer which is harmless.
351 SendFrames(dealer, {"DONE"});
352 }
353
354 // Drop any unsent/undelivered messages immediately so zmq_close/zmq_ctx_term
355 // do not hang at shutdown.
356 int linger = 0;
357 zmq_setsockopt(dealer, ZMQ_LINGER, &linger, sizeof(linger));
358
359 return finishedOk ? 0 : 1;
360}
361
362bool NDimensionalIpcRunner::WaitForChildProcesses(const std::vector<pid_t> & pids, int timeoutMs)
363{
364 bool allExitedCleanly = true;
365 const bool useTimeout = (timeoutMs >= 0);
366
367 for (pid_t pid : pids) {
368 if (pid <= 0) continue;
369
370 int status = 0;
371 pid_t rc = -1;
372 if (!useTimeout) {
373 rc = waitpid(pid, &status, 0);
374 }
375 else {
376 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
377 while (std::chrono::steady_clock::now() < deadline) {
378 rc = waitpid(pid, &status, WNOHANG);
379 if (rc == pid || rc < 0) {
380 break;
381 }
382 std::this_thread::sleep_for(std::chrono::milliseconds(10));
383 }
384 if (rc == 0) {
385 allExitedCleanly = false;
386 continue;
387 }
388 }
389
390 if (rc < 0) {
391 allExitedCleanly = false;
392 continue;
393 }
394
395 if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
396 allExitedCleanly = false;
397 }
398 }
399 return allExitedCleanly;
400}
401
402void NDimensionalIpcRunner::CleanupChildProcesses(const std::vector<pid_t> & pids)
403{
404 for (pid_t pid : pids) {
405 if (pid <= 0) continue;
406 int status = 0;
407 pid_t rc = waitpid(pid, &status, WNOHANG);
408 if (rc == pid) {
409 continue;
410 }
411 if (rc < 0) {
412 continue;
413 }
414
415 kill(pid, SIGTERM);
416 rc = waitpid(pid, &status, WNOHANG);
417 if (rc == pid || rc < 0) {
418 continue;
419 }
420
421 if (!WaitForChildProcesses({pid}, 1500)) {
422 kill(pid, SIGKILL);
423
424 // Never block indefinitely here: if a child is stuck in uninterruptible I/O
425 // state, waitpid(..., 0) can hang forever and stall process shutdown.
426 const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
427 while (std::chrono::steady_clock::now() < deadline) {
428 rc = waitpid(pid, &status, WNOHANG);
429 if (rc == pid || rc < 0) break;
430 std::this_thread::sleep_for(std::chrono::milliseconds(10));
431 }
432 if (rc == 0) {
433 NLogWarning("NDimensionalIpcRunner::CleanupChildProcesses: child pid=%d did not exit after SIGKILL; continuing shutdown", pid);
434 }
435 }
436 }
437}
438
439std::vector<int> NDimensionalIpcRunner::ParseCoords(const std::string & coordsStr)
440{
441 std::vector<int> coords;
442 std::stringstream ss(coordsStr);
443 std::string token;
444 while (std::getline(ss, token, ',')) {
445 if (token.empty()) continue;
446 coords.push_back(std::stoi(token));
447 }
448 return coords;
449}
450
451std::vector<Long64_t> NDimensionalIpcRunner::ParseIds(const std::string & idsStr)
452{
453 std::vector<Long64_t> ids;
454 std::stringstream ss(idsStr);
455 std::string token;
456 while (std::getline(ss, token, ',')) {
457 if (token.empty()) continue;
458 ids.push_back(static_cast<Long64_t>(std::stoll(token)));
459 }
460 return ids;
461}
462
463} // namespace Ndmspc
static void SetThreadName(const std::string &name, std::thread::id thread_id=std::this_thread::get_id())
Sets the name of a thread.
Definition NLogger.cxx:132
Thread-local data object for NDMSPC processing.
Definition NThreadData.h:21
static bool CreateDirectory(const std::string &path)
Definition NUtils.cxx:688
static int Cp(std::string source, std::string destination, Bool_t progressbar=kTRUE)
Copy a file from source to destination.
Definition NUtils.cxx:155
Global callback function for libwebsockets client events.