9 #include <TDirectory.h>
13 #include <THnSparse.h>
18 #include <TObjString.h>
20 #include <TBufferJSON.h>
23 #include "NParameters.h"
24 #include "NStorageTree.h"
26 #include "NBinningDef.h"
27 #include "NDimensionalExecutor.h"
28 #include "NDimensionalIpcRunner.h"
29 #include "NGnThreadData.h"
31 #include "NTreeBranch.h"
33 #include "NStorageTree.h"
34 #include "NGnNavigator.h"
45 std::string objPath =
"";
46 if (objCfg.contains(
"prefix") && objCfg[
"prefix"].is_string()) {
47 objPath = objCfg[
"prefix"].get<std::string>();
50 std::string axisObjectDefaultFormat =
51 cfg[
"axisObjectDefaultFormat"].is_string() ? cfg[
"axisObjectDefaultFormat"].get<std::string>() :
"%.2f_%.2f";
52 std::string axisDefaultSeparator =
53 cfg[
"axisDefaultSeparator"].is_string() ? cfg[
"axisDefaultSeparator"].get<std::string>() :
"/";
56 for (
auto & axisEntry : cfg[
"axes"]) {
61 if (axisEntry.is_string()) {
62 axisName = axisEntry.get<std::string>();
63 if (axisObjectDefaultFormat.empty()) {
68 format = axisObjectDefaultFormat;
71 else if (axisEntry.is_object()) {
72 if (axisEntry.contains(
"name") && axisEntry[
"name"].is_string()) {
73 axisName = axisEntry[
"name"].get<std::string>();
78 if (axisEntry.contains(
"mode") && axisEntry[
"mode"].is_string()) {
79 mode = axisEntry[
"mode"].get<std::string>();
81 if (axisEntry.contains(
"format") && axisEntry[
"format"].is_string()) {
82 format = axisEntry[
"format"].get<std::string>();
90 if (axisObjectDefaultFormat.empty())
97 format = axisObjectDefaultFormat.empty() ?
"%.2f_%.2f" : axisObjectDefaultFormat;
98 else if (mode ==
"bin")
104 if (mode ==
"minmax") {
107 objPath += TString::Format(format.c_str(), min, max).Data();
109 else if (mode ==
"min") {
111 objPath += TString::Format(format.c_str(), min).Data();
113 else if (mode ==
"max") {
115 objPath += TString::Format(format.c_str(), max).Data();
117 else if (mode ==
"center") {
119 objPath += TString::Format(format.c_str(), c).Data();
121 else if (mode ==
"label") {
125 else if (mode ==
"bin") {
126 objPath += std::to_string(point->
GetBin(axisName));
129 objPath += std::to_string(point->
GetBin(axisName));
132 std::string sep = axisDefaultSeparator;
133 if (axisEntry.is_object() && axisEntry.contains(
"sufix") && axisEntry[
"sufix"].is_string()) {
134 sep = axisEntry[
"sufix"].get<std::string>();
140 if (!lastSep.empty() && objPath.size() >= lastSep.size()) {
141 objPath = objPath.substr(0, objPath.size() - lastSep.size());
143 if (objCfg.contains(
"sufix") && objCfg[
"sufix"].is_string()) {
144 objPath += objCfg[
"sufix"].get<std::string>();
159 NGnTree::NGnTree(std::vector<TAxis *> axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
165 NLogError(
"NGnTree::NGnTree: No axes provided, binning is nullptr.");
176 NGnTree::NGnTree(TObjArray * axes, std::string filename, std::string treename) : TObject(), fInput(nullptr)
182 if (axes ==
nullptr) {
183 NLogError(
"NGnTree::NGnTree: Axes TObjArray is nullptr.");
188 if (axes ==
nullptr && axes->GetEntries() == 0) {
189 NLogError(
"NGnTree::NGnTree: No axes provided, binning is nullptr.");
205 if (ngnt ==
nullptr) {
206 NLogError(
"NGnTree::NGnTree: NGnTree is nullptr.");
212 NLogError(
"NGnTree::NGnTree: Binning in NGnTree is nullptr.");
226 : TObject(), fBinning(b), fTreeStorage(s), fInput(nullptr), fOwnsBinning(false), fOwnsTreeStorage(false)
232 NLogError(
"NGnTree::NGnTree: Binning is nullptr.");
242 NLogError(
"NGnTree::NGnTree: Storage tree is nullptr.");
254 NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis,
const std::string & outFileName, json cfg)
255 : TObject(), fInput(nullptr)
261 std::map<std::string, std::vector<std::vector<int>>> b;
262 TObjArray * axes =
new TObjArray();
263 int parameterAxisIdx = -1;
264 std::vector<std::string> labels;
265 for (
int i = 0; i < hns->GetNdimensions(); i++) {
266 TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267 TAxis * axis = (TAxis *)axisIn->Clone();
270 if (parameterAxis.compare(axis->GetName()) == 0) {
271 parameterAxisIdx = i;
272 TAxis * axis = hns->GetAxis(parameterAxisIdx);
273 for (
int bin = 1; bin <= axis->GetNbins(); bin++) {
275 labels.push_back(axis->GetBinLabel(bin));
281 if (axisIn->IsAlphanumeric()) {
282 NLogTrace(
"Setting axis '%s' labels from input THnSparse", axis->GetName());
283 for (
int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284 std::string label = axisIn->GetBinLabel(bin);
285 if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
290 b[axis->GetName()] = {{1}};
294 cfg[
"_parameterAxis"] = parameterAxisIdx;
295 cfg[
"_labels"] = labels;
298 NLogDebug(
"Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
301 NLogDebug(
"Created NGnTree for THnSparse import ...");
302 if (ngnt->IsZombie()) {
303 NLogError(
"NGnTree::Import: Failed to create NGnTree !!!");
309 const char * tmpDirStr = gSystem->Getenv(
"NDMSPC_TMP_DIR");
313 if (!tmpDirStr || tmpDir.empty()) {
316 std::string tmpFilename = tmpDir +
"/ngnt_imported_input" + std::to_string(gSystem->GetPid()) +
".root";
318 if (ngntIn->IsZombie()) {
319 NLogError(
"NGnTree::Import: Failed to create NGnTree for input !!!");
325 ngntIn->
GetOutput(
"default")->Add(hns->Clone(
"test"));
336 ngnt->
InitParameters(cfg[
"_labels"].get<std::vector<std::string>>());
338 Ndmspc::NGnProcessFuncPtr processFunc = [](
Ndmspc::NBinningPoint * point, TList * , TList * outputPoint,
341 TH1::AddDirectory(kFALSE);
343 json cfg = point->
GetCfg();
347 NLogError(
"NGnTree::Import: Input NGnTree is nullptr !!!");
352 THnSparse * hns = (THnSparse *)ngntIn->
GetOutput(
"default")->At(0);
353 if (hns ==
nullptr) {
354 NLogError(
"NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
358 int axisIdx = cfg[
"_parameterAxis"].get<
int>();
359 std::vector<std::vector<int>> ranges;
362 for (
int i = 0; i < hns->GetNdimensions(); i++) {
363 if (i == axisIdx)
continue;
365 ranges.push_back({i, coord, coord});
370 TH1 * h = hns->Projection(axisIdx,
"O");
372 NLogError(
"NGnTree::Import: Projection of THnSparse failed !!!");
375 if (h->GetEntries() > 0) {
378 for (
int bin = 1; bin <= h->GetNbinsX(); bin++) {
379 params->
SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
385 std::string filename = cfg[
"filename"].get<std::string>();
387 if (!f || filename.compare(f->GetName()) != 0) {
389 NLogDebug(
"NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
392 NLogDebug(
"NGnTree::Import: Opening file '%s' ...", filename.c_str());
393 f = TFile::Open(filename.c_str());
394 if (!f || f->IsZombie()) {
395 NLogError(
"NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
401 std::string axisObjectDefaultFormat =
402 cfg[
"axisObjectDefaultFormat"].is_string() ? cfg[
"axisObjectDefaultFormat"].get<std::string>() :
"%.2f_%.2f";
403 std::string axisDefaultSeparator =
404 cfg[
"axisDefaultSeparator"].is_string() ? cfg[
"axisDefaultSeparator"].get<std::string>() :
"/";
406 if (cfg.contains(
"dryrun") && cfg[
"dryrun"].is_boolean()) {
407 dryrun = cfg[
"dryrun"].get<
bool>();
411 NLogInfo(
"NGnTree::Import (dryrun): '%s' ...", point->
GetString().c_str());
415 for (
auto & [objName, objCfg] : cfg[
"objects"].items()) {
419 NLogInfo(
"NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
424 NLogInfo(
"NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425 cfg[
"filename"].get<std::string>().c_str());
427 TObject * obj = f->Get(objPath.c_str());
430 NLogWarning(
"NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431 cfg[
"filename"].get<std::string>().c_str());
436 if (obj->InheritsFrom(TCanvas::Class())) {
437 TCanvas * cObj = (TCanvas *)obj;
438 cObj->SetName(objName.c_str());
440 outputPoint->Add(obj->Clone(objName.c_str()));
447 ngnt->
Process(processFunc, cfg);
450 gSystem->Exec(TString::Format(
"rm -f %s", tmpFilename.c_str()));
474 TString opt = option;
477 NLogInfo(
"NGnTree::Print: Printing NGnTree object [ALL] ...");
482 NLogError(
"Binning is not initialized in NGnTree !!!");
488 NLogError(
"Storage tree is not initialized in NGnTree !!!");
498 NLogInfo(
"NGnTree::Draw: Drawing NGnTree object [not implemented yet]...");
501 bool NGnTree::Process(NGnProcessFuncPtr func,
const json & cfg, std::string binningName, NGnBeginFuncPtr beginFunc,
502 NGnEndFuncPtr endFunc)
509 NLogError(
"Binning is not initialized in NGnTree !!!");
516 if (!binningName.empty()) {
518 if (std::find(defNames.begin(), defNames.end(), binningName) == defNames.end()) {
519 NLogError(
"Binning definition '%s' not found in NGnTree !!!", binningName.c_str());
523 defNames.push_back(binningName);
528 bool rc =
Process(func, defNames, cfg, binningIn, beginFunc, endFunc);
530 NLogError(
"NGnTree::Process: Processing failed !!!");
537 bool NGnTree::Process(NGnProcessFuncPtr func,
const std::vector<std::string> & defNames,
const json & cfg,
538 NBinning * binningIn, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc)
544 NLogInfo(
"NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545 bool batch = gROOT->IsBatch();
546 gROOT->SetBatch(kTRUE);
547 TH1::AddDirectory(kFALSE);
550 if (
const char * workerEndpoint = gSystem->Getenv(
"NDMSPC_WORKER_ENDPOINT")) {
551 size_t workerIndex = 0;
552 if (
const char * envIdx = gSystem->Getenv(
"NDMSPC_WORKER_INDEX")) {
553 try { workerIndex =
static_cast<size_t>(std::stoul(envIdx)); }
catch (...) {}
555 NLogInfo(
"NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
557 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
558 std::string workerBase = tmpDirEnv ? tmpDirEnv :
"/tmp";
563 void * ctx = zmq_ctx_new();
564 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
565 const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
566 zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
567 int timeoutMs = 1000;
568 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
569 zmq_connect(dealer, workerEndpoint);
570 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"READY"});
573 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
576 std::vector<std::string> frames;
577 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
578 if (errno == EAGAIN || errno == EWOULDBLOCK) {
579 if (std::chrono::steady_clock::now() > initDeadline)
break;
585 if (frames.size() >= 1 && frames[0] ==
"STOP") {
586 NLogPrint(
"NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
589 gROOT->SetBatch(batch);
592 if (frames.size() >= 5 && frames[0] ==
"INIT") {
593 workerIndex =
static_cast<size_t>(std::stoul(frames[1]));
594 const std::string & sessionId = frames[2];
595 const std::string & initResultsDir = frames[3];
596 const std::string & initTreeName = frames[4];
599 if (frames.size() >= 7) {
600 if (!frames[5].empty())
601 gSystem->Setenv(
"NDMSPC_TMP_DIR", frames[5].c_str());
602 if (!frames[6].empty())
603 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
606 if (!gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR")[0] ==
'\0') {
607 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
608 if (tmpDirEnv && tmpDirEnv[0] !=
'\0')
609 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
613 const char * localTmpEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
614 const std::string localBase = localTmpEnv ? localTmpEnv :
"/tmp";
615 const std::string localFile = localBase +
"/.ndmspc/tmp/" + sessionId +
"/" +
619 const std::string resultsFile = initResultsDir +
"/" + std::to_string(workerIndex) +
"/" +
622 bool rc = workerData.
Init(workerIndex, func, beginFunc, endFunc,
this, binningIn,
fInput, localFile, initTreeName);
624 NLogError(
"NGnTree::Process: Worker failed to initialize NGnThreadData");
630 if (resultsFile != localFile) {
634 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"ACK"});
639 NLogError(
"NGnTree::Process: Worker did not receive INIT from supervisor");
645 Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
648 gROOT->SetBatch(batch);
655 int nThreads = ROOT::GetThreadPoolSize();
656 if (nThreads < 1) nThreads = 1;
658 std::string executionMode =
"thread";
659 const char * envMode = gSystem->Getenv(
"NDMSPC_EXECUTION_MODE");
660 const bool modeExplicit = (envMode && envMode[0] !=
'\0');
662 executionMode = envMode;
665 std::string normalizedMode = executionMode;
666 std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
667 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
668 if (normalizedMode ==
"process") normalizedMode =
"ipc";
670 bool useProcessIpc = (normalizedMode ==
"ipc" || normalizedMode ==
"tcp");
671 bool useTcp = (normalizedMode ==
"tcp");
672 size_t nProcesses =
static_cast<size_t>(nThreads);
673 bool ndmspcNProcExplicit =
false;
675 if (
const char * envNdmspcNProc = gSystem->Getenv(
"NDMSPC_MAX_PROCESSES")) {
676 ndmspcNProcExplicit =
true;
678 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNdmspcNProc)));
681 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
685 else if (
const char * envNProc = gSystem->Getenv(
"ROOT_MAX_THREADS")) {
688 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNProc)));
691 NLogWarning(
"NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
698 if (normalizedMode ==
"thread") {
699 useProcessIpc =
false;
702 else if (normalizedMode ==
"tcp") {
703 useProcessIpc =
true;
706 else if (normalizedMode ==
"ipc") {
707 useProcessIpc =
true;
711 NLogWarning(
"NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
712 executionMode.c_str());
713 useProcessIpc = (nProcesses > 1);
717 else if (nProcesses > 1) {
718 useProcessIpc =
true;
720 executionMode =
"ipc";
721 normalizedMode =
"ipc";
724 if (ndmspcNProcExplicit && normalizedMode ==
"thread" && nProcesses > 1) {
725 NLogWarning(
"NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
729 const size_t workerObjectCount = useProcessIpc ? std::max(
static_cast<size_t>(nThreads), nProcesses)
730 :
static_cast<size_t>(nThreads);
731 std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
733 NLogInfo(
"NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
734 executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
736 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
738 if (tmpDirEnv && tmpDirEnv[0] !=
'\0') {
743 if (!(tmpDirPrefix.BeginsWith(
"root://") || tmpDirPrefix.BeginsWith(
"http://") ||
744 tmpDirPrefix.BeginsWith(
"https://"))) {
745 tmpDir = tmpDirPrefix.Data();
747 if (tmpDir.empty()) tmpDir =
"/tmp";
750 std::string jobDir = tmpDir +
"/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
754 const char * resultsDirEnv = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
755 const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
756 std::string resultsDir = sameDir ? jobDir
757 : (std::string(resultsDirEnv) +
"/" +
758 std::to_string(gSystem->GetPid()));
760 std::string filePrefix = jobDir;
761 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
763 bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc,
this, binningIn,
fInput, filename,
766 NLogError(
"Failed to initialize thread data %zu, exiting ...", i);
769 threadDataVector[i].SetCfg(cfg);
775 threadDataVector[i].SetResultsFilename(resultsFile);
778 size_t processedEntries = 0;
779 size_t totalEntries = 0;
780 auto start_par = std::chrono::high_resolution_clock::now();
781 auto start_par_job = std::chrono::high_resolution_clock::now();
786 thread_obj.Process(coords);
789 size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
790 : totalEntries - processedEntries;
791 NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format(
"R%4zu", nRunning).Data());
798 std::vector<Ndmspc::NThreadData *> processWorkers;
799 std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
801 processWorkers.reserve(threadDataVector.size());
802 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
803 processWorkers.push_back(&threadDataVector[i]);
805 ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
807 const char * tcpPort = gSystem->Getenv(
"NDMSPC_TCP_PORT");
808 std::string tcpEndpoint = std::string(
"tcp://0.0.0.0:") + (tcpPort ? tcpPort :
"5555");
809 const char * resultsDirBase = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
813 if (workerMacro.empty()) {
814 if (
const char * envMacro = gSystem->Getenv(
"NDMSPC_MACRO")) workerMacro = envMacro;
816 ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
818 resultsDirBase ? resultsDirBase :
"");
820 ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
824 std::map<std::string, std::vector<Long64_t>>
825 defIdMapProcessedRemoved;
838 for (
auto & name : defNames) {
841 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
845 if (binningDef->GetIds().size() == 0) {
846 NLogWarning(
"NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
850 const std::vector<Long64_t> originalDefinitionIds = binningDef->GetIds();
852 std::vector<int> mins, maxs;
854 maxs.push_back(binningDef->GetIds().size() - 1);
855 NLogDebug(
"NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
856 binningDef->GetIds().size());
859 NLogInfo(
"NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
862 Printf(
"Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
864 start_par = std::chrono::high_resolution_clock::now();
865 processedEntries = 0;
866 totalEntries = maxs[0] + 1;
867 const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
868 : threadDataVector.size();
871 TString::Format(
"R%4zu", activeWorkers).Data());
874 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
876 threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
881 if (!useProcessIpc) {
885 Bool_t prevMustClean = gROOT->MustClean();
886 gROOT->SetMustClean(kFALSE);
893 Bool_t prevBatch = gROOT->IsBatch();
894 gROOT->SetBatch(kTRUE);
903 gROOT->SetMustClean(prevMustClean);
904 gROOT->SetBatch(prevBatch);
907 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
911 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
912 threadDataVector[i].ExecuteEndFunction();
916 ipcExecutor->SetBounds(mins, maxs);
917 size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
918 name, &originalDefinitionIds, [&, activeWorkers](
size_t ackCount,
size_t activeWorkersNow) {
919 processedEntries = ackCount;
921 size_t nRunning = std::min(activeWorkersNow, activeWorkers);
923 TString::Format(
"R%4zu", nRunning).Data());
926 processedEntries = acked;
930 const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, processWorkers.size()));
931 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
932 auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
934 workerDef->GetIds().clear();
938 for (
size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
939 const size_t workerIndex = taskIndex % processesToUse;
940 threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
942 auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
944 workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
954 Printf(
"Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
956 NLogDebug(
"NGnTree::Process: [BEGIN] ------------------------------------------------");
959 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
960 NLogDebug(
"NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
964 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
965 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
970 for (
size_t i = 0; i < defNames.size(); i++) {
972 std::string other_name = defNames[i];
978 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
982 for (
auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
983 NLogTrace(
"NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
984 other_name.c_str(), sumIds);
986 NLogTrace(
"NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
987 defIdMapProcessedRemoved[other_name].push_back(*it);
988 it = otherDef->GetIds().erase(it);
1000 NLogDebug(
"NGnTree::Process: [END] ------------------------------------------------");
1003 catch (
const std::exception & ex) {
1005 ipcExecutor->FinishProcessIpc(
true);
1006 ipcExecutor.reset();
1009 TString what(ex.what());
1010 if (what.Contains(
"Interrupted by user")) {
1012 gROOT->SetInterrupt(kFALSE);
1014 NLogWarning(
"NGnTree::Process: Interrupted by user, stopping processing.");
1017 NLogError(
"NGnTree::Process: Processing failed: %s", ex.what());
1024 auto end_par = std::chrono::high_resolution_clock::now();
1025 std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1028 ipcExecutor->FinishProcessIpc();
1033 const std::set<size_t> registeredWorkers =
1034 (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1035 const bool filterByRegistered = !registeredWorkers.empty();
1038 Printf(
"NGnTree::Process: Execution completed and it took %s .",
1042 NLogInfo(
"NGnTree::Process: Execution completed and it took %s .",
1046 NLogInfo(
"NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1047 for (
auto & data : threadDataVector) {
1048 if (useProcessIpc) {
1049 NLogTrace(
"NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1050 data.GetAssignedIndex());
1054 NLogTrace(
"NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1055 data.GetHnSparseBase()->GetStorageTree()->Close(
true);
1059 NLogDebug(
"NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1061 Printf(
"NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1063 const auto mergeStart = std::chrono::high_resolution_clock::now();
1064 TList * mergeList =
new TList();
1066 outputData->
Init(0, func,
nullptr,
nullptr,
this, binningIn);
1070 for (
auto & data : threadDataVector) {
1071 if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1072 NLogInfo(
"NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1075 NLogTrace(
"NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1076 mergeList->Add(&data);
1079 Long64_t nmerged = outputData->
Merge(mergeList);
1080 const auto mergeEnd = std::chrono::high_resolution_clock::now();
1082 const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1083 Printf(
"NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1086 NLogError(
"NGnTree::Process: Failed to merge thread data, exiting ...");
1090 NLogInfo(
"NGnTree::Process: Merged %lld outputs successfully", nmerged);
1102 std::set<Long64_t> mergedContentIds;
1103 std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1106 for (
size_t i = 0; i < defNames.size(); i++) {
1107 std::string name = defNames[i];
1111 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1114 for (
auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1115 if (other_name.compare(name) != 0) {
1118 for (
auto &
id : removedIds) {
1119 if (std::find(def->GetIds().begin(), def->GetIds().end(),
id) == def->GetIds().end()) {
1120 NLogTrace(
"NGnTree::Process: Adding missing entry %lld to definition '%s'",
id, name.c_str());
1121 def->GetIds().push_back(
id);
1125 sort(def->GetIds().begin(), def->GetIds().end());
1129 def->GetContent()->Reset();
1130 for (
auto id : def->GetIds()) {
1132 def->GetBinning()->GetContent()->GetBinContent(
id, point.
GetCoords());
1135 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(),
id, bin);
1136 def->GetContent()->SetBinContent(bin,
id);
1138 if (mergedContentIds.insert(
id).second) {
1139 mergedContentCoords.emplace_back(
1148 mergedBinning->GetContent()->Reset();
1149 for (
const auto & entry : mergedContentCoords) {
1150 Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1151 mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1155 NLogDebug(
"NGnTree::Process: Final binning definitions after processing:");
1156 for (
auto & name : defNames) {
1160 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1163 binningDef->Print();
1172 NLogInfo(
"NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1181 Printf(
"NGnTree::Process: [phase] final close start (%s)",
1184 const auto closeStart = std::chrono::high_resolution_clock::now();
1186 const auto closeEnd = std::chrono::high_resolution_clock::now();
1188 const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1189 Printf(
"NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1193 Printf(
"NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1195 const auto cleanupStart = std::chrono::high_resolution_clock::now();
1196 gSystem->Exec(TString::Format(
"rm -fr %s", jobDir.c_str()));
1197 const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1199 const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1200 Printf(
"NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1202 gROOT->SetBatch(batch);
1217 fOutputs[name]->SetName(name.c_str());
1228 NLogDebug(
"Opening '%s' with branches='%s' and treename='%s' ...", filename.c_str(), branches.c_str(),
1231 TFile * file = TFile::Open(filename.c_str());
1233 NLogError(
"NGnTree::Open: Cannot open file '%s'", filename.c_str());
1237 TTree * tree = (TTree *)file->Get(treename.c_str());
1239 NLogError(
"NGnTree::Open: Cannot get tree '%s' from file '%s'", treename.c_str(), filename.c_str());
1243 return Open(tree, branches, file);
1254 NLogError(
"NGnTree::Open: Cannot get binning from tree '%s'", tree->GetName());
1258 if (!hnstStorageTree) {
1259 NLogError(
"NGnTree::Open: Cannot get tree storage info from tree '%s'", tree->GetName());
1263 std::map<std::string, TList *> outputs;
1264 TDirectory * dir =
nullptr;
1266 dir = (TDirectory *)file->Get(
"outputs");
1267 auto l = dir->GetListOfKeys();
1268 for (
auto kv : *l) {
1269 TObject * obj = dir->Get(kv->GetName());
1271 TList * l =
dynamic_cast<TList *
>(obj);
1273 outputs[l->GetName()] = l;
1274 NLogDebug(
"Imported output list for binning '%s' with %d object(s) from file '%s'", l->GetName(), l->GetEntries(),
1285 if (!hnstStorageTree->
SetFileTree(file, tree))
return nullptr;
1289 std::vector<std::string> enabledBranches;
1290 if (!branches.empty()) {
1298 NLogTrace(
"NGnTree::Open: Enabled branches: %s", kv.first.c_str());
1319 NLogTrace(
"NGnTree::SetNavigator: Replacing existing navigator ...");
1333 NLogError(
"NGnTree::Close: Storage tree is not initialized in NGnTree !!!");
1346 NLogError(
"NGnTree::GetEntry: Storage tree is not initialized in NGnTree !!!");
1361 return GetEntry(0, checkBinningDef);
1364 void NGnTree::Play(
int timeout, std::string binning, std::vector<int> outputPointIds,
1365 std::vector<std::vector<int>> ranges, Option_t * option)
1370 TString opt = option;
1373 std::string annimationTempDir =
1374 TString::Format(
"%s/.ndmspc/animation/%d", gSystem->Getenv(
"HOME"), gSystem->GetPid()).Data();
1375 gSystem->Exec(TString::Format(
"mkdir -p %s", annimationTempDir.c_str()));
1377 if (binning.empty()) {
1383 NLogError(
"NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1384 NLogError(
"Available binning definitions:");
1387 NLogError(
" [*] %s", name.c_str());
1389 NLogError(
" [ ] %s", name.c_str());
1394 THnSparse * bdContent = (THnSparse *)binningDef->
GetContent()->Clone();
1396 std::string bdContentName = TString::Format(
"bdContent_%s", binning.c_str()).Data();
1400 Long64_t linBin = 0;
1401 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(
true )};
1402 std::vector<Long64_t> ids;
1404 while ((linBin = iter->Next()) >= 0) {
1406 ids.push_back(linBin);
1409 NLogWarning(
"NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1414 TCanvas * c1 =
nullptr;
1416 c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject(
"c1");
1417 if (c1 ==
nullptr) c1 =
new TCanvas(
"c1",
"NGnTree::Play", 800, 600);
1420 c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1421 gSystem->ProcessEvents();
1423 binningDef->
Print();
1427 for (
auto id : ids) {
1432 if (!l || l->IsEmpty()) {
1433 NLogWarning(
"NGnTree::Play: No 'outputPoint' for entry %lld !!!",
id);
1440 if (outputPointIds.empty()) {
1441 outputPointIds.resize(l->GetEntries());
1442 for (
int i = 0; i < l->GetEntries(); i++) {
1443 outputPointIds[i] = i;
1446 int n = outputPointIds.size();
1449 for (
int i = 0; i < n; i++) {
1453 TObject * obj = l->At(outputPointIds[i]);
1455 if (obj->InheritsFrom(TH1::Class())) {
1456 TH1 * h = (TH1 *)obj;
1457 h->SetDirectory(
nullptr);
1461 TH1 * hclone = (TH1 *)h->Clone();
1463 hclone->SetDirectory(
nullptr);
1469 if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1470 TH1 * h = (TH1 *)obj;
1472 NLogDebug(
"Mean value from histogram [%s]: %f", h->GetName(), v);
1477 TH1 * bdProj = (TH1 *)gROOT->FindObjectAny(
"bdProj");
1482 if (bdContent->GetNdimensions() == 1) {
1483 bdProj = bdContent->Projection(0,
"O");
1485 else if (bdContent->GetNdimensions() == 2) {
1486 bdProj = bdContent->Projection(0, 1,
"O");
1488 else if (bdContent->GetNdimensions() == 3) {
1489 bdProj = bdContent->Projection(0, 1, 2,
"O");
1492 NLogError(
"NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1495 bdProj->SetName(
"bdProj");
1496 bdProj->SetTitle(TString::Format(
"Binning '%s' content projection", binning.c_str()).Data());
1497 bdProj->SetMinimum(0);
1499 bdProj->Draw(
"colz");
1504 c1->ModifiedUpdate();
1505 c1->SaveAs(TString::Format(
"%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1507 gSystem->ProcessEvents();
1508 if (timeout > 0) gSystem->Sleep(timeout);
1512 NLogInfo(
"Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1514 TString::Format(
"magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1515 gSystem->Exec(TString::Format(
"rm -fr %s", annimationTempDir.c_str()));
1516 NLogInfo(
"Animation saved to ndmspc_play.gif");
1532 TH1::AddDirectory(kFALSE);
1534 json cfg = point->
GetCfg();
1536 Printf(
"Processing THnSparse projection with configuration: %s", cfg.dump().c_str());
1544 for (
auto & [objName, objCfg] : cfg[
"objects"].items()) {
1545 NLogInfo(
"Processing object '%s' ...", objName.c_str());
1548 if (hns ==
nullptr) {
1549 NLogError(
"NGnTree::Projection: THnSparse 'hns' not found in storage tree !!!");
1554 for (
size_t i = 0; i < objCfg.size(); i++) {
1556 NLogInfo(
"Processing projection %zu for object '%s' ...", i, objName.c_str());
1557 std::vector<int> dims;
1558 std::vector<std::string> dimNames = cfg[
"objects"][objName][i].get<std::vector<std::string>>();
1559 for (
const auto & dimName : dimNames) {
1560 NLogDebug(
"Looking for dimension name '%s' in THnSparse ...", dimName.c_str());
1562 for (
int i = 0; i < hns->GetNdimensions(); i++) {
1563 if (dimName == hns->GetAxis(i)->GetName()) {
1569 dims.push_back(dim);
1571 NLogError(
"NGnTree::Projection: Dimension name '%s' not found in THnSparse !!!", dimName.c_str());
1576 TH1 * hPrev = (TH1 *)output->At(i);
1578 hProj->SetName(TString::Format(
"%s_proj_%s", objName.c_str(),
NUtils::Join(dims,
'_').c_str()).Data());
1592 THnSparse * hnsIn = binningDef->
GetContent();
1594 std::vector<std::vector<int>> ranges = cfg[
"ranges"].get<std::vector<std::vector<int>>>();
1596 Long64_t linBin = 0;
1597 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{hnsIn->CreateIter(
true )};
1598 std::vector<Long64_t> ids;
1600 while ((linBin = iter->Next()) >= 0) {
1601 ids.push_back(linBin);
1604 NLogWarning(
"NGnTree::Projection: No entries found in binning definition '%s' !!!", binningDef->GetName());
1609 binningDef->
GetIds() = ids;
1624 std::map<
int, std::vector<int>> ranges, std::map<
int, std::vector<int>> rangesBase)
1633 return navigator.
Reshape(binningName, levels, level, ranges, rangesBase);
1637 int level, std::map<
int, std::vector<int>> ranges,
1638 std::map<
int, std::vector<int>> rangesBase)
1644 if (binningName.empty()) {
1648 THnSparse * hns = (THnSparse *)
fOutputs[binningName]->FindObject(
"resource_monitor");
1650 NLogError(
"NGnTree::Draw: Resource monitor THnSparse not found in outputs !!!");
1656 auto ngnt =
new NGnTree(hns,
"stat",
"/tmp/hnst_imported_for_drawing.root");
1657 if (ngnt->IsZombie()) {
1658 NLogError(
"NGnTree::GetResourceStatisticsNavigator: Failed to import resource monitor THnSparse !!!");
1665 auto ngnt2 =
NGnTree::Open(
"/tmp/hnst_imported_for_drawing.root");
1666 auto nav = ngnt2->Reshape(
"default", levels, level, ranges, rangesBase);
1679 NLogTrace(
"NGnTree::InitParameters: Replacing existing parameters ...");
1683 if (paramNames.empty()) {
1684 NLogTrace(
"NGnTree::InitParameters: No parameter names provided, skipping ...");
1694 const std::vector<std::string> & headers,
const std::string & outputFile,
bool close)
1701 std::string findPathClean = findPath;
1702 if (!findPathClean.empty() && findPathClean.back() ==
'/') {
1703 findPathClean.pop_back();
1706 std::vector<std::string> paths =
NUtils::Find(findPathClean, fileName);
1707 NLogInfo(
"NGnTree::Import: Found %zu files to import ...", paths.size());
1710 int nDirAxes = ngntArray->GetEntries();
1715 ngntArray->Add(axis->Clone());
1717 ngntFirst->
Close(
false);
1719 std::map<std::string, std::vector<std::vector<int>>> b;
1721 for (
int i = 0; i < ngntArray->GetEntries(); i++) {
1722 TAxis * axis = (TAxis *)ngntArray->At(i);
1723 b[axis->GetName()].push_back({1});
1733 cfg[
"basedir"] = findPathClean;
1734 cfg[
"filename"] = fileName;
1735 cfg[
"nDirAxes"] = nDirAxes;
1736 cfg[
"headers"] = headers;
1738 Ndmspc::NGnProcessFuncPtr processFunc = [](
Ndmspc::NBinningPoint * point, TList * , TList * outputPoint,
1742 json cfg = point->
GetCfg();
1743 std::string filename = cfg[
"basedir"].get<std::string>();
1745 for (
auto & header : cfg[
"headers"]) {
1746 filename += point->
GetBinLabel(header.get<std::string>());
1753 filename += cfg[
"filename"].get<std::string>();
1755 if (!ngnt || filename.compare(ngnt->GetStorageTree()->GetFileName()) != 0) {
1756 NLogInfo(
"NGnTree::Import: Opening file '%s' ...", filename.c_str());
1758 NLogDebug(
"NGnTree::Import: Closing previously opened file '%s' ...",
1759 ngnt->GetStorageTree()->GetFileName().c_str());
1765 if (!ngnt || ngnt->IsZombie()) {
1766 NLogError(
"NGnTree::Import: Cannot open file '%s'", filename.c_str());
1772 int nDirAxes = cfg[
"nDirAxes"].get<
int>();
1775 NLogInfo(
"NGnTree::Import: Processing point with coords %s ...", coordsStr.c_str());
1777 Long64_t entryNumber =
1778 ngnt->GetBinning()->GetContent()->GetBin(&coords[3 * nDirAxes], kFALSE);
1779 NLogInfo(
"NGnTree::Import: Corresponding entry number in file '%s' is %lld", filename.c_str(), entryNumber);
1781 ngnt->GetEntry(entryNumber);
1790 for (
const auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
1793 NLogTrace(
"NGnTree::Import: Adding branch '%s' to storage tree ...", kv.first.c_str());
1796 NLogTrace(
"NGnTree::Import: Setting branch address for branch '%s' ...", kv.first.c_str());
1799 outputPoint->Add(
new TNamed(
"source_file", filename));
1815 TH1::AddDirectory(kFALSE);
1828 ngnt->
Process(processFunc, cfg,
"", beginFunc, endFunc);
Defines binning mapping and content for NDMSPC histograms.
THnSparse * GetContent() const
Get the template content histogram.
virtual void Print(Option_t *option="") const
Print binning definition information.
void RefreshIdsFromContent()
Refresh bin IDs from content histogram.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Represents a single point in multi-dimensional binning.
Double_t GetBinMax(std::string axis) const
Get the maximum value for a specific axis.
std::string GetString(const std::string &prefix="", bool all=false) const
Returns a string representation of the binning point.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
std::string GetBinLabel(std::string axis) const
Get the label for a specific axis.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Long64_t GetEntryNumber() const
Get entry number for the point.
NParameters * GetParameters() const
Get the parameters associated with this binning point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
Double_t GetBinCenter(std::string axis) const
Returns the center value of the bin along the specified axis.
NStorageTree * GetTreeStorage() const
Get pointer to storage tree object.
void SetTempObject(const std::string &name, TObject *obj)
Set a temporary object with the given name.
NStorageTree * GetStorageTree() const
Returns a pointer to the associated storage tree.
virtual void Print(Option_t *option="") const
Print binning point information.
TObject * GetTempObject(const std::string &name) const
Retrieve a temporary object by name.
NGnTree * GetInput() const
Get pointer to input NGnTree object.
Int_t GetBin(std::string axis) const
Returns the bin index for the specified axis.
Int_t * GetCoords() const
Get pointer to content coordinates array.
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Double_t GetBinMin(std::string axis) const
Get the minimum value for a specific axis.
NBinning object for managing multi-dimensional binning and axis definitions.
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
std::string GetCurrentDefinitionName() const
Get current definition name.
NBinningPoint * GetPoint()
Get the current binning point.
virtual void Print(Option_t *option="") const
Print binning information.
std::vector< TAxis * > GetAxes() const
Get vector of axis pointers.
bool SetCfg(const json &cfg)
Set configuration from JSON.
void AddBinningDefinition(std::string name, std::map< std::string, std::vector< std::vector< int >>> binning, bool forceDefault=false)
Add a binning definition.
void Reset()
Reset the binning object to initial state.
Executes a function over all points in an N-dimensional space, optionally in parallel.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
Navigator object for managing hierarchical data structures and projections.
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, size_t level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
void SetGnTree(NGnTree *tree)
Set NGnTree object pointer.
Thread-local data object for NDMSPC processing.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
void SetResultsFilename(const std::string &filename)
Set the results filename for TCP mode (shared filesystem path).
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
void SetCfg(const json &cfg)
Set configuration JSON object.
NDMSPC tree object for managing multi-dimensional data storage and processing.
NBinning * GetBinning() const
Get pointer to binning object.
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
virtual void Draw(Option_t *option="") override
Draws the tree object.
void SetIsPureCopy(bool val)
Sets the pure copy status of the tree.
bool Close(bool write=false)
Close the tree, optionally writing data.
bool fOwnsTreeStorage
True when fTreeStorage is owned by this instance.
virtual ~NGnTree()
Destructor.
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
virtual void Print(Option_t *option="") const override
Print tree information.
void SetInput(NGnTree *input)
Set input NGnTree pointer.
NGnTree()
Default constructor.
static NGnTree * Import(const std::string &findPath, const std::string &fileName, const std::vector< std::string > &headers, const std::string &outFileName="/tmp/ngnt_imported.root", bool close=true)
Imports an NGnTree from a specified file.
NGnNavigator * Reshape(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Reshape navigator using binning name and levels.
NBinning * fBinning
Binning object.
TList * GetOutput(std::string name="")
Get output list by name.
NParameters * GetParameters() const
Returns the parameters associated with this tree.
NGnTree * fInput
Input NGnTree for processing.
bool fOwnsBinning
True when fBinning is owned by this instance.
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
NGnTree * GetInput() const
Get pointer to input NGnTree.
std::string fWorkerMacroList
Comma-separated macro paths sent to TCP workers.
void Play(int timeout=0, std::string binning="", std::vector< int > outputPointIds={0}, std::vector< std::vector< int >> ranges={}, Option_t *option="")
Play tree data with optional binning and output point IDs.
void SetOutputs(std::map< std::string, TList * > outputs)
Set outputs map.
bool InitParameters(const std::vector< std::string > ¶mNames)
Initializes the parameters for the tree using the provided parameter names.
std::map< std::string, TList * > fOutputs
Outputs.
NGnNavigator * fNavigator
! Navigator object
NParameters * fParameters
Parameters object.
TList * Projection(const json &cfg, std::string binningName="")
Project tree data using configuration and binning name.
NGnNavigator * GetResourceStatisticsNavigator(std::string binningName, std::vector< std::vector< int >> levels, int level=0, std::map< int, std::vector< int >> ranges={}, std::map< int, std::vector< int >> rangesBase={})
Returns a navigator for resource statistics based on binning and levels.
NStorageTree * fTreeStorage
Tree storage.
static std::string BuildObjectPath(const json &cfg, const json &objCfg, const NBinningPoint *point)
Helper: build object path string from configuration and a binning point.
void SetNavigator(NGnNavigator *navigator)
Sets the navigator for this tree.
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
bool Process(NGnProcessFuncPtr func, const json &cfg=json::object(), std::string binningName="", NGnBeginFuncPtr beginFunc=nullptr, NGnEndFuncPtr endFunc=nullptr)
Process tree data using a function pointer and configuration.
static bool GetConsoleOutput()
Get console output flag.
bool SetParameter(int bin, Double_t value, Double_t error=0.)
Set the value and error of a parameter by bin index.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
void SetBranchAddresses()
Set addresses for all branches.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
bool SetFileTree(TFile *file, TTree *tree)
Tree handling.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
Long64_t GetEntry(Long64_t entry, NBinningPoint *point=nullptr, bool checkBinningDef=false)
Get entry by index and fill NBinningPoint.
std::string GetPrefix() const
Get prefix path.
TObject * GetBranchObject(const std::string &name)
Get pointer to branch object by name.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
std::string GetPostfix() const
Get postfix path.
bool Close(bool write=false, std::map< std::string, TList * > outputs={})
Close the storage tree, optionally writing outputs.
TTree * GetTree() const
Get pointer to TTree object.
virtual void Print(Option_t *option="") const
Print storage tree information.
void SetBinning(NBinning *binning)
Set binning object pointer.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
TObject * GetObject() const
Get object pointer.
static bool SetAxisRanges(THnSparse *sparse, std::vector< std::vector< int >> ranges={}, bool withOverflow=false, bool modifyTitle=false, bool reset=true)
Set axis ranges for THnSparse using vector of ranges.
static TH1 * ProjectTHnSparse(THnSparse *hns, const std::vector< int > &axes, Option_t *option="")
Project a THnSparse histogram onto specified axes.
static std::vector< std::string > Tokenize(std::string_view input, const char delim)
Tokenize a string by delimiter.
static std::string FormatTime(long long seconds)
Format time in seconds to human-readable string.
static bool EnableMT(Int_t numthreads=-1)
Enable multi-threading with specified number of threads.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
static void ProgressBar(int current, int total, std::string prefix="", std::string suffix="", int barWidth=50)
Display progress bar.
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
static TObjArray * AxesFromDirectory(const std::vector< std::string > paths, const std::string &findPath, const std::string &fileName, const std::vector< std::string > &axesNames)
Creates an array of axes objects from files in specified directories.
static std::vector< std::string > Find(std::string path, std::string filename="")
Find files in a path matching filename.