ndmspc v1.2.0-0.1.rc5
Loading...
Searching...
No Matches
NGnTree.cxx
1#include <chrono>
2#include <cstddef>
3#include <ctime>
4#include <numbers>
5#include <set>
6#include <string>
7#include <vector>
8#include "TAxis.h"
9#include <TDirectory.h>
10#include <TObject.h>
11#include <TList.h>
12#include <TROOT.h>
13#include <THnSparse.h>
14#include <TH1.h>
15#include <TCanvas.h>
16#include <TSystem.h>
17#include <TMap.h>
18#include <TObjString.h>
19#include <TTree.h>
20#include <TBufferJSON.h>
21#include <sys/poll.h>
22#include <zmq.h>
23#include "NParameters.h"
24#include "NStorageTree.h"
25#include "NBinning.h"
26#include "NBinningDef.h"
27#include "NDimensionalExecutor.h"
28#include "NDimensionalIpcRunner.h"
29#include "NGnThreadData.h"
30#include "NLogger.h"
31#include "NTreeBranch.h"
32#include "NUtils.h"
33#include "NStorageTree.h"
34#include "NGnNavigator.h"
35#include "NGnTree.h"
36
38ClassImp(Ndmspc::NGnTree);
40
41namespace Ndmspc {
42
43std::string NGnTree::BuildObjectPath(const json & cfg, const json & objCfg, const NBinningPoint * point)
44{
45 std::string objPath = "";
46 if (objCfg.contains("prefix") && objCfg["prefix"].is_string()) {
47 objPath = objCfg["prefix"].get<std::string>();
48 }
49
50 std::string axisObjectDefaultFormat =
51 cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
52 std::string axisDefaultSeparator =
53 cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
54
55 std::string lastSep;
56 for (auto & axisEntry : cfg["axes"]) {
57 std::string axisName;
58 std::string mode;
59 std::string format;
60
61 if (axisEntry.is_string()) {
62 axisName = axisEntry.get<std::string>();
63 if (axisObjectDefaultFormat.empty()) {
64 mode = "bin";
65 }
66 else {
67 mode = "minmax";
68 format = axisObjectDefaultFormat;
69 }
70 }
71 else if (axisEntry.is_object()) {
72 if (axisEntry.contains("name") && axisEntry["name"].is_string()) {
73 axisName = axisEntry["name"].get<std::string>();
74 }
75 else {
76 continue;
77 }
78 if (axisEntry.contains("mode") && axisEntry["mode"].is_string()) {
79 mode = axisEntry["mode"].get<std::string>();
80 }
81 if (axisEntry.contains("format") && axisEntry["format"].is_string()) {
82 format = axisEntry["format"].get<std::string>();
83 }
84 }
85 else {
86 continue;
87 }
88
89 if (mode.empty()) {
90 if (axisObjectDefaultFormat.empty())
91 mode = "bin";
92 else
93 mode = "minmax";
94 }
95 if (format.empty()) {
96 if (mode == "minmax")
97 format = axisObjectDefaultFormat.empty() ? "%.2f_%.2f" : axisObjectDefaultFormat;
98 else if (mode == "bin")
99 format = "%d";
100 else
101 format = "%.2f";
102 }
103
104 if (mode == "minmax") {
105 double min = point->GetBinMin(axisName);
106 double max = point->GetBinMax(axisName);
107 objPath += TString::Format(format.c_str(), min, max).Data();
108 }
109 else if (mode == "min") {
110 double min = point->GetBinMin(axisName);
111 objPath += TString::Format(format.c_str(), min).Data();
112 }
113 else if (mode == "max") {
114 double max = point->GetBinMax(axisName);
115 objPath += TString::Format(format.c_str(), max).Data();
116 }
117 else if (mode == "center") {
118 double c = point->GetBinCenter(axisName);
119 objPath += TString::Format(format.c_str(), c).Data();
120 }
121 else if (mode == "label") {
122 std::string lbl = point->GetBinLabel(axisName);
123 objPath += lbl;
124 }
125 else if (mode == "bin") {
126 objPath += std::to_string(point->GetBin(axisName));
127 }
128 else {
129 objPath += std::to_string(point->GetBin(axisName));
130 }
131
132 std::string sep = axisDefaultSeparator;
133 if (axisEntry.is_object() && axisEntry.contains("sufix") && axisEntry["sufix"].is_string()) {
134 sep = axisEntry["sufix"].get<std::string>();
135 }
136 objPath += sep;
137 lastSep = sep;
138 }
139
140 if (!lastSep.empty() && objPath.size() >= lastSep.size()) {
141 objPath = objPath.substr(0, objPath.size() - lastSep.size());
142 }
143 if (objCfg.contains("sufix") && objCfg["sufix"].is_string()) {
144 objPath += objCfg["sufix"].get<std::string>();
145 }
146
147 return objPath;
148}
149
156
157NGnTree::NGnTree() : TObject() {}
158
159NGnTree::NGnTree(std::vector<TAxis *> axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
160{
164 if (axes.empty()) {
165 NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
166 // fBinning = new NBinning();
167 MakeZombie();
168 return;
169 }
170 fBinning = new NBinning(axes);
172 fTreeStorage->InitTree(filename, treename);
173 fNavigator = new NGnNavigator();
174 fNavigator->SetGnTree(this);
175}
176NGnTree::NGnTree(TObjArray * axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
177{
181
182 if (axes == nullptr) {
183 NLogError("NGnTree::NGnTree: Axes TObjArray is nullptr.");
184 MakeZombie();
185 return;
186 }
187
188 if (axes == nullptr && axes->GetEntries() == 0) {
189 NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
190 MakeZombie();
191 return;
192 }
193 fBinning = new NBinning(axes);
195 fTreeStorage->InitTree(filename, treename);
196 fNavigator = new NGnNavigator();
197 fNavigator->SetGnTree(this);
198}
199
200NGnTree::NGnTree(NGnTree * ngnt, std::string filename, std::string treename) : TObject(), fInput(nullptr)
201{
205 if (ngnt == nullptr) {
206 NLogError("NGnTree::NGnTree: NGnTree is nullptr.");
207 MakeZombie();
208 return;
209 }
210
211 if (ngnt->GetBinning() == nullptr) {
212 NLogError("NGnTree::NGnTree: Binning in NGnTree is nullptr.");
213 MakeZombie();
214 return;
215 }
216
217 // TODO: Import binning from user
218 fBinning = (NBinning *)ngnt->GetBinning()->Clone();
220 fTreeStorage->InitTree(filename, treename);
221 fNavigator = new NGnNavigator();
222 fNavigator->SetGnTree(this);
223}
224
226 : TObject(), fBinning(b), fTreeStorage(s), fInput(nullptr), fOwnsBinning(false), fOwnsTreeStorage(false)
227{
231 if (fBinning == nullptr) {
232 NLogError("NGnTree::NGnTree: Binning is nullptr.");
233 MakeZombie();
234 }
235 if (s == nullptr) {
237 fTreeStorage->InitTree("", "ngnt");
238 fOwnsTreeStorage = true;
239 }
240
241 if (fTreeStorage == nullptr) {
242 NLogError("NGnTree::NGnTree: Storage tree is nullptr.");
243 MakeZombie();
244 }
245
246 // fBinning->Initialize();
247 //
248 // TODO: Check if this is needed
249 fTreeStorage->SetBinning(fBinning);
250 fBinning->GetPoint()->SetTreeStorage(fTreeStorage);
251 fNavigator = new NGnNavigator();
252 fNavigator->SetGnTree(this);
253}
254NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis, const std::string & outFileName, json cfg)
255 : TObject(), fInput(nullptr)
256{
260
261 std::map<std::string, std::vector<std::vector<int>>> b;
262 TObjArray * axes = new TObjArray();
263 int parameterAxisIdx = -1;
264 std::vector<std::string> labels;
265 for (int i = 0; i < hns->GetNdimensions(); i++) {
266 TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267 TAxis * axis = (TAxis *)axisIn->Clone();
268
269 // check if parameterAxis matches axis name
270 if (parameterAxis.compare(axis->GetName()) == 0) {
271 parameterAxisIdx = i;
272 TAxis * axis = hns->GetAxis(parameterAxisIdx);
273 for (int bin = 1; bin <= axis->GetNbins(); bin++) {
274 // NLogInfo("Axis bin %d label: %s", bin, axis->GetBinLabel(bin));
275 labels.push_back(axis->GetBinLabel(bin));
276 }
277 continue;
278 }
279
280 // set label
281 if (axisIn->IsAlphanumeric()) {
282 NLogTrace("Setting axis '%s' labels from input THnSparse", axis->GetName());
283 for (int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284 std::string label = axisIn->GetBinLabel(bin);
285 if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
286 }
287 }
288
289 axes->Add(axis);
290 b[axis->GetName()] = {{1}};
291 }
292
293 // json cfg;
294 cfg["_parameterAxis"] = parameterAxisIdx;
295 cfg["_labels"] = labels;
296
297 // return nullptr;
298 NLogDebug("Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
299 parameterAxisIdx);
300 NGnTree * ngnt = new NGnTree(axes, outFileName);
301 NLogDebug("Created NGnTree for THnSparse import ...");
302 if (ngnt->IsZombie()) {
303 NLogError("NGnTree::Import: Failed to create NGnTree !!!");
304 MakeZombie();
305 return;
306 }
307 // ngnt->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
308 // Get env variable for tmp dir $NDMSPC_TMP_DIR
309 const char * tmpDirStr = gSystem->Getenv("NDMSPC_TMP_DIR");
310
311 std::string tmpDir;
312
313 if (!tmpDirStr || tmpDir.empty()) {
314 tmpDir = "/tmp";
315 }
316 std::string tmpFilename = tmpDir + "/ngnt_imported_input" + std::to_string(gSystem->GetPid()) + ".root";
317 NGnTree * ngntIn = new NGnTree(axes, tmpFilename);
318 if (ngntIn->IsZombie()) {
319 NLogError("NGnTree::Import: Failed to create NGnTree for input !!!");
320 SafeDelete(ngnt);
321 MakeZombie();
322 return;
323 }
324 // ngntIn->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
325 ngntIn->GetOutput("default")->Add(hns->Clone("test"));
326 ngntIn->Close(true);
327
328 // return;
329 // delete ngntIn;
330
331 ngnt->SetInput(NGnTree::Open(tmpFilename)); // Set input to self
332
333 ngnt->GetInput()->Print();
334
335 ngnt->GetBinning()->AddBinningDefinition("default", b);
336 ngnt->InitParameters(cfg["_labels"].get<std::vector<std::string>>());
337
338 Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
339 int /*threadId*/) {
340 // NLogInfo("Thread ID: %d", threadId);
341 TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
342 // point->Print();
343 json cfg = point->GetCfg();
344
345 NGnTree * ngntIn = point->GetInput();
346 if (!ngntIn) {
347 NLogError("NGnTree::Import: Input NGnTree is nullptr !!!");
348 return;
349 }
350 // ngntIn->Print();
351
352 THnSparse * hns = (THnSparse *)ngntIn->GetOutput("default")->At(0);
353 if (hns == nullptr) {
354 NLogError("NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
355 return;
356 }
357
358 int axisIdx = cfg["_parameterAxis"].get<int>();
359 std::vector<std::vector<int>> ranges;
360 // set ranges from point storage coords
361 int iAxis = 0;
362 for (int i = 0; i < hns->GetNdimensions(); i++) {
363 if (i == axisIdx) continue; // skip parameter axis
364 int coord = point->GetStorageCoords()[iAxis++];
365 ranges.push_back({i, coord, coord});
366 // NLogInfo("Setting axis %d range to [%d, %d]", i, coord, coord);
367 }
368
369 NUtils::SetAxisRanges(hns, ranges);
370 TH1 * h = hns->Projection(axisIdx, "O");
371 if (!h) {
372 NLogError("NGnTree::Import: Projection of THnSparse failed !!!");
373 return;
374 }
375 if (h->GetEntries() > 0) {
376 NParameters * params = point->GetParameters();
377 if (params) {
378 for (int bin = 1; bin <= h->GetNbinsX(); bin++) {
379 params->SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
380 }
381 }
382 // outputPoint->Add(hParams);
383 outputPoint->Add(h);
384
385 std::string filename = cfg["filename"].get<std::string>();
386 TFile * f = (TFile *)point->GetTempObject("file");
387 if (!f || filename.compare(f->GetName()) != 0) {
388 if (f) {
389 NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
390 f->Close();
391 }
392 NLogDebug("NGnTree::Import: Opening file '%s' ...", filename.c_str());
393 f = TFile::Open(filename.c_str());
394 if (!f || f->IsZombie()) {
395 NLogError("NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
396 return;
397 }
398 point->SetTempObject("file", f);
399 }
400
401 std::string axisObjectDefaultFormat =
402 cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
403 std::string axisDefaultSeparator =
404 cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
405 bool dryrun = false;
406 if (cfg.contains("dryrun") && cfg["dryrun"].is_boolean()) {
407 dryrun = cfg["dryrun"].get<bool>();
408 }
409
410 if (dryrun) {
411 NLogInfo("NGnTree::Import (dryrun): '%s' ...", point->GetString().c_str());
412 }
413
414 // loop over all object in cfg["objects"]
415 for (auto & [objName, objCfg] : cfg["objects"].items()) {
416 std::string objPath = NGnTree::BuildObjectPath(cfg, objCfg, point);
417
418 if (dryrun) {
419 NLogInfo("NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
420 continue;
421 }
422
423 if (point->GetEntryNumber() == 0) {
424 NLogInfo("NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425 cfg["filename"].get<std::string>().c_str());
426 }
427 TObject * obj = f->Get(objPath.c_str());
428 if (!obj) {
429 if (point->GetEntryNumber() == 0) {
430 NLogWarning("NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431 cfg["filename"].get<std::string>().c_str());
432 }
433 continue;
434 }
435
436 if (obj->InheritsFrom(TCanvas::Class())) {
437 TCanvas * cObj = (TCanvas *)obj;
438 cObj->SetName(objName.c_str());
439 }
440 outputPoint->Add(obj->Clone(objName.c_str()));
441 }
442 // f->Close();
443 }
444 };
445
446 // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
447 ngnt->Process(processFunc, cfg);
448 ngnt->Close(true);
449 // Remove tmp file
450 gSystem->Exec(TString::Format("rm -f %s", tmpFilename.c_str()));
451}
452
454{
458
459 if (fOwnsBinning) {
460 SafeDelete(fBinning);
461 }
462 if (fOwnsTreeStorage) {
463 SafeDelete(fTreeStorage);
464 }
465 SafeDelete(fNavigator);
466 SafeDelete(fParameters);
467}
468void NGnTree::Print(Option_t * option) const
469{
473
474 TString opt = option;
475
476 // Print list of axes
477 NLogInfo("NGnTree::Print: Printing NGnTree object [ALL] ...");
478 if (fBinning) {
479 fBinning->Print(option);
480 }
481 else {
482 NLogError("Binning is not initialized in NGnTree !!!");
483 }
484 if (fTreeStorage) {
485 fTreeStorage->Print(option);
486 }
487 else {
488 NLogError("Storage tree is not initialized in NGnTree !!!");
489 }
490}
491
492void NGnTree::Draw(Option_t * /*option*/)
493{
497
498 NLogInfo("NGnTree::Draw: Drawing NGnTree object [not implemented yet]...");
499}
500
501bool NGnTree::Process(NGnProcessFuncPtr func, const json & cfg, std::string binningName, NGnBeginFuncPtr beginFunc,
502 NGnEndFuncPtr endFunc)
503{
507
508 if (!fBinning) {
509 NLogError("Binning is not initialized in NGnTree !!!");
510 return false;
511 }
512
513 NBinning * binningIn = (NBinning *)fBinning->Clone();
514
515 std::vector<std::string> defNames = fBinning->GetDefinitionNames();
516 if (!binningName.empty()) {
517 // Check if binning definitions exist
518 if (std::find(defNames.begin(), defNames.end(), binningName) == defNames.end()) {
519 NLogError("Binning definition '%s' not found in NGnTree !!!", binningName.c_str());
520 return false;
521 }
522 defNames.clear();
523 defNames.push_back(binningName);
524 }
525
526 fBinning->Reset();
527 fBinning->SetCfg(cfg); // Set configuration to binning point
528 bool rc = Process(func, defNames, cfg, binningIn, beginFunc, endFunc);
529 if (!rc) {
530 NLogError("NGnTree::Process: Processing failed !!!");
531 return false;
532 }
533 // bool rc = false;
534 return true;
535}
536
537bool NGnTree::Process(NGnProcessFuncPtr func, const std::vector<std::string> & defNames, const json & cfg,
538 NBinning * binningIn, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc)
539{
543
544 NLogInfo("NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545 bool batch = gROOT->IsBatch();
546 gROOT->SetBatch(kTRUE);
547 TH1::AddDirectory(kFALSE);
548
549 std::string storagePostfix = fTreeStorage ? fTreeStorage->GetPostfix() : "";
550 if (storagePostfix.empty() && fTreeStorage) {
551 const std::string storageFileName = fTreeStorage->GetFileName();
552 if (!storageFileName.empty()) {
553 storagePostfix = gSystem->BaseName(storageFileName.c_str());
554 }
555 }
556 if (storagePostfix.empty()) {
557 storagePostfix = "ndmspc.root";
558 }
559
560 // --- Worker mode: run as a remote TCP worker ---
561 if (const char * workerEndpoint = gSystem->Getenv("NDMSPC_WORKER_ENDPOINT")) {
562 size_t workerIndex = 0;
563 if (const char * envIdx = gSystem->Getenv("NDMSPC_WORKER_INDEX")) {
564 try { workerIndex = static_cast<size_t>(std::stoul(envIdx)); } catch (...) {}
565 }
566 NLogInfo("NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
567
568 // const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
569 // jobDir and treeName will be received from master via INIT
570 // For now init NGnThreadData with a placeholder filename; Init() will be called with real paths
571 Ndmspc::NGnThreadData workerData;
572
573 void * ctx = zmq_ctx_new();
574 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
575 const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
576 zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
577 int timeoutMs = 1000;
578 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
579 zmq_connect(dealer, workerEndpoint);
580 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"READY"});
581
582 // Wait for INIT
583 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
584 bool initOk = false;
585 while (!initOk) {
586 std::vector<std::string> frames;
587 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
588 if (errno == EAGAIN || errno == EWOULDBLOCK) {
589 if (std::chrono::steady_clock::now() > initDeadline) break;
590 continue;
591 }
592 break;
593 }
594 // INIT frames: "INIT", workerIdx, sessionId, resultsDir, treeName[, tmpDir, tmpResultsDir]
595 if (frames.size() >= 1 && frames[0] == "STOP") {
596 NLogPrint("NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
597 zmq_close(dealer);
598 zmq_ctx_term(ctx);
599 gROOT->SetBatch(batch);
600 return true;
601 }
602 if (frames.size() >= 5 && frames[0] == "INIT") {
603 workerIndex = static_cast<size_t>(std::stoul(frames[1]));
604 const std::string & sessionId = frames[2];
605 const std::string & initResultsDir = frames[3];
606 const std::string & initTreeName = frames[4];
607
608 // Apply env vars sent by supervisor — these override the worker's inherited environment
609 if (frames.size() >= 7) {
610 if (!frames[5].empty())
611 gSystem->Setenv("NDMSPC_TMP_DIR", frames[5].c_str());
612 if (!frames[6].empty())
613 gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
614 }
615 // Fallback: if NDMSPC_TMP_RESULTS_DIR is still unset/empty, use NDMSPC_TMP_DIR
616 if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
617 const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
618 if (tmpDirEnv && tmpDirEnv[0] != '\0')
619 gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
620 }
621
622 // Local work file — always on this machine's NDMSPC_TMP_DIR
623 const char * localTmpEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
624 const std::string localBase = localTmpEnv ? localTmpEnv : "/tmp";
625 const std::string localFile = localBase + "/.ndmspc/tmp/" + sessionId + "/" +
626 std::to_string(workerIndex) + "/" + storagePostfix;
627
628 // Results file — on shared FS; supervisor reads from here to merge
629 const std::string resultsFile = initResultsDir + "/" + std::to_string(workerIndex) + "/" +
630 storagePostfix;
631
632 bool rc = workerData.Init(workerIndex, func, beginFunc, endFunc, this, binningIn, fInput, localFile, initTreeName);
633 if (!rc) {
634 NLogError("NGnTree::Process: Worker failed to initialize NGnThreadData");
635 zmq_close(dealer);
636 zmq_ctx_term(ctx);
637 return false;
638 }
639 // If results dir differs from local dir, tell TaskLoop to copy after Close(true)
640 if (resultsFile != localFile) {
641 workerData.SetResultsFilename(resultsFile);
642 }
643 workerData.SetCfg(cfg);
644 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"ACK"});
645 initOk = true;
646 }
647 }
648 if (!initOk) {
649 NLogError("NGnTree::Process: Worker did not receive INIT from supervisor");
650 zmq_close(dealer);
651 zmq_ctx_term(ctx);
652 return false;
653 }
654
655 Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
656 zmq_close(dealer);
657 zmq_ctx_term(ctx);
658 gROOT->SetBatch(batch);
659 return true;
660 }
661 // --- End worker mode ---
662
664
665 int nThreads = ROOT::GetThreadPoolSize(); // Get the number of threads to use
666 if (nThreads < 1) nThreads = 1;
667
668 std::string executionMode = "thread";
669 const char * envMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE");
670 const bool modeExplicit = (envMode && envMode[0] != '\0');
671 if (modeExplicit) {
672 executionMode = envMode;
673 }
674
675 std::string normalizedMode = executionMode;
676 std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
677 [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
678 if (normalizedMode == "process") normalizedMode = "ipc";
679
680 bool useProcessIpc = (normalizedMode == "ipc" || normalizedMode == "tcp");
681 bool useTcp = (normalizedMode == "tcp");
682 size_t nProcesses = static_cast<size_t>(nThreads);
683 bool ndmspcNProcExplicit = false;
684
685 if (const char * envNdmspcNProc = gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
686 ndmspcNProcExplicit = true;
687 try {
688 nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNdmspcNProc)));
689 }
690 catch (...) {
691 NLogWarning("NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
692 nProcesses);
693 }
694 }
695 else if (const char * envNProc = gSystem->Getenv("ROOT_MAX_THREADS")) {
696 // Backward-compatible fallback when NDMSPC_MAX_PROCESSES is not set.
697 try {
698 nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNProc)));
699 }
700 catch (...) {
701 NLogWarning("NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
702 }
703 }
704
705 // Keep explicit NDMSPC_EXECUTION_MODE settings authoritative.
706 // If mode is not explicitly set, default to local IPC for multi-process runs.
707 if (modeExplicit) {
708 if (normalizedMode == "thread") {
709 useProcessIpc = false;
710 useTcp = false;
711 }
712 else if (normalizedMode == "tcp") {
713 useProcessIpc = true;
714 useTcp = true;
715 }
716 else if (normalizedMode == "ipc") {
717 useProcessIpc = true;
718 useTcp = false;
719 }
720 else {
721 NLogWarning("NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
722 executionMode.c_str());
723 useProcessIpc = (nProcesses > 1);
724 useTcp = false;
725 }
726 }
727 else if (nProcesses > 1) {
728 useProcessIpc = true;
729 useTcp = false;
730 executionMode = "ipc";
731 normalizedMode = "ipc";
732 }
733
734 if (ndmspcNProcExplicit && normalizedMode == "thread" && nProcesses > 1) {
735 NLogWarning("NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
736 nProcesses);
737 }
738
739 const size_t workerObjectCount = useProcessIpc ? std::max(static_cast<size_t>(nThreads), nProcesses)
740 : static_cast<size_t>(nThreads);
741 std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
742
743 NLogInfo("NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
744 executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
745
746 const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
747 std::string tmpDir;
748 if (tmpDirEnv && tmpDirEnv[0] != '\0') {
749 tmpDir = tmpDirEnv;
750 } else {
751 TString tmpDirPrefix = fTreeStorage->GetPrefix();
752 // Use storage prefix only if it is a local path (not a remote URL)
753 if (!(tmpDirPrefix.BeginsWith("root://") || tmpDirPrefix.BeginsWith("http://") ||
754 tmpDirPrefix.BeginsWith("https://"))) {
755 tmpDir = tmpDirPrefix.Data();
756 }
757 if (tmpDir.empty()) tmpDir = "/tmp";
758 }
759
760 std::string jobDir = tmpDir + "/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
761
762 // Results dir: when NDMSPC_TMP_RESULTS_DIR equals NDMSPC_TMP_DIR (or is unset),
763 // reuse jobDir so that localTmpFile == resultsFilename — no copy or delete needed.
764 const char * resultsDirEnv = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
765 const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
766 std::string resultsDir = sameDir ? jobDir
767 : (std::string(resultsDirEnv) + "/" +
768 std::to_string(gSystem->GetPid()));
769
770 std::string filePrefix = jobDir;
771 for (size_t i = 0; i < threadDataVector.size(); ++i) {
772 std::string filename = filePrefix + "/" + std::to_string(i) + "/" + storagePostfix;
773 bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc, this, binningIn, fInput, filename,
774 fTreeStorage->GetTree()->GetName());
775 if (!rc) {
776 NLogError("Failed to initialize thread data %zu, exiting ...", i);
777 return false;
778 }
779 threadDataVector[i].SetCfg(cfg); // Set configuration to binning point
780 if (useTcp) {
781 // Tell the merge step where workers will deposit their finished files.
782 // When resultsDir == jobDir (NDMSPC_TMP_RESULTS_DIR unset) the paths are
783 // identical so no copy or delete is needed — handled in TaskLoop.
784 std::string resultsFile = resultsDir + "/" + std::to_string(i) + "/" + storagePostfix;
785 threadDataVector[i].SetResultsFilename(resultsFile);
786 }
787 }
788 size_t processedEntries = 0;
789 size_t totalEntries = 0;
790 auto start_par = std::chrono::high_resolution_clock::now();
791 auto start_par_job = std::chrono::high_resolution_clock::now();
792 auto task = [&](const std::vector<int> & coords, Ndmspc::NGnThreadData & thread_obj) {
793 // NLogWarning("Processing coordinates %s in thread %zu", NUtils::GetCoordsString(coords).c_str(),
794 // thread_obj.GetAssignedIndex());
795 // thread_obj.Print();
796 thread_obj.Process(coords);
797 processedEntries++;
799 size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
800 : totalEntries - processedEntries;
801 NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format("R%4zu", nRunning).Data());
802 }
803 };
804
805 size_t iDef = 0;
806 int sumIds = 0;
807
808 std::vector<Ndmspc::NThreadData *> processWorkers;
809 std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
810 if (useProcessIpc) {
811 processWorkers.reserve(threadDataVector.size());
812 for (size_t i = 0; i < threadDataVector.size(); ++i) {
813 processWorkers.push_back(&threadDataVector[i]);
814 }
815 ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
816 if (useTcp) {
817 const char * tcpPort = gSystem->Getenv("NDMSPC_TCP_PORT");
818 std::string tcpEndpoint = std::string("tcp://0.0.0.0:") + (tcpPort ? tcpPort : "5555");
819 const char * resultsDirBase = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
820 const char * macroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS");
821 // Auto-detect the macro to send to workers: explicit SetWorkerMacro() takes
822 // priority; otherwise fall back to NDMSPC_MACRO set by ndmspc-run.
823 std::string workerMacro = fWorkerMacroList;
824 if (workerMacro.empty()) {
825 if (const char * envMacro = gSystem->Getenv("NDMSPC_MACRO")) workerMacro = envMacro;
826 }
827 ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
828 fTreeStorage->GetTree()->GetName(), workerMacro, tmpDir,
829 resultsDirBase ? resultsDirBase : "",
830 macroParams ? macroParams : "");
831 } else {
832 ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
833 }
834 }
835
836 std::map<std::string, std::vector<Long64_t>>
837 defIdMapProcessedRemoved; // Map to track which ids belong to which definition
838 // for (auto & name : defNames) {
839 // auto binningDef = binningIn->GetDefinition(name);
840 // if (!binningDef) {
841 // NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
842 // return false;
843 // }
844 // for (auto & id : binningDef->GetIds()) {
845 // defIdMap[name].push_back(id);
846 // }
847 // }
848
849 try {
850 for (auto & name : defNames) {
851 auto binningDef = binningIn->GetDefinition(name);
852 if (!binningDef) {
853 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
854 return false;
855 }
856
857 if (binningDef->GetIds().size() == 0) {
858 NLogWarning("NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
859 continue;
860 }
861
862 const std::vector<Long64_t> originalDefinitionIds = binningDef->GetIds();
863
864 std::vector<int> mins, maxs;
865 mins.push_back(0);
866 maxs.push_back(binningDef->GetIds().size() - 1);
867 NLogDebug("NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
868 binningDef->GetIds().size());
869
871 NLogInfo("NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
872 }
873 else {
874 Printf("Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
875 }
876 start_par = std::chrono::high_resolution_clock::now();
877 processedEntries = 0;
878 totalEntries = maxs[0] + 1;
879 const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
880 : threadDataVector.size();
882 NUtils::ProgressBar(processedEntries, totalEntries, start_par,
883 TString::Format("R%4zu", activeWorkers).Data());
884
885 // binningIn->SetCurrentDefinitionName(name);
886 for (size_t i = 0; i < threadDataVector.size(); ++i) {
887 // threadDataVector[i].SetCurrentDefinitionName(name);
888 threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
889 }
890
891 Ndmspc::NDimensionalExecutor executorMT(mins, maxs);
892
893 if (!useProcessIpc) {
894 // Disable ROOT's RecursiveRemove during the parallel phase.
895 // Without this, concurrent threads' object deletions trigger RecursiveRemove
896 // which iterates pad->fPrimitives without per-object locks → TObjLink corruption.
897 Bool_t prevMustClean = gROOT->MustClean();
898 gROOT->SetMustClean(kFALSE);
899
900 // Enable batch mode during the parallel phase so that ROOT drawing calls
901 // (e.g. TH1::Fit drawing the fit function via f1->Draw) are no-ops.
902 // Without this, concurrent threads each trigger ROOT canvas creation ("c1"),
903 // the second canvas creation deletes the first → double-free / heap corruption,
904 // which then causes TPad::RecursiveRemove to crash when closing the per-thread TTree.
905 Bool_t prevBatch = gROOT->IsBatch();
906 gROOT->SetBatch(kTRUE);
907
908 executorMT.ExecuteParallel<Ndmspc::NGnThreadData>(task, threadDataVector);
909
910 // Restore both flags before flushing deferred deletes, so each object's destructor
911 // properly calls gROOT->RecursiveRemove and removes itself from ROOT's global lists.
912 // This prevents dangling pointers that would crash the RecursiveRemove cascade
913 // triggered by TTree::~TTree when the per-thread storage files are closed below.
914 // It is safe here because all worker threads have already finished.
915 gROOT->SetMustClean(prevMustClean);
916 gROOT->SetBatch(prevBatch);
917
918 // Flush deferred deletes single-threaded with MustClean and batch mode restored.
919 for (size_t i = 0; i < threadDataVector.size(); ++i) {
920 threadDataVector[i].FlushDeferredDeletes();
921 }
922
923 for (size_t i = 0; i < threadDataVector.size(); ++i) {
924 threadDataVector[i].ExecuteEndFunction();
925 }
926 }
927 else {
928 ipcExecutor->SetBounds(mins, maxs);
929 // Capture final active worker count reported by the IPC executor so
930 // we can deterministically rebuild per-worker counters for only the
931 // workers that actually connected.
932 size_t finalActiveWorkers = 0;
933 size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
934 name, &originalDefinitionIds,
935 [&, activeWorkers](const ExecutionProgress& progress) {
936 processedEntries = progress.tasksAcked;
937 finalActiveWorkers = progress.activeWorkers;
939 size_t nRunning = std::min(progress.activeWorkers, activeWorkers);
940 NUtils::ProgressBar(processedEntries, totalEntries, start_par,
941 TString::Format("R%4zu", nRunning).Data());
942 }
943 });
944 processedEntries = acked;
945
946 // Child processes update their own worker-object copies. Rebuild parent-side
947 // per-worker counters and processed-id vectors deterministically from task assignment.
948 // Use the number of workers that actually connected (finalActiveWorkers) if available;
949 // otherwise fall back to the configured process count / processWorkers size.
950 size_t connected = finalActiveWorkers > 0 ? finalActiveWorkers : std::min(nProcesses, processWorkers.size());
951 const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, connected));
952 for (size_t i = 0; i < threadDataVector.size(); ++i) {
953 auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
954 if (workerDef) {
955 workerDef->GetIds().clear();
956 }
957 }
958
959 for (size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
960 const size_t workerIndex = taskIndex % processesToUse;
961 threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
962
963 auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
964 if (workerDef) {
965 workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
966 }
967 }
968
969 if (!NLogger::GetConsoleOutput() && processedEntries < totalEntries) {
970 NUtils::ProgressBar(processedEntries, totalEntries, start_par, "R 0");
971 }
972 }
973
975 Printf("Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
976 // Update hnsbBinningIn with the processed ids
977 NLogDebug("NGnTree::Process: [BEGIN] ------------------------------------------------");
978 sumIds += binningIn->GetDefinition(name)->GetIds().size();
979 binningIn->GetDefinition(name)->GetIds().clear();
980 for (size_t i = 0; i < threadDataVector.size(); ++i) {
981 NLogDebug("NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
982 // threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->Print();
983 binningIn->GetDefinition(name)->GetIds().insert(
984 binningIn->GetDefinition(name)->GetIds().end(),
985 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
986 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
987 sort(binningIn->GetDefinition(name)->GetIds().begin(), binningIn->GetDefinition(name)->GetIds().end());
988 }
989 // hnsbBinningIn->GetDefinition(name)->Print();
990 // remove entries present in hnsbBinningIn from other definitions
991 for (size_t i = 0; i < defNames.size(); i++) {
992
993 std::string other_name = defNames[i];
994 auto otherDef = binningIn->GetDefinition(other_name);
995 if (i <= iDef) {
996 continue;
997 }
998 if (!otherDef) {
999 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
1000 return false;
1001 }
1002 // remove entries that has value less then sumIds
1003 for (auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
1004 NLogTrace("NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
1005 other_name.c_str(), sumIds);
1006 if (*it < sumIds) {
1007 NLogTrace("NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
1008 defIdMapProcessedRemoved[other_name].push_back(*it);
1009 it = otherDef->GetIds().erase(it);
1010 }
1011 else {
1012 ++it;
1013 }
1014 }
1015
1016 binningIn->GetDefinition(other_name)->Print();
1017 }
1018 // hnsbBinningIn->GetDefinition(name)->Print();
1019 iDef++;
1020
1021 NLogDebug("NGnTree::Process: [END] ------------------------------------------------");
1022 }
1023 }
1024 catch (const std::exception & ex) {
1025 if (ipcExecutor) {
1026 ipcExecutor->FinishProcessIpc(/*abort=*/true);
1027 ipcExecutor.reset();
1028 }
1029
1030 TString what(ex.what());
1031 if (what.Contains("Interrupted by user")) {
1032 if (gROOT) {
1033 gROOT->SetInterrupt(kFALSE);
1034 }
1035 NLogWarning("NGnTree::Process: Interrupted by user, stopping processing.");
1036 }
1037 else {
1038 NLogError("NGnTree::Process: Processing failed: %s", ex.what());
1039 }
1040 return false;
1041 }
1042
1043 // return true; // For testing, skip merging and post-processing
1044
1045 auto end_par = std::chrono::high_resolution_clock::now();
1046 std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1047
1048 if (ipcExecutor) {
1049 ipcExecutor->FinishProcessIpc();
1050 }
1051
1052 // For TCP mode, only merge results from workers that actually connected.
1053 // For IPC/fork and thread modes, all indices are valid.
1054 const std::set<size_t> registeredWorkers =
1055 (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1056 const bool filterByRegistered = !registeredWorkers.empty();
1057
1059 Printf("NGnTree::Process: Execution completed and it took %s .",
1060 NUtils::FormatTime(par_duration.count() / 1000).c_str());
1061 }
1062 else {
1063 NLogInfo("NGnTree::Process: Execution completed and it took %s .",
1064 NUtils::FormatTime(par_duration.count() / 1000).c_str());
1065 }
1066
1067 NLogInfo("NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1068 for (auto & data : threadDataVector) {
1069 if (useProcessIpc) {
1070 NLogTrace("NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1071 data.GetAssignedIndex());
1072 // data.GetHnSparseBase()->GetStorageTree()->Close(false);
1073 }
1074 else {
1075 NLogTrace("NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1076 data.GetHnSparseBase()->GetStorageTree()->Close(true);
1077 }
1078 }
1079
1080 NLogDebug("NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1082 Printf("NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1083 }
1084 const auto mergeStart = std::chrono::high_resolution_clock::now();
1085 TList * mergeList = new TList();
1086 Ndmspc::NGnThreadData * outputData = new Ndmspc::NGnThreadData();
1087 outputData->Init(0, func, nullptr, nullptr, this, binningIn);
1088 outputData->SetCfg(cfg);
1089 // outputData->Init(0, func, this);
1090
1091 for (auto & data : threadDataVector) {
1092 if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1093 NLogInfo("NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1094 continue;
1095 }
1096 NLogTrace("NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1097 mergeList->Add(&data);
1098 }
1099
1100 Long64_t nmerged = outputData->Merge(mergeList);
1101 const auto mergeEnd = std::chrono::high_resolution_clock::now();
1103 const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1104 Printf("NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1105 }
1106 if (nmerged <= 0) {
1107 NLogError("NGnTree::Process: Failed to merge thread data, exiting ...");
1108 delete mergeList;
1109 return false;
1110 }
1111 NLogInfo("NGnTree::Process: Merged %lld outputs successfully", nmerged);
1112 // delete all temporary files
1113 // for (auto & data : threadDataVector) {
1114 // std::string filename = data.GetHnSparseBase()->GetStorageTree()->GetFileName();
1115 // NLogTrace("NGnTree::Process: Deleting temporary file '%s' ...", filename.c_str());
1116 // gSystem->Exec(TString::Format("rm -f %s", filename.c_str()));
1117 // }
1118 //
1119
1120 // binningIn= outputData->GetHnSparseBase()->GetBinning();
1121
1122 auto * mergedBinning = outputData->GetHnSparseBase()->GetBinning();
1123 std::set<Long64_t> mergedContentIds;
1124 std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1125
1126 // add missing entries to definitions based on defIdMapProcessedRemoved
1127 for (size_t i = 0; i < defNames.size(); i++) {
1128 std::string name = defNames[i];
1129 // auto def = binningIn->GetDefinition(name);
1130 auto def = mergedBinning->GetDefinition(name);
1131 if (!def) {
1132 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1133 return false;
1134 }
1135 for (auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1136 if (other_name.compare(name) != 0) {
1137 continue;
1138 }
1139 for (auto & id : removedIds) {
1140 if (std::find(def->GetIds().begin(), def->GetIds().end(), id) == def->GetIds().end()) {
1141 NLogTrace("NGnTree::Process: Adding missing entry %lld to definition '%s'", id, name.c_str());
1142 def->GetIds().push_back(id);
1143 }
1144 }
1145 }
1146 sort(def->GetIds().begin(), def->GetIds().end());
1147 // outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds() = def->GetIds();
1148
1149 // Modify content in binning definitions based on def->GetIds()
1150 def->GetContent()->Reset();
1151 for (auto id : def->GetIds()) {
1152 NBinningPoint point(def->GetBinning());
1153 def->GetBinning()->GetContent()->GetBinContent(id, point.GetCoords());
1155 Long64_t bin = def->GetContent()->GetBin(point.GetStorageCoords());
1156 NLogTrace("NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(), id, bin);
1157 def->GetContent()->SetBinContent(bin, id);
1158
1159 if (mergedContentIds.insert(id).second) {
1160 mergedContentCoords.emplace_back(
1162 }
1163 }
1164 }
1165
1166 // Rebuild the merged top-level content from final definition ids. In IPC/TCP mode
1167 // the merge setup may still carry sparse source-bin content; resetting here keeps
1168 // only the bins that correspond to actual merged tree entries.
1169 mergedBinning->GetContent()->Reset();
1170 for (const auto & entry : mergedContentCoords) {
1171 Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1172 mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1173 }
1174
1175 // print final binning definitions
1176 NLogDebug("NGnTree::Process: Final binning definitions after processing:");
1177 for (auto & name : defNames) {
1178 // auto binningDef = binningIn->GetDefinition(name);
1179 auto binningDef = outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name);
1180 if (!binningDef) {
1181 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1182 return false;
1183 }
1184 binningDef->Print();
1185 }
1186
1187 fTreeStorage = outputData->GetHnSparseBase()->GetStorageTree();
1188 fOutputs = outputData->GetHnSparseBase()->GetOutputs();
1189 fBinning = outputData->GetHnSparseBase()->GetBinning(); // Update binning to the merged one
1190 fParameters = outputData->GetHnSparseBase()->GetParameters();
1191
1193 NLogInfo("NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1194 fTreeStorage->GetFileName().c_str());
1195 }
1196 else {
1197 Printf("Processing completed successfully. Output was stored in '%s'.", fTreeStorage->GetFileName().c_str());
1198 }
1199
1200 // Close the final output file
1202 Printf("NGnTree::Process: [phase] final close start (%s)",
1203 outputData->GetHnSparseBase()->GetStorageTree()->GetFileName().c_str());
1204 }
1205 const auto closeStart = std::chrono::high_resolution_clock::now();
1206 outputData->GetHnSparseBase()->Close(true);
1207 const auto closeEnd = std::chrono::high_resolution_clock::now();
1209 const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1210 Printf("NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1211 }
1212
1214 Printf("NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1215 }
1216 const auto cleanupStart = std::chrono::high_resolution_clock::now();
1217 gSystem->Exec(TString::Format("rm -fr %s", jobDir.c_str()));
1218 const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1220 const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1221 Printf("NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1222 }
1223 gROOT->SetBatch(batch); // Restore ROOT batch mode
1224 return true;
1225}
1226
1227TList * NGnTree::GetOutput(std::string name)
1228{
1232
1233 if (name.empty()) {
1234 name = fBinning->GetCurrentDefinitionName();
1235 }
1236 if (fOutputs.find(name) == fOutputs.end()) {
1237 fOutputs[name] = new TList();
1238 fOutputs[name]->SetName(name.c_str());
1239 }
1240 return fOutputs[name];
1241}
1242
1243NGnTree * NGnTree::Open(const std::string & filename, const std::string & branches, const std::string & treename)
1244{
1248
1249 NLogDebug("Opening '%s' with branches='%s' and treename='%s' ...", filename.c_str(), branches.c_str(),
1250 treename.c_str());
1251
1252 TFile * file = TFile::Open(filename.c_str());
1253 if (!file) {
1254 NLogError("NGnTree::Open: Cannot open file '%s'", filename.c_str());
1255 return nullptr;
1256 }
1257
1258 TTree * tree = (TTree *)file->Get(treename.c_str());
1259 if (!tree) {
1260 NLogError("NGnTree::Open: Cannot get tree '%s' from file '%s'", treename.c_str(), filename.c_str());
1261 return nullptr;
1262 }
1263
1264 return Open(tree, branches, file);
1265}
1266
1267NGnTree * NGnTree::Open(TTree * tree, const std::string & branches, TFile * file)
1268{
1272
1273 NBinning * hnstBinning = (NBinning *)tree->GetUserInfo()->At(0);
1274 if (!hnstBinning) {
1275 NLogError("NGnTree::Open: Cannot get binning from tree '%s'", tree->GetName());
1276 return nullptr;
1277 }
1278 NStorageTree * hnstStorageTree = (NStorageTree *)tree->GetUserInfo()->At(1);
1279 if (!hnstStorageTree) {
1280 NLogError("NGnTree::Open: Cannot get tree storage info from tree '%s'", tree->GetName());
1281 return nullptr;
1282 }
1283
1284 std::map<std::string, TList *> outputs;
1285 TDirectory * dir = nullptr;
1286 if (file) {
1287 dir = (TDirectory *)file->Get("outputs");
1288 auto l = dir->GetListOfKeys();
1289 for (auto kv : *l) {
1290 TObject * obj = dir->Get(kv->GetName());
1291 if (!obj) continue;
1292 TList * l = dynamic_cast<TList *>(obj);
1293 if (!l) continue;
1294 outputs[l->GetName()] = l;
1295 NLogDebug("Imported output list for binning '%s' with %d object(s) from file '%s'", l->GetName(), l->GetEntries(),
1296 file->GetName());
1297 }
1298 }
1299 // TDirectory * dir = (TDirectory *)tree->GetUserInfo()->FindObject("outputs");
1300 // if (dir) {
1301 // dir->Print();
1302 // }
1303
1304 NGnTree * ngnt = new NGnTree(hnstBinning, hnstStorageTree);
1305
1306 if (!hnstStorageTree->SetFileTree(file, tree)) return nullptr;
1307 // if (!ngnt->InitBinnings({})) return nullptr;
1308 // ngnt->Print();
1309 // Get list of branches
1310 std::vector<std::string> enabledBranches;
1311 if (!branches.empty()) {
1312 enabledBranches = Ndmspc::NUtils::Tokenize(branches, ',');
1313 NLogTrace("NGnTree::Open: Enabled branches: %s", NUtils::GetCoordsString(enabledBranches, -1).c_str());
1314 hnstStorageTree->SetEnabledBranches(enabledBranches);
1315 }
1316 else {
1317 // loop over all branches and set address
1318 for (auto & kv : hnstStorageTree->GetBranchesMap()) {
1319 NLogTrace("NGnTree::Open: Enabled branches: %s", kv.first.c_str());
1320 }
1321 }
1322 // Set all branches to be read
1323 hnstStorageTree->SetBranchAddresses();
1324 ngnt->SetOutputs(outputs);
1325
1326 NGnNavigator * nav = new NGnNavigator();
1327 nav->SetGnTree(ngnt);
1328 ngnt->SetNavigator(nav);
1329
1330 return ngnt;
1331}
1332
1334{
1338
1339 if (fNavigator) {
1340 NLogTrace("NGnTree::SetNavigator: Replacing existing navigator ...");
1341 SafeDelete(fNavigator);
1342 }
1343
1344 fNavigator = navigator;
1345}
1346
1347bool NGnTree::Close(bool write)
1348{
1352
1353 if (!fTreeStorage) {
1354 NLogError("NGnTree::Close: Storage tree is not initialized in NGnTree !!!");
1355 return false;
1356 }
1357
1358 return fTreeStorage->Close(write, fOutputs);
1359}
1360
1361Int_t NGnTree::GetEntry(Long64_t entry, bool checkBinningDef)
1362{
1366 if (!fTreeStorage) {
1367 NLogError("NGnTree::GetEntry: Storage tree is not initialized in NGnTree !!!");
1368 return -1;
1369 }
1370
1371 int bytes =
1372 fTreeStorage->GetEntry(entry, fBinning->GetPoint(0, fBinning->GetCurrentDefinitionName()), checkBinningDef);
1373 if (fTreeStorage->GetBranch("_params")) fParameters = (NParameters *)fTreeStorage->GetBranch("_params")->GetObject();
1374 return bytes;
1375}
1376Int_t NGnTree::GetEntry(std::vector<std::vector<int>> /*range*/, bool checkBinningDef)
1377{
1381
1382 return GetEntry(0, checkBinningDef);
1383}
1384
1385void NGnTree::Play(int timeout, std::string binning, std::vector<int> outputPointIds,
1386 std::vector<std::vector<int>> ranges, Option_t * option)
1387{
1391 TString opt = option;
1392 opt.ToUpper();
1393
1394 std::string annimationTempDir =
1395 TString::Format("%s/.ndmspc/animation/%d", gSystem->Getenv("HOME"), gSystem->GetPid()).Data();
1396 gSystem->Exec(TString::Format("mkdir -p %s", annimationTempDir.c_str()));
1397
1398 if (binning.empty()) {
1399 binning = fBinning->GetCurrentDefinitionName();
1400 }
1401
1402 NBinningDef * binningDef = fBinning->GetDefinition(binning);
1403 if (!binningDef) {
1404 NLogError("NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1405 NLogError("Available binning definitions:");
1406 for (auto & name : fBinning->GetDefinitionNames()) {
1407 if (name == fBinning->GetCurrentDefinitionName())
1408 NLogError(" [*] %s", name.c_str());
1409 else
1410 NLogError(" [ ] %s", name.c_str());
1411 }
1412 return;
1413 }
1414
1415 THnSparse * bdContent = (THnSparse *)binningDef->GetContent()->Clone();
1416
1417 std::string bdContentName = TString::Format("bdContent_%s", binning.c_str()).Data();
1418 // Set axis ranges if provided
1419 if (!ranges.empty()) NUtils::SetAxisRanges(bdContent, ranges, false, true);
1420
1421 Long64_t linBin = 0;
1422 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(true /*use axis range*/)};
1423 std::vector<Long64_t> ids;
1424 // std::vector<Long64_t> ids = binningDef->GetIds();
1425 while ((linBin = iter->Next()) >= 0) {
1426 // NLogDebug("Original content bin %lld: %f", linBin, bdContentOrig->GetBinContent(linBin));
1427 ids.push_back(linBin);
1428 }
1429 if (ids.empty()) {
1430 NLogWarning("NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1431 return;
1432 }
1433 // return;
1434
1435 TCanvas * c1 = nullptr;
1436
1437 c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject("c1");
1438 if (c1 == nullptr) c1 = new TCanvas("c1", "NGnTree::Play", 800, 600);
1439 c1->Clear();
1440 c1->cd();
1441 c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1442 gSystem->ProcessEvents();
1443
1444 binningDef->Print();
1445 bdContent->Reset();
1446
1447 // loop over all ids and print them
1448 for (auto id : ids) {
1449 // for (int id = 0; id < GetEntries(); id++) {
1450 GetEntry(id);
1451 fBinning->GetPoint()->Print();
1452 TList * l = (TList *)fTreeStorage->GetBranch("_outputPoint")->GetObject();
1453 if (!l || l->IsEmpty()) {
1454 NLogWarning("NGnTree::Play: No 'outputPoint' for entry %lld !!!", id);
1455 continue;
1456 }
1457 else {
1458 // NLogInfo("Output for entry %lld:", id);
1459 // l->Print(opt.Data());
1460
1461 if (outputPointIds.empty()) {
1462 outputPointIds.resize(l->GetEntries());
1463 for (int i = 0; i < l->GetEntries(); i++) {
1464 outputPointIds[i] = i;
1465 }
1466 }
1467 int n = outputPointIds.size();
1468
1469 Double_t v = 1.0;
1470 for (int i = 0; i < n; i++) {
1471 // NLogDebug("Drawing output object id %d (list index %d) on pad %d", outputPointIds[i], i, i + 1);
1472
1473 c1->cd(i + 2);
1474 TObject * obj = l->At(outputPointIds[i]);
1475 if (obj) {
1476 if (obj->InheritsFrom(TH1::Class())) {
1477 TH1 * h = (TH1 *)obj;
1478 h->SetDirectory(nullptr);
1479 // Draw a clone to avoid transferring ownership or modifying
1480 // the original object stored in the TList (can cause
1481 // TPad/TList removal during drawing and lead to crashes).
1482 TH1 * hclone = (TH1 *)h->Clone();
1483 if (hclone) {
1484 hclone->SetDirectory(nullptr);
1485 hclone->Draw();
1486 }
1487 }
1488 // obj->Print();
1489 }
1490 if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1491 TH1 * h = (TH1 *)obj;
1492 v = h->GetMean();
1493 NLogDebug("Mean value from histogram [%s]: %f", h->GetName(), v);
1494 }
1495 }
1496 bdContent->SetBinContent(fBinning->GetPoint()->GetStorageCoords(), 1);
1497 c1->cd(1);
1498 TH1 * bdProj = (TH1 *)gROOT->FindObjectAny("bdProj");
1499 if (bdProj) {
1500 delete bdProj;
1501 bdProj = nullptr;
1502 }
1503 if (bdContent->GetNdimensions() == 1) {
1504 bdProj = bdContent->Projection(0, "O");
1505 }
1506 else if (bdContent->GetNdimensions() == 2) {
1507 bdProj = bdContent->Projection(0, 1, "O");
1508 }
1509 else if (bdContent->GetNdimensions() == 3) {
1510 bdProj = bdContent->Projection(0, 1, 2, "O");
1511 }
1512 else {
1513 NLogError("NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1514 }
1515 if (bdProj) {
1516 bdProj->SetName("bdProj");
1517 bdProj->SetTitle(TString::Format("Binning '%s' content projection", binning.c_str()).Data());
1518 bdProj->SetMinimum(0);
1519 // bdProj->SetDirectory(nullptr);
1520 bdProj->Draw("colz");
1521 // c1->ModifiedUpdate();
1522 }
1523 }
1524 if (c1) {
1525 c1->ModifiedUpdate();
1526 c1->SaveAs(TString::Format("%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1527 }
1528 gSystem->ProcessEvents();
1529 if (timeout > 0) gSystem->Sleep(timeout);
1530 NLogInfo("%d", id);
1531 }
1532
1533 NLogInfo("Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1534 gSystem->Exec(
1535 TString::Format("magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1536 gSystem->Exec(TString::Format("rm -fr %s", annimationTempDir.c_str()));
1537 NLogInfo("Animation saved to ndmspc_play.gif");
1538
1539 delete bdContent;
1540}
1541
1542TList * NGnTree::Projection(const json & cfg, std::string binningName)
1543{
1547
1548 // SetInput(); // Set input to selfp
1550 Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * output, TList * /*outputPoint*/,
1551 int /*threadId*/) {
1552 // NLogInfo("Thread ID: %d", threadId);
1553 TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1554 point->Print();
1555 json cfg = point->GetCfg();
1556
1557 Printf("Processing THnSparse projection with configuration: %s", cfg.dump().c_str());
1558
1559 Ndmspc::NGnTree * ngntIn = point->GetInput();
1560 // ngntIn->Print();
1561 // ngntIn->GetEntry(0);
1562 ngntIn->GetEntry(point->GetEntryNumber());
1563
1564 // loop over all cfg["objects"]
1565 for (auto & [objName, objCfg] : cfg["objects"].items()) {
1566 NLogInfo("Processing object '%s' ...", objName.c_str());
1567
1568 THnSparse * hns = (THnSparse *)(ngntIn->GetStorageTree()->GetBranchObject(objName));
1569 if (hns == nullptr) {
1570 NLogError("NGnTree::Projection: THnSparse 'hns' not found in storage tree !!!");
1571 return;
1572 }
1573 // hns->Print("all");
1574 // loop over cfg["objects"][objName] array of projection dimension names
1575 for (size_t i = 0; i < objCfg.size(); i++) {
1576
1577 NLogInfo("Processing projection %zu for object '%s' ...", i, objName.c_str());
1578 std::vector<int> dims;
1579 std::vector<std::string> dimNames = cfg["objects"][objName][i].get<std::vector<std::string>>();
1580 for (const auto & dimName : dimNames) {
1581 NLogDebug("Looking for dimension name '%s' in THnSparse ...", dimName.c_str());
1582 int dim = -1;
1583 for (int i = 0; i < hns->GetNdimensions(); i++) {
1584 if (dimName == hns->GetAxis(i)->GetName()) {
1585 dim = i;
1586 break;
1587 }
1588 }
1589 if (dim >= 0)
1590 dims.push_back(dim);
1591 else {
1592 NLogError("NGnTree::Projection: Dimension name '%s' not found in THnSparse !!!", dimName.c_str());
1593 }
1594 }
1595 // Print dims
1596 NLogInfo("Projecting THnSparse on dimensions: %s", NUtils::GetCoordsString(dims, -1).c_str());
1597 TH1 * hPrev = (TH1 *)output->At(i);
1598 TH1 * hProj = NUtils::ProjectTHnSparse(hns, dims, "O");
1599 hProj->SetName(TString::Format("%s_proj_%s", objName.c_str(), NUtils::Join(dims, '_').c_str()).Data());
1600 if (hPrev) {
1601 hPrev->Add(hProj);
1602 }
1603 else {
1604 output->Add(hProj);
1605 }
1606 }
1607 }
1608 output->Print();
1609 };
1610
1611 // NBinningDef * binningDef = fInput->GetBinning()->GetDefinition(binningName);
1612 NBinningDef * binningDef = GetBinning()->GetDefinition(binningName);
1613 THnSparse * hnsIn = binningDef->GetContent();
1614 // std::vector<std::vector<int>> ranges{{0, 2, 2}, {2, 1, 1}};
1615 std::vector<std::vector<int>> ranges = cfg["ranges"].get<std::vector<std::vector<int>>>();
1616 NUtils::SetAxisRanges(hnsIn, ranges); // Set the ranges for the axes
1617 Long64_t linBin = 0;
1618 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{hnsIn->CreateIter(true /*use axis range*/)};
1619 std::vector<Long64_t> ids;
1620 // std::vector<Long64_t> ids = binningDef->GetIds();
1621 while ((linBin = iter->Next()) >= 0) {
1622 ids.push_back(linBin);
1623 }
1624 if (ids.empty()) {
1625 NLogWarning("NGnTree::Projection: No entries found in binning definition '%s' !!!", binningDef->GetName());
1626 binningDef->RefreshIdsFromContent();
1627 return nullptr;
1628 }
1629
1630 binningDef->GetIds() = ids;
1631
1632 // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
1633 Process(processFunc, cfg);
1634
1635 // Refresh binning definition ids from content after processing
1636 binningDef->RefreshIdsFromContent();
1637
1638 // GetInput()->Close(false);
1639 return GetOutput(fBinning->GetCurrentDefinitionName());
1640
1641 // Close(false);
1642}
1643
1644NGnNavigator * NGnTree::Reshape(std::string binningName, std::vector<std::vector<int>> levels, int level,
1645 std::map<int, std::vector<int>> ranges, std::map<int, std::vector<int>> rangesBase)
1646{
1650
1651 NGnNavigator navigator;
1652 navigator.SetGnTree(this);
1653
1654 return navigator.Reshape(binningName, levels, level, ranges, rangesBase);
1655}
1656
1657NGnNavigator * NGnTree::GetResourceStatisticsNavigator(std::string binningName, std::vector<std::vector<int>> levels,
1658 int level, std::map<int, std::vector<int>> ranges,
1659 std::map<int, std::vector<int>> rangesBase)
1660{
1664
1665 if (binningName.empty()) {
1666 binningName = fBinning->GetCurrentDefinitionName();
1667 }
1668
1669 THnSparse * hns = (THnSparse *)fOutputs[binningName]->FindObject("resource_monitor");
1670 if (!hns) {
1671 NLogError("NGnTree::Draw: Resource monitor THnSparse not found in outputs !!!");
1672 return nullptr;
1673 }
1674 hns->Print("all");
1675 // return nullptr;
1676
1677 auto ngnt = new NGnTree(hns, "stat", "/tmp/hnst_imported_for_drawing.root");
1678 if (ngnt->IsZombie()) {
1679 NLogError("NGnTree::GetResourceStatisticsNavigator: Failed to import resource monitor THnSparse !!!");
1680 return nullptr;
1681 }
1682 ngnt->Print();
1683 ngnt->Close();
1684
1685 // return nullptr;
1686 auto ngnt2 = NGnTree::Open("/tmp/hnst_imported_for_drawing.root");
1687 auto nav = ngnt2->Reshape("default", levels, level, ranges, rangesBase);
1688 // // nav->Export("/tmp/hnst_imported_for_drawing.json", {}, "ws://localhost:8080/ws/root.websocket");
1689 // // nav->Draw();
1690 return nav;
1691}
1692
1693bool NGnTree::InitParameters(const std::vector<std::string> & paramNames)
1694{
1698
1699 if (fParameters) {
1700 NLogTrace("NGnTree::InitParameters: Replacing existing parameters ...");
1701 delete fParameters;
1702 }
1703
1704 if (paramNames.empty()) {
1705 NLogTrace("NGnTree::InitParameters: No parameter names provided, skipping ...");
1706 return false;
1707 }
1708
1709 fParameters = new NParameters(paramNames, "results", "Results");
1710
1711 return true;
1712}
1713
1714NGnTree * NGnTree::Import(const std::string & findPath, const std::string & fileName,
1715 const std::vector<std::string> & headers, const std::string & outputFile, bool close)
1716{
1720
1721 // remove trailing slash from findPath if exists
1722 std::string findPathClean = findPath;
1723 if (!findPathClean.empty() && findPathClean.back() == '/') {
1724 findPathClean.pop_back();
1725 }
1726
1727 std::vector<std::string> paths = NUtils::Find(findPathClean, fileName);
1728 NLogInfo("NGnTree::Import: Found %zu files to import ...", paths.size());
1729
1730 TObjArray * ngntArray = NUtils::AxesFromDirectory(paths, findPathClean, fileName, headers);
1731 int nDirAxes = ngntArray->GetEntries();
1732
1733 NGnTree * ngntFirst = NGnTree::Open(paths[0]);
1734 // Add all axes from ngntFirst to ngntArray
1735 for (const auto & axis : ngntFirst->GetBinning()->GetAxes()) {
1736 ngntArray->Add(axis->Clone());
1737 }
1738 ngntFirst->Close(false);
1739
1740 std::map<std::string, std::vector<std::vector<int>>> b;
1741
1742 for (int i = 0; i < ngntArray->GetEntries(); i++) {
1743 TAxis * axis = (TAxis *)ngntArray->At(i);
1744 b[axis->GetName()].push_back({1});
1745 }
1746
1747 NGnTree * ngnt = new NGnTree(ngntArray, outputFile);
1748 ngnt->SetIsPureCopy(true);
1749
1750 // return nullptr;
1751 ngnt->GetBinning()->AddBinningDefinition("default", b);
1752
1753 json cfg;
1754 cfg["basedir"] = findPathClean;
1755 cfg["filename"] = fileName;
1756 cfg["nDirAxes"] = nDirAxes;
1757 cfg["headers"] = headers;
1758 // cfg["ndmspc"]["shared"]["currentFileName"] = "";
1759 Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
1760 int /*threadId*/) {
1761 // point->Print();
1762
1763 json cfg = point->GetCfg();
1764 std::string filename = cfg["basedir"].get<std::string>();
1765 filename += "/";
1766 for (auto & header : cfg["headers"]) {
1767 filename += point->GetBinLabel(header.get<std::string>());
1768 filename += "/";
1769 }
1770 // filename += "/";
1771 // filename += point->GetBinLabel("c");
1772 // filename += point->GetBinLabel("year");
1773 // filename += "/";
1774 filename += cfg["filename"].get<std::string>();
1775 NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1776 if (!ngnt || filename.compare(ngnt->GetStorageTree()->GetFileName()) != 0) {
1777 NLogInfo("NGnTree::Import: Opening file '%s' ...", filename.c_str());
1778 if (ngnt) {
1779 NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...",
1780 ngnt->GetStorageTree()->GetFileName().c_str());
1781 ngnt->Close(false);
1782 // delete ngnt;
1783 point->SetTempObject("file", nullptr);
1784 }
1785 ngnt = NGnTree::Open(filename.c_str());
1786 if (!ngnt || ngnt->IsZombie()) {
1787 NLogError("NGnTree::Import: Cannot open file '%s'", filename.c_str());
1788 return;
1789 }
1790 point->SetTempObject("file", ngnt);
1791 }
1792
1793 int nDirAxes = cfg["nDirAxes"].get<int>();
1794 Int_t * coords = point->GetCoords();
1795 std::string coordsStr = NUtils::GetCoordsString(NUtils::ArrayToVector(coords, point->GetNDimensionsContent()));
1796 NLogInfo("NGnTree::Import: Processing point with coords %s ...", coordsStr.c_str());
1797
1798 Long64_t entryNumber =
1799 ngnt->GetBinning()->GetContent()->GetBin(&coords[3 * nDirAxes], kFALSE); // skip first 3 dir axes
1800 NLogInfo("NGnTree::Import: Corresponding entry number in file '%s' is %lld", filename.c_str(), entryNumber);
1801
1802 ngnt->GetEntry(entryNumber);
1803
1804 // // add outputPoint content to outputPoint list
1805 // TList * inputOutputPoint = (TList *)ngnt->GetStorageTree()->GetBranch("_outputPoint")->GetObject();
1806 // for (int i = 0; i < inputOutputPoint->GetEntries(); i++) {
1807 // outputPoint->Add(inputOutputPoint->At(i));
1808 // }
1809
1810 // set all branches from ngnt to branch addresses in current object
1811 for (const auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
1812 // check if branch exists in current storage tree
1813 if (point->GetStorageTree()->GetBranch(kv.first) == nullptr) {
1814 NLogTrace("NGnTree::Import: Adding branch '%s' to storage tree ...", kv.first.c_str());
1815 point->GetStorageTree()->AddBranch(kv.first, nullptr, kv.second.GetObjectClassName());
1816 }
1817 NLogTrace("NGnTree::Import: Setting branch address for branch '%s' ...", kv.first.c_str());
1818 point->GetTreeStorage()->GetBranch(kv.first)->SetAddress(kv.second.GetObject());
1819 }
1820 outputPoint->Add(new TNamed("source_file", filename));
1821
1822 // ngnt->Print();
1823
1824 // NLogInfo("NGnTree::Import: nDirAxes=%d ...", cfg["nDirAxes"].get<int>());
1825
1826 // json & tempCfg = point->GetTempCfg();
1827 // if (tempCfg["test"].is_null()) {
1828 // NLogInfo("Setting temp cfg test value to 42");
1829 // tempCfg["test"] = 42;
1830 // }
1831 // NLogInfo("Temp cfg test value: %d", tempCfg["test"].get<int>());
1832
1833 // f->ls();
1834 };
1835 Ndmspc::NGnBeginFuncPtr beginFunc = [](Ndmspc::NBinningPoint * /*point*/, int /*threadId*/) {
1836 TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1837 };
1838
1839 Ndmspc::NGnEndFuncPtr endFunc = [](Ndmspc::NBinningPoint * point, int /*threadId*/) {
1840 NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1841 if (ngnt) {
1842 NLogDebug("NGnTree::Import: Closing last file '%s' ...", ngnt->GetStorageTree()->GetFileName().c_str());
1843 // ngnt->Close(false);
1844 // delete ngnt;
1845 point->SetTempObject("file", nullptr);
1846 }
1847 };
1848
1849 ngnt->Process(processFunc, cfg, "", beginFunc, endFunc);
1850 if (close) {
1851 ngnt->Close(true);
1852 delete ngnt;
1853 ngnt = NGnTree::Open(outputFile.c_str());
1854 }
1855 return ngnt;
1856}
1857
1858} // namespace Ndmspc
Defines binning mapping and content for NDMSPC histograms.
Definition NBinningDef.h:26
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Definition NBinningDef.h:93
THnSparse * GetContent() const
Get the template content histogram.
virtual void Print(Option_t *option="") const
Print binning definition information.
void RefreshIdsFromContent()
Refresh bin IDs from content histogram.
Represents a single point in multi-dimensional binning.
Double_t GetBinMax(std::string axis) const
Get the maximum value for a specific axis.
std::string GetString(const std::string &prefix="", bool all=false) const
Returns a string representation of the binning point.
std::string GetBinLabel(std::string axis) const
Get the label for a specific axis.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
NStorageTree * GetTreeStorage() const
Get pointer to storage tree object.
Long64_t GetEntryNumber() const
Get entry number for the point.
Double_t GetBinCenter(std::string axis) const
Returns the center value of the bin along the specified axis.
NGnTree * GetInput() const
Get pointer to input NGnTree object.
NStorageTree * GetStorageTree() const
Returns a pointer to the associated storage tree.
void SetTempObject(const std::string &name, TObject *obj)
Set a temporary object with the given name.
virtual void Print(Option_t *option="") const
Print binning point information.
TObject * GetTempObject(const std::string &name) const
Retrieve a temporary object by name.
Int_t GetBin(std::string axis) const
Returns the bin index for the specified axis.
json & GetCfg()
Get reference to configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
Double_t GetBinMin(std::string axis) const
Get the minimum value for a specific axis.
NParameters * GetParameters() const
Get the parameters associated with this binning point.
NBinning object for managing multi-dimensional binning and axis definitions.
Definition NBinning.h:45
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
THnSparse * GetContent() const
Get the content histogram.
Definition NBinning.h:217
void AddBinningDefinition(std::string name, std::map< std::string, std::vector< std::vector< int > > > binning, bool forceDefault=false)
Add a binning definition.
std::vector< TAxis * > GetAxes() const
Get vector of axis pointers.
Definition NBinning.h:223
Executes a function over all points in an N-dimensional space, optionally in parallel.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
Navigator object for managing hierarchical data structures and projections.
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int > > levels, size_t level=0, std::map< int, std::vector< int > > ranges={}, std::map< int, std::vector< int > > rangesBase={})
Reshape navigator using binning name and levels.
void SetGnTree(NGnTree *tree)
Set NGnTree object pointer.
Thread-local data object for NDMSPC processing.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
void SetResultsFilename(const std::string &filename)
Set the results filename for TCP mode (shared filesystem path).
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
void SetCfg(const json &cfg)
Set configuration JSON object.
NDMSPC tree object for managing multi-dimensional data storage and processing.
Definition NGnTree.h:75
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
Definition NGnTree.h:173
NGnNavigator * GetResourceStatisticsNavigator(std::string binningName, std::vector< std::vector< int > > levels, int level=0, std::map< int, std::vector< int > > ranges={}, std::map< int, std::vector< int > > rangesBase={})
Returns a navigator for resource statistics based on binning and levels.
Definition NGnTree.cxx:1657
virtual void Draw(Option_t *option="") override
Draws the tree object.
Definition NGnTree.cxx:492
void SetIsPureCopy(bool val)
Sets the pure copy status of the tree.
Definition NGnTree.h:230
bool Close(bool write=false)
Close the tree, optionally writing data.
Definition NGnTree.cxx:1347
bool fOwnsTreeStorage
True when fTreeStorage is owned by this instance.
Definition NGnTree.h:393
virtual ~NGnTree()
Destructor.
Definition NGnTree.cxx:453
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
Definition NGnTree.cxx:1361
virtual void Print(Option_t *option="") const override
Print tree information.
Definition NGnTree.cxx:468
void SetInput(NGnTree *input)
Set input NGnTree pointer.
Definition NGnTree.h:204
NGnTree()
Default constructor.
Definition NGnTree.cxx:157
static NGnTree * Import(const std::string &findPath, const std::string &fileName, const std::vector< std::string > &headers, const std::string &outFileName="/tmp/ngnt_imported.root", bool close=true)
Imports an NGnTree from a specified file.
Definition NGnTree.cxx:1714
NBinning * fBinning
Binning object.
Definition NGnTree.h:386
TList * GetOutput(std::string name="")
Get output list by name.
Definition NGnTree.cxx:1227
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
Definition NGnTree.h:179
NGnTree * fInput
Input NGnTree for processing.
Definition NGnTree.h:389
bool fOwnsBinning
True when fBinning is owned by this instance.
Definition NGnTree.h:392
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int > > levels, int level=0, std::map< int, std::vector< int > > ranges={}, std::map< int, std::vector< int > > rangesBase={})
Reshape navigator using binning name and levels.
Definition NGnTree.cxx:1644
NParameters * GetParameters() const
Returns the parameters associated with this tree.
Definition NGnTree.h:329
std::string fWorkerMacroList
Comma-separated macro paths sent to TCP workers.
Definition NGnTree.h:395
void SetOutputs(std::map< std::string, TList * > outputs)
Set outputs map.
Definition NGnTree.h:192
bool InitParameters(const std::vector< std::string > &paramNames)
Initializes the parameters for the tree using the provided parameter names.
Definition NGnTree.cxx:1693
void Play(int timeout=0, std::string binning="", std::vector< int > outputPointIds={0}, std::vector< std::vector< int > > ranges={}, Option_t *option="")
Play tree data with optional binning and output point IDs.
Definition NGnTree.cxx:1385
std::map< std::string, TList * > fOutputs
Outputs.
Definition NGnTree.h:388
NGnNavigator * fNavigator
! Navigator object
Definition NGnTree.h:390
NParameters * fParameters
Parameters object.
Definition NGnTree.h:391
TList * Projection(const json &cfg, std::string binningName="")
Project tree data using configuration and binning name.
Definition NGnTree.cxx:1542
NStorageTree * fTreeStorage
Tree storage.
Definition NGnTree.h:387
NGnTree * GetInput() const
Get pointer to input NGnTree.
Definition NGnTree.h:198
static std::string BuildObjectPath(const json &cfg, const json &objCfg, const NBinningPoint *point)
Helper: build object path string from configuration and a binning point.
Definition NGnTree.cxx:43
void SetNavigator(NGnNavigator *navigator)
Sets the navigator for this tree.
Definition NGnTree.cxx:1333
NBinning * GetBinning() const
Get pointer to binning object.
Definition NGnTree.h:161
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Definition NGnTree.cxx:1243
bool Process(NGnProcessFuncPtr func, const json &cfg=json::object(), std::string binningName="", NGnBeginFuncPtr beginFunc=nullptr, NGnEndFuncPtr endFunc=nullptr)
Process tree data using a function pointer and configuration.
Definition NGnTree.cxx:501
static bool GetConsoleOutput()
Get console output flag.
Definition NLogger.h:548
NParameters object.
Definition NParameters.h:13
bool SetParameter(int bin, Double_t value, Double_t error=0.)
Set the value and error of a parameter by bin index.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
void SetBranchAddresses()
Set addresses for all branches.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
bool SetFileTree(TFile *file, TTree *tree)
Tree handling.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
TObject * GetBranchObject(const std::string &name)
Get pointer to branch object by name.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static bool SetAxisRanges(THnSparse *sparse, std::vector< std::vector< int > > ranges={}, bool withOverflow=false, bool modifyTitle=false, bool reset=true)
Set axis ranges for THnSparse using vector of ranges.
Definition NUtils.cxx:1224
static TH1 * ProjectTHnSparse(THnSparse *hns, const std::vector< int > &axes, Option_t *option="")
Project a THnSparse histogram onto specified axes.
Definition NUtils.cxx:1171
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition NUtils.cxx:1077
static std::string FormatTime(long long seconds)
Format time in seconds to human-readable string.
Definition NUtils.cxx:1664
static bool EnableMT(Int_t numthreads=-1)
Enable multi-threading with specified number of threads.
Definition NUtils.cxx:46
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
Definition NUtils.cxx:1115
static void ProgressBar(int current, int total, std::string prefix="", std::string suffix="", int barWidth=50)
Display progress bar.
Definition NUtils.cxx:1677
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
Definition NUtils.cxx:1592
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
Definition NUtils.cxx:1555
static TObjArray * AxesFromDirectory(const std::vector< std::string > paths, const std::string &findPath, const std::string &fileName, const std::vector< std::string > &axesNames)
Creates an array of axes objects from files in specified directories.
Definition NUtils.cxx:1400
static std::vector< std::string > Find(std::string path, std::string filename="")
Find files in a path matching filename.
Definition NUtils.cxx:967
Global callback function for libwebsockets client events.
void(*)(Ndmspc::NBinningPoint *, TList *, TList *, int) NGnProcessFuncPtr
Function pointer type for processing binning points and lists.
Definition NGnTree.h:40
void(*)(Ndmspc::NBinningPoint *, int) NGnEndFuncPtr
Function pointer type for termination functions used in NGnTree.
Definition NGnTree.h:62
void(*)(Ndmspc::NBinningPoint *, int) NGnBeginFuncPtr
Function pointer type for the beginning of a tree operation.
Definition NGnTree.h:52
Execution progress metrics for IPC-based distributed processing.
size_t activeWorkers
Number of workers currently active.
size_t tasksAcked
Tasks completed and ACKed by workers.