ndmspc  v1.2.0-0.1.rc3
NGnTree.cxx
1 #include <chrono>
2 #include <cstddef>
3 #include <ctime>
4 #include <numbers>
5 #include <set>
6 #include <string>
7 #include <vector>
8 #include "TAxis.h"
9 #include <TDirectory.h>
10 #include <TObject.h>
11 #include <TList.h>
12 #include <TROOT.h>
13 #include <THnSparse.h>
14 #include <TH1.h>
15 #include <TCanvas.h>
16 #include <TSystem.h>
17 #include <TMap.h>
18 #include <TObjString.h>
19 #include <TTree.h>
20 #include <TBufferJSON.h>
21 #include <sys/poll.h>
22 #include <zmq.h>
23 #include "NParameters.h"
24 #include "NStorageTree.h"
25 #include "NBinning.h"
26 #include "NBinningDef.h"
27 #include "NDimensionalExecutor.h"
28 #include "NDimensionalIpcRunner.h"
29 #include "NGnThreadData.h"
30 #include "NLogger.h"
31 #include "NTreeBranch.h"
32 #include "NUtils.h"
33 #include "NStorageTree.h"
34 #include "NGnNavigator.h"
35 #include "NGnTree.h"
36 
38 ClassImp(Ndmspc::NGnTree);
40 
41 namespace Ndmspc {
42 
43 std::string NGnTree::BuildObjectPath(const json & cfg, const json & objCfg, const NBinningPoint * point)
44 {
45  std::string objPath = "";
46  if (objCfg.contains("prefix") && objCfg["prefix"].is_string()) {
47  objPath = objCfg["prefix"].get<std::string>();
48  }
49 
50  std::string axisObjectDefaultFormat =
51  cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
52  std::string axisDefaultSeparator =
53  cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
54 
55  std::string lastSep;
56  for (auto & axisEntry : cfg["axes"]) {
57  std::string axisName;
58  std::string mode;
59  std::string format;
60 
61  if (axisEntry.is_string()) {
62  axisName = axisEntry.get<std::string>();
63  if (axisObjectDefaultFormat.empty()) {
64  mode = "bin";
65  }
66  else {
67  mode = "minmax";
68  format = axisObjectDefaultFormat;
69  }
70  }
71  else if (axisEntry.is_object()) {
72  if (axisEntry.contains("name") && axisEntry["name"].is_string()) {
73  axisName = axisEntry["name"].get<std::string>();
74  }
75  else {
76  continue;
77  }
78  if (axisEntry.contains("mode") && axisEntry["mode"].is_string()) {
79  mode = axisEntry["mode"].get<std::string>();
80  }
81  if (axisEntry.contains("format") && axisEntry["format"].is_string()) {
82  format = axisEntry["format"].get<std::string>();
83  }
84  }
85  else {
86  continue;
87  }
88 
89  if (mode.empty()) {
90  if (axisObjectDefaultFormat.empty())
91  mode = "bin";
92  else
93  mode = "minmax";
94  }
95  if (format.empty()) {
96  if (mode == "minmax")
97  format = axisObjectDefaultFormat.empty() ? "%.2f_%.2f" : axisObjectDefaultFormat;
98  else if (mode == "bin")
99  format = "%d";
100  else
101  format = "%.2f";
102  }
103 
104  if (mode == "minmax") {
105  double min = point->GetBinMin(axisName);
106  double max = point->GetBinMax(axisName);
107  objPath += TString::Format(format.c_str(), min, max).Data();
108  }
109  else if (mode == "min") {
110  double min = point->GetBinMin(axisName);
111  objPath += TString::Format(format.c_str(), min).Data();
112  }
113  else if (mode == "max") {
114  double max = point->GetBinMax(axisName);
115  objPath += TString::Format(format.c_str(), max).Data();
116  }
117  else if (mode == "center") {
118  double c = point->GetBinCenter(axisName);
119  objPath += TString::Format(format.c_str(), c).Data();
120  }
121  else if (mode == "label") {
122  std::string lbl = point->GetBinLabel(axisName);
123  objPath += lbl;
124  }
125  else if (mode == "bin") {
126  objPath += std::to_string(point->GetBin(axisName));
127  }
128  else {
129  objPath += std::to_string(point->GetBin(axisName));
130  }
131 
132  std::string sep = axisDefaultSeparator;
133  if (axisEntry.is_object() && axisEntry.contains("sufix") && axisEntry["sufix"].is_string()) {
134  sep = axisEntry["sufix"].get<std::string>();
135  }
136  objPath += sep;
137  lastSep = sep;
138  }
139 
140  if (!lastSep.empty() && objPath.size() >= lastSep.size()) {
141  objPath = objPath.substr(0, objPath.size() - lastSep.size());
142  }
143  if (objCfg.contains("sufix") && objCfg["sufix"].is_string()) {
144  objPath += objCfg["sufix"].get<std::string>();
145  }
146 
147  return objPath;
148 }
157 NGnTree::NGnTree() : TObject() {}
158 
159 NGnTree::NGnTree(std::vector<TAxis *> axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
160 {
164  if (axes.empty()) {
165  NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
166  // fBinning = new NBinning();
167  MakeZombie();
168  return;
169  }
170  fBinning = new NBinning(axes);
172  fTreeStorage->InitTree(filename, treename);
173  fNavigator = new NGnNavigator();
174  fNavigator->SetGnTree(this);
175 }
176 NGnTree::NGnTree(TObjArray * axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
177 {
181 
182  if (axes == nullptr) {
183  NLogError("NGnTree::NGnTree: Axes TObjArray is nullptr.");
184  MakeZombie();
185  return;
186  }
187 
188  if (axes == nullptr && axes->GetEntries() == 0) {
189  NLogError("NGnTree::NGnTree: No axes provided, binning is nullptr.");
190  MakeZombie();
191  return;
192  }
193  fBinning = new NBinning(axes);
195  fTreeStorage->InitTree(filename, treename);
196  fNavigator = new NGnNavigator();
197  fNavigator->SetGnTree(this);
198 }
199 
200 NGnTree::NGnTree(NGnTree * ngnt, std::string filename, std::string treename) : TObject(), fInput(nullptr)
201 {
205  if (ngnt == nullptr) {
206  NLogError("NGnTree::NGnTree: NGnTree is nullptr.");
207  MakeZombie();
208  return;
209  }
210 
211  if (ngnt->GetBinning() == nullptr) {
212  NLogError("NGnTree::NGnTree: Binning in NGnTree is nullptr.");
213  MakeZombie();
214  return;
215  }
216 
217  // TODO: Import binning from user
218  fBinning = (NBinning *)ngnt->GetBinning()->Clone();
220  fTreeStorage->InitTree(filename, treename);
221  fNavigator = new NGnNavigator();
222  fNavigator->SetGnTree(this);
223 }
224 
226  : TObject(), fBinning(b), fTreeStorage(s), fInput(nullptr), fOwnsBinning(false), fOwnsTreeStorage(false)
227 {
231  if (fBinning == nullptr) {
232  NLogError("NGnTree::NGnTree: Binning is nullptr.");
233  MakeZombie();
234  }
235  if (s == nullptr) {
237  fTreeStorage->InitTree("", "ngnt");
238  fOwnsTreeStorage = true;
239  }
240 
241  if (fTreeStorage == nullptr) {
242  NLogError("NGnTree::NGnTree: Storage tree is nullptr.");
243  MakeZombie();
244  }
245 
246  // fBinning->Initialize();
247  //
248  // TODO: Check if this is needed
251  fNavigator = new NGnNavigator();
252  fNavigator->SetGnTree(this);
253 }
254 NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis, const std::string & outFileName, json cfg)
255  : TObject(), fInput(nullptr)
256 {
260 
261  std::map<std::string, std::vector<std::vector<int>>> b;
262  TObjArray * axes = new TObjArray();
263  int parameterAxisIdx = -1;
264  std::vector<std::string> labels;
265  for (int i = 0; i < hns->GetNdimensions(); i++) {
266  TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267  TAxis * axis = (TAxis *)axisIn->Clone();
268 
269  // check if parameterAxis matches axis name
270  if (parameterAxis.compare(axis->GetName()) == 0) {
271  parameterAxisIdx = i;
272  TAxis * axis = hns->GetAxis(parameterAxisIdx);
273  for (int bin = 1; bin <= axis->GetNbins(); bin++) {
274  // NLogInfo("Axis bin %d label: %s", bin, axis->GetBinLabel(bin));
275  labels.push_back(axis->GetBinLabel(bin));
276  }
277  continue;
278  }
279 
280  // set label
281  if (axisIn->IsAlphanumeric()) {
282  NLogTrace("Setting axis '%s' labels from input THnSparse", axis->GetName());
283  for (int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284  std::string label = axisIn->GetBinLabel(bin);
285  if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
286  }
287  }
288 
289  axes->Add(axis);
290  b[axis->GetName()] = {{1}};
291  }
292 
293  // json cfg;
294  cfg["_parameterAxis"] = parameterAxisIdx;
295  cfg["_labels"] = labels;
296 
297  // return nullptr;
298  NLogDebug("Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
299  parameterAxisIdx);
300  NGnTree * ngnt = new NGnTree(axes, outFileName);
301  NLogDebug("Created NGnTree for THnSparse import ...");
302  if (ngnt->IsZombie()) {
303  NLogError("NGnTree::Import: Failed to create NGnTree !!!");
304  MakeZombie();
305  return;
306  }
307  // ngnt->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
308  // Get env variable for tmp dir $NDMSPC_TMP_DIR
309  const char * tmpDirStr = gSystem->Getenv("NDMSPC_TMP_DIR");
310 
311  std::string tmpDir;
312 
313  if (!tmpDirStr || tmpDir.empty()) {
314  tmpDir = "/tmp";
315  }
316  std::string tmpFilename = tmpDir + "/ngnt_imported_input" + std::to_string(gSystem->GetPid()) + ".root";
317  NGnTree * ngntIn = new NGnTree(axes, tmpFilename);
318  if (ngntIn->IsZombie()) {
319  NLogError("NGnTree::Import: Failed to create NGnTree for input !!!");
320  SafeDelete(ngnt);
321  MakeZombie();
322  return;
323  }
324  // ngntIn->GetStorageTree()->GetTree()->GetUserInfo()->Add(hns->Clone());
325  ngntIn->GetOutput("default")->Add(hns->Clone("test"));
326  ngntIn->Close(true);
327 
328  // return;
329  // delete ngntIn;
330 
331  ngnt->SetInput(NGnTree::Open(tmpFilename)); // Set input to self
332 
333  ngnt->GetInput()->Print();
334 
335  ngnt->GetBinning()->AddBinningDefinition("default", b);
336  ngnt->InitParameters(cfg["_labels"].get<std::vector<std::string>>());
337 
338  Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
339  int /*threadId*/) {
340  // NLogInfo("Thread ID: %d", threadId);
341  TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
342  // point->Print();
343  json cfg = point->GetCfg();
344 
345  NGnTree * ngntIn = point->GetInput();
346  if (!ngntIn) {
347  NLogError("NGnTree::Import: Input NGnTree is nullptr !!!");
348  return;
349  }
350  // ngntIn->Print();
351 
352  THnSparse * hns = (THnSparse *)ngntIn->GetOutput("default")->At(0);
353  if (hns == nullptr) {
354  NLogError("NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
355  return;
356  }
357 
358  int axisIdx = cfg["_parameterAxis"].get<int>();
359  std::vector<std::vector<int>> ranges;
360  // set ranges from point storage coords
361  int iAxis = 0;
362  for (int i = 0; i < hns->GetNdimensions(); i++) {
363  if (i == axisIdx) continue; // skip parameter axis
364  int coord = point->GetStorageCoords()[iAxis++];
365  ranges.push_back({i, coord, coord});
366  // NLogInfo("Setting axis %d range to [%d, %d]", i, coord, coord);
367  }
368 
369  NUtils::SetAxisRanges(hns, ranges);
370  TH1 * h = hns->Projection(axisIdx, "O");
371  if (!h) {
372  NLogError("NGnTree::Import: Projection of THnSparse failed !!!");
373  return;
374  }
375  if (h->GetEntries() > 0) {
376  NParameters * params = point->GetParameters();
377  if (params) {
378  for (int bin = 1; bin <= h->GetNbinsX(); bin++) {
379  params->SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
380  }
381  }
382  // outputPoint->Add(hParams);
383  outputPoint->Add(h);
384 
385  std::string filename = cfg["filename"].get<std::string>();
386  TFile * f = (TFile *)point->GetTempObject("file");
387  if (!f || filename.compare(f->GetName()) != 0) {
388  if (f) {
389  NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
390  f->Close();
391  }
392  NLogDebug("NGnTree::Import: Opening file '%s' ...", filename.c_str());
393  f = TFile::Open(filename.c_str());
394  if (!f || f->IsZombie()) {
395  NLogError("NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
396  return;
397  }
398  point->SetTempObject("file", f);
399  }
400 
401  std::string axisObjectDefaultFormat =
402  cfg["axisObjectDefaultFormat"].is_string() ? cfg["axisObjectDefaultFormat"].get<std::string>() : "%.2f_%.2f";
403  std::string axisDefaultSeparator =
404  cfg["axisDefaultSeparator"].is_string() ? cfg["axisDefaultSeparator"].get<std::string>() : "/";
405  bool dryrun = false;
406  if (cfg.contains("dryrun") && cfg["dryrun"].is_boolean()) {
407  dryrun = cfg["dryrun"].get<bool>();
408  }
409 
410  if (dryrun) {
411  NLogInfo("NGnTree::Import (dryrun): '%s' ...", point->GetString().c_str());
412  }
413 
414  // loop over all object in cfg["objects"]
415  for (auto & [objName, objCfg] : cfg["objects"].items()) {
416  std::string objPath = NGnTree::BuildObjectPath(cfg, objCfg, point);
417 
418  if (dryrun) {
419  NLogInfo("NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
420  continue;
421  }
422 
423  if (point->GetEntryNumber() == 0) {
424  NLogInfo("NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425  cfg["filename"].get<std::string>().c_str());
426  }
427  TObject * obj = f->Get(objPath.c_str());
428  if (!obj) {
429  if (point->GetEntryNumber() == 0) {
430  NLogWarning("NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431  cfg["filename"].get<std::string>().c_str());
432  }
433  continue;
434  }
435 
436  if (obj->InheritsFrom(TCanvas::Class())) {
437  TCanvas * cObj = (TCanvas *)obj;
438  cObj->SetName(objName.c_str());
439  }
440  outputPoint->Add(obj->Clone(objName.c_str()));
441  }
442  // f->Close();
443  }
444  };
445 
446  // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
447  ngnt->Process(processFunc, cfg);
448  ngnt->Close(true);
449  // Remove tmp file
450  gSystem->Exec(TString::Format("rm -f %s", tmpFilename.c_str()));
451 }
452 
454 {
458 
459  if (fOwnsBinning) {
460  SafeDelete(fBinning);
461  }
462  if (fOwnsTreeStorage) {
463  SafeDelete(fTreeStorage);
464  }
465  SafeDelete(fNavigator);
466  SafeDelete(fParameters);
467 }
468 void NGnTree::Print(Option_t * option) const
469 {
473 
474  TString opt = option;
475 
476  // Print list of axes
477  NLogInfo("NGnTree::Print: Printing NGnTree object [ALL] ...");
478  if (fBinning) {
479  fBinning->Print(option);
480  }
481  else {
482  NLogError("Binning is not initialized in NGnTree !!!");
483  }
484  if (fTreeStorage) {
485  fTreeStorage->Print(option);
486  }
487  else {
488  NLogError("Storage tree is not initialized in NGnTree !!!");
489  }
490 }
491 
492 void NGnTree::Draw(Option_t * /*option*/)
493 {
497 
498  NLogInfo("NGnTree::Draw: Drawing NGnTree object [not implemented yet]...");
499 }
500 
501 bool NGnTree::Process(NGnProcessFuncPtr func, const json & cfg, std::string binningName, NGnBeginFuncPtr beginFunc,
502  NGnEndFuncPtr endFunc)
503 {
507 
508  if (!fBinning) {
509  NLogError("Binning is not initialized in NGnTree !!!");
510  return false;
511  }
512 
513  NBinning * binningIn = (NBinning *)fBinning->Clone();
514 
515  std::vector<std::string> defNames = fBinning->GetDefinitionNames();
516  if (!binningName.empty()) {
517  // Check if binning definitions exist
518  if (std::find(defNames.begin(), defNames.end(), binningName) == defNames.end()) {
519  NLogError("Binning definition '%s' not found in NGnTree !!!", binningName.c_str());
520  return false;
521  }
522  defNames.clear();
523  defNames.push_back(binningName);
524  }
525 
526  fBinning->Reset();
527  fBinning->SetCfg(cfg); // Set configuration to binning point
528  bool rc = Process(func, defNames, cfg, binningIn, beginFunc, endFunc);
529  if (!rc) {
530  NLogError("NGnTree::Process: Processing failed !!!");
531  return false;
532  }
533  // bool rc = false;
534  return true;
535 }
536 
537 bool NGnTree::Process(NGnProcessFuncPtr func, const std::vector<std::string> & defNames, const json & cfg,
538  NBinning * binningIn, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc)
539 {
543 
544  NLogInfo("NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545  bool batch = gROOT->IsBatch();
546  gROOT->SetBatch(kTRUE);
547  TH1::AddDirectory(kFALSE);
548 
549  // --- Worker mode: run as a remote TCP worker ---
550  if (const char * workerEndpoint = gSystem->Getenv("NDMSPC_WORKER_ENDPOINT")) {
551  size_t workerIndex = 0;
552  if (const char * envIdx = gSystem->Getenv("NDMSPC_WORKER_INDEX")) {
553  try { workerIndex = static_cast<size_t>(std::stoul(envIdx)); } catch (...) {}
554  }
555  NLogInfo("NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
556 
557  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
558  std::string workerBase = tmpDirEnv ? tmpDirEnv : "/tmp";
559  // jobDir and treeName will be received from master via INIT
560  // For now init NGnThreadData with a placeholder filename; Init() will be called with real paths
561  Ndmspc::NGnThreadData workerData;
562 
563  void * ctx = zmq_ctx_new();
564  void * dealer = zmq_socket(ctx, ZMQ_DEALER);
565  const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
566  zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
567  int timeoutMs = 1000;
568  zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
569  zmq_connect(dealer, workerEndpoint);
570  Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"READY"});
571 
572  // Wait for INIT
573  const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
574  bool initOk = false;
575  while (!initOk) {
576  std::vector<std::string> frames;
577  if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
578  if (errno == EAGAIN || errno == EWOULDBLOCK) {
579  if (std::chrono::steady_clock::now() > initDeadline) break;
580  continue;
581  }
582  break;
583  }
584  // INIT frames: "INIT", workerIdx, sessionId, resultsDir, treeName[, tmpDir, tmpResultsDir]
585  if (frames.size() >= 1 && frames[0] == "STOP") {
586  NLogPrint("NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
587  zmq_close(dealer);
588  zmq_ctx_term(ctx);
589  gROOT->SetBatch(batch);
590  return true;
591  }
592  if (frames.size() >= 5 && frames[0] == "INIT") {
593  workerIndex = static_cast<size_t>(std::stoul(frames[1]));
594  const std::string & sessionId = frames[2];
595  const std::string & initResultsDir = frames[3];
596  const std::string & initTreeName = frames[4];
597 
598  // Apply env vars sent by supervisor — these override the worker's inherited environment
599  if (frames.size() >= 7) {
600  if (!frames[5].empty())
601  gSystem->Setenv("NDMSPC_TMP_DIR", frames[5].c_str());
602  if (!frames[6].empty())
603  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
604  }
605  // Fallback: if NDMSPC_TMP_RESULTS_DIR is still unset/empty, use NDMSPC_TMP_DIR
606  if (!gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR")[0] == '\0') {
607  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
608  if (tmpDirEnv && tmpDirEnv[0] != '\0')
609  gSystem->Setenv("NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
610  }
611 
612  // Local work file — always on this machine's NDMSPC_TMP_DIR
613  const char * localTmpEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
614  const std::string localBase = localTmpEnv ? localTmpEnv : "/tmp";
615  const std::string localFile = localBase + "/.ndmspc/tmp/" + sessionId + "/" +
616  std::to_string(workerIndex) + "/" + fTreeStorage->GetPostfix();
617 
618  // Results file — on shared FS; supervisor reads from here to merge
619  const std::string resultsFile = initResultsDir + "/" + std::to_string(workerIndex) + "/" +
621 
622  bool rc = workerData.Init(workerIndex, func, beginFunc, endFunc, this, binningIn, fInput, localFile, initTreeName);
623  if (!rc) {
624  NLogError("NGnTree::Process: Worker failed to initialize NGnThreadData");
625  zmq_close(dealer);
626  zmq_ctx_term(ctx);
627  return false;
628  }
629  // If results dir differs from local dir, tell TaskLoop to copy after Close(true)
630  if (resultsFile != localFile) {
631  workerData.SetResultsFilename(resultsFile);
632  }
633  workerData.SetCfg(cfg);
634  Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {"ACK"});
635  initOk = true;
636  }
637  }
638  if (!initOk) {
639  NLogError("NGnTree::Process: Worker did not receive INIT from supervisor");
640  zmq_close(dealer);
641  zmq_ctx_term(ctx);
642  return false;
643  }
644 
645  Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
646  zmq_close(dealer);
647  zmq_ctx_term(ctx);
648  gROOT->SetBatch(batch);
649  return true;
650  }
651  // --- End worker mode ---
652 
654 
655  int nThreads = ROOT::GetThreadPoolSize(); // Get the number of threads to use
656  if (nThreads < 1) nThreads = 1;
657 
658  std::string executionMode = "thread";
659  const char * envMode = gSystem->Getenv("NDMSPC_EXECUTION_MODE");
660  const bool modeExplicit = (envMode && envMode[0] != '\0');
661  if (modeExplicit) {
662  executionMode = envMode;
663  }
664 
665  std::string normalizedMode = executionMode;
666  std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
667  [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
668  if (normalizedMode == "process") normalizedMode = "ipc";
669 
670  bool useProcessIpc = (normalizedMode == "ipc" || normalizedMode == "tcp");
671  bool useTcp = (normalizedMode == "tcp");
672  size_t nProcesses = static_cast<size_t>(nThreads);
673  bool ndmspcNProcExplicit = false;
674 
675  if (const char * envNdmspcNProc = gSystem->Getenv("NDMSPC_MAX_PROCESSES")) {
676  ndmspcNProcExplicit = true;
677  try {
678  nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNdmspcNProc)));
679  }
680  catch (...) {
681  NLogWarning("NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
682  nProcesses);
683  }
684  }
685  else if (const char * envNProc = gSystem->Getenv("ROOT_MAX_THREADS")) {
686  // Backward-compatible fallback when NDMSPC_MAX_PROCESSES is not set.
687  try {
688  nProcesses = std::max<size_t>(1, static_cast<size_t>(std::stoll(envNProc)));
689  }
690  catch (...) {
691  NLogWarning("NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
692  }
693  }
694 
695  // Keep explicit NDMSPC_EXECUTION_MODE settings authoritative.
696  // If mode is not explicitly set, default to local IPC for multi-process runs.
697  if (modeExplicit) {
698  if (normalizedMode == "thread") {
699  useProcessIpc = false;
700  useTcp = false;
701  }
702  else if (normalizedMode == "tcp") {
703  useProcessIpc = true;
704  useTcp = true;
705  }
706  else if (normalizedMode == "ipc") {
707  useProcessIpc = true;
708  useTcp = false;
709  }
710  else {
711  NLogWarning("NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
712  executionMode.c_str());
713  useProcessIpc = (nProcesses > 1);
714  useTcp = false;
715  }
716  }
717  else if (nProcesses > 1) {
718  useProcessIpc = true;
719  useTcp = false;
720  executionMode = "ipc";
721  normalizedMode = "ipc";
722  }
723 
724  if (ndmspcNProcExplicit && normalizedMode == "thread" && nProcesses > 1) {
725  NLogWarning("NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
726  nProcesses);
727  }
728 
729  const size_t workerObjectCount = useProcessIpc ? std::max(static_cast<size_t>(nThreads), nProcesses)
730  : static_cast<size_t>(nThreads);
731  std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
732 
733  NLogInfo("NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
734  executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
735 
736  const char * tmpDirEnv = gSystem->Getenv("NDMSPC_TMP_DIR");
737  std::string tmpDir;
738  if (tmpDirEnv && tmpDirEnv[0] != '\0') {
739  tmpDir = tmpDirEnv;
740  } else {
741  TString tmpDirPrefix = fTreeStorage->GetPrefix();
742  // Use storage prefix only if it is a local path (not a remote URL)
743  if (!(tmpDirPrefix.BeginsWith("root://") || tmpDirPrefix.BeginsWith("http://") ||
744  tmpDirPrefix.BeginsWith("https://"))) {
745  tmpDir = tmpDirPrefix.Data();
746  }
747  if (tmpDir.empty()) tmpDir = "/tmp";
748  }
749 
750  std::string jobDir = tmpDir + "/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
751 
752  // Results dir: when NDMSPC_TMP_RESULTS_DIR equals NDMSPC_TMP_DIR (or is unset),
753  // reuse jobDir so that localTmpFile == resultsFilename — no copy or delete needed.
754  const char * resultsDirEnv = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
755  const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
756  std::string resultsDir = sameDir ? jobDir
757  : (std::string(resultsDirEnv) + "/" +
758  std::to_string(gSystem->GetPid()));
759 
760  std::string filePrefix = jobDir;
761  for (size_t i = 0; i < threadDataVector.size(); ++i) {
762  std::string filename = filePrefix + "/" + std::to_string(i) + "/" + fTreeStorage->GetPostfix();
763  bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc, this, binningIn, fInput, filename,
764  fTreeStorage->GetTree()->GetName());
765  if (!rc) {
766  NLogError("Failed to initialize thread data %zu, exiting ...", i);
767  return false;
768  }
769  threadDataVector[i].SetCfg(cfg); // Set configuration to binning point
770  if (useTcp) {
771  // Tell the merge step where workers will deposit their finished files.
772  // When resultsDir == jobDir (NDMSPC_TMP_RESULTS_DIR unset) the paths are
773  // identical so no copy or delete is needed — handled in TaskLoop.
774  std::string resultsFile = resultsDir + "/" + std::to_string(i) + "/" + fTreeStorage->GetPostfix();
775  threadDataVector[i].SetResultsFilename(resultsFile);
776  }
777  }
778  size_t processedEntries = 0;
779  size_t totalEntries = 0;
780  auto start_par = std::chrono::high_resolution_clock::now();
781  auto start_par_job = std::chrono::high_resolution_clock::now();
782  auto task = [&](const std::vector<int> & coords, Ndmspc::NGnThreadData & thread_obj) {
783  // NLogWarning("Processing coordinates %s in thread %zu", NUtils::GetCoordsString(coords).c_str(),
784  // thread_obj.GetAssignedIndex());
785  // thread_obj.Print();
786  thread_obj.Process(coords);
787  processedEntries++;
788  if (!NLogger::GetConsoleOutput()) {
789  size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
790  : totalEntries - processedEntries;
791  NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format("R%4zu", nRunning).Data());
792  }
793  };
794 
795  size_t iDef = 0;
796  int sumIds = 0;
797 
798  std::vector<Ndmspc::NThreadData *> processWorkers;
799  std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
800  if (useProcessIpc) {
801  processWorkers.reserve(threadDataVector.size());
802  for (size_t i = 0; i < threadDataVector.size(); ++i) {
803  processWorkers.push_back(&threadDataVector[i]);
804  }
805  ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
806  if (useTcp) {
807  const char * tcpPort = gSystem->Getenv("NDMSPC_TCP_PORT");
808  std::string tcpEndpoint = std::string("tcp://0.0.0.0:") + (tcpPort ? tcpPort : "5555");
809  const char * resultsDirBase = gSystem->Getenv("NDMSPC_TMP_RESULTS_DIR");
810  // Auto-detect the macro to send to workers: explicit SetWorkerMacro() takes
811  // priority; otherwise fall back to NDMSPC_MACRO set by ndmspc-run.
812  std::string workerMacro = fWorkerMacroList;
813  if (workerMacro.empty()) {
814  if (const char * envMacro = gSystem->Getenv("NDMSPC_MACRO")) workerMacro = envMacro;
815  }
816  ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
817  fTreeStorage->GetTree()->GetName(), workerMacro, tmpDir,
818  resultsDirBase ? resultsDirBase : "");
819  } else {
820  ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
821  }
822  }
823 
824  std::map<std::string, std::vector<Long64_t>>
825  defIdMapProcessedRemoved; // Map to track which ids belong to which definition
826  // for (auto & name : defNames) {
827  // auto binningDef = binningIn->GetDefinition(name);
828  // if (!binningDef) {
829  // NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
830  // return false;
831  // }
832  // for (auto & id : binningDef->GetIds()) {
833  // defIdMap[name].push_back(id);
834  // }
835  // }
836 
837  try {
838  for (auto & name : defNames) {
839  auto binningDef = binningIn->GetDefinition(name);
840  if (!binningDef) {
841  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
842  return false;
843  }
844 
845  if (binningDef->GetIds().size() == 0) {
846  NLogWarning("NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
847  continue;
848  }
849 
850  const std::vector<Long64_t> originalDefinitionIds = binningDef->GetIds();
851 
852  std::vector<int> mins, maxs;
853  mins.push_back(0);
854  maxs.push_back(binningDef->GetIds().size() - 1);
855  NLogDebug("NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
856  binningDef->GetIds().size());
857 
859  NLogInfo("NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
860  }
861  else {
862  Printf("Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
863  }
864  start_par = std::chrono::high_resolution_clock::now();
865  processedEntries = 0;
866  totalEntries = maxs[0] + 1;
867  const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
868  : threadDataVector.size();
870  NUtils::ProgressBar(processedEntries, totalEntries, start_par,
871  TString::Format("R%4zu", activeWorkers).Data());
872 
873  // binningIn->SetCurrentDefinitionName(name);
874  for (size_t i = 0; i < threadDataVector.size(); ++i) {
875  // threadDataVector[i].SetCurrentDefinitionName(name);
876  threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
877  }
878 
879  Ndmspc::NDimensionalExecutor executorMT(mins, maxs);
880 
881  if (!useProcessIpc) {
882  // Disable ROOT's RecursiveRemove during the parallel phase.
883  // Without this, concurrent threads' object deletions trigger RecursiveRemove
884  // which iterates pad->fPrimitives without per-object locks → TObjLink corruption.
885  Bool_t prevMustClean = gROOT->MustClean();
886  gROOT->SetMustClean(kFALSE);
887 
888  // Enable batch mode during the parallel phase so that ROOT drawing calls
889  // (e.g. TH1::Fit drawing the fit function via f1->Draw) are no-ops.
890  // Without this, concurrent threads each trigger ROOT canvas creation ("c1"),
891  // the second canvas creation deletes the first → double-free / heap corruption,
892  // which then causes TPad::RecursiveRemove to crash when closing the per-thread TTree.
893  Bool_t prevBatch = gROOT->IsBatch();
894  gROOT->SetBatch(kTRUE);
895 
896  executorMT.ExecuteParallel<Ndmspc::NGnThreadData>(task, threadDataVector);
897 
898  // Restore both flags before flushing deferred deletes, so each object's destructor
899  // properly calls gROOT->RecursiveRemove and removes itself from ROOT's global lists.
900  // This prevents dangling pointers that would crash the RecursiveRemove cascade
901  // triggered by TTree::~TTree when the per-thread storage files are closed below.
902  // It is safe here because all worker threads have already finished.
903  gROOT->SetMustClean(prevMustClean);
904  gROOT->SetBatch(prevBatch);
905 
906  // Flush deferred deletes single-threaded with MustClean and batch mode restored.
907  for (size_t i = 0; i < threadDataVector.size(); ++i) {
908  threadDataVector[i].FlushDeferredDeletes();
909  }
910 
911  for (size_t i = 0; i < threadDataVector.size(); ++i) {
912  threadDataVector[i].ExecuteEndFunction();
913  }
914  }
915  else {
916  ipcExecutor->SetBounds(mins, maxs);
917  size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
918  name, &originalDefinitionIds, [&, activeWorkers](size_t ackCount, size_t activeWorkersNow) {
919  processedEntries = ackCount;
920  if (!NLogger::GetConsoleOutput()) {
921  size_t nRunning = std::min(activeWorkersNow, activeWorkers);
922  NUtils::ProgressBar(processedEntries, totalEntries, start_par,
923  TString::Format("R%4zu", nRunning).Data());
924  }
925  });
926  processedEntries = acked;
927 
928  // Child processes update their own worker-object copies. Rebuild parent-side
929  // per-worker counters and processed-id vectors deterministically from task assignment.
930  const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, processWorkers.size()));
931  for (size_t i = 0; i < threadDataVector.size(); ++i) {
932  auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
933  if (workerDef) {
934  workerDef->GetIds().clear();
935  }
936  }
937 
938  for (size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
939  const size_t workerIndex = taskIndex % processesToUse;
940  threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
941 
942  auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
943  if (workerDef) {
944  workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
945  }
946  }
947 
948  if (!NLogger::GetConsoleOutput() && processedEntries < totalEntries) {
949  NUtils::ProgressBar(processedEntries, totalEntries, start_par, "R 0");
950  }
951  }
952 
954  Printf("Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
955  // Update hnsbBinningIn with the processed ids
956  NLogDebug("NGnTree::Process: [BEGIN] ------------------------------------------------");
957  sumIds += binningIn->GetDefinition(name)->GetIds().size();
958  binningIn->GetDefinition(name)->GetIds().clear();
959  for (size_t i = 0; i < threadDataVector.size(); ++i) {
960  NLogDebug("NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
961  // threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->Print();
962  binningIn->GetDefinition(name)->GetIds().insert(
963  binningIn->GetDefinition(name)->GetIds().end(),
964  threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
965  threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
966  sort(binningIn->GetDefinition(name)->GetIds().begin(), binningIn->GetDefinition(name)->GetIds().end());
967  }
968  // hnsbBinningIn->GetDefinition(name)->Print();
969  // remove entries present in hnsbBinningIn from other definitions
970  for (size_t i = 0; i < defNames.size(); i++) {
971 
972  std::string other_name = defNames[i];
973  auto otherDef = binningIn->GetDefinition(other_name);
974  if (i <= iDef) {
975  continue;
976  }
977  if (!otherDef) {
978  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
979  return false;
980  }
981  // remove entries that has value less then sumIds
982  for (auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
983  NLogTrace("NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
984  other_name.c_str(), sumIds);
985  if (*it < sumIds) {
986  NLogTrace("NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
987  defIdMapProcessedRemoved[other_name].push_back(*it);
988  it = otherDef->GetIds().erase(it);
989  }
990  else {
991  ++it;
992  }
993  }
994 
995  binningIn->GetDefinition(other_name)->Print();
996  }
997  // hnsbBinningIn->GetDefinition(name)->Print();
998  iDef++;
999 
1000  NLogDebug("NGnTree::Process: [END] ------------------------------------------------");
1001  }
1002  }
1003  catch (const std::exception & ex) {
1004  if (ipcExecutor) {
1005  ipcExecutor->FinishProcessIpc(/*abort=*/true);
1006  ipcExecutor.reset();
1007  }
1008 
1009  TString what(ex.what());
1010  if (what.Contains("Interrupted by user")) {
1011  if (gROOT) {
1012  gROOT->SetInterrupt(kFALSE);
1013  }
1014  NLogWarning("NGnTree::Process: Interrupted by user, stopping processing.");
1015  }
1016  else {
1017  NLogError("NGnTree::Process: Processing failed: %s", ex.what());
1018  }
1019  return false;
1020  }
1021 
1022  // return true; // For testing, skip merging and post-processing
1023 
1024  auto end_par = std::chrono::high_resolution_clock::now();
1025  std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1026 
1027  if (ipcExecutor) {
1028  ipcExecutor->FinishProcessIpc();
1029  }
1030 
1031  // For TCP mode, only merge results from workers that actually connected.
1032  // For IPC/fork and thread modes, all indices are valid.
1033  const std::set<size_t> registeredWorkers =
1034  (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1035  const bool filterByRegistered = !registeredWorkers.empty();
1036 
1037  if (!NLogger::GetConsoleOutput()) {
1038  Printf("NGnTree::Process: Execution completed and it took %s .",
1039  NUtils::FormatTime(par_duration.count() / 1000).c_str());
1040  }
1041  else {
1042  NLogInfo("NGnTree::Process: Execution completed and it took %s .",
1043  NUtils::FormatTime(par_duration.count() / 1000).c_str());
1044  }
1045 
1046  NLogInfo("NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1047  for (auto & data : threadDataVector) {
1048  if (useProcessIpc) {
1049  NLogTrace("NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1050  data.GetAssignedIndex());
1051  // data.GetHnSparseBase()->GetStorageTree()->Close(false);
1052  }
1053  else {
1054  NLogTrace("NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1055  data.GetHnSparseBase()->GetStorageTree()->Close(true);
1056  }
1057  }
1058 
1059  NLogDebug("NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1060  if (!NLogger::GetConsoleOutput()) {
1061  Printf("NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1062  }
1063  const auto mergeStart = std::chrono::high_resolution_clock::now();
1064  TList * mergeList = new TList();
1065  Ndmspc::NGnThreadData * outputData = new Ndmspc::NGnThreadData();
1066  outputData->Init(0, func, nullptr, nullptr, this, binningIn);
1067  outputData->SetCfg(cfg);
1068  // outputData->Init(0, func, this);
1069 
1070  for (auto & data : threadDataVector) {
1071  if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1072  NLogInfo("NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1073  continue;
1074  }
1075  NLogTrace("NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1076  mergeList->Add(&data);
1077  }
1078 
1079  Long64_t nmerged = outputData->Merge(mergeList);
1080  const auto mergeEnd = std::chrono::high_resolution_clock::now();
1081  if (!NLogger::GetConsoleOutput()) {
1082  const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1083  Printf("NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1084  }
1085  if (nmerged <= 0) {
1086  NLogError("NGnTree::Process: Failed to merge thread data, exiting ...");
1087  delete mergeList;
1088  return false;
1089  }
1090  NLogInfo("NGnTree::Process: Merged %lld outputs successfully", nmerged);
1091  // delete all temporary files
1092  // for (auto & data : threadDataVector) {
1093  // std::string filename = data.GetHnSparseBase()->GetStorageTree()->GetFileName();
1094  // NLogTrace("NGnTree::Process: Deleting temporary file '%s' ...", filename.c_str());
1095  // gSystem->Exec(TString::Format("rm -f %s", filename.c_str()));
1096  // }
1097  //
1098 
1099  // binningIn= outputData->GetHnSparseBase()->GetBinning();
1100 
1101  auto * mergedBinning = outputData->GetHnSparseBase()->GetBinning();
1102  std::set<Long64_t> mergedContentIds;
1103  std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1104 
1105  // add missing entries to definitions based on defIdMapProcessedRemoved
1106  for (size_t i = 0; i < defNames.size(); i++) {
1107  std::string name = defNames[i];
1108  // auto def = binningIn->GetDefinition(name);
1109  auto def = mergedBinning->GetDefinition(name);
1110  if (!def) {
1111  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1112  return false;
1113  }
1114  for (auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1115  if (other_name.compare(name) != 0) {
1116  continue;
1117  }
1118  for (auto & id : removedIds) {
1119  if (std::find(def->GetIds().begin(), def->GetIds().end(), id) == def->GetIds().end()) {
1120  NLogTrace("NGnTree::Process: Adding missing entry %lld to definition '%s'", id, name.c_str());
1121  def->GetIds().push_back(id);
1122  }
1123  }
1124  }
1125  sort(def->GetIds().begin(), def->GetIds().end());
1126  // outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds() = def->GetIds();
1127 
1128  // Modify content in binning definitions based on def->GetIds()
1129  def->GetContent()->Reset();
1130  for (auto id : def->GetIds()) {
1131  NBinningPoint point(def->GetBinning());
1132  def->GetBinning()->GetContent()->GetBinContent(id, point.GetCoords());
1133  point.RecalculateStorageCoords();
1134  Long64_t bin = def->GetContent()->GetBin(point.GetStorageCoords());
1135  NLogTrace("NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(), id, bin);
1136  def->GetContent()->SetBinContent(bin, id);
1137 
1138  if (mergedContentIds.insert(id).second) {
1139  mergedContentCoords.emplace_back(
1140  id, NUtils::ArrayToVector(point.GetCoords(), point.GetNDimensionsContent()));
1141  }
1142  }
1143  }
1144 
1145  // Rebuild the merged top-level content from final definition ids. In IPC/TCP mode
1146  // the merge setup may still carry sparse source-bin content; resetting here keeps
1147  // only the bins that correspond to actual merged tree entries.
1148  mergedBinning->GetContent()->Reset();
1149  for (const auto & entry : mergedContentCoords) {
1150  Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1151  mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1152  }
1153 
1154  // print final binning definitions
1155  NLogDebug("NGnTree::Process: Final binning definitions after processing:");
1156  for (auto & name : defNames) {
1157  // auto binningDef = binningIn->GetDefinition(name);
1158  auto binningDef = outputData->GetHnSparseBase()->GetBinning()->GetDefinition(name);
1159  if (!binningDef) {
1160  NLogError("NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1161  return false;
1162  }
1163  binningDef->Print();
1164  }
1165 
1166  fTreeStorage = outputData->GetHnSparseBase()->GetStorageTree();
1167  fOutputs = outputData->GetHnSparseBase()->GetOutputs();
1168  fBinning = outputData->GetHnSparseBase()->GetBinning(); // Update binning to the merged one
1169  fParameters = outputData->GetHnSparseBase()->GetParameters();
1170 
1171  if (NLogger::GetConsoleOutput()) {
1172  NLogInfo("NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1173  fTreeStorage->GetFileName().c_str());
1174  }
1175  else {
1176  Printf("Processing completed successfully. Output was stored in '%s'.", fTreeStorage->GetFileName().c_str());
1177  }
1178 
1179  // Close the final output file
1180  if (!NLogger::GetConsoleOutput()) {
1181  Printf("NGnTree::Process: [phase] final close start (%s)",
1182  outputData->GetHnSparseBase()->GetStorageTree()->GetFileName().c_str());
1183  }
1184  const auto closeStart = std::chrono::high_resolution_clock::now();
1185  outputData->GetHnSparseBase()->Close(true);
1186  const auto closeEnd = std::chrono::high_resolution_clock::now();
1187  if (!NLogger::GetConsoleOutput()) {
1188  const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1189  Printf("NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1190  }
1191 
1192  if (!NLogger::GetConsoleOutput()) {
1193  Printf("NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1194  }
1195  const auto cleanupStart = std::chrono::high_resolution_clock::now();
1196  gSystem->Exec(TString::Format("rm -fr %s", jobDir.c_str()));
1197  const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1198  if (!NLogger::GetConsoleOutput()) {
1199  const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1200  Printf("NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1201  }
1202  gROOT->SetBatch(batch); // Restore ROOT batch mode
1203  return true;
1204 }
1205 
1206 TList * NGnTree::GetOutput(std::string name)
1207 {
1211 
1212  if (name.empty()) {
1214  }
1215  if (fOutputs.find(name) == fOutputs.end()) {
1216  fOutputs[name] = new TList();
1217  fOutputs[name]->SetName(name.c_str());
1218  }
1219  return fOutputs[name];
1220 }
1221 
1222 NGnTree * NGnTree::Open(const std::string & filename, const std::string & branches, const std::string & treename)
1223 {
1227 
1228  NLogDebug("Opening '%s' with branches='%s' and treename='%s' ...", filename.c_str(), branches.c_str(),
1229  treename.c_str());
1230 
1231  TFile * file = TFile::Open(filename.c_str());
1232  if (!file) {
1233  NLogError("NGnTree::Open: Cannot open file '%s'", filename.c_str());
1234  return nullptr;
1235  }
1236 
1237  TTree * tree = (TTree *)file->Get(treename.c_str());
1238  if (!tree) {
1239  NLogError("NGnTree::Open: Cannot get tree '%s' from file '%s'", treename.c_str(), filename.c_str());
1240  return nullptr;
1241  }
1242 
1243  return Open(tree, branches, file);
1244 }
1245 
1246 NGnTree * NGnTree::Open(TTree * tree, const std::string & branches, TFile * file)
1247 {
1251 
1252  NBinning * hnstBinning = (NBinning *)tree->GetUserInfo()->At(0);
1253  if (!hnstBinning) {
1254  NLogError("NGnTree::Open: Cannot get binning from tree '%s'", tree->GetName());
1255  return nullptr;
1256  }
1257  NStorageTree * hnstStorageTree = (NStorageTree *)tree->GetUserInfo()->At(1);
1258  if (!hnstStorageTree) {
1259  NLogError("NGnTree::Open: Cannot get tree storage info from tree '%s'", tree->GetName());
1260  return nullptr;
1261  }
1262 
1263  std::map<std::string, TList *> outputs;
1264  TDirectory * dir = nullptr;
1265  if (file) {
1266  dir = (TDirectory *)file->Get("outputs");
1267  auto l = dir->GetListOfKeys();
1268  for (auto kv : *l) {
1269  TObject * obj = dir->Get(kv->GetName());
1270  if (!obj) continue;
1271  TList * l = dynamic_cast<TList *>(obj);
1272  if (!l) continue;
1273  outputs[l->GetName()] = l;
1274  NLogDebug("Imported output list for binning '%s' with %d object(s) from file '%s'", l->GetName(), l->GetEntries(),
1275  file->GetName());
1276  }
1277  }
1278  // TDirectory * dir = (TDirectory *)tree->GetUserInfo()->FindObject("outputs");
1279  // if (dir) {
1280  // dir->Print();
1281  // }
1282 
1283  NGnTree * ngnt = new NGnTree(hnstBinning, hnstStorageTree);
1284 
1285  if (!hnstStorageTree->SetFileTree(file, tree)) return nullptr;
1286  // if (!ngnt->InitBinnings({})) return nullptr;
1287  // ngnt->Print();
1288  // Get list of branches
1289  std::vector<std::string> enabledBranches;
1290  if (!branches.empty()) {
1291  enabledBranches = Ndmspc::NUtils::Tokenize(branches, ',');
1292  NLogTrace("NGnTree::Open: Enabled branches: %s", NUtils::GetCoordsString(enabledBranches, -1).c_str());
1293  hnstStorageTree->SetEnabledBranches(enabledBranches);
1294  }
1295  else {
1296  // loop over all branches and set address
1297  for (auto & kv : hnstStorageTree->GetBranchesMap()) {
1298  NLogTrace("NGnTree::Open: Enabled branches: %s", kv.first.c_str());
1299  }
1300  }
1301  // Set all branches to be read
1302  hnstStorageTree->SetBranchAddresses();
1303  ngnt->SetOutputs(outputs);
1304 
1305  NGnNavigator * nav = new NGnNavigator();
1306  nav->SetGnTree(ngnt);
1307  ngnt->SetNavigator(nav);
1308 
1309  return ngnt;
1310 }
1311 
1313 {
1317 
1318  if (fNavigator) {
1319  NLogTrace("NGnTree::SetNavigator: Replacing existing navigator ...");
1320  SafeDelete(fNavigator);
1321  }
1322 
1323  fNavigator = navigator;
1324 }
1325 
1326 bool NGnTree::Close(bool write)
1327 {
1331 
1332  if (!fTreeStorage) {
1333  NLogError("NGnTree::Close: Storage tree is not initialized in NGnTree !!!");
1334  return false;
1335  }
1336 
1337  return fTreeStorage->Close(write, fOutputs);
1338 }
1339 
1340 Int_t NGnTree::GetEntry(Long64_t entry, bool checkBinningDef)
1341 {
1345  if (!fTreeStorage) {
1346  NLogError("NGnTree::GetEntry: Storage tree is not initialized in NGnTree !!!");
1347  return -1;
1348  }
1349 
1350  int bytes =
1351  fTreeStorage->GetEntry(entry, fBinning->GetPoint(0, fBinning->GetCurrentDefinitionName()), checkBinningDef);
1352  if (fTreeStorage->GetBranch("_params")) fParameters = (NParameters *)fTreeStorage->GetBranch("_params")->GetObject();
1353  return bytes;
1354 }
1355 Int_t NGnTree::GetEntry(std::vector<std::vector<int>> /*range*/, bool checkBinningDef)
1356 {
1360 
1361  return GetEntry(0, checkBinningDef);
1362 }
1363 
1364 void NGnTree::Play(int timeout, std::string binning, std::vector<int> outputPointIds,
1365  std::vector<std::vector<int>> ranges, Option_t * option)
1366 {
1370  TString opt = option;
1371  opt.ToUpper();
1372 
1373  std::string annimationTempDir =
1374  TString::Format("%s/.ndmspc/animation/%d", gSystem->Getenv("HOME"), gSystem->GetPid()).Data();
1375  gSystem->Exec(TString::Format("mkdir -p %s", annimationTempDir.c_str()));
1376 
1377  if (binning.empty()) {
1378  binning = fBinning->GetCurrentDefinitionName();
1379  }
1380 
1381  NBinningDef * binningDef = fBinning->GetDefinition(binning);
1382  if (!binningDef) {
1383  NLogError("NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1384  NLogError("Available binning definitions:");
1385  for (auto & name : fBinning->GetDefinitionNames()) {
1386  if (name == fBinning->GetCurrentDefinitionName())
1387  NLogError(" [*] %s", name.c_str());
1388  else
1389  NLogError(" [ ] %s", name.c_str());
1390  }
1391  return;
1392  }
1393 
1394  THnSparse * bdContent = (THnSparse *)binningDef->GetContent()->Clone();
1395 
1396  std::string bdContentName = TString::Format("bdContent_%s", binning.c_str()).Data();
1397  // Set axis ranges if provided
1398  if (!ranges.empty()) NUtils::SetAxisRanges(bdContent, ranges, false, true);
1399 
1400  Long64_t linBin = 0;
1401  std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(true /*use axis range*/)};
1402  std::vector<Long64_t> ids;
1403  // std::vector<Long64_t> ids = binningDef->GetIds();
1404  while ((linBin = iter->Next()) >= 0) {
1405  // NLogDebug("Original content bin %lld: %f", linBin, bdContentOrig->GetBinContent(linBin));
1406  ids.push_back(linBin);
1407  }
1408  if (ids.empty()) {
1409  NLogWarning("NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1410  return;
1411  }
1412  // return;
1413 
1414  TCanvas * c1 = nullptr;
1415 
1416  c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject("c1");
1417  if (c1 == nullptr) c1 = new TCanvas("c1", "NGnTree::Play", 800, 600);
1418  c1->Clear();
1419  c1->cd();
1420  c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1421  gSystem->ProcessEvents();
1422 
1423  binningDef->Print();
1424  bdContent->Reset();
1425 
1426  // loop over all ids and print them
1427  for (auto id : ids) {
1428  // for (int id = 0; id < GetEntries(); id++) {
1429  GetEntry(id);
1430  fBinning->GetPoint()->Print();
1431  TList * l = (TList *)fTreeStorage->GetBranch("_outputPoint")->GetObject();
1432  if (!l || l->IsEmpty()) {
1433  NLogWarning("NGnTree::Play: No 'outputPoint' for entry %lld !!!", id);
1434  continue;
1435  }
1436  else {
1437  // NLogInfo("Output for entry %lld:", id);
1438  // l->Print(opt.Data());
1439 
1440  if (outputPointIds.empty()) {
1441  outputPointIds.resize(l->GetEntries());
1442  for (int i = 0; i < l->GetEntries(); i++) {
1443  outputPointIds[i] = i;
1444  }
1445  }
1446  int n = outputPointIds.size();
1447 
1448  Double_t v = 1.0;
1449  for (int i = 0; i < n; i++) {
1450  // NLogDebug("Drawing output object id %d (list index %d) on pad %d", outputPointIds[i], i, i + 1);
1451 
1452  c1->cd(i + 2);
1453  TObject * obj = l->At(outputPointIds[i]);
1454  if (obj) {
1455  if (obj->InheritsFrom(TH1::Class())) {
1456  TH1 * h = (TH1 *)obj;
1457  h->SetDirectory(nullptr);
1458  // Draw a clone to avoid transferring ownership or modifying
1459  // the original object stored in the TList (can cause
1460  // TPad/TList removal during drawing and lead to crashes).
1461  TH1 * hclone = (TH1 *)h->Clone();
1462  if (hclone) {
1463  hclone->SetDirectory(nullptr);
1464  hclone->Draw();
1465  }
1466  }
1467  // obj->Print();
1468  }
1469  if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1470  TH1 * h = (TH1 *)obj;
1471  v = h->GetMean();
1472  NLogDebug("Mean value from histogram [%s]: %f", h->GetName(), v);
1473  }
1474  }
1475  bdContent->SetBinContent(fBinning->GetPoint()->GetStorageCoords(), 1);
1476  c1->cd(1);
1477  TH1 * bdProj = (TH1 *)gROOT->FindObjectAny("bdProj");
1478  if (bdProj) {
1479  delete bdProj;
1480  bdProj = nullptr;
1481  }
1482  if (bdContent->GetNdimensions() == 1) {
1483  bdProj = bdContent->Projection(0, "O");
1484  }
1485  else if (bdContent->GetNdimensions() == 2) {
1486  bdProj = bdContent->Projection(0, 1, "O");
1487  }
1488  else if (bdContent->GetNdimensions() == 3) {
1489  bdProj = bdContent->Projection(0, 1, 2, "O");
1490  }
1491  else {
1492  NLogError("NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1493  }
1494  if (bdProj) {
1495  bdProj->SetName("bdProj");
1496  bdProj->SetTitle(TString::Format("Binning '%s' content projection", binning.c_str()).Data());
1497  bdProj->SetMinimum(0);
1498  // bdProj->SetDirectory(nullptr);
1499  bdProj->Draw("colz");
1500  // c1->ModifiedUpdate();
1501  }
1502  }
1503  if (c1) {
1504  c1->ModifiedUpdate();
1505  c1->SaveAs(TString::Format("%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1506  }
1507  gSystem->ProcessEvents();
1508  if (timeout > 0) gSystem->Sleep(timeout);
1509  NLogInfo("%d", id);
1510  }
1511 
1512  NLogInfo("Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1513  gSystem->Exec(
1514  TString::Format("magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1515  gSystem->Exec(TString::Format("rm -fr %s", annimationTempDir.c_str()));
1516  NLogInfo("Animation saved to ndmspc_play.gif");
1517 
1518  delete bdContent;
1519 }
1520 
1521 TList * NGnTree::Projection(const json & cfg, std::string binningName)
1522 {
1526 
1527  // SetInput(); // Set input to selfp
1529  Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * output, TList * /*outputPoint*/,
1530  int /*threadId*/) {
1531  // NLogInfo("Thread ID: %d", threadId);
1532  TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1533  point->Print();
1534  json cfg = point->GetCfg();
1535 
1536  Printf("Processing THnSparse projection with configuration: %s", cfg.dump().c_str());
1537 
1538  Ndmspc::NGnTree * ngntIn = point->GetInput();
1539  // ngntIn->Print();
1540  // ngntIn->GetEntry(0);
1541  ngntIn->GetEntry(point->GetEntryNumber());
1542 
1543  // loop over all cfg["objects"]
1544  for (auto & [objName, objCfg] : cfg["objects"].items()) {
1545  NLogInfo("Processing object '%s' ...", objName.c_str());
1546 
1547  THnSparse * hns = (THnSparse *)(ngntIn->GetStorageTree()->GetBranchObject(objName));
1548  if (hns == nullptr) {
1549  NLogError("NGnTree::Projection: THnSparse 'hns' not found in storage tree !!!");
1550  return;
1551  }
1552  // hns->Print("all");
1553  // loop over cfg["objects"][objName] array of projection dimension names
1554  for (size_t i = 0; i < objCfg.size(); i++) {
1555 
1556  NLogInfo("Processing projection %zu for object '%s' ...", i, objName.c_str());
1557  std::vector<int> dims;
1558  std::vector<std::string> dimNames = cfg["objects"][objName][i].get<std::vector<std::string>>();
1559  for (const auto & dimName : dimNames) {
1560  NLogDebug("Looking for dimension name '%s' in THnSparse ...", dimName.c_str());
1561  int dim = -1;
1562  for (int i = 0; i < hns->GetNdimensions(); i++) {
1563  if (dimName == hns->GetAxis(i)->GetName()) {
1564  dim = i;
1565  break;
1566  }
1567  }
1568  if (dim >= 0)
1569  dims.push_back(dim);
1570  else {
1571  NLogError("NGnTree::Projection: Dimension name '%s' not found in THnSparse !!!", dimName.c_str());
1572  }
1573  }
1574  // Print dims
1575  NLogInfo("Projecting THnSparse on dimensions: %s", NUtils::GetCoordsString(dims, -1).c_str());
1576  TH1 * hPrev = (TH1 *)output->At(i);
1577  TH1 * hProj = NUtils::ProjectTHnSparse(hns, dims, "O");
1578  hProj->SetName(TString::Format("%s_proj_%s", objName.c_str(), NUtils::Join(dims, '_').c_str()).Data());
1579  if (hPrev) {
1580  hPrev->Add(hProj);
1581  }
1582  else {
1583  output->Add(hProj);
1584  }
1585  }
1586  }
1587  output->Print();
1588  };
1589 
1590  // NBinningDef * binningDef = fInput->GetBinning()->GetDefinition(binningName);
1591  NBinningDef * binningDef = GetBinning()->GetDefinition(binningName);
1592  THnSparse * hnsIn = binningDef->GetContent();
1593  // std::vector<std::vector<int>> ranges{{0, 2, 2}, {2, 1, 1}};
1594  std::vector<std::vector<int>> ranges = cfg["ranges"].get<std::vector<std::vector<int>>>();
1595  NUtils::SetAxisRanges(hnsIn, ranges); // Set the ranges for the axes
1596  Long64_t linBin = 0;
1597  std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{hnsIn->CreateIter(true /*use axis range*/)};
1598  std::vector<Long64_t> ids;
1599  // std::vector<Long64_t> ids = binningDef->GetIds();
1600  while ((linBin = iter->Next()) >= 0) {
1601  ids.push_back(linBin);
1602  }
1603  if (ids.empty()) {
1604  NLogWarning("NGnTree::Projection: No entries found in binning definition '%s' !!!", binningDef->GetName());
1605  binningDef->RefreshIdsFromContent();
1606  return nullptr;
1607  }
1608 
1609  binningDef->GetIds() = ids;
1610 
1611  // NUtils::SetAxisRanges(, std::vector<std::vector<int>> ranges)
1612  Process(processFunc, cfg);
1613 
1614  // Refresh binning definition ids from content after processing
1615  binningDef->RefreshIdsFromContent();
1616 
1617  // GetInput()->Close(false);
1619 
1620  // Close(false);
1621 }
1622 
1623 NGnNavigator * NGnTree::Reshape(std::string binningName, std::vector<std::vector<int>> levels, int level,
1624  std::map<int, std::vector<int>> ranges, std::map<int, std::vector<int>> rangesBase)
1625 {
1629 
1630  NGnNavigator navigator;
1631  navigator.SetGnTree(this);
1632 
1633  return navigator.Reshape(binningName, levels, level, ranges, rangesBase);
1634 }
1635 
1636 NGnNavigator * NGnTree::GetResourceStatisticsNavigator(std::string binningName, std::vector<std::vector<int>> levels,
1637  int level, std::map<int, std::vector<int>> ranges,
1638  std::map<int, std::vector<int>> rangesBase)
1639 {
1643 
1644  if (binningName.empty()) {
1645  binningName = fBinning->GetCurrentDefinitionName();
1646  }
1647 
1648  THnSparse * hns = (THnSparse *)fOutputs[binningName]->FindObject("resource_monitor");
1649  if (!hns) {
1650  NLogError("NGnTree::Draw: Resource monitor THnSparse not found in outputs !!!");
1651  return nullptr;
1652  }
1653  hns->Print("all");
1654  // return nullptr;
1655 
1656  auto ngnt = new NGnTree(hns, "stat", "/tmp/hnst_imported_for_drawing.root");
1657  if (ngnt->IsZombie()) {
1658  NLogError("NGnTree::GetResourceStatisticsNavigator: Failed to import resource monitor THnSparse !!!");
1659  return nullptr;
1660  }
1661  ngnt->Print();
1662  ngnt->Close();
1663 
1664  // return nullptr;
1665  auto ngnt2 = NGnTree::Open("/tmp/hnst_imported_for_drawing.root");
1666  auto nav = ngnt2->Reshape("default", levels, level, ranges, rangesBase);
1667  // // nav->Export("/tmp/hnst_imported_for_drawing.json", {}, "ws://localhost:8080/ws/root.websocket");
1668  // // nav->Draw();
1669  return nav;
1670 }
1671 
1672 bool NGnTree::InitParameters(const std::vector<std::string> & paramNames)
1673 {
1677 
1678  if (fParameters) {
1679  NLogTrace("NGnTree::InitParameters: Replacing existing parameters ...");
1680  delete fParameters;
1681  }
1682 
1683  if (paramNames.empty()) {
1684  NLogTrace("NGnTree::InitParameters: No parameter names provided, skipping ...");
1685  return false;
1686  }
1687 
1688  fParameters = new NParameters(paramNames, "results", "Results");
1689 
1690  return true;
1691 }
1692 
1693 NGnTree * NGnTree::Import(const std::string & findPath, const std::string & fileName,
1694  const std::vector<std::string> & headers, const std::string & outputFile, bool close)
1695 {
1699 
1700  // remove trailing slash from findPath if exists
1701  std::string findPathClean = findPath;
1702  if (!findPathClean.empty() && findPathClean.back() == '/') {
1703  findPathClean.pop_back();
1704  }
1705 
1706  std::vector<std::string> paths = NUtils::Find(findPathClean, fileName);
1707  NLogInfo("NGnTree::Import: Found %zu files to import ...", paths.size());
1708 
1709  TObjArray * ngntArray = NUtils::AxesFromDirectory(paths, findPathClean, fileName, headers);
1710  int nDirAxes = ngntArray->GetEntries();
1711 
1712  NGnTree * ngntFirst = NGnTree::Open(paths[0]);
1713  // Add all axes from ngntFirst to ngntArray
1714  for (const auto & axis : ngntFirst->GetBinning()->GetAxes()) {
1715  ngntArray->Add(axis->Clone());
1716  }
1717  ngntFirst->Close(false);
1718 
1719  std::map<std::string, std::vector<std::vector<int>>> b;
1720 
1721  for (int i = 0; i < ngntArray->GetEntries(); i++) {
1722  TAxis * axis = (TAxis *)ngntArray->At(i);
1723  b[axis->GetName()].push_back({1});
1724  }
1725 
1726  NGnTree * ngnt = new NGnTree(ngntArray, outputFile);
1727  ngnt->SetIsPureCopy(true);
1728 
1729  // return nullptr;
1730  ngnt->GetBinning()->AddBinningDefinition("default", b);
1731 
1732  json cfg;
1733  cfg["basedir"] = findPathClean;
1734  cfg["filename"] = fileName;
1735  cfg["nDirAxes"] = nDirAxes;
1736  cfg["headers"] = headers;
1737  // cfg["ndmspc"]["shared"]["currentFileName"] = "";
1738  Ndmspc::NGnProcessFuncPtr processFunc = [](Ndmspc::NBinningPoint * point, TList * /*output*/, TList * outputPoint,
1739  int /*threadId*/) {
1740  // point->Print();
1741 
1742  json cfg = point->GetCfg();
1743  std::string filename = cfg["basedir"].get<std::string>();
1744  filename += "/";
1745  for (auto & header : cfg["headers"]) {
1746  filename += point->GetBinLabel(header.get<std::string>());
1747  filename += "/";
1748  }
1749  // filename += "/";
1750  // filename += point->GetBinLabel("c");
1751  // filename += point->GetBinLabel("year");
1752  // filename += "/";
1753  filename += cfg["filename"].get<std::string>();
1754  NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1755  if (!ngnt || filename.compare(ngnt->GetStorageTree()->GetFileName()) != 0) {
1756  NLogInfo("NGnTree::Import: Opening file '%s' ...", filename.c_str());
1757  if (ngnt) {
1758  NLogDebug("NGnTree::Import: Closing previously opened file '%s' ...",
1759  ngnt->GetStorageTree()->GetFileName().c_str());
1760  ngnt->Close(false);
1761  // delete ngnt;
1762  point->SetTempObject("file", nullptr);
1763  }
1764  ngnt = NGnTree::Open(filename.c_str());
1765  if (!ngnt || ngnt->IsZombie()) {
1766  NLogError("NGnTree::Import: Cannot open file '%s'", filename.c_str());
1767  return;
1768  }
1769  point->SetTempObject("file", ngnt);
1770  }
1771 
1772  int nDirAxes = cfg["nDirAxes"].get<int>();
1773  Int_t * coords = point->GetCoords();
1774  std::string coordsStr = NUtils::GetCoordsString(NUtils::ArrayToVector(coords, point->GetNDimensionsContent()));
1775  NLogInfo("NGnTree::Import: Processing point with coords %s ...", coordsStr.c_str());
1776 
1777  Long64_t entryNumber =
1778  ngnt->GetBinning()->GetContent()->GetBin(&coords[3 * nDirAxes], kFALSE); // skip first 3 dir axes
1779  NLogInfo("NGnTree::Import: Corresponding entry number in file '%s' is %lld", filename.c_str(), entryNumber);
1780 
1781  ngnt->GetEntry(entryNumber);
1782 
1783  // // add outputPoint content to outputPoint list
1784  // TList * inputOutputPoint = (TList *)ngnt->GetStorageTree()->GetBranch("_outputPoint")->GetObject();
1785  // for (int i = 0; i < inputOutputPoint->GetEntries(); i++) {
1786  // outputPoint->Add(inputOutputPoint->At(i));
1787  // }
1788 
1789  // set all branches from ngnt to branch addresses in current object
1790  for (const auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
1791  // check if branch exists in current storage tree
1792  if (point->GetStorageTree()->GetBranch(kv.first) == nullptr) {
1793  NLogTrace("NGnTree::Import: Adding branch '%s' to storage tree ...", kv.first.c_str());
1794  point->GetStorageTree()->AddBranch(kv.first, nullptr, kv.second.GetObjectClassName());
1795  }
1796  NLogTrace("NGnTree::Import: Setting branch address for branch '%s' ...", kv.first.c_str());
1797  point->GetTreeStorage()->GetBranch(kv.first)->SetAddress(kv.second.GetObject());
1798  }
1799  outputPoint->Add(new TNamed("source_file", filename));
1800 
1801  // ngnt->Print();
1802 
1803  // NLogInfo("NGnTree::Import: nDirAxes=%d ...", cfg["nDirAxes"].get<int>());
1804 
1805  // json & tempCfg = point->GetTempCfg();
1806  // if (tempCfg["test"].is_null()) {
1807  // NLogInfo("Setting temp cfg test value to 42");
1808  // tempCfg["test"] = 42;
1809  // }
1810  // NLogInfo("Temp cfg test value: %d", tempCfg["test"].get<int>());
1811 
1812  // f->ls();
1813  };
1814  Ndmspc::NGnBeginFuncPtr beginFunc = [](Ndmspc::NBinningPoint * /*point*/, int /*threadId*/) {
1815  TH1::AddDirectory(kFALSE); // Prevent histograms from being associated with the current directory
1816  };
1817 
1818  Ndmspc::NGnEndFuncPtr endFunc = [](Ndmspc::NBinningPoint * point, int /*threadId*/) {
1819  NGnTree * ngnt = (NGnTree *)point->GetTempObject("file");
1820  if (ngnt) {
1821  NLogDebug("NGnTree::Import: Closing last file '%s' ...", ngnt->GetStorageTree()->GetFileName().c_str());
1822  // ngnt->Close(false);
1823  // delete ngnt;
1824  point->SetTempObject("file", nullptr);
1825  }
1826  };
1827 
1828  ngnt->Process(processFunc, cfg, "", beginFunc, endFunc);
1829  if (close) {
1830  ngnt->Close(true);
1831  delete ngnt;
1832  ngnt = NGnTree::Open(outputFile.c_str());
1833  }
1834  return ngnt;
1835 }
1836 
1837 } // namespace Ndmspc
Defines binning mapping and content for NDMSPC histograms.
Definition: NBinningDef.h:26
THnSparse * GetContent() const
Get the template content histogram.
Definition: NBinningDef.h:118
virtual void Print(Option_t *option="") const
Print binning definition information.
void RefreshIdsFromContent()
Refresh bin IDs from content histogram.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Definition: NBinningDef.h:93
Represents a single point in multi-dimensional binning.
Definition: NBinningPoint.h:21
Double_t GetBinMax(std::string axis) const
Get the maximum value for a specific axis.
std::string GetString(const std::string &prefix="", bool all=false) const
Returns a string representation of the binning point.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
std::string GetBinLabel(std::string axis) const
Get the label for a specific axis.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Long64_t GetEntryNumber() const
Get entry number for the point.
NParameters * GetParameters() const
Get the parameters associated with this binning point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
Definition: NBinningPoint.h:67
Double_t GetBinCenter(std::string axis) const
Returns the center value of the bin along the specified axis.
NStorageTree * GetTreeStorage() const
Get pointer to storage tree object.
void SetTempObject(const std::string &name, TObject *obj)
Set a temporary object with the given name.
NStorageTree * GetStorageTree() const
Returns a pointer to the associated storage tree.
Definition: NBinningPoint.h:90
virtual void Print(Option_t *option="") const
Print binning point information.
TObject * GetTempObject(const std::string &name) const
Retrieve a temporary object by name.
NGnTree * GetInput() const
Get pointer to input NGnTree object.
Int_t GetBin(std::string axis) const
Returns the bin index for the specified axis.
Int_t * GetCoords() const
Get pointer to content coordinates array.
Definition: NBinningPoint.h:55
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Definition: NBinningPoint.h:49
Double_t GetBinMin(std::string axis) const
Get the minimum value for a specific axis.
NBinning object for managing multi-dimensional binning and axis definitions.
Definition: NBinning.h:45
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
Definition: NBinning.cxx:1024
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
Definition: NBinning.h:270
std::string GetCurrentDefinitionName() const
Get current definition name.
Definition: NBinning.h:276
NBinningPoint * GetPoint()
Get the current binning point.
Definition: NBinning.cxx:1128
virtual void Print(Option_t *option="") const
Print binning information.
Definition: NBinning.cxx:245
std::vector< TAxis * > GetAxes() const
Get vector of axis pointers.
Definition: NBinning.h:223
bool SetCfg(const json &cfg)
Set configuration from JSON.
Definition: NBinning.cxx:1174
void AddBinningDefinition(std::string name, std::map< std::string, std::vector< std::vector< int >>> binning, bool forceDefault=false)
Add a binning definition.
Definition: NBinning.cxx:1053
void Reset()
Reset the binning object to initial state.
Definition: NBinning.cxx:78
Executes a function over all points in an N-dimensional space, optionally in parallel.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
Navigator object for managing hierarchical data structures and projections.
Definition: NGnNavigator.h:22
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, size_t level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
void SetGnTree(NGnTree *tree)
Set NGnTree object pointer.
Definition: NGnNavigator.h:184
Thread-local data object for NDMSPC processing.
Definition: NGnThreadData.h:19
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
void SetResultsFilename(const std::string &filename)
Set the results filename for TCP mode (shared filesystem path).
Definition: NGnThreadData.h:98
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
Definition: NGnThreadData.h:65
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
void SetCfg(const json &cfg)
Set configuration JSON object.
Definition: NGnThreadData.h:83
NDMSPC tree object for managing multi-dimensional data storage and processing.
Definition: NGnTree.h:75
NBinning * GetBinning() const
Get pointer to binning object.
Definition: NGnTree.h:161
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
Definition: NGnTree.h:179
virtual void Draw(Option_t *option="") override
Draws the tree object.
Definition: NGnTree.cxx:492
void SetIsPureCopy(bool val)
Sets the pure copy status of the tree.
Definition: NGnTree.h:230
bool Close(bool write=false)
Close the tree, optionally writing data.
Definition: NGnTree.cxx:1326
bool fOwnsTreeStorage
True when fTreeStorage is owned by this instance.
Definition: NGnTree.h:393
virtual ~NGnTree()
Destructor.
Definition: NGnTree.cxx:453
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
Definition: NGnTree.cxx:1340
virtual void Print(Option_t *option="") const override
Print tree information.
Definition: NGnTree.cxx:468
void SetInput(NGnTree *input)
Set input NGnTree pointer.
Definition: NGnTree.h:204
NGnTree()
Default constructor.
Definition: NGnTree.cxx:157
static NGnTree * Import(const std::string &findPath, const std::string &fileName, const std::vector< std::string > &headers, const std::string &outFileName="/tmp/ngnt_imported.root", bool close=true)
Imports an NGnTree from a specified file.
Definition: NGnTree.cxx:1693
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
Definition: NGnTree.cxx:1623
NBinning * fBinning
Binning object.
Definition: NGnTree.h:386
TList * GetOutput(std::string name="")
Get output list by name.
Definition: NGnTree.cxx:1206
NParameters * GetParameters() const
Returns the parameters associated with this tree.
Definition: NGnTree.h:329
NGnTree * fInput
Input NGnTree for processing.
Definition: NGnTree.h:389
bool fOwnsBinning
True when fBinning is owned by this instance.
Definition: NGnTree.h:392
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
Definition: NGnTree.h:173
NGnTree * GetInput() const
Get pointer to input NGnTree.
Definition: NGnTree.h:198
std::string fWorkerMacroList
Comma-separated macro paths sent to TCP workers.
Definition: NGnTree.h:395
void Play(int timeout=0, std::string binning="", std::vector< int > outputPointIds={0}, std::vector< std::vector< int >> ranges={}, Option_t *option="")
Play tree data with optional binning and output point IDs.
Definition: NGnTree.cxx:1364
void SetOutputs(std::map< std::string, TList * > outputs)
Set outputs map.
Definition: NGnTree.h:192
bool InitParameters(const std::vector< std::string > &paramNames)
Initializes the parameters for the tree using the provided parameter names.
Definition: NGnTree.cxx:1672
std::map< std::string, TList * > fOutputs
Outputs.
Definition: NGnTree.h:388
NGnNavigator * fNavigator
! Navigator object
Definition: NGnTree.h:390
NParameters * fParameters
Parameters object.
Definition: NGnTree.h:391
TList * Projection(const json &cfg, std::string binningName="")
Project tree data using configuration and binning name.
Definition: NGnTree.cxx:1521
NGnNavigator * GetResourceStatisticsNavigator(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Returns a navigator for resource statistics based on binning and levels.
Definition: NGnTree.cxx:1636
NStorageTree * fTreeStorage
Tree storage.
Definition: NGnTree.h:387
static std::string BuildObjectPath(const json &cfg, const json &objCfg, const NBinningPoint *point)
Helper: build object path string from configuration and a binning point.
Definition: NGnTree.cxx:43
void SetNavigator(NGnNavigator *navigator)
Sets the navigator for this tree.
Definition: NGnTree.cxx:1312
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Definition: NGnTree.cxx:1222
bool Process(NGnProcessFuncPtr func, const json &cfg=json::object(), std::string binningName="", NGnBeginFuncPtr beginFunc=nullptr, NGnEndFuncPtr endFunc=nullptr)
Process tree data using a function pointer and configuration.
Definition: NGnTree.cxx:501
static bool GetConsoleOutput()
Get console output flag.
Definition: NLogger.h:548
NParameters object.
Definition: NParameters.h:13
bool SetParameter(int bin, Double_t value, Double_t error=0.)
Set the value and error of a parameter by bin index.
Definition: NParameters.cxx:55
NDMSPC storage tree object for managing ROOT TTree-based data storage.
Definition: NStorageTree.h:22
void SetBranchAddresses()
Set addresses for all branches.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
bool SetFileTree(TFile *file, TTree *tree)
Tree handling.
std::string GetFileName() const
Get file name.
Definition: NStorageTree.h:194
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
Long64_t GetEntry(Long64_t entry, NBinningPoint *point=nullptr, bool checkBinningDef=false)
Get entry by index and fill NBinningPoint.
std::string GetPrefix() const
Get prefix path.
Definition: NStorageTree.h:200
TObject * GetBranchObject(const std::string &name)
Get pointer to branch object by name.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
std::string GetPostfix() const
Get postfix path.
Definition: NStorageTree.h:206
bool Close(bool write=false, std::map< std::string, TList * > outputs={})
Close the storage tree, optionally writing outputs.
TTree * GetTree() const
Get pointer to TTree object.
Definition: NStorageTree.h:188
virtual void Print(Option_t *option="") const
Print storage tree information.
void SetBinning(NBinning *binning)
Set binning object pointer.
Definition: NStorageTree.h:182
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
Definition: NStorageTree.h:129
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
Definition: NTreeBranch.cxx:59
TObject * GetObject() const
Get object pointer.
Definition: NTreeBranch.h:81
static bool SetAxisRanges(THnSparse *sparse, std::vector< std::vector< int >> ranges={}, bool withOverflow=false, bool modifyTitle=false, bool reset=true)
Set axis ranges for THnSparse using vector of ranges.
Definition: NUtils.cxx:1220
static TH1 * ProjectTHnSparse(THnSparse *hns, const std::vector< int > &axes, Option_t *option="")
Project a THnSparse histogram onto specified axes.
Definition: NUtils.cxx:1167
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
Definition: NUtils.cxx:1073
static std::string FormatTime(long long seconds)
Format time in seconds to human-readable string.
Definition: NUtils.cxx:1660
static bool EnableMT(Int_t numthreads=-1)
Enable multi-threading with specified number of threads.
Definition: NUtils.cxx:46
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
Definition: NUtils.cxx:1111
static void ProgressBar(int current, int total, std::string prefix="", std::string suffix="", int barWidth=50)
Display progress bar.
Definition: NUtils.cxx:1673
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
Definition: NUtils.cxx:1588
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
Definition: NUtils.cxx:1551
static TObjArray * AxesFromDirectory(const std::vector< std::string > paths, const std::string &findPath, const std::string &fileName, const std::vector< std::string > &axesNames)
Creates an array of axes objects from files in specified directories.
Definition: NUtils.cxx:1396
static std::vector< std::string > Find(std::string path, std::string filename="")
Find files in a path matching filename.
Definition: NUtils.cxx:963