254NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis,
const std::string & outFileName, json cfg)
255 : TObject(),
fInput(nullptr)
261 std::map<std::string, std::vector<std::vector<int>>> b;
262 TObjArray * axes =
new TObjArray();
263 int parameterAxisIdx = -1;
264 std::vector<std::string> labels;
265 for (
int i = 0; i < hns->GetNdimensions(); i++) {
266 TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267 TAxis * axis = (TAxis *)axisIn->Clone();
270 if (parameterAxis.compare(axis->GetName()) == 0) {
271 parameterAxisIdx = i;
272 TAxis * axis = hns->GetAxis(parameterAxisIdx);
273 for (
int bin = 1; bin <= axis->GetNbins(); bin++) {
275 labels.push_back(axis->GetBinLabel(bin));
281 if (axisIn->IsAlphanumeric()) {
282 NLogTrace(
"Setting axis '%s' labels from input THnSparse", axis->GetName());
283 for (
int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284 std::string label = axisIn->GetBinLabel(bin);
285 if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
290 b[axis->GetName()] = {{1}};
294 cfg[
"_parameterAxis"] = parameterAxisIdx;
295 cfg[
"_labels"] = labels;
298 NLogDebug(
"Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
301 NLogDebug(
"Created NGnTree for THnSparse import ...");
302 if (ngnt->IsZombie()) {
303 NLogError(
"NGnTree::Import: Failed to create NGnTree !!!");
309 const char * tmpDirStr = gSystem->Getenv(
"NDMSPC_TMP_DIR");
313 if (!tmpDirStr || tmpDir.empty()) {
316 std::string tmpFilename = tmpDir +
"/ngnt_imported_input" + std::to_string(gSystem->GetPid()) +
".root";
318 if (ngntIn->IsZombie()) {
319 NLogError(
"NGnTree::Import: Failed to create NGnTree for input !!!");
325 ngntIn->
GetOutput(
"default")->Add(hns->Clone(
"test"));
336 ngnt->
InitParameters(cfg[
"_labels"].get<std::vector<std::string>>());
341 TH1::AddDirectory(kFALSE);
343 json cfg = point->
GetCfg();
347 NLogError(
"NGnTree::Import: Input NGnTree is nullptr !!!");
352 THnSparse * hns = (THnSparse *)ngntIn->
GetOutput(
"default")->At(0);
353 if (hns ==
nullptr) {
354 NLogError(
"NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
358 int axisIdx = cfg[
"_parameterAxis"].get<
int>();
359 std::vector<std::vector<int>> ranges;
362 for (
int i = 0; i < hns->GetNdimensions(); i++) {
363 if (i == axisIdx)
continue;
365 ranges.push_back({i, coord, coord});
370 TH1 * h = hns->Projection(axisIdx,
"O");
372 NLogError(
"NGnTree::Import: Projection of THnSparse failed !!!");
375 if (h->GetEntries() > 0) {
378 for (
int bin = 1; bin <= h->GetNbinsX(); bin++) {
379 params->
SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
385 std::string filename = cfg[
"filename"].get<std::string>();
387 if (!f || filename.compare(f->GetName()) != 0) {
389 NLogDebug(
"NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
392 NLogDebug(
"NGnTree::Import: Opening file '%s' ...", filename.c_str());
393 f = TFile::Open(filename.c_str());
394 if (!f || f->IsZombie()) {
395 NLogError(
"NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
401 std::string axisObjectDefaultFormat =
402 cfg[
"axisObjectDefaultFormat"].is_string() ? cfg[
"axisObjectDefaultFormat"].get<std::string>() :
"%.2f_%.2f";
403 std::string axisDefaultSeparator =
404 cfg[
"axisDefaultSeparator"].is_string() ? cfg[
"axisDefaultSeparator"].get<std::string>() :
"/";
406 if (cfg.contains(
"dryrun") && cfg[
"dryrun"].is_boolean()) {
407 dryrun = cfg[
"dryrun"].get<
bool>();
411 NLogInfo(
"NGnTree::Import (dryrun): '%s' ...", point->
GetString().c_str());
415 for (
auto & [objName, objCfg] : cfg[
"objects"].items()) {
419 NLogInfo(
"NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
424 NLogInfo(
"NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425 cfg[
"filename"].get<std::string>().c_str());
427 TObject * obj = f->Get(objPath.c_str());
430 NLogWarning(
"NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431 cfg[
"filename"].get<std::string>().c_str());
436 if (obj->InheritsFrom(TCanvas::Class())) {
437 TCanvas * cObj = (TCanvas *)obj;
438 cObj->SetName(objName.c_str());
440 outputPoint->Add(obj->Clone(objName.c_str()));
447 ngnt->
Process(processFunc, cfg);
450 gSystem->Exec(TString::Format(
"rm -f %s", tmpFilename.c_str()));
544 NLogInfo(
"NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545 bool batch = gROOT->IsBatch();
546 gROOT->SetBatch(kTRUE);
547 TH1::AddDirectory(kFALSE);
551 const std::string storageFileName =
fTreeStorage->GetFileName();
552 if (!storageFileName.empty()) {
553 storagePostfix = gSystem->BaseName(storageFileName.c_str());
556 if (storagePostfix.empty()) {
557 storagePostfix =
"ndmspc.root";
561 if (
const char * workerEndpoint = gSystem->Getenv(
"NDMSPC_WORKER_ENDPOINT")) {
562 size_t workerIndex = 0;
563 if (
const char * envIdx = gSystem->Getenv(
"NDMSPC_WORKER_INDEX")) {
564 try { workerIndex =
static_cast<size_t>(std::stoul(envIdx)); }
catch (...) {}
566 NLogInfo(
"NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
573 void * ctx = zmq_ctx_new();
574 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
575 const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
576 zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
577 int timeoutMs = 1000;
578 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
579 zmq_connect(dealer, workerEndpoint);
580 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"READY"});
583 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
586 std::vector<std::string> frames;
587 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
588 if (errno == EAGAIN || errno == EWOULDBLOCK) {
589 if (std::chrono::steady_clock::now() > initDeadline)
break;
595 if (frames.size() >= 1 && frames[0] ==
"STOP") {
596 NLogPrint(
"NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
599 gROOT->SetBatch(batch);
602 if (frames.size() >= 5 && frames[0] ==
"INIT") {
603 workerIndex =
static_cast<size_t>(std::stoul(frames[1]));
604 const std::string & sessionId = frames[2];
605 const std::string & initResultsDir = frames[3];
606 const std::string & initTreeName = frames[4];
609 if (frames.size() >= 7) {
610 if (!frames[5].empty())
611 gSystem->Setenv(
"NDMSPC_TMP_DIR", frames[5].c_str());
612 if (!frames[6].empty())
613 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
616 if (!gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR")[0] ==
'\0') {
617 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
618 if (tmpDirEnv && tmpDirEnv[0] !=
'\0')
619 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
623 const char * localTmpEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
624 const std::string localBase = localTmpEnv ? localTmpEnv :
"/tmp";
625 const std::string localFile = localBase +
"/.ndmspc/tmp/" + sessionId +
"/" +
626 std::to_string(workerIndex) +
"/" + storagePostfix;
629 const std::string resultsFile = initResultsDir +
"/" + std::to_string(workerIndex) +
"/" +
632 bool rc = workerData.
Init(workerIndex, func, beginFunc, endFunc,
this, binningIn,
fInput, localFile, initTreeName);
634 NLogError(
"NGnTree::Process: Worker failed to initialize NGnThreadData");
640 if (resultsFile != localFile) {
644 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"ACK"});
649 NLogError(
"NGnTree::Process: Worker did not receive INIT from supervisor");
655 Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
658 gROOT->SetBatch(batch);
665 int nThreads = ROOT::GetThreadPoolSize();
666 if (nThreads < 1) nThreads = 1;
668 std::string executionMode =
"thread";
669 const char * envMode = gSystem->Getenv(
"NDMSPC_EXECUTION_MODE");
670 const bool modeExplicit = (envMode && envMode[0] !=
'\0');
672 executionMode = envMode;
675 std::string normalizedMode = executionMode;
676 std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
677 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
678 if (normalizedMode ==
"process") normalizedMode =
"ipc";
680 bool useProcessIpc = (normalizedMode ==
"ipc" || normalizedMode ==
"tcp");
681 bool useTcp = (normalizedMode ==
"tcp");
682 size_t nProcesses =
static_cast<size_t>(nThreads);
683 bool ndmspcNProcExplicit =
false;
685 if (
const char * envNdmspcNProc = gSystem->Getenv(
"NDMSPC_MAX_PROCESSES")) {
686 ndmspcNProcExplicit =
true;
688 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNdmspcNProc)));
691 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
695 else if (
const char * envNProc = gSystem->Getenv(
"ROOT_MAX_THREADS")) {
698 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNProc)));
701 NLogWarning(
"NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
708 if (normalizedMode ==
"thread") {
709 useProcessIpc =
false;
712 else if (normalizedMode ==
"tcp") {
713 useProcessIpc =
true;
716 else if (normalizedMode ==
"ipc") {
717 useProcessIpc =
true;
721 NLogWarning(
"NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
722 executionMode.c_str());
723 useProcessIpc = (nProcesses > 1);
727 else if (nProcesses > 1) {
728 useProcessIpc =
true;
730 executionMode =
"ipc";
731 normalizedMode =
"ipc";
734 if (ndmspcNProcExplicit && normalizedMode ==
"thread" && nProcesses > 1) {
735 NLogWarning(
"NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
739 const size_t workerObjectCount = useProcessIpc ? std::max(
static_cast<size_t>(nThreads), nProcesses)
740 :
static_cast<size_t>(nThreads);
741 std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
743 NLogInfo(
"NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
744 executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
746 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
748 if (tmpDirEnv && tmpDirEnv[0] !=
'\0') {
753 if (!(tmpDirPrefix.BeginsWith(
"root://") || tmpDirPrefix.BeginsWith(
"http://") ||
754 tmpDirPrefix.BeginsWith(
"https://"))) {
755 tmpDir = tmpDirPrefix.Data();
757 if (tmpDir.empty()) tmpDir =
"/tmp";
760 std::string jobDir = tmpDir +
"/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
764 const char * resultsDirEnv = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
765 const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
766 std::string resultsDir = sameDir ? jobDir
767 : (std::string(resultsDirEnv) +
"/" +
768 std::to_string(gSystem->GetPid()));
770 std::string filePrefix = jobDir;
771 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
772 std::string filename = filePrefix +
"/" + std::to_string(i) +
"/" + storagePostfix;
773 bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc,
this, binningIn,
fInput, filename,
776 NLogError(
"Failed to initialize thread data %zu, exiting ...", i);
779 threadDataVector[i].SetCfg(cfg);
784 std::string resultsFile = resultsDir +
"/" + std::to_string(i) +
"/" + storagePostfix;
785 threadDataVector[i].SetResultsFilename(resultsFile);
788 size_t processedEntries = 0;
789 size_t totalEntries = 0;
790 auto start_par = std::chrono::high_resolution_clock::now();
791 auto start_par_job = std::chrono::high_resolution_clock::now();
796 thread_obj.Process(coords);
799 size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
800 : totalEntries - processedEntries;
801 NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format(
"R%4zu", nRunning).Data());
808 std::vector<Ndmspc::NThreadData *> processWorkers;
809 std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
811 processWorkers.reserve(threadDataVector.size());
812 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
813 processWorkers.push_back(&threadDataVector[i]);
815 ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
817 const char * tcpPort = gSystem->Getenv(
"NDMSPC_TCP_PORT");
818 std::string tcpEndpoint = std::string(
"tcp://0.0.0.0:") + (tcpPort ? tcpPort :
"5555");
819 const char * resultsDirBase = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
820 const char * macroParams = gSystem->Getenv(
"NDMSPC_MACRO_PARAMS");
824 if (workerMacro.empty()) {
825 if (
const char * envMacro = gSystem->Getenv(
"NDMSPC_MACRO")) workerMacro = envMacro;
827 ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
828 fTreeStorage->GetTree()->GetName(), workerMacro, tmpDir,
829 resultsDirBase ? resultsDirBase :
"",
830 macroParams ? macroParams :
"");
832 ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
836 std::map<std::string, std::vector<Long64_t>>
837 defIdMapProcessedRemoved;
850 for (
auto & name : defNames) {
853 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
857 if (binningDef->GetIds().size() == 0) {
858 NLogWarning(
"NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
862 const std::vector<Long64_t> originalDefinitionIds = binningDef->
GetIds();
864 std::vector<int> mins, maxs;
866 maxs.push_back(binningDef->GetIds().size() - 1);
867 NLogDebug(
"NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
868 binningDef->GetIds().size());
871 NLogInfo(
"NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
874 Printf(
"Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
876 start_par = std::chrono::high_resolution_clock::now();
877 processedEntries = 0;
878 totalEntries = maxs[0] + 1;
879 const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
880 : threadDataVector.size();
883 TString::Format(
"R%4zu", activeWorkers).Data());
886 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
888 threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
893 if (!useProcessIpc) {
897 Bool_t prevMustClean = gROOT->MustClean();
898 gROOT->SetMustClean(kFALSE);
905 Bool_t prevBatch = gROOT->IsBatch();
906 gROOT->SetBatch(kTRUE);
915 gROOT->SetMustClean(prevMustClean);
916 gROOT->SetBatch(prevBatch);
919 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
920 threadDataVector[i].FlushDeferredDeletes();
923 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
924 threadDataVector[i].ExecuteEndFunction();
928 ipcExecutor->SetBounds(mins, maxs);
932 size_t finalActiveWorkers = 0;
933 size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
934 name, &originalDefinitionIds,
939 size_t nRunning = std::min(progress.
activeWorkers, activeWorkers);
941 TString::Format(
"R%4zu", nRunning).Data());
944 processedEntries = acked;
950 size_t connected = finalActiveWorkers > 0 ? finalActiveWorkers : std::min(nProcesses, processWorkers.size());
951 const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, connected));
952 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
953 auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
955 workerDef->GetIds().clear();
959 for (
size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
960 const size_t workerIndex = taskIndex % processesToUse;
961 threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
963 auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
965 workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
975 Printf(
"Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
977 NLogDebug(
"NGnTree::Process: [BEGIN] ------------------------------------------------");
980 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
981 NLogDebug(
"NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
985 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
986 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
991 for (
size_t i = 0; i < defNames.size(); i++) {
993 std::string other_name = defNames[i];
999 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
1003 for (
auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
1004 NLogTrace(
"NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
1005 other_name.c_str(), sumIds);
1007 NLogTrace(
"NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
1008 defIdMapProcessedRemoved[other_name].push_back(*it);
1009 it = otherDef->GetIds().erase(it);
1021 NLogDebug(
"NGnTree::Process: [END] ------------------------------------------------");
1024 catch (
const std::exception & ex) {
1026 ipcExecutor->FinishProcessIpc(
true);
1027 ipcExecutor.reset();
1030 TString what(ex.what());
1031 if (what.Contains(
"Interrupted by user")) {
1033 gROOT->SetInterrupt(kFALSE);
1035 NLogWarning(
"NGnTree::Process: Interrupted by user, stopping processing.");
1038 NLogError(
"NGnTree::Process: Processing failed: %s", ex.what());
1045 auto end_par = std::chrono::high_resolution_clock::now();
1046 std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1049 ipcExecutor->FinishProcessIpc();
1054 const std::set<size_t> registeredWorkers =
1055 (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1056 const bool filterByRegistered = !registeredWorkers.empty();
1059 Printf(
"NGnTree::Process: Execution completed and it took %s .",
1063 NLogInfo(
"NGnTree::Process: Execution completed and it took %s .",
1067 NLogInfo(
"NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1068 for (
auto & data : threadDataVector) {
1069 if (useProcessIpc) {
1070 NLogTrace(
"NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1071 data.GetAssignedIndex());
1075 NLogTrace(
"NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1076 data.GetHnSparseBase()->GetStorageTree()->Close(
true);
1080 NLogDebug(
"NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1082 Printf(
"NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1084 const auto mergeStart = std::chrono::high_resolution_clock::now();
1085 TList * mergeList =
new TList();
1087 outputData->
Init(0, func,
nullptr,
nullptr,
this, binningIn);
1091 for (
auto & data : threadDataVector) {
1092 if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1093 NLogInfo(
"NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1096 NLogTrace(
"NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1097 mergeList->Add(&data);
1100 Long64_t nmerged = outputData->
Merge(mergeList);
1101 const auto mergeEnd = std::chrono::high_resolution_clock::now();
1103 const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1104 Printf(
"NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1107 NLogError(
"NGnTree::Process: Failed to merge thread data, exiting ...");
1111 NLogInfo(
"NGnTree::Process: Merged %lld outputs successfully", nmerged);
1123 std::set<Long64_t> mergedContentIds;
1124 std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1127 for (
size_t i = 0; i < defNames.size(); i++) {
1128 std::string name = defNames[i];
1130 auto def = mergedBinning->GetDefinition(name);
1132 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1135 for (
auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1136 if (other_name.compare(name) != 0) {
1139 for (
auto &
id : removedIds) {
1140 if (std::find(def->GetIds().begin(), def->GetIds().end(),
id) == def->GetIds().end()) {
1141 NLogTrace(
"NGnTree::Process: Adding missing entry %lld to definition '%s'",
id, name.c_str());
1142 def->GetIds().push_back(
id);
1146 sort(def->GetIds().begin(), def->GetIds().end());
1150 def->GetContent()->Reset();
1151 for (
auto id : def->GetIds()) {
1153 def->GetBinning()->GetContent()->GetBinContent(
id, point.
GetCoords());
1156 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(),
id, bin);
1157 def->GetContent()->SetBinContent(bin,
id);
1159 if (mergedContentIds.insert(
id).second) {
1160 mergedContentCoords.emplace_back(
1170 for (
const auto & entry : mergedContentCoords) {
1171 Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1172 mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1176 NLogDebug(
"NGnTree::Process: Final binning definitions after processing:");
1177 for (
auto & name : defNames) {
1181 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1184 binningDef->
Print();
1193 NLogInfo(
"NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1197 Printf(
"Processing completed successfully. Output was stored in '%s'.",
fTreeStorage->GetFileName().c_str());
1202 Printf(
"NGnTree::Process: [phase] final close start (%s)",
1205 const auto closeStart = std::chrono::high_resolution_clock::now();
1207 const auto closeEnd = std::chrono::high_resolution_clock::now();
1209 const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1210 Printf(
"NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1214 Printf(
"NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1216 const auto cleanupStart = std::chrono::high_resolution_clock::now();
1217 gSystem->Exec(TString::Format(
"rm -fr %s", jobDir.c_str()));
1218 const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1220 const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1221 Printf(
"NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1223 gROOT->SetBatch(batch);
1385void NGnTree::Play(
int timeout, std::string binning, std::vector<int> outputPointIds,
1386 std::vector<std::vector<int>> ranges, Option_t * option)
1391 TString opt = option;
1394 std::string annimationTempDir =
1395 TString::Format(
"%s/.ndmspc/animation/%d", gSystem->Getenv(
"HOME"), gSystem->GetPid()).Data();
1396 gSystem->Exec(TString::Format(
"mkdir -p %s", annimationTempDir.c_str()));
1398 if (binning.empty()) {
1399 binning =
fBinning->GetCurrentDefinitionName();
1404 NLogError(
"NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1405 NLogError(
"Available binning definitions:");
1406 for (
auto & name :
fBinning->GetDefinitionNames()) {
1407 if (name ==
fBinning->GetCurrentDefinitionName())
1408 NLogError(
" [*] %s", name.c_str());
1410 NLogError(
" [ ] %s", name.c_str());
1415 THnSparse * bdContent = (THnSparse *)binningDef->
GetContent()->Clone();
1417 std::string bdContentName = TString::Format(
"bdContent_%s", binning.c_str()).Data();
1421 Long64_t linBin = 0;
1422 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(
true )};
1423 std::vector<Long64_t> ids;
1425 while ((linBin = iter->Next()) >= 0) {
1427 ids.push_back(linBin);
1430 NLogWarning(
"NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1435 TCanvas * c1 =
nullptr;
1437 c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject(
"c1");
1438 if (c1 ==
nullptr) c1 =
new TCanvas(
"c1",
"NGnTree::Play", 800, 600);
1441 c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1442 gSystem->ProcessEvents();
1444 binningDef->
Print();
1448 for (
auto id : ids) {
1452 TList * l = (TList *)
fTreeStorage->GetBranch(
"_outputPoint")->GetObject();
1453 if (!l || l->IsEmpty()) {
1454 NLogWarning(
"NGnTree::Play: No 'outputPoint' for entry %lld !!!",
id);
1461 if (outputPointIds.empty()) {
1462 outputPointIds.resize(l->GetEntries());
1463 for (
int i = 0; i < l->GetEntries(); i++) {
1464 outputPointIds[i] = i;
1467 int n = outputPointIds.size();
1470 for (
int i = 0; i < n; i++) {
1474 TObject * obj = l->At(outputPointIds[i]);
1476 if (obj->InheritsFrom(TH1::Class())) {
1477 TH1 * h = (TH1 *)obj;
1478 h->SetDirectory(
nullptr);
1482 TH1 * hclone = (TH1 *)h->Clone();
1484 hclone->SetDirectory(
nullptr);
1490 if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1491 TH1 * h = (TH1 *)obj;
1493 NLogDebug(
"Mean value from histogram [%s]: %f", h->GetName(), v);
1496 bdContent->SetBinContent(
fBinning->GetPoint()->GetStorageCoords(), 1);
1498 TH1 * bdProj = (TH1 *)gROOT->FindObjectAny(
"bdProj");
1503 if (bdContent->GetNdimensions() == 1) {
1504 bdProj = bdContent->Projection(0,
"O");
1506 else if (bdContent->GetNdimensions() == 2) {
1507 bdProj = bdContent->Projection(0, 1,
"O");
1509 else if (bdContent->GetNdimensions() == 3) {
1510 bdProj = bdContent->Projection(0, 1, 2,
"O");
1513 NLogError(
"NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1516 bdProj->SetName(
"bdProj");
1517 bdProj->SetTitle(TString::Format(
"Binning '%s' content projection", binning.c_str()).Data());
1518 bdProj->SetMinimum(0);
1520 bdProj->Draw(
"colz");
1525 c1->ModifiedUpdate();
1526 c1->SaveAs(TString::Format(
"%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1528 gSystem->ProcessEvents();
1529 if (timeout > 0) gSystem->Sleep(timeout);
1533 NLogInfo(
"Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1535 TString::Format(
"magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1536 gSystem->Exec(TString::Format(
"rm -fr %s", annimationTempDir.c_str()));
1537 NLogInfo(
"Animation saved to ndmspc_play.gif");