ndmspc v1.2.0-0.1.rc4
Loading...
Searching...
No Matches
NGnTree.cxx
1#include <chrono>
2#include <cstddef>
3#include <ctime>
4#include <numbers>
5#include <set>
6#include <string>
7#include <vector>
8#include "TAxis.h"
9#include <TDirectory.h>
10#include <TObject.h>
11#include <TList.h>
12#include <TROOT.h>
13#include <THnSparse.h>
14#include <TH1.h>
15#include <TCanvas.h>
16#include <TSystem.h>
17#include <TMap.h>
18#include <TObjString.h>
19#include <TTree.h>
20#include <TBufferJSON.h>
21#include <sys/poll.h>
22#include <zmq.h>
23#include "NParameters.h"
24#include "NStorageTree.h"
25#include "NBinning.h"
26#include "NBinningDef.h"
27#include "NDimensionalExecutor.h"
28#include "NDimensionalIpcRunner.h"
29#include "NGnThreadData.h"
30#include "NLogger.h"
31#include "NTreeBranch.h"
32#include "NUtils.h"
33#include "NStorageTree.h"
34#include "NGnNavigator.h"
35#include "NGnTree.h"
36
38ClassImp(Ndmspc::NGnTree);
40
41namespace Ndmspc {
42
43std::string NGnTree::BuildObjectPath(const json & cfg, const json & objCfg, const NBinningPoint * point)
44{
45 std::string objPath = "";
46 if (objCfg.contains("prefix") && objCfg["prefix"].is_string()) {
47 objPath = objCfg["prefix"].get<std::string>();
48 }
49
50 std::string axisObjectDefaultFormat =
51 cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
52 std::string axisDefaultSeparator =
53 cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
54
55 std::string lastSep;
56 for (auto & axisEntry : cfg["axes"]) {
57 std::string axisName;
58 std::string mode;
59 std::string format;
60
61 if (axisEntry.is_string()) {
62 axisName = axisEntry.get<std::string>();
63 if (axisObjectDefaultFormat.empty()) {
64 mode = "bin";
65 }
66 else {
67 mode = "minmax";
68 format = axisObjectDefaultFormat;
69 }
70 }
71 else if (axisEntry.is_object()) {
72 if (axisEntry.contains("name") && axisEntry["name"].is_string()) {
73 axisName = axisEntry["name"].get<std::string>();
74 }
75 else {
76 continue;
77 }
78 if (axisEntry.contains("mode") && axisEntry["mode"].is_string()) {
79 mode = axisEntry["mode"].get<std::string>();
80 }
81 if (axisEntry.contains("format") && axisEntry["format"].is_string()) {
82 format = axisEntry["format"].get<std::string>();
83 }
84 }
85 else {
86 continue;
87 }
88
89 if (mode.empty()) {
90 if (axisObjectDefaultFormat.empty())
91 mode = "bin";
92 else
93 mode = "minmax";
94 }
95 if (format.empty()) {
96 if (mode == "minmax")
97 format = axisObjectDefaultFormat.empty() ? "%.2f_%.2f" : axisObjectDefaultFormat;
98 else if (mode == "bin")
99 format = "%d";
100 else
101 format = "%.2f";
102 }
103
104 if (mode == "minmax") {
105 double min = point->GetBinMin(axisName);
106 double max = point->GetBinMax(axisName);
107 objPath += TString::Format(format.c_str(), min, max).Data();
108 }
109 else if (mode == "min") {
110 double min = point->GetBinMin(axisName);
111 objPath += TString::Format(format.c_str(), min).Data();
112 }
113 else if (mode == "max") {
114 double max = point->GetBinMax(axisName);
115 objPath += TString::Format(format.c_str(), max).Data();
116 }
117 else if (mode == "center") {
118 double c = point->GetBinCenter(axisName);
119 objPath += TString::Format(format.c_str(), c).Data();
120 }
121 else if (mode == "label") {
122 std::string lbl = point->GetBinLabel(axisName);
123 objPath += lbl;
124 }
125 else if (mode == "bin") {
126 objPath += std::to_string(point->GetBin(axisName));
127 }
128 else {
129 objPath += std::to_string(point->GetBin(axisName));
130 }
131
132 std::string sep = axisDefaultSeparator;
133 if (axisEntry.is_object() && axisEntry.contains("sufix") && axisEntry["sufix"].is_string()) {
134 sep = axisEntry["sufix"].get<std::string>();
135 }
136 objPath += sep;
137 lastSep = sep;
138 }
139
140 if (!lastSep.empty() && objPath.size() >= lastSep.size()) {
141 objPath = objPath.substr(0, objPath.size() - lastSep.size());
142 }
143 if (objCfg.contains("sufix") && objCfg["sufix"].is_string()) {
144 objPath += objCfg["sufix"].get<std::string>();
145 }
146
147 return objPath;
148}
149
156
157NGnTree::NGnTree() : TObject() {}
158
159NGnTree::NGnTree(std::vector<TAxis *> axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
160{
164 if (axes.empty()) {
165 NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
166 // fBinning = new NBinning();
167 MakeZombie();
168 return;
169 }
170 fBinning = new NBinning(axes);
172 fTreeStorage->InitTree(filename, treename);
173 fNavigator = new NGnNavigator();
174 fNavigator->SetGnTree(this);
175}
176NGnTree::NGnTree(TObjArray * axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
177{
181
182 if (axes == nullptr) {
183 NLogError("NGnTree::NGnTree: Axes TObjArray is nullptr.");
184 MakeZombie();
185 return;
186 }
187
188 if (axes == nullptr && axes->GetEntries() == 0) {
189 NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
190 MakeZombie();
191 return;
192 }
193 fBinning = new NBinning(axes);
195 fTreeStorage->InitTree(filename, treename);
196 fNavigator = new NGnNavigator();
197 fNavigator->SetGnTree(this);
198}
199
200NGnTree::NGnTree(NGnTree * ngnt, std::string filename, std::string treename) : TObject(), fInput(nullptr)
201{
205 if (ngnt == nullptr) {
206 NLogError("NGnTree::NGnTree: NGnTree is nullptr.");
207 MakeZombie();
208 return;
209 }
210
211 if (ngnt->GetBinning() == nullptr) {
212 NLogError("NGnTree::NGnTree: Binning in NGnTree is nullptr.");
213 MakeZombie();
214 return;
215 }
216
217 // TODO: Import binning from user
218 fBinning = (NBinning *)ngnt->GetBinning()->Clone();
220 fTreeStorage->InitTree(filename, treename);
221 fNavigator = new NGnNavigator();
222 fNavigator->SetGnTree(this);
223}
224
226 : TObject(), fBinning(b), fTreeStorage(s), fInput(nullptr), fOwnsBinning(false), fOwnsTreeStorage(false)
227{
231 if (fBinning == nullptr) {
232 NLogError("NGnTree::NGnTree: Binning is nullptr.");
233 MakeZombie();
234 }
235 if (s == nullptr) {
237 fTreeStorage->InitTree("", "ngnt");
238 fOwnsTreeStorage = true;
239 }
240
241 if (fTreeStorage == nullptr) {
242 NLogError("NGnTree::NGnTree: Storage tree is nullptr.");
243 MakeZombie();
244 }
245
246 // fBinning->Initialize();
247 //
248 // TODO: Check if this is needed
249 fTreeStorage->SetBinning(fBinning);
250 fBinning->GetPoint()->SetTreeStorage(fTreeStorage);
251 fNavigator = new NGnNavigator();
252 fNavigator->SetGnTree(this);
253}
254NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis, const std::string & outFileName, json cfg)
255 : TObject(), fInput(nullptr)
256{
260
261 std::map<std::string, std::vector<std::vector<int>>> b;
262 TObjArray * axes = new TObjArray();
263 int parameterAxisIdx = -1;
264 std::vector<std::string> labels;
265 for (int i = 0; i < hns->GetNdimensions(); i++) {
266 TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267 TAxis * axis = (TAxis *)axisIn->Clone();
268
269 // check if parameterAxis matches axis name
270 if (parameterAxis.compare(axis->GetName()) == 0) {
271 parameterAxisIdx = i;
272 TAxis * axis = hns->GetAxis(parameterAxisIdx);
273 for (int bin = 1; bin <= axis->GetNbins(); bin++) {
274 // NLogInfo("Axis bin %d label: %s", bin, axis->GetBinLabel(bin));
275 labels.push_back(axis->GetBinLabel(bin));
276 }
277 continue;
278 }
279
280 // set label
281 if (axisIn->IsAlphanumeric()) {
282 NLogTrace("Setting axis '%s' labels from input THnSparse", axis->GetName());
283 for (int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284 std::string label = axisIn->GetBinLabel(bin);
285 if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
286 }
287 }
288
289 axes->Add(axis);
290 b[axis->GetName()] = {{1}};
291 }
292
293 // json cfg;
294 cfg["_parameterAxis"] = parameterAxisIdx;
295 cfg["_labels"] = labels;
296
297 // return nullptr;
298 NLogDebug("Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
299 parameterAxisIdx);
300 NGnTree * ngnt = new NGnTree(axes, outFileName);
301 NLogDebug("Created NGnTree for THnSparse import ...");
302 if (ngnt->IsZombie()) {
303 NLogError("NGnTree::Import: Failed to create NGnTree !!!");
304 MakeZombie();
305 return;
306 }
307 // ngnt->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
308 // Get env variable for tmp dir $NDMSPC_TMP_DIR
309 const char * tmpDirStr = gSystem->Getenv("NDMSPC_TMP_DIR");
310
311 std::string tmpDir;
312
313 if (!tmpDirStr || tmpDir.empty()) {
314 tmpDir = "/tmp";
315 }
316 std::string tmpFilename = tmpDir + "/ngnt_imported_input" + std::to_string(gSystem->GetPid()) + ".root";
317 NGnTree * ngntIn = new NGnTree(axes, tmpFilename);
318 if (ngntIn->IsZombie()) {
319 NLogError("NGnTree::Import: Failed to create NGnTree for input !!!");
320 SafeDelete(ngnt);
321 MakeZombie();
322 return;
323 }
324 // ngntIn->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
325 ngntIn->GetOutput("default")->Add(hns->Clone("test"));
326 ngntIn->Close(true);
327
328 // return;
329 // delete ngntIn;
330
331 ngnt->SetInput(NGnTree::Open(tmpFilename)); // Set input to self
332
333 ngnt->GetInput()->Print();
334
335 ngnt->GetBinning()->AddBinningDefinition("default", b);
336 ngnt->InitParameters(cfg["_labels"].get<std::vector<std::string>>());
337
338 Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
339 int /*threadId*/) {
340 // NLogInfo("Thread ID: %d", threadId);
341 TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
342 // point->Print();
343 json cfg = point->GetCfg();
344
345 NGnTree * ngntIn = point->GetInput();
346 if (!ngntIn) {
347 NLogError("NGnTree::Import: Input NGnTree is nullptr !!!");
348 return;
349 }
350 // ngntIn->Print();
351
352 THnSparse * hns = (THnSparse *)ngntIn->GetOutput("default")->At(0);
353 if (hns == nullptr) {
354 NLogError("NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
355 return;
356 }
357
358 int axisIdx = cfg["_parameterAxis"].get<int>();
359 std::vector<std::vector<int>> ranges;
360 // set ranges from point storage coords
361 int iAxis = 0;
362 for (int i = 0; i < hns->GetNdimensions(); i++) {
363 if (i == axisIdx) continue; // skip parameter axis
364 int coord = point->GetStorageCoords()[iAxis++];
365 ranges.push_back({i, coord, coord});
366 // NLogInfo("Setting axis %d range to [%d, %d]", i, coord, coord);
367 }
368
369 NUtils::SetAxisRanges(hns, ranges);
370 TH1 * h = hns->Projection(axisIdx, "O");
371 if (!h) {
372 NLogError("NGnTree::Import: Projection of THnSparse failed !!!");
373 return;
374 }
375 if (h->GetEntries() > 0) {
376 NParameters * params = point->GetParameters();
377 if (params) {
378 for (int bin = 1; bin <= h->GetNbinsX(); bin++) {
379 params->SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
380 }
381 }
382 // outputPoint->Add(hParams);
383 outputPoint->Add(h);
384
385 std::string filename = cfg["filename"].get<std::string>();
386 TFile * f = (TFile *)point->GetTempObject("file");
387 if (!f || filename.compare(f->GetName()) != 0) {
388 if (f) {
389 NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
390 f->Close();
391 }
392 NLogDebug("NGnTree::Import: Opening file '%s' ...", filename.c_str());
393 f = TFile::Open(filename.c_str());
394 if (!f || f->IsZombie()) {
395 NLogError("NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
396 return;
397 }
398 point->SetTempObject("file", f);
399 }
400
401 std::string axisObjectDefaultFormat =
402 cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
403 std::string axisDefaultSeparator =
404 cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
405 bool dryrun = false;
406 if (cfg.contains("dryrun") && cfg["dryrun"].is_boolean()) {
407 dryrun = cfg["dryrun"].get<bool>();
408 }
409
410 if (dryrun) {
411 NLogInfo("NGnTree::Import (dryrun): '%s' ...", point->GetString().c_str());
412 }
413
414 // loop over all object in cfg["objects"]
415 for (auto & [objName, objCfg] : cfg["objects"].items()) {
416 std::string objPath = NGnTree::BuildObjectPath(cfg, objCfg, point);
417
418 if (dryrun) {
419 NLogInfo("NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
420 continue;
421 }
422
423 if (point->GetEntryNumber() == 0) {
424 NLogInfo("NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425 cfg["filename"].get<std::string>().c_str());
426 }
427 TObject * obj = f->Get(objPath.c_str());
428 if (!obj) {
429 if (point->GetEntryNumber() == 0) {
430 NLogWarning("NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431 cfg["filename"].get<std::string>().c_str());
432 }
433 continue;
434 }
435
436 if (obj->InheritsFrom(TCanvas::Class())) {
437 TCanvas * cObj = (TCanvas *)obj;
438 cObj->SetName(objName.c_str());
439 }
440 outputPoint->Add(obj->Clone(objName.c_str()));
441 }
442 // f->Close();
443 }
444 };
445
446 // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
447 ngnt->Process(processFunc, cfg);
448 ngnt->Close(true);
449 // Remove tmp file
450 gSystem->Exec(TString::Format("rm -f %s", tmpFilename.c_str()));
451}
452
454{
458
459 if (fOwnsBinning) {
460 SafeDelete(fBinning);
461 }
462 if (fOwnsTreeStorage) {
463 SafeDelete(fTreeStorage);
464 }
465 SafeDelete(fNavigator);
466 SafeDelete(fParameters);
467}
468void NGnTree::Print(Option_t * option) const
469{
473
474 TString opt = option;
475
476 // Print list of axes
477 NLogInfo("NGnTree::Print: Printing NGnTree object [ALL] ...");
478 if (fBinning) {
479 fBinning->Print(option);
480 }
481 else {
482 NLogError("Binning is not initialized in NGnTree !!!");
483 }
484 if (fTreeStorage) {
485 fTreeStorage->Print(option);
486 }
487 else {
488 NLogError("Storage tree is not initialized in NGnTree !!!");
489 }
490}
491
492void NGnTree::Draw(Option_t * /*option*/)
493{
497
498 NLogInfo("NGnTree::Draw: Drawing NGnTree object [not implemented yet]...");
499}
500
501bool NGnTree::Process(NGnProcessFuncPtr func, const json & cfg, std::string binningName, NGnBeginFuncPtr beginFunc,
502 NGnEndFuncPtr endFunc)
503{
507
508 if (!fBinning) {
509 NLogError("Binning is not initialized in NGnTree !!!");
510 return false;
511 }
512
513 NBinning * binningIn = (NBinning *)fBinning->Clone();
514
515 std::vector<std::string> defNames = fBinning->GetDefinitionNames();
516 if (!binningName.empty()) {
517 // Check if binning definitions exist
518 if (std::find(defNames.begin(), defNames.end(), binningName) == defNames.end()) {
519 NLogError("Binning definition '%s' not found in NGnTree !!!", binningName.c_str());
520 return false;
521 }
522 defNames.clear();
523 defNames.push_back(binningName);
524 }
525
526 fBinning->Reset();
527 fBinning->SetCfg(cfg); // Set configuration to binning point
528 bool rc = Process(func, defNames, cfg, binningIn, beginFunc, endFunc);
529 if (!rc) {
530 NLogError("NGnTree::Process: Processing failed !!!");
531 return false;
532 }
533 // bool rc = false;
534 return true;
535}
536
537bool NGnTree::Process(NGnProcessFuncPtr func, const std::vector<std::string> & defNames, const json & cfg,
538 NBinning * binningIn, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc)
539{
543
544 NLogInfo("NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545 bool batch = gROOT->IsBatch();
546 gROOT->SetBatch(kTRUE);
547 TH1::AddDirectory(kFALSE);
548
549 std::string storagePostfix = fTreeStorage ? fTreeStorage->GetPostfix() : "";
550 if (storagePostfix.empty() && fTreeStorage) {
551 const std::string storageFileName = fTreeStorage->GetFileName();
552 if (!storageFileName.empty()) {
553 storagePostfix = gSystem->BaseName(storageFileName.c_str());
554 }
555 }
556 if (storagePostfix.empty()) {
557 storagePostfix = "ndmspc.root";
558 }
559
560 // --- Worker mode: run as a remote TCP worker ---
561 if (const char * workerEndpoint = gSystem->Getenv("NDMSPC_WORKER_ENDPOINT")) {
562 size_t workerIndex = 0;
563 if (const char * envIdx = gSystem->Getenv("NDMSPC_WORKER_INDEX")) {
564 try { workerIndex = static_cast<size_t>(std::stoul(envIdx)); } catch (...) {}
565 }
566 NLogInfo("NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
567
568 // const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
569 // jobDir and treeName will be received from master via INIT
570 // For now init NGnThreadData with a placeholder filename; Init() will be called with real paths
571 Ndmspc::NGnThreadData workerData;
572
573 void * ctx = zmq_ctx_new();
574 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
575 const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
576 zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
577 int timeoutMs = 1000;
578 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
579 zmq_connect(dealer, workerEndpoint);
580 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"READY"});
581
582 // Wait for INIT
583 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
584 bool initOk = false;
585 while (!initOk) {
586 std::vector<std::string> frames;
587 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
588 if (errno == EAGAIN || errno == EWOULDBLOCK) {
589 if (std::chrono::steady_clock::now() > initDeadline) break;
590 continue;
591 }
592 break;
593 }
594 // INIT frames: "INIT", workerIdx, sessionId, resultsDir, treeName[, tmpDir, tmpResultsDir]
595 if (frames.size() >= 1 && frames[0] == "STOP") {
596 NLogPrint("NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
597 zmq_close(dealer);
598 zmq_ctx_term(ctx);
599 gROOT->SetBatch(batch);
600 return true;
601 }
602 if (frames.size() >= 5 && frames[0] == "INIT") {
603 workerIndex = static_cast<size_t>(std::stoul(frames[1]));
604 const std::string & sessionId = frames[2];
605 const std::string & initResultsDir = frames[3];
606 const std::string & initTreeName = frames[4];
607
608 // Apply env vars sent by supervisor — these override the worker's inherited environment
609 if (frames.size() >= 7) {
610 if (!frames[5].empty())
611 gSystem->Setenv("NDMSPC_TMP_DIR", frames[5].c_str());
612 if (!frames[6].empty())
613 gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
614 }
615 // Fallback: if NDMSPC_TMP_RESULTS_DIR is still unset/empty, use NDMSPC_TMP_DIR
616 if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
617 const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
618 if (tmpDirEnv && tmpDirEnv[0] != '\0')
619 gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
620 }
621
622 // Local work file — always on this machine's NDMSPC_TMP_DIR
623 const char * localTmpEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
624 const std::string localBase = localTmpEnv ? localTmpEnv : "/tmp";
625 const std::string localFile = localBase + "/.ndmspc/tmp/" + sessionId + "/" +
626 std::to_string(workerIndex) + "/" + storagePostfix;
627
628 // Results file — on shared FS; supervisor reads from here to merge
629 const std::string resultsFile = initResultsDir + "/" + std::to_string(workerIndex) + "/" +
630 storagePostfix;
631
632 bool rc = workerData.Init(workerIndex, func, beginFunc, endFunc, this, binningIn, fInput, localFile, initTreeName);
633 if (!rc) {
634 NLogError("NGnTree::Process: Worker failed to initialize NGnThreadData");
635 zmq_close(dealer);
636 zmq_ctx_term(ctx);
637 return false;
638 }
639 // If results dir differs from local dir, tell TaskLoop to copy after Close(true)
640 if (resultsFile != localFile) {
641 workerData.SetResultsFilename(resultsFile);
642 }
643 workerData.SetCfg(cfg);
644 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"ACK"});
645 initOk = true;
646 }
647 }
648 if (!initOk) {
649 NLogError("NGnTree::Process: Worker did not receive INIT from supervisor");
650 zmq_close(dealer);
651 zmq_ctx_term(ctx);
652 return false;
653 }
654
655 Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
656 zmq_close(dealer);
657 zmq_ctx_term(ctx);
658 gROOT->SetBatch(batch);
659 return true;
660 }
661 // --- End worker mode ---
662
664
665 int nThreads = ROOT::GetThreadPoolSize(); // Get the number of threads to use
666 if (nThreads < 1) nThreads = 1;
667
668 std::string executionMode = "thread";
669 const char * envMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE");
670 const bool modeExplicit = (envMode && envMode[0] != '\0');
671 if (modeExplicit) {
672 executionMode = envMode;
673 }
674
675 std::string normalizedMode = executionMode;
676 std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
677 [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
678 if (normalizedMode == "process") normalizedMode = "ipc";
679
680 bool useProcessIpc = (normalizedMode == "ipc" || normalizedMode == "tcp");
681 bool useTcp = (normalizedMode == "tcp");
682 size_t nProcesses = static_cast<size_t>(nThreads);
683 bool ndmspcNProcExplicit = false;
684
685 if (const char * envNdmspcNProc = gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
686 ndmspcNProcExplicit = true;
687 try {
688 nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNdmspcNProc)));
689 }
690 catch (...) {
691 NLogWarning("NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
692 nProcesses);
693 }
694 }
695 else if (const char * envNProc = gSystem->Getenv("ROOT_MAX_THREADS")) {
696 // Backward-compatible fallback when NDMSPC_MAX_PROCESSES is not set.
697 try {
698 nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNProc)));
699 }
700 catch (...) {
701 NLogWarning("NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
702 }
703 }
704
705 // Keep explicit NDMSPC_EXECUTION_MODE settings authoritative.
706 // If mode is not explicitly set, default to local IPC for multi-process runs.
707 if (modeExplicit) {
708 if (normalizedMode == "thread") {
709 useProcessIpc = false;
710 useTcp = false;
711 }
712 else if (normalizedMode == "tcp") {
713 useProcessIpc = true;
714 useTcp = true;
715 }
716 else if (normalizedMode == "ipc") {
717 useProcessIpc = true;
718 useTcp = false;
719 }
720 else {
721 NLogWarning("NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
722 executionMode.c_str());
723 useProcessIpc = (nProcesses > 1);
724 useTcp = false;
725 }
726 }
727 else if (nProcesses > 1) {
728 useProcessIpc = true;
729 useTcp = false;
730 executionMode = "ipc";
731 normalizedMode = "ipc";
732 }
733
734 if (ndmspcNProcExplicit && normalizedMode == "thread" && nProcesses > 1) {
735 NLogWarning("NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
736 nProcesses);
737 }
738
739 const size_t workerObjectCount = useProcessIpc ? std::max(static_cast<size_t>(nThreads), nProcesses)
740 : static_cast<size_t>(nThreads);
741 std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
742
743 NLogInfo("NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
744 executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
745
746 const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
747 std::string tmpDir;
748 if (tmpDirEnv && tmpDirEnv[0] != '\0') {
749 tmpDir = tmpDirEnv;
750 } else {
751 TString tmpDirPrefix = fTreeStorage->GetPrefix();
752 // Use storage prefix only if it is a local path (not a remote URL)
753 if (!(tmpDirPrefix.BeginsWith("root://") || tmpDirPrefix.BeginsWith("http://") ||
754 tmpDirPrefix.BeginsWith("https://"))) {
755 tmpDir = tmpDirPrefix.Data();
756 }
757 if (tmpDir.empty()) tmpDir = "/tmp";
758 }
759
760 std::string jobDir = tmpDir + "/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
761
762 // Results dir: when NDMSPC_TMP_RESULTS_DIR equals NDMSPC_TMP_DIR (or is unset),
763 // reuse jobDir so that localTmpFile == resultsFilename — no copy or delete needed.
764 const char * resultsDirEnv = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
765 const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
766 std::string resultsDir = sameDir ? jobDir
767 : (std::string(resultsDirEnv) + "/" +
768 std::to_string(gSystem->GetPid()));
769
770 std::string filePrefix = jobDir;
771 for (size_t i = 0; i < threadDataVector.size(); ++i) {
772 std::string filename = filePrefix + "/" + std::to_string(i) + "/" + storagePostfix;
773 bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc, this, binningIn, fInput, filename,
774 fTreeStorage->GetTree()->GetName());
775 if (!rc) {
776 NLogError("Failed to initialize thread data %zu, exiting ...", i);
777 return false;
778 }
779 threadDataVector[i].SetCfg(cfg); // Set configuration to binning point
780 if (useTcp) {
781 // Tell the merge step where workers will deposit their finished files.
782 // When resultsDir == jobDir (NDMSPC_TMP_RESULTS_DIR unset) the paths are
783 // identical so no copy or delete is needed — handled in TaskLoop.
784 std::string resultsFile = resultsDir + "/" + std::to_string(i) + "/" + storagePostfix;
785 threadDataVector[i].SetResultsFilename(resultsFile);
786 }
787 }
788 size_t processedEntries = 0;
789 size_t totalEntries = 0;
790 auto start_par = std::chrono::high_resolution_clock::now();
791 auto start_par_job = std::chrono::high_resolution_clock::now();
792 auto task = [&](const std::vector<int> & coords, Ndmspc::NGnThreadData & thread_obj) {
793 // NLogWarning("Processing coordinates %s in thread %zu", NUtils::GetCoordsString(coords).c_str(),
794 // thread_obj.GetAssignedIndex());
795 // thread_obj.Print();
796 thread_obj.Process(coords);
797 processedEntries++;
799 size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
800 : totalEntries - processedEntries;
801 NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format("R%4zu", nRunning).Data());
802 }
803 };
804
805 size_t iDef = 0;
806 int sumIds = 0;
807
808 std::vector<Ndmspc::NThreadData *> processWorkers;
809 std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
810 if (useProcessIpc) {
811 processWorkers.reserve(threadDataVector.size());
812 for (size_t i = 0; i < threadDataVector.size(); ++i) {
813 processWorkers.push_back(&threadDataVector[i]);
814 }
815 ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
816 if (useTcp) {
817 const char * tcpPort = gSystem->Getenv("NDMSPC_TCP_PORT");
818 std::string tcpEndpoint = std::string("tcp://0.0.0.0:") + (tcpPort ? tcpPort : "5555");
819 const char * resultsDirBase = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
820 const char * macroParams = gSystem->Getenv("NDMSPC_MACRO_PARAMS");
821 // Auto-detect the macro to send to workers: explicit SetWorkerMacro() takes
822 // priority; otherwise fall back to NDMSPC_MACRO set by ndmspc-run.
823 std::string workerMacro = fWorkerMacroList;
824 if (workerMacro.empty()) {
825 if (const char * envMacro = gSystem->Getenv("NDMSPC_MACRO")) workerMacro = envMacro;
826 }
827 ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
828 fTreeStorage->GetTree()->GetName(), workerMacro, tmpDir,
829 resultsDirBase ? resultsDirBase : "",
830 macroParams ? macroParams : "");
831 } else {
832 ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
833 }
834 }
835
836 std::map<std::string, std::vector<Long64_t>>
837 defIdMapProcessedRemoved; // Map to track which ids belong to which definition
838 // for (auto & name : defNames) {
839 // auto binningDef = binningIn->GetDefinition(name);
840 // if (!binningDef) {
841 // NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
842 // return false;
843 // }
844 // for (auto & id : binningDef->GetIds()) {
845 // defIdMap[name].push_back(id);
846 // }
847 // }
848
849 try {
850 for (auto & name : defNames) {
851 auto binningDef = binningIn->GetDefinition(name);
852 if (!binningDef) {
853 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
854 return false;
855 }
856
857 if (binningDef->GetIds().size() == 0) {
858 NLogWarning("NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
859 continue;
860 }
861
862 const std::vector<Long64_t> originalDefinitionIds = binningDef->GetIds();
863
864 std::vector<int> mins, maxs;
865 mins.push_back(0);
866 maxs.push_back(binningDef->GetIds().size() - 1);
867 NLogDebug("NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
868 binningDef->GetIds().size());
869
871 NLogInfo("NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
872 }
873 else {
874 Printf("Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
875 }
876 start_par = std::chrono::high_resolution_clock::now();
877 processedEntries = 0;
878 totalEntries = maxs[0] + 1;
879 const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
880 : threadDataVector.size();
882 NUtils::ProgressBar(processedEntries, totalEntries, start_par,
883 TString::Format("R%4zu", activeWorkers).Data());
884
885 // binningIn->SetCurrentDefinitionName(name);
886 for (size_t i = 0; i < threadDataVector.size(); ++i) {
887 // threadDataVector[i].SetCurrentDefinitionName(name);
888 threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
889 }
890
891 Ndmspc::NDimensionalExecutor executorMT(mins, maxs);
892
893 if (!useProcessIpc) {
894 // Disable ROOT's RecursiveRemove during the parallel phase.
895 // Without this, concurrent threads' object deletions trigger RecursiveRemove
896 // which iterates pad->fPrimitives without per-object locks → TObjLink corruption.
897 Bool_t prevMustClean = gROOT->MustClean();
898 gROOT->SetMustClean(kFALSE);
899
900 // Enable batch mode during the parallel phase so that ROOT drawing calls
901 // (e.g. TH1::Fit drawing the fit function via f1->Draw) are no-ops.
902 // Without this, concurrent threads each trigger ROOT canvas creation ("c1"),
903 // the second canvas creation deletes the first → double-free / heap corruption,
904 // which then causes TPad::RecursiveRemove to crash when closing the per-thread TTree.
905 Bool_t prevBatch = gROOT->IsBatch();
906 gROOT->SetBatch(kTRUE);
907
908 executorMT.ExecuteParallel<Ndmspc::NGnThreadData>(task, threadDataVector);
909
910 // Restore both flags before flushing deferred deletes, so each object's destructor
911 // properly calls gROOT->RecursiveRemove and removes itself from ROOT's global lists.
912 // This prevents dangling pointers that would crash the RecursiveRemove cascade
913 // triggered by TTree::~TTree when the per-thread storage files are closed below.
914 // It is safe here because all worker threads have already finished.
915 gROOT->SetMustClean(prevMustClean);
916 gROOT->SetBatch(prevBatch);
917
918 // Flush deferred deletes single-threaded with MustClean and batch mode restored.
919 for (size_t i = 0; i < threadDataVector.size(); ++i) {
920 threadDataVector[i].FlushDeferredDeletes();
921 }
922
923 for (size_t i = 0; i < threadDataVector.size(); ++i) {
924 threadDataVector[i].ExecuteEndFunction();
925 }
926 }
927 else {
928 ipcExecutor->SetBounds(mins, maxs);
929 size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
930 name, &originalDefinitionIds, [&, activeWorkers](size_t ackCount, size_t activeWorkersNow) {
931 processedEntries = ackCount;
933 size_t nRunning = std::min(activeWorkersNow, activeWorkers);
934 NUtils::ProgressBar(processedEntries, totalEntries, start_par,
935 TString::Format("R%4zu", nRunning).Data());
936 }
937 });
938 processedEntries = acked;
939
940 // Child processes update their own worker-object copies. Rebuild parent-side
941 // per-worker counters and processed-id vectors deterministically from task assignment.
942 const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, processWorkers.size()));
943 for (size_t i = 0; i < threadDataVector.size(); ++i) {
944 auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
945 if (workerDef) {
946 workerDef->GetIds().clear();
947 }
948 }
949
950 for (size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
951 const size_t workerIndex = taskIndex % processesToUse;
952 threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
953
954 auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
955 if (workerDef) {
956 workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
957 }
958 }
959
960 if (!NLogger::GetConsoleOutput() && processedEntries < totalEntries) {
961 NUtils::ProgressBar(processedEntries, totalEntries, start_par, "R 0");
962 }
963 }
964
966 Printf("Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
967 // Update hnsbBinningIn with the processed ids
968 NLogDebug("NGnTree::Process: [BEGIN] ------------------------------------------------");
969 sumIds += binningIn->GetDefinition(name)->GetIds().size();
970 binningIn->GetDefinition(name)->GetIds().clear();
971 for (size_t i = 0; i < threadDataVector.size(); ++i) {
972 NLogDebug("NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
973 // threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->Print();
974 binningIn->GetDefinition(name)->GetIds().insert(
975 binningIn->GetDefinition(name)->GetIds().end(),
976 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
977 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
978 sort(binningIn->GetDefinition(name)->GetIds().begin(), binningIn->GetDefinition(name)->GetIds().end());
979 }
980 // hnsbBinningIn->GetDefinition(name)->Print();
981 // remove entries present in hnsbBinningIn from other definitions
982 for (size_t i = 0; i < defNames.size(); i++) {
983
984 std::string other_name = defNames[i];
985 auto otherDef = binningIn->GetDefinition(other_name);
986 if (i <= iDef) {
987 continue;
988 }
989 if (!otherDef) {
990 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
991 return false;
992 }
993 // remove entries that has value less then sumIds
994 for (auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
995 NLogTrace("NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
996 other_name.c_str(), sumIds);
997 if (*it < sumIds) {
998 NLogTrace("NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
999 defIdMapProcessedRemoved[other_name].push_back(*it);
1000 it = otherDef->GetIds().erase(it);
1001 }
1002 else {
1003 ++it;
1004 }
1005 }
1006
1007 binningIn->GetDefinition(other_name)->Print();
1008 }
1009 // hnsbBinningIn->GetDefinition(name)->Print();
1010 iDef++;
1011
1012 NLogDebug("NGnTree::Process: [END] ------------------------------------------------");
1013 }
1014 }
1015 catch (const std::exception & ex) {
1016 if (ipcExecutor) {
1017 ipcExecutor->FinishProcessIpc(/*abort=*/true);
1018 ipcExecutor.reset();
1019 }
1020
1021 TString what(ex.what());
1022 if (what.Contains("Interrupted by user")) {
1023 if (gROOT) {
1024 gROOT->SetInterrupt(kFALSE);
1025 }
1026 NLogWarning("NGnTree::Process: Interrupted by user, stopping processing.");
1027 }
1028 else {
1029 NLogError("NGnTree::Process: Processing failed: %s", ex.what());
1030 }
1031 return false;
1032 }
1033
1034 // return true; // For testing, skip merging and post-processing
1035
1036 auto end_par = std::chrono::high_resolution_clock::now();
1037 std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1038
1039 if (ipcExecutor) {
1040 ipcExecutor->FinishProcessIpc();
1041 }
1042
1043 // For TCP mode, only merge results from workers that actually connected.
1044 // For IPC/fork and thread modes, all indices are valid.
1045 const std::set<size_t> registeredWorkers =
1046 (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1047 const bool filterByRegistered = !registeredWorkers.empty();
1048
1050 Printf("NGnTree::Process: Execution completed and it took %s .",
1051 NUtils::FormatTime(par_duration.count() / 1000).c_str());
1052 }
1053 else {
1054 NLogInfo("NGnTree::Process: Execution completed and it took %s .",
1055 NUtils::FormatTime(par_duration.count() / 1000).c_str());
1056 }
1057
1058 NLogInfo("NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1059 for (auto & data : threadDataVector) {
1060 if (useProcessIpc) {
1061 NLogTrace("NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1062 data.GetAssignedIndex());
1063 // data.GetHnSparseBase()->GetStorageTree()->Close(false);
1064 }
1065 else {
1066 NLogTrace("NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1067 data.GetHnSparseBase()->GetStorageTree()->Close(true);
1068 }
1069 }
1070
1071 NLogDebug("NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1073 Printf("NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1074 }
1075 const auto mergeStart = std::chrono::high_resolution_clock::now();
1076 TList * mergeList = new TList();
1077 Ndmspc::NGnThreadData * outputData = new Ndmspc::NGnThreadData();
1078 outputData->Init(0, func, nullptr, nullptr, this, binningIn);
1079 outputData->SetCfg(cfg);
1080 // outputData->Init(0, func, this);
1081
1082 for (auto & data : threadDataVector) {
1083 if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1084 NLogInfo("NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1085 continue;
1086 }
1087 NLogTrace("NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1088 mergeList->Add(&data);
1089 }
1090
1091 Long64_t nmerged = outputData->Merge(mergeList);
1092 const auto mergeEnd = std::chrono::high_resolution_clock::now();
1094 const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1095 Printf("NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1096 }
1097 if (nmerged <= 0) {
1098 NLogError("NGnTree::Process: Failed to merge thread data, exiting ...");
1099 delete mergeList;
1100 return false;
1101 }
1102 NLogInfo("NGnTree::Process: Merged %lld outputs successfully", nmerged);
1103 // delete all temporary files
1104 // for (auto & data : threadDataVector) {
1105 // std::string filename = data.GetHnSparseBase()->GetStorageTree()->GetFileName();
1106 // NLogTrace("NGnTree::Process: Deleting temporary file '%s' ...", filename.c_str());
1107 // gSystem->Exec(TString::Format("rm -f %s", filename.c_str()));
1108 // }
1109 //
1110
1111 // binningIn= outputData->GetHnSparseBase()->GetBinning();
1112
1113 auto * mergedBinning = outputData->GetHnSparseBase()->GetBinning();
1114 std::set<Long64_t> mergedContentIds;
1115 std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1116
1117 // add missing entries to definitions based on defIdMapProcessedRemoved
1118 for (size_t i = 0; i < defNames.size(); i++) {
1119 std::string name = defNames[i];
1120 // auto def = binningIn->GetDefinition(name);
1121 auto def = mergedBinning->GetDefinition(name);
1122 if (!def) {
1123 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1124 return false;
1125 }
1126 for (auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1127 if (other_name.compare(name) != 0) {
1128 continue;
1129 }
1130 for (auto & id : removedIds) {
1131 if (std::find(def->GetIds().begin(), def->GetIds().end(), id) == def->GetIds().end()) {
1132 NLogTrace("NGnTree::Process: Adding missing entry %lld to definition '%s'", id, name.c_str());
1133 def->GetIds().push_back(id);
1134 }
1135 }
1136 }
1137 sort(def->GetIds().begin(), def->GetIds().end());
1138 // outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds() = def->GetIds();
1139
1140 // Modify content in binning definitions based on def->GetIds()
1141 def->GetContent()->Reset();
1142 for (auto id : def->GetIds()) {
1143 NBinningPoint point(def->GetBinning());
1144 def->GetBinning()->GetContent()->GetBinContent(id, point.GetCoords());
1146 Long64_t bin = def->GetContent()->GetBin(point.GetStorageCoords());
1147 NLogTrace("NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(), id, bin);
1148 def->GetContent()->SetBinContent(bin, id);
1149
1150 if (mergedContentIds.insert(id).second) {
1151 mergedContentCoords.emplace_back(
1153 }
1154 }
1155 }
1156
1157 // Rebuild the merged top-level content from final definition ids. In IPC/TCP mode
1158 // the merge setup may still carry sparse source-bin content; resetting here keeps
1159 // only the bins that correspond to actual merged tree entries.
1160 mergedBinning->GetContent()->Reset();
1161 for (const auto & entry : mergedContentCoords) {
1162 Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1163 mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1164 }
1165
1166 // print final binning definitions
1167 NLogDebug("NGnTree::Process: Final binning definitions after processing:");
1168 for (auto & name : defNames) {
1169 // auto binningDef = binningIn->GetDefinition(name);
1170 auto binningDef = outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name);
1171 if (!binningDef) {
1172 NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1173 return false;
1174 }
1175 binningDef->Print();
1176 }
1177
1178 fTreeStorage = outputData->GetHnSparseBase()->GetStorageTree();
1179 fOutputs = outputData->GetHnSparseBase()->GetOutputs();
1180 fBinning = outputData->GetHnSparseBase()->GetBinning(); // Update binning to the merged one
1181 fParameters = outputData->GetHnSparseBase()->GetParameters();
1182
1184 NLogInfo("NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1185 fTreeStorage->GetFileName().c_str());
1186 }
1187 else {
1188 Printf("Processing completed successfully. Output was stored in '%s'.", fTreeStorage->GetFileName().c_str());
1189 }
1190
1191 // Close the final output file
1193 Printf("NGnTree::Process: [phase] final close start (%s)",
1194 outputData->GetHnSparseBase()->GetStorageTree()->GetFileName().c_str());
1195 }
1196 const auto closeStart = std::chrono::high_resolution_clock::now();
1197 outputData->GetHnSparseBase()->Close(true);
1198 const auto closeEnd = std::chrono::high_resolution_clock::now();
1200 const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1201 Printf("NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1202 }
1203
1205 Printf("NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1206 }
1207 const auto cleanupStart = std::chrono::high_resolution_clock::now();
1208 gSystem->Exec(TString::Format("rm -fr %s", jobDir.c_str()));
1209 const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1211 const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1212 Printf("NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1213 }
1214 gROOT->SetBatch(batch); // Restore ROOT batch mode
1215 return true;
1216}
1217
1218TList * NGnTree::GetOutput(std::string name)
1219{
1223
1224 if (name.empty()) {
1225 name = fBinning->GetCurrentDefinitionName();
1226 }
1227 if (fOutputs.find(name) == fOutputs.end()) {
1228 fOutputs[name] = new TList();
1229 fOutputs[name]->SetName(name.c_str());
1230 }
1231 return fOutputs[name];
1232}
1233
1234NGnTree * NGnTree::Open(const std::string & filename, const std::string & branches, const std::string & treename)
1235{
1239
1240 NLogDebug("Opening '%s' with branches='%s' and treename='%s' ...", filename.c_str(), branches.c_str(),
1241 treename.c_str());
1242
1243 TFile * file = TFile::Open(filename.c_str());
1244 if (!file) {
1245 NLogError("NGnTree::Open: Cannot open file '%s'", filename.c_str());
1246 return nullptr;
1247 }
1248
1249 TTree * tree = (TTree *)file->Get(treename.c_str());
1250 if (!tree) {
1251 NLogError("NGnTree::Open: Cannot get tree '%s' from file '%s'", treename.c_str(), filename.c_str());
1252 return nullptr;
1253 }
1254
1255 return Open(tree, branches, file);
1256}
1257
1258NGnTree * NGnTree::Open(TTree * tree, const std::string & branches, TFile * file)
1259{
1263
1264 NBinning * hnstBinning = (NBinning *)tree->GetUserInfo()->At(0);
1265 if (!hnstBinning) {
1266 NLogError("NGnTree::Open: Cannot get binning from tree '%s'", tree->GetName());
1267 return nullptr;
1268 }
1269 NStorageTree * hnstStorageTree = (NStorageTree *)tree->GetUserInfo()->At(1);
1270 if (!hnstStorageTree) {
1271 NLogError("NGnTree::Open: Cannot get tree storage info from tree '%s'", tree->GetName());
1272 return nullptr;
1273 }
1274
1275 std::map<std::string, TList *> outputs;
1276 TDirectory * dir = nullptr;
1277 if (file) {
1278 dir = (TDirectory *)file->Get("outputs");
1279 auto l = dir->GetListOfKeys();
1280 for (auto kv : *l) {
1281 TObject * obj = dir->Get(kv->GetName());
1282 if (!obj) continue;
1283 TList * l = dynamic_cast<TList *>(obj);
1284 if (!l) continue;
1285 outputs[l->GetName()] = l;
1286 NLogDebug("Imported output list for binning '%s' with %d object(s) from file '%s'", l->GetName(), l->GetEntries(),
1287 file->GetName());
1288 }
1289 }
1290 // TDirectory * dir = (TDirectory *)tree->GetUserInfo()->FindObject("outputs");
1291 // if (dir) {
1292 // dir->Print();
1293 // }
1294
1295 NGnTree * ngnt = new NGnTree(hnstBinning, hnstStorageTree);
1296
1297 if (!hnstStorageTree->SetFileTree(file, tree)) return nullptr;
1298 // if (!ngnt->InitBinnings({})) return nullptr;
1299 // ngnt->Print();
1300 // Get list of branches
1301 std::vector<std::string> enabledBranches;
1302 if (!branches.empty()) {
1303 enabledBranches = Ndmspc::NUtils::Tokenize(branches, ',');
1304 NLogTrace("NGnTree::Open: Enabled branches: %s", NUtils::GetCoordsString(enabledBranches, -1).c_str());
1305 hnstStorageTree->SetEnabledBranches(enabledBranches);
1306 }
1307 else {
1308 // loop over all branches and set address
1309 for (auto & kv : hnstStorageTree->GetBranchesMap()) {
1310 NLogTrace("NGnTree::Open: Enabled branches: %s", kv.first.c_str());
1311 }
1312 }
1313 // Set all branches to be read
1314 hnstStorageTree->SetBranchAddresses();
1315 ngnt->SetOutputs(outputs);
1316
1317 NGnNavigator * nav = new NGnNavigator();
1318 nav->SetGnTree(ngnt);
1319 ngnt->SetNavigator(nav);
1320
1321 return ngnt;
1322}
1323
1325{
1329
1330 if (fNavigator) {
1331 NLogTrace("NGnTree::SetNavigator: Replacing existing navigator ...");
1332 SafeDelete(fNavigator);
1333 }
1334
1335 fNavigator = navigator;
1336}
1337
1338bool NGnTree::Close(bool write)
1339{
1343
1344 if (!fTreeStorage) {
1345 NLogError("NGnTree::Close: Storage tree is not initialized in NGnTree !!!");
1346 return false;
1347 }
1348
1349 return fTreeStorage->Close(write, fOutputs);
1350}
1351
1352Int_t NGnTree::GetEntry(Long64_t entry, bool checkBinningDef)
1353{
1357 if (!fTreeStorage) {
1358 NLogError("NGnTree::GetEntry: Storage tree is not initialized in NGnTree !!!");
1359 return -1;
1360 }
1361
1362 int bytes =
1363 fTreeStorage->GetEntry(entry, fBinning->GetPoint(0, fBinning->GetCurrentDefinitionName()), checkBinningDef);
1364 if (fTreeStorage->GetBranch("_params")) fParameters = (NParameters *)fTreeStorage->GetBranch("_params")->GetObject();
1365 return bytes;
1366}
1367Int_t NGnTree::GetEntry(std::vector<std::vector<int>> /*range*/, bool checkBinningDef)
1368{
1372
1373 return GetEntry(0, checkBinningDef);
1374}
1375
1376void NGnTree::Play(int timeout, std::string binning, std::vector<int> outputPointIds,
1377 std::vector<std::vector<int>> ranges, Option_t * option)
1378{
1382 TString opt = option;
1383 opt.ToUpper();
1384
1385 std::string annimationTempDir =
1386 TString::Format("%s/.ndmspc/animation/%d", gSystem->Getenv("HOME"), gSystem->GetPid()).Data();
1387 gSystem->Exec(TString::Format("mkdir -p %s", annimationTempDir.c_str()));
1388
1389 if (binning.empty()) {
1390 binning = fBinning->GetCurrentDefinitionName();
1391 }
1392
1393 NBinningDef * binningDef = fBinning->GetDefinition(binning);
1394 if (!binningDef) {
1395 NLogError("NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1396 NLogError("Available binning definitions:");
1397 for (auto & name : fBinning->GetDefinitionNames()) {
1398 if (name == fBinning->GetCurrentDefinitionName())
1399 NLogError(" [*] %s", name.c_str());
1400 else
1401 NLogError(" [ ] %s", name.c_str());
1402 }
1403 return;
1404 }
1405
1406 THnSparse * bdContent = (THnSparse *)binningDef->GetContent()->Clone();
1407
1408 std::string bdContentName = TString::Format("bdContent_%s", binning.c_str()).Data();
1409 // Set axis ranges if provided
1410 if (!ranges.empty()) NUtils::SetAxisRanges(bdContent, ranges, false, true);
1411
1412 Long64_t linBin = 0;
1413 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(true /*use axis range*/)};
1414 std::vector<Long64_t> ids;
1415 // std::vector<Long64_t> ids = binningDef->GetIds();
1416 while ((linBin = iter->Next()) >= 0) {
1417 // NLogDebug("Original content bin %lld: %f", linBin, bdContentOrig->GetBinContent(linBin));
1418 ids.push_back(linBin);
1419 }
1420 if (ids.empty()) {
1421 NLogWarning("NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1422 return;
1423 }
1424 // return;
1425
1426 TCanvas * c1 = nullptr;
1427
1428 c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject("c1");
1429 if (c1 == nullptr) c1 = new TCanvas("c1", "NGnTree::Play", 800, 600);
1430 c1->Clear();
1431 c1->cd();
1432 c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1433 gSystem->ProcessEvents();
1434
1435 binningDef->Print();
1436 bdContent->Reset();
1437
1438 // loop over all ids and print them
1439 for (auto id : ids) {
1440 // for (int id = 0; id < GetEntries(); id++) {
1441 GetEntry(id);
1442 fBinning->GetPoint()->Print();
1443 TList * l = (TList *)fTreeStorage->GetBranch("_outputPoint")->GetObject();
1444 if (!l || l->IsEmpty()) {
1445 NLogWarning("NGnTree::Play: No 'outputPoint' for entry %lld !!!", id);
1446 continue;
1447 }
1448 else {
1449 // NLogInfo("Output for entry %lld:", id);
1450 // l->Print(opt.Data());
1451
1452 if (outputPointIds.empty()) {
1453 outputPointIds.resize(l->GetEntries());
1454 for (int i = 0; i < l->GetEntries(); i++) {
1455 outputPointIds[i] = i;
1456 }
1457 }
1458 int n = outputPointIds.size();
1459
1460 Double_t v = 1.0;
1461 for (int i = 0; i < n; i++) {
1462 // NLogDebug("Drawing output object id %d (list index %d) on pad %d", outputPointIds[i], i, i + 1);
1463
1464 c1->cd(i + 2);
1465 TObject * obj = l->At(outputPointIds[i]);
1466 if (obj) {
1467 if (obj->InheritsFrom(TH1::Class())) {
1468 TH1 * h = (TH1 *)obj;
1469 h->SetDirectory(nullptr);
1470 // Draw a clone to avoid transferring ownership or modifying
1471 // the original object stored in the TList (can cause
1472 // TPad/TList removal during drawing and lead to crashes).
1473 TH1 * hclone = (TH1 *)h->Clone();
1474 if (hclone) {
1475 hclone->SetDirectory(nullptr);
1476 hclone->Draw();
1477 }
1478 }
1479 // obj->Print();
1480 }
1481 if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1482 TH1 * h = (TH1 *)obj;
1483 v = h->GetMean();
1484 NLogDebug("Mean value from histogram [%s]: %f", h->GetName(), v);
1485 }
1486 }
1487 bdContent->SetBinContent(fBinning->GetPoint()->GetStorageCoords(), 1);
1488 c1->cd(1);
1489 TH1 * bdProj = (TH1 *)gROOT->FindObjectAny("bdProj");
1490 if (bdProj) {
1491 delete bdProj;
1492 bdProj = nullptr;
1493 }
1494 if (bdContent->GetNdimensions() == 1) {
1495 bdProj = bdContent->Projection(0, "O");
1496 }
1497 else if (bdContent->GetNdimensions() == 2) {
1498 bdProj = bdContent->Projection(0, 1, "O");
1499 }
1500 else if (bdContent->GetNdimensions() == 3) {
1501 bdProj = bdContent->Projection(0, 1, 2, "O");
1502 }
1503 else {
1504 NLogError("NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1505 }
1506 if (bdProj) {
1507 bdProj->SetName("bdProj");
1508 bdProj->SetTitle(TString::Format("Binning '%s' content projection", binning.c_str()).Data());
1509 bdProj->SetMinimum(0);
1510 // bdProj->SetDirectory(nullptr);
1511 bdProj->Draw("colz");
1512 // c1->ModifiedUpdate();
1513 }
1514 }
1515 if (c1) {
1516 c1->ModifiedUpdate();
1517 c1->SaveAs(TString::Format("%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1518 }
1519 gSystem->ProcessEvents();
1520 if (timeout > 0) gSystem->Sleep(timeout);
1521 NLogInfo("%d", id);
1522 }
1523
1524 NLogInfo("Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1525 gSystem->Exec(
1526 TString::Format("magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1527 gSystem->Exec(TString::Format("rm -fr %s", annimationTempDir.c_str()));
1528 NLogInfo("Animation saved to ndmspc_play.gif");
1529
1530 delete bdContent;
1531}
1532
1533TList * NGnTree::Projection(const json & cfg, std::string binningName)
1534{
1538
1539 // SetInput(); // Set input to selfp
1541 Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * output, TList * /*outputPoint*/,
1542 int /*threadId*/) {
1543 // NLogInfo("Thread ID: %d", threadId);
1544 TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1545 point->Print();
1546 json cfg = point->GetCfg();
1547
1548 Printf("Processing THnSparse projection with configuration: %s", cfg.dump().c_str());
1549
1550 Ndmspc::NGnTree * ngntIn = point->GetInput();
1551 // ngntIn->Print();
1552 // ngntIn->GetEntry(0);
1553 ngntIn->GetEntry(point->GetEntryNumber());
1554
1555 // loop over all cfg["objects"]
1556 for (auto & [objName, objCfg] : cfg["objects"].items()) {
1557 NLogInfo("Processing object '%s' ...", objName.c_str());
1558
1559 THnSparse * hns = (THnSparse *)(ngntIn->GetStorageTree()->GetBranchObject(objName));
1560 if (hns == nullptr) {
1561 NLogError("NGnTree::Projection: THnSparse 'hns' not found in storage tree !!!");
1562 return;
1563 }
1564 // hns->Print("all");
1565 // loop over cfg["objects"][objName] array of projection dimension names
1566 for (size_t i = 0; i < objCfg.size(); i++) {
1567
1568 NLogInfo("Processing projection %zu for object '%s' ...", i, objName.c_str());
1569 std::vector<int> dims;
1570 std::vector<std::string> dimNames = cfg["objects"][objName][i].get<std::vector<std::string>>();
1571 for (const auto & dimName : dimNames) {
1572 NLogDebug("Looking for dimension name '%s' in THnSparse ...", dimName.c_str());
1573 int dim = -1;
1574 for (int i = 0; i < hns->GetNdimensions(); i++) {
1575 if (dimName == hns->GetAxis(i)->GetName()) {
1576 dim = i;
1577 break;
1578 }
1579 }
1580 if (dim >= 0)
1581 dims.push_back(dim);
1582 else {
1583 NLogError("NGnTree::Projection: Dimension name '%s' not found in THnSparse !!!", dimName.c_str());
1584 }
1585 }
1586 // Print dims
1587 NLogInfo("Projecting THnSparse on dimensions: %s", NUtils::GetCoordsString(dims, -1).c_str());
1588 TH1 * hPrev = (TH1 *)output->At(i);
1589 TH1 * hProj = NUtils::ProjectTHnSparse(hns, dims, "O");
1590 hProj->SetName(TString::Format("%s_proj_%s", objName.c_str(), NUtils::Join(dims, '_').c_str()).Data());
1591 if (hPrev) {
1592 hPrev->Add(hProj);
1593 }
1594 else {
1595 output->Add(hProj);
1596 }
1597 }
1598 }
1599 output->Print();
1600 };
1601
1602 // NBinningDef * binningDef = fInput->GetBinning()->GetDefinition(binningName);
1603 NBinningDef * binningDef = GetBinning()->GetDefinition(binningName);
1604 THnSparse * hnsIn = binningDef->GetContent();
1605 // std::vector<std::vector<int>> ranges{{0, 2, 2}, {2, 1, 1}};
1606 std::vector<std::vector<int>> ranges = cfg["ranges"].get<std::vector<std::vector<int>>>();
1607 NUtils::SetAxisRanges(hnsIn, ranges); // Set the ranges for the axes
1608 Long64_t linBin = 0;
1609 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{hnsIn->CreateIter(true /*use axis range*/)};
1610 std::vector<Long64_t> ids;
1611 // std::vector<Long64_t> ids = binningDef->GetIds();
1612 while ((linBin = iter->Next()) >= 0) {
1613 ids.push_back(linBin);
1614 }
1615 if (ids.empty()) {
1616 NLogWarning("NGnTree::Projection: No entries found in binning definition '%s' !!!", binningDef->GetName());
1617 binningDef->RefreshIdsFromContent();
1618 return nullptr;
1619 }
1620
1621 binningDef->GetIds() = ids;
1622
1623 // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
1624 Process(processFunc, cfg);
1625
1626 // Refresh binning definition ids from content after processing
1627 binningDef->RefreshIdsFromContent();
1628
1629 // GetInput()->Close(false);
1630 return GetOutput(fBinning->GetCurrentDefinitionName());
1631
1632 // Close(false);
1633}
1634
1635NGnNavigator * NGnTree::Reshape(std::string binningName, std::vector<std::vector<int>> levels, int level,
1636 std::map<int, std::vector<int>> ranges, std::map<int, std::vector<int>> rangesBase)
1637{
1641
1642 NGnNavigator navigator;
1643 navigator.SetGnTree(this);
1644
1645 return navigator.Reshape(binningName, levels, level, ranges, rangesBase);
1646}
1647
1648NGnNavigator * NGnTree::GetResourceStatisticsNavigator(std::string binningName, std::vector<std::vector<int>> levels,
1649 int level, std::map<int, std::vector<int>> ranges,
1650 std::map<int, std::vector<int>> rangesBase)
1651{
1655
1656 if (binningName.empty()) {
1657 binningName = fBinning->GetCurrentDefinitionName();
1658 }
1659
1660 THnSparse * hns = (THnSparse *)fOutputs[binningName]->FindObject("resource_monitor");
1661 if (!hns) {
1662 NLogError("NGnTree::Draw: Resource monitor THnSparse not found in outputs !!!");
1663 return nullptr;
1664 }
1665 hns->Print("all");
1666 // return nullptr;
1667
1668 auto ngnt = new NGnTree(hns, "stat", "/tmp/hnst_imported_for_drawing.root");
1669 if (ngnt->IsZombie()) {
1670 NLogError("NGnTree::GetResourceStatisticsNavigator: Failed to import resource monitor THnSparse !!!");
1671 return nullptr;
1672 }
1673 ngnt->Print();
1674 ngnt->Close();
1675
1676 // return nullptr;
1677 auto ngnt2 = NGnTree::Open("/tmp/hnst_imported_for_drawing.root");
1678 auto nav = ngnt2->Reshape("default", levels, level, ranges, rangesBase);
1679 // // nav->Export("/tmp/hnst_imported_for_drawing.json", {}, "ws://localhost:8080/ws/root.websocket");
1680 // // nav->Draw();
1681 return nav;
1682}
1683
1684bool NGnTree::InitParameters(const std::vector<std::string> & paramNames)
1685{
1689
1690 if (fParameters) {
1691 NLogTrace("NGnTree::InitParameters: Replacing existing parameters ...");
1692 delete fParameters;
1693 }
1694
1695 if (paramNames.empty()) {
1696 NLogTrace("NGnTree::InitParameters: No parameter names provided, skipping ...");
1697 return false;
1698 }
1699
1700 fParameters = new NParameters(paramNames, "results", "Results");
1701
1702 return true;
1703}
1704
1705NGnTree * NGnTree::Import(const std::string & findPath, const std::string & fileName,
1706 const std::vector<std::string> & headers, const std::string & outputFile, bool close)
1707{
1711
1712 // remove trailing slash from findPath if exists
1713 std::string findPathClean = findPath;
1714 if (!findPathClean.empty() && findPathClean.back() == '/') {
1715 findPathClean.pop_back();
1716 }
1717
1718 std::vector<std::string> paths = NUtils::Find(findPathClean, fileName);
1719 NLogInfo("NGnTree::Import: Found %zu files to import ...", paths.size());
1720
1721 TObjArray * ngntArray = NUtils::AxesFromDirectory(paths, findPathClean, fileName, headers);
1722 int nDirAxes = ngntArray->GetEntries();
1723
1724 NGnTree * ngntFirst = NGnTree::Open(paths[0]);
1725 // Add all axes from ngntFirst to ngntArray
1726 for (const auto & axis : ngntFirst->GetBinning()->GetAxes()) {
1727 ngntArray->Add(axis->Clone());
1728 }
1729 ngntFirst->Close(false);
1730
1731 std::map<std::string, std::vector<std::vector<int>>> b;
1732
1733 for (int i = 0; i < ngntArray->GetEntries(); i++) {
1734 TAxis * axis = (TAxis *)ngntArray->At(i);
1735 b[axis->GetName()].push_back({1});
1736 }
1737
1738 NGnTree * ngnt = new NGnTree(ngntArray, outputFile);
1739 ngnt->SetIsPureCopy(true);
1740
1741 // return nullptr;
1742 ngnt->GetBinning()->AddBinningDefinition("default", b);
1743
1744 json cfg;
1745 cfg["basedir"] = findPathClean;
1746 cfg["filename"] = fileName;
1747 cfg["nDirAxes"] = nDirAxes;
1748 cfg["headers"] = headers;
1749 // cfg["ndmspc"]["shared"]["currentFileName"] = "";
1750 Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
1751 int /*threadId*/) {
1752 // point->Print();
1753
1754 json cfg = point->GetCfg();
1755 std::string filename = cfg["basedir"].get<std::string>();
1756 filename += "/";
1757 for (auto & header : cfg["headers"]) {
1758 filename += point->GetBinLabel(header.get<std::string>());
1759 filename += "/";
1760 }
1761 // filename += "/";
1762 // filename += point->GetBinLabel("c");
1763 // filename += point->GetBinLabel("year");
1764 // filename += "/";
1765 filename += cfg["filename"].get<std::string>();
1766 NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1767 if (!ngnt || filename.compare(ngnt->GetStorageTree()->GetFileName()) != 0) {
1768 NLogInfo("NGnTree::Import: Opening file '%s' ...", filename.c_str());
1769 if (ngnt) {
1770 NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...",
1771 ngnt->GetStorageTree()->GetFileName().c_str());
1772 ngnt->Close(false);
1773 // delete ngnt;
1774 point->SetTempObject("file", nullptr);
1775 }
1776 ngnt = NGnTree::Open(filename.c_str());
1777 if (!ngnt || ngnt->IsZombie()) {
1778 NLogError("NGnTree::Import: Cannot open file '%s'", filename.c_str());
1779 return;
1780 }
1781 point->SetTempObject("file", ngnt);
1782 }
1783
1784 int nDirAxes = cfg["nDirAxes"].get<int>();
1785 Int_t * coords = point->GetCoords();
1786 std::string coordsStr = NUtils::GetCoordsString(NUtils::ArrayToVector(coords, point->GetNDimensionsContent()));
1787 NLogInfo("NGnTree::Import: Processing point with coords %s ...", coordsStr.c_str());
1788
1789 Long64_t entryNumber =
1790 ngnt->GetBinning()->GetContent()->GetBin(&coords[3 * nDirAxes], kFALSE); // skip first 3 dir axes
1791 NLogInfo("NGnTree::Import: Corresponding entry number in file '%s' is %lld", filename.c_str(), entryNumber);
1792
1793 ngnt->GetEntry(entryNumber);
1794
1795 // // add outputPoint content to outputPoint list
1796 // TList * inputOutputPoint = (TList *)ngnt->GetStorageTree()->GetBranch("_outputPoint")->GetObject();
1797 // for (int i = 0; i < inputOutputPoint->GetEntries(); i++) {
1798 // outputPoint->Add(inputOutputPoint->At(i));
1799 // }
1800
1801 // set all branches from ngnt to branch addresses in current object
1802 for (const auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
1803 // check if branch exists in current storage tree
1804 if (point->GetStorageTree()->GetBranch(kv.first) == nullptr) {
1805 NLogTrace("NGnTree::Import: Adding branch '%s' to storage tree ...", kv.first.c_str());
1806 point->GetStorageTree()->AddBranch(kv.first, nullptr, kv.second.GetObjectClassName());
1807 }
1808 NLogTrace("NGnTree::Import: Setting branch address for branch '%s' ...", kv.first.c_str());
1809 point->GetTreeStorage()->GetBranch(kv.first)->SetAddress(kv.second.GetObject());
1810 }
1811 outputPoint->Add(new TNamed("source_file", filename));
1812
1813 // ngnt->Print();
1814
1815 // NLogInfo("NGnTree::Import: nDirAxes=%d ...", cfg["nDirAxes"].get<int>());
1816
1817 // json & tempCfg = point->GetTempCfg();
1818 // if (tempCfg["test"].is_null()) {
1819 // NLogInfo("Setting temp cfg test value to 42");
1820 // tempCfg["test"] = 42;
1821 // }
1822 // NLogInfo("Temp cfg test value: %d", tempCfg["test"].get<int>());
1823
1824 // f->ls();
1825 };
1826 Ndmspc::NGnBeginFuncPtr beginFunc = [](Ndmspc::NBinningPoint * /*point*/, int /*threadId*/) {
1827 TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1828 };
1829
1830 Ndmspc::NGnEndFuncPtr endFunc = [](Ndmspc::NBinningPoint * point, int /*threadId*/) {
1831 NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1832 if (ngnt) {
1833 NLogDebug("NGnTree::Import: Closing last file '%s' ...", ngnt->GetStorageTree()->GetFileName().c_str());
1834 // ngnt->Close(false);
1835 // delete ngnt;
1836 point->SetTempObject("file", nullptr);
1837 }
1838 };
1839
1840 ngnt->Process(processFunc, cfg, "", beginFunc, endFunc);
1841 if (close) {
1842 ngnt->Close(true);
1843 delete ngnt;
1844 ngnt = NGnTree::Open(outputFile.c_str());
1845 }
1846 return ngnt;
1847}
1848
1849} // namespace Ndmspc
Defines binning mapping and content for NDMSPC histograms.
Definition NBinningDef.h:26
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Definition NBinningDef.h:93
THnSparse * GetContent() const
Get the template content histogram.
virtual void Print(Option_t *option="") const
Print binning definition information.
void RefreshIdsFromContent()
Refresh bin IDs from content histogram.
Represents a single point in multi-dimensional binning.
Double_t GetBinMax(std::string axis) const
Get the maximum value for a specific axis.
std::string GetString(const std::string &prefix="", bool all=false) const
Returns a string representation of the binning point.
std::string GetBinLabel(std::string axis) const
Get the label for a specific axis.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
NStorageTree * GetTreeStorage() const
Get pointer to storage tree object.
Long64_t GetEntryNumber() const
Get entry number for the point.
Double_t GetBinCenter(std::string axis) const
Returns the center value of the bin along the specified axis.
NGnTree * GetInput() const
Get pointer to input NGnTree object.
NStorageTree * GetStorageTree() const
Returns a pointer to the associated storage tree.
void SetTempObject(const std::string &name, TObject *obj)
Set a temporary object with the given name.
virtual void Print(Option_t *option="") const
Print binning point information.
TObject * GetTempObject(const std::string &name) const
Retrieve a temporary object by name.
Int_t GetBin(std::string axis) const
Returns the bin index for the specified axis.
json & GetCfg()
Get reference to configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
Double_t GetBinMin(std::string axis) const
Get the minimum value for a specific axis.
NParameters * GetParameters() const
Get the parameters associated with this binning point.
NBinning object for managing multi-dimensional binning and axis definitions.
Definition NBinning.h:45
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
THnSparse * GetContent() const
Get the content histogram.
Definition NBinning.h:217
void AddBinningDefinition(std::string name, std::map< std::string, std::vector< std::vector< int > > > binning, bool forceDefault=false)
Add a binning definition.
std::vector< TAxis * > GetAxes() const
Get vector of axis pointers.
Definition NBinning.h:223
Executes a function over all points in an N-dimensional space, optionally in parallel.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
Navigator object for managing hierarchical data structures and projections.
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int > > levels, size_t level=0, std::map< int, std::vector< int > > ranges={}, std::map< int, std::vector< int > > rangesBase={})
Reshape navigator using binning name and levels.
void SetGnTree(NGnTree *tree)
Set NGnTree object pointer.
Thread-local data object for NDMSPC processing.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
void SetResultsFilename(const std::string &filename)
Set the results filename for TCP mode (shared filesystem path).
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
void SetCfg(const json &cfg)
Set configuration JSON object.
NDMSPC tree object for managing multi-dimensional data storage and processing.
Definition NGnTree.h:75
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
Definition NGnTree.h:173
NGnNavigator * GetResourceStatisticsNavigator(std::string binningName, std::vector< std::vector< int > > levels, int level=0, std::map< int, std::vector< int > > ranges={}, std::map< int, std::vector< int > > rangesBase={})
Returns a navigator for resource statistics based on binning and levels.
Definition NGnTree.cxx:1648
virtual void Draw(Option_t *option="") override
Draws the tree object.
Definition NGnTree.cxx:492
void SetIsPureCopy(bool val)
Sets the pure copy status of the tree.
Definition NGnTree.h:230
bool Close(bool write=false)
Close the tree, optionally writing data.
Definition NGnTree.cxx:1338
bool fOwnsTreeStorage
True when fTreeStorage is owned by this instance.
Definition NGnTree.h:393
virtual ~NGnTree()
Destructor.
Definition NGnTree.cxx:453
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
Definition NGnTree.cxx:1352
virtual void Print(Option_t *option="") const override
Print tree information.
Definition NGnTree.cxx:468
void SetInput(NGnTree *input)
Set input NGnTree pointer.
Definition NGnTree.h:204
NGnTree()
Default constructor.
Definition NGnTree.cxx:157
static NGnTree * Import(const std::string &findPath, const std::string &fileName, const std::vector< std::string > &headers, const std::string &outFileName="/tmp/ngnt_imported.root", bool close=true)
Imports an NGnTree from a specified file.
Definition NGnTree.cxx:1705
NBinning * fBinning
Binning object.
Definition NGnTree.h:386
TList * GetOutput(std::string name="")
Get output list by name.
Definition NGnTree.cxx:1218
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
Definition NGnTree.h:179
NGnTree * fInput
Input NGnTree for processing.
Definition NGnTree.h:389
bool fOwnsBinning
True when fBinning is owned by this instance.
Definition NGnTree.h:392
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int > > levels, int level=0, std::map< int, std::vector< int > > ranges={}, std::map< int, std::vector< int > > rangesBase={})
Reshape navigator using binning name and levels.
Definition NGnTree.cxx:1635
NParameters * GetParameters() const
Returns the parameters associated with this tree.
Definition NGnTree.h:329
std::string fWorkerMacroList
Comma-separated macro paths sent to TCP workers.
Definition NGnTree.h:395
void SetOutputs(std::map< std::string, TList * > outputs)
Set outputs map.
Definition NGnTree.h:192
bool InitParameters(const std::vector< std::string > &paramNames)
Initializes the parameters for the tree using the provided parameter names.
Definition NGnTree.cxx:1684
void Play(int timeout=0, std::string binning="", std::vector< int > outputPointIds={0}, std::vector< std::vector< int > > ranges={}, Option_t *option="")
Play tree data with optional binning and output point IDs.
Definition NGnTree.cxx:1376
std::map< std::string, TList * > fOutputs
Outputs.
Definition NGnTree.h:388
NGnNavigator * fNavigator
! Navigator object
Definition NGnTree.h:390
NParameters * fParameters
Parameters object.
Definition NGnTree.h:391
TList * Projection(const json &cfg, std::string binningName="")
Project tree data using configuration and binning name.
Definition NGnTree.cxx:1533
NStorageTree * fTreeStorage
Tree storage.
Definition NGnTree.h:387
NGnTree * GetInput() const
Get pointer to input NGnTree.
Definition NGnTree.h:198
static std::string BuildObjectPath(const json &cfg, const json &objCfg, const NBinningPoint *point)
Helper: build object path string from configuration and a binning point.
Definition NGnTree.cxx:43
void SetNavigator(NGnNavigator *navigator)
Sets the navigator for this tree.
Definition NGnTree.cxx:1324
NBinning * GetBinning() const
Get pointer to binning object.
Definition NGnTree.h:161
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Definition NGnTree.cxx:1234
bool Process(NGnProcessFuncPtr func, const json &cfg=json::object(), std::string binningName="", NGnBeginFuncPtr beginFunc=nullptr, NGnEndFuncPtr endFunc=nullptr)
Process tree data using a function pointer and configuration.
Definition NGnTree.cxx:501
static bool GetConsoleOutput()
Get console output flag.
Definition NLogger.h:548
NParameters object.
Definition NParameters.h:13
bool SetParameter(int bin, Double_t value, Double_t error=0.)
Set the value and error of a parameter by bin index.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
void SetBranchAddresses()
Set addresses for all branches.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
bool SetFileTree(TFile *file, TTree *tree)
Tree handling.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
TObject * GetBranchObject(const std::string &name)
Get pointer to branch object by name.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static bool SetAxisRanges(THnSparse *sparse, std::vector< std::vector< int > > ranges={}, bool withOverflow=false, bool modifyTitle=false, bool reset=true)
Set axis ranges for THnSparse using vector of ranges.
Definition NUtils.cxx:1220
static TH1 * ProjectTHnSparse(THnSparse *hns, const std::vector< int > &axes, Option_t *option="")
Project a THnSparse histogram onto specified axes.
Definition NUtils.cxx:1167
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition NUtils.cxx:1073
static std::string FormatTime(long long seconds)
Format time in seconds to human-readable string.
Definition NUtils.cxx:1660
static bool EnableMT(Int_t numthreads=-1)
Enable multi-threading with specified number of threads.
Definition NUtils.cxx:46
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
Definition NUtils.cxx:1111
static void ProgressBar(int current, int total, std::string prefix="", std::string suffix="", int barWidth=50)
Display progress bar.
Definition NUtils.cxx:1673
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
Definition NUtils.cxx:1588
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
Definition NUtils.cxx:1551
static TObjArray * AxesFromDirectory(const std::vector< std::string > paths, const std::string &findPath, const std::string &fileName, const std::vector< std::string > &axesNames)
Creates an array of axes objects from files in specified directories.
Definition NUtils.cxx:1396
static std::vector< std::string > Find(std::string path, std::string filename="")
Find files in a path matching filename.
Definition NUtils.cxx:963
Global callback function for libwebsockets client events.
void(*)(Ndmspc::NBinningPoint *, TList *, TList *, int) NGnProcessFuncPtr
Function pointer type for processing binning points and lists.
Definition NGnTree.h:40
void(*)(Ndmspc::NBinningPoint *, int) NGnEndFuncPtr
Function pointer type for termination functions used in NGnTree.
Definition NGnTree.h:62
void(*)(Ndmspc::NBinningPoint *, int) NGnBeginFuncPtr
Function pointer type for the beginning of a tree operation.
Definition NGnTree.h:52