254NGnTree::NGnTree(THnSparse * hns, std::string parameterAxis,
const std::string & outFileName, json cfg)
255 : TObject(),
fInput(nullptr)
261 std::map<std::string, std::vector<std::vector<int>>> b;
262 TObjArray * axes =
new TObjArray();
263 int parameterAxisIdx = -1;
264 std::vector<std::string> labels;
265 for (
int i = 0; i < hns->GetNdimensions(); i++) {
266 TAxis * axisIn = (TAxis *)hns->GetAxis(i);
267 TAxis * axis = (TAxis *)axisIn->Clone();
270 if (parameterAxis.compare(axis->GetName()) == 0) {
271 parameterAxisIdx = i;
272 TAxis * axis = hns->GetAxis(parameterAxisIdx);
273 for (
int bin = 1; bin <= axis->GetNbins(); bin++) {
275 labels.push_back(axis->GetBinLabel(bin));
281 if (axisIn->IsAlphanumeric()) {
282 NLogTrace(
"Setting axis '%s' labels from input THnSparse", axis->GetName());
283 for (
int bin = 1; bin <= axisIn->GetNbins(); bin++) {
284 std::string label = axisIn->GetBinLabel(bin);
285 if (!labels.empty()) axis->SetBinLabel(bin, axisIn->GetBinLabel(bin));
290 b[axis->GetName()] = {{1}};
294 cfg[
"_parameterAxis"] = parameterAxisIdx;
295 cfg[
"_labels"] = labels;
298 NLogDebug(
"Importing THnSparse as NGnTree with parameter axis '%s' (index %d) ...", parameterAxis.c_str(),
301 NLogDebug(
"Created NGnTree for THnSparse import ...");
302 if (ngnt->IsZombie()) {
303 NLogError(
"NGnTree::Import: Failed to create NGnTree !!!");
309 const char * tmpDirStr = gSystem->Getenv(
"NDMSPC_TMP_DIR");
313 if (!tmpDirStr || tmpDir.empty()) {
316 std::string tmpFilename = tmpDir +
"/ngnt_imported_input" + std::to_string(gSystem->GetPid()) +
".root";
318 if (ngntIn->IsZombie()) {
319 NLogError(
"NGnTree::Import: Failed to create NGnTree for input !!!");
325 ngntIn->
GetOutput(
"default")->Add(hns->Clone(
"test"));
336 ngnt->
InitParameters(cfg[
"_labels"].get<std::vector<std::string>>());
341 TH1::AddDirectory(kFALSE);
343 json cfg = point->
GetCfg();
347 NLogError(
"NGnTree::Import: Input NGnTree is nullptr !!!");
352 THnSparse * hns = (THnSparse *)ngntIn->
GetOutput(
"default")->At(0);
353 if (hns ==
nullptr) {
354 NLogError(
"NGnTree::Import: THnSparse 'hns' not found in storage tree !!!");
358 int axisIdx = cfg[
"_parameterAxis"].get<
int>();
359 std::vector<std::vector<int>> ranges;
362 for (
int i = 0; i < hns->GetNdimensions(); i++) {
363 if (i == axisIdx)
continue;
365 ranges.push_back({i, coord, coord});
370 TH1 * h = hns->Projection(axisIdx,
"O");
372 NLogError(
"NGnTree::Import: Projection of THnSparse failed !!!");
375 if (h->GetEntries() > 0) {
378 for (
int bin = 1; bin <= h->GetNbinsX(); bin++) {
379 params->
SetParameter(bin, h->GetBinContent(bin), h->GetBinError(bin));
385 std::string filename = cfg[
"filename"].get<std::string>();
387 if (!f || filename.compare(f->GetName()) != 0) {
389 NLogDebug(
"NGnTree::Import: Closing previously opened file '%s' ...", f->GetName());
392 NLogDebug(
"NGnTree::Import: Opening file '%s' ...", filename.c_str());
393 f = TFile::Open(filename.c_str());
394 if (!f || f->IsZombie()) {
395 NLogError(
"NGnTree::Import: Cannot open file '%s' !!!", filename.c_str());
401 std::string axisObjectDefaultFormat =
402 cfg[
"axisObjectDefaultFormat"].is_string() ? cfg[
"axisObjectDefaultFormat"].get<std::string>() :
"%.2f_%.2f";
403 std::string axisDefaultSeparator =
404 cfg[
"axisDefaultSeparator"].is_string() ? cfg[
"axisDefaultSeparator"].get<std::string>() :
"/";
406 if (cfg.contains(
"dryrun") && cfg[
"dryrun"].is_boolean()) {
407 dryrun = cfg[
"dryrun"].get<
bool>();
411 NLogInfo(
"NGnTree::Import (dryrun): '%s' ...", point->
GetString().c_str());
415 for (
auto & [objName, objCfg] : cfg[
"objects"].items()) {
419 NLogInfo(
"NGnTree::Import (dryrun): would retrieve object '%s'", objPath.c_str());
424 NLogInfo(
"NGnTree::Import: Retrieving object '%s' from file '%s' ...", objPath.c_str(),
425 cfg[
"filename"].get<std::string>().c_str());
427 TObject * obj = f->Get(objPath.c_str());
430 NLogWarning(
"NGnTree::Import: Cannot get object '%s' from file '%s' !!!", objPath.c_str(),
431 cfg[
"filename"].get<std::string>().c_str());
436 if (obj->InheritsFrom(TCanvas::Class())) {
437 TCanvas * cObj = (TCanvas *)obj;
438 cObj->SetName(objName.c_str());
440 outputPoint->Add(obj->Clone(objName.c_str()));
447 ngnt->
Process(processFunc, cfg);
450 gSystem->Exec(TString::Format(
"rm -f %s", tmpFilename.c_str()));
544 NLogInfo(
"NGnTree::Process: Starting processing with %zu definitions ...", defNames.size());
545 bool batch = gROOT->IsBatch();
546 gROOT->SetBatch(kTRUE);
547 TH1::AddDirectory(kFALSE);
551 const std::string storageFileName =
fTreeStorage->GetFileName();
552 if (!storageFileName.empty()) {
553 storagePostfix = gSystem->BaseName(storageFileName.c_str());
556 if (storagePostfix.empty()) {
557 storagePostfix =
"ndmspc.root";
561 if (
const char * workerEndpoint = gSystem->Getenv(
"NDMSPC_WORKER_ENDPOINT")) {
562 size_t workerIndex = 0;
563 if (
const char * envIdx = gSystem->Getenv(
"NDMSPC_WORKER_INDEX")) {
564 try { workerIndex =
static_cast<size_t>(std::stoul(envIdx)); }
catch (...) {}
566 NLogInfo(
"NGnTree::Process: Worker mode — connecting to %s as worker %zu", workerEndpoint, workerIndex);
573 void * ctx = zmq_ctx_new();
574 void * dealer = zmq_socket(ctx, ZMQ_DEALER);
575 const std::string identity = Ndmspc::NDimensionalIpcRunner::BuildWorkerIdentity(workerIndex);
576 zmq_setsockopt(dealer, ZMQ_IDENTITY, identity.data(), identity.size());
577 int timeoutMs = 1000;
578 zmq_setsockopt(dealer, ZMQ_RCVTIMEO, &timeoutMs,
sizeof(timeoutMs));
579 zmq_connect(dealer, workerEndpoint);
580 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"READY"});
583 const auto initDeadline = std::chrono::steady_clock::now() + std::chrono::seconds(30);
586 std::vector<std::string> frames;
587 if (!Ndmspc::NDimensionalIpcRunner::ReceiveFrames(dealer, frames)) {
588 if (errno == EAGAIN || errno == EWOULDBLOCK) {
589 if (std::chrono::steady_clock::now() > initDeadline)
break;
595 if (frames.size() >= 1 && frames[0] ==
"STOP") {
596 NLogPrint(
"NGnTree::Process: Worker received STOP before INIT — session already finished, exiting.");
599 gROOT->SetBatch(batch);
602 if (frames.size() >= 5 && frames[0] ==
"INIT") {
603 workerIndex =
static_cast<size_t>(std::stoul(frames[1]));
604 const std::string & sessionId = frames[2];
605 const std::string & initResultsDir = frames[3];
606 const std::string & initTreeName = frames[4];
609 if (frames.size() >= 7) {
610 if (!frames[5].empty())
611 gSystem->Setenv(
"NDMSPC_TMP_DIR", frames[5].c_str());
612 if (!frames[6].empty())
613 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", frames[6].c_str());
616 if (!gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR") || gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR")[0] ==
'\0') {
617 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
618 if (tmpDirEnv && tmpDirEnv[0] !=
'\0')
619 gSystem->Setenv(
"NDMSPC_TMP_RESULTS_DIR", tmpDirEnv);
623 const char * localTmpEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
624 const std::string localBase = localTmpEnv ? localTmpEnv :
"/tmp";
625 const std::string localFile = localBase +
"/.ndmspc/tmp/" + sessionId +
"/" +
626 std::to_string(workerIndex) +
"/" + storagePostfix;
629 const std::string resultsFile = initResultsDir +
"/" + std::to_string(workerIndex) +
"/" +
632 bool rc = workerData.
Init(workerIndex, func, beginFunc, endFunc,
this, binningIn,
fInput, localFile, initTreeName);
634 NLogError(
"NGnTree::Process: Worker failed to initialize NGnThreadData");
640 if (resultsFile != localFile) {
644 Ndmspc::NDimensionalIpcRunner::SendFrames(dealer, {
"ACK"});
649 NLogError(
"NGnTree::Process: Worker did not receive INIT from supervisor");
655 Ndmspc::NDimensionalIpcRunner::TaskLoop(dealer, workerIndex, &workerData);
658 gROOT->SetBatch(batch);
665 int nThreads = ROOT::GetThreadPoolSize();
666 if (nThreads < 1) nThreads = 1;
668 std::string executionMode =
"thread";
669 const char * envMode = gSystem->Getenv(
"NDMSPC_EXECUTION_MODE");
670 const bool modeExplicit = (envMode && envMode[0] !=
'\0');
672 executionMode = envMode;
675 std::string normalizedMode = executionMode;
676 std::transform(normalizedMode.begin(), normalizedMode.end(), normalizedMode.begin(),
677 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
678 if (normalizedMode ==
"process") normalizedMode =
"ipc";
680 bool useProcessIpc = (normalizedMode ==
"ipc" || normalizedMode ==
"tcp");
681 bool useTcp = (normalizedMode ==
"tcp");
682 size_t nProcesses =
static_cast<size_t>(nThreads);
683 bool ndmspcNProcExplicit =
false;
685 if (
const char * envNdmspcNProc = gSystem->Getenv(
"NDMSPC_MAX_PROCESSES")) {
686 ndmspcNProcExplicit =
true;
688 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNdmspcNProc)));
691 NLogWarning(
"NGnTree::Process: Invalid NDMSPC_MAX_PROCESSES='%s', using default=%zu", envNdmspcNProc,
695 else if (
const char * envNProc = gSystem->Getenv(
"ROOT_MAX_THREADS")) {
698 nProcesses = std::max<size_t>(1,
static_cast<size_t>(std::stoll(envNProc)));
701 NLogWarning(
"NGnTree::Process: Invalid ROOT_MAX_THREADS='%s', using default=%zu", envNProc, nProcesses);
708 if (normalizedMode ==
"thread") {
709 useProcessIpc =
false;
712 else if (normalizedMode ==
"tcp") {
713 useProcessIpc =
true;
716 else if (normalizedMode ==
"ipc") {
717 useProcessIpc =
true;
721 NLogWarning(
"NGnTree::Process: Unknown NDMSPC_EXECUTION_MODE='%s', falling back to auto mode selection.",
722 executionMode.c_str());
723 useProcessIpc = (nProcesses > 1);
727 else if (nProcesses > 1) {
728 useProcessIpc =
true;
730 executionMode =
"ipc";
731 normalizedMode =
"ipc";
734 if (ndmspcNProcExplicit && normalizedMode ==
"thread" && nProcesses > 1) {
735 NLogWarning(
"NGnTree::Process: NDMSPC_MAX_PROCESSES=%zu is set, but NDMSPC_EXECUTION_MODE=thread disables IPC.",
739 const size_t workerObjectCount = useProcessIpc ? std::max(
static_cast<size_t>(nThreads), nProcesses)
740 :
static_cast<size_t>(nThreads);
741 std::vector<Ndmspc::NGnThreadData> threadDataVector(workerObjectCount);
743 NLogInfo(
"NGnTree::Process: executionMode='%s', useProcessIpc=%d, ROOT threads=%d, ipcProcesses=%zu, workerObjects=%zu",
744 executionMode.c_str(), useProcessIpc ? 1 : 0, nThreads, nProcesses, workerObjectCount);
746 const char * tmpDirEnv = gSystem->Getenv(
"NDMSPC_TMP_DIR");
748 if (tmpDirEnv && tmpDirEnv[0] !=
'\0') {
753 if (!(tmpDirPrefix.BeginsWith(
"root://") || tmpDirPrefix.BeginsWith(
"http://") ||
754 tmpDirPrefix.BeginsWith(
"https://"))) {
755 tmpDir = tmpDirPrefix.Data();
757 if (tmpDir.empty()) tmpDir =
"/tmp";
760 std::string jobDir = tmpDir +
"/.ndmspc/tmp/" + std::to_string(gSystem->GetPid());
764 const char * resultsDirEnv = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
765 const bool sameDir = !resultsDirEnv || std::string(resultsDirEnv) == tmpDir;
766 std::string resultsDir = sameDir ? jobDir
767 : (std::string(resultsDirEnv) +
"/" +
768 std::to_string(gSystem->GetPid()));
770 std::string filePrefix = jobDir;
771 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
772 std::string filename = filePrefix +
"/" + std::to_string(i) +
"/" + storagePostfix;
773 bool rc = threadDataVector[i].Init(i, func, beginFunc, endFunc,
this, binningIn,
fInput, filename,
776 NLogError(
"Failed to initialize thread data %zu, exiting ...", i);
779 threadDataVector[i].SetCfg(cfg);
784 std::string resultsFile = resultsDir +
"/" + std::to_string(i) +
"/" + storagePostfix;
785 threadDataVector[i].SetResultsFilename(resultsFile);
788 size_t processedEntries = 0;
789 size_t totalEntries = 0;
790 auto start_par = std::chrono::high_resolution_clock::now();
791 auto start_par_job = std::chrono::high_resolution_clock::now();
796 thread_obj.Process(coords);
799 size_t nRunning = (totalEntries - processedEntries >= threadDataVector.size()) ? threadDataVector.size()
800 : totalEntries - processedEntries;
801 NUtils::ProgressBar(processedEntries, totalEntries, start_par, TString::Format(
"R%4zu", nRunning).Data());
808 std::vector<Ndmspc::NThreadData *> processWorkers;
809 std::unique_ptr<Ndmspc::NDimensionalExecutor> ipcExecutor;
811 processWorkers.reserve(threadDataVector.size());
812 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
813 processWorkers.push_back(&threadDataVector[i]);
815 ipcExecutor = std::make_unique<Ndmspc::NDimensionalExecutor>(std::vector<int>{0}, std::vector<int>{0});
817 const char * tcpPort = gSystem->Getenv(
"NDMSPC_TCP_PORT");
818 std::string tcpEndpoint = std::string(
"tcp://0.0.0.0:") + (tcpPort ? tcpPort :
"5555");
819 const char * resultsDirBase = gSystem->Getenv(
"NDMSPC_TMP_RESULTS_DIR");
820 const char * macroParams = gSystem->Getenv(
"NDMSPC_MACRO_PARAMS");
824 if (workerMacro.empty()) {
825 if (
const char * envMacro = gSystem->Getenv(
"NDMSPC_MACRO")) workerMacro = envMacro;
827 ipcExecutor->StartProcessIpc(processWorkers, nProcesses, tcpEndpoint, resultsDir,
828 fTreeStorage->GetTree()->GetName(), workerMacro, tmpDir,
829 resultsDirBase ? resultsDirBase :
"",
830 macroParams ? macroParams :
"");
832 ipcExecutor->StartProcessIpc(processWorkers, nProcesses);
836 std::map<std::string, std::vector<Long64_t>>
837 defIdMapProcessedRemoved;
850 for (
auto & name : defNames) {
853 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
857 if (binningDef->GetIds().size() == 0) {
858 NLogWarning(
"NGnTree::Process: Binning definition '%s' has no entries, skipping ...", name.c_str());
862 const std::vector<Long64_t> originalDefinitionIds = binningDef->
GetIds();
864 std::vector<int> mins, maxs;
866 maxs.push_back(binningDef->GetIds().size() - 1);
867 NLogDebug(
"NGnTree::Process: Processing with binning definition '%s' with %zu entries", name.c_str(),
868 binningDef->GetIds().size());
871 NLogInfo(
"NGnTree::Process: Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
874 Printf(
"Processing binning definition '%s' with %d tasks ...", name.c_str(), maxs[0] + 1);
876 start_par = std::chrono::high_resolution_clock::now();
877 processedEntries = 0;
878 totalEntries = maxs[0] + 1;
879 const size_t activeWorkers = useProcessIpc ? std::max<size_t>(1, std::min(nProcesses, processWorkers.size()))
880 : threadDataVector.size();
883 TString::Format(
"R%4zu", activeWorkers).Data());
886 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
888 threadDataVector[i].GetHnSparseBase()->GetBinning()->SetCurrentDefinitionName(name);
893 if (!useProcessIpc) {
897 Bool_t prevMustClean = gROOT->MustClean();
898 gROOT->SetMustClean(kFALSE);
905 Bool_t prevBatch = gROOT->IsBatch();
906 gROOT->SetBatch(kTRUE);
915 gROOT->SetMustClean(prevMustClean);
916 gROOT->SetBatch(prevBatch);
919 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
920 threadDataVector[i].FlushDeferredDeletes();
923 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
924 threadDataVector[i].ExecuteEndFunction();
928 ipcExecutor->SetBounds(mins, maxs);
929 size_t acked = ipcExecutor->ExecuteCurrentBoundsProcessIpc(
930 name, &originalDefinitionIds, [&, activeWorkers](
size_t ackCount,
size_t activeWorkersNow) {
931 processedEntries = ackCount;
933 size_t nRunning = std::min(activeWorkersNow, activeWorkers);
935 TString::Format(
"R%4zu", nRunning).Data());
938 processedEntries = acked;
942 const size_t processesToUse = std::max<size_t>(1, std::min(nProcesses, processWorkers.size()));
943 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
944 auto * workerDef = threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name);
946 workerDef->GetIds().clear();
950 for (
size_t taskIndex = 0; taskIndex < originalDefinitionIds.size(); ++taskIndex) {
951 const size_t workerIndex = taskIndex % processesToUse;
952 threadDataVector[workerIndex].SetNProcessed(threadDataVector[workerIndex].GetNProcessed() + 1);
954 auto * workerDef = threadDataVector[workerIndex].GetHnSparseBase()->GetBinning()->GetDefinition(name);
956 workerDef->GetIds().push_back(originalDefinitionIds[taskIndex]);
966 Printf(
"Finished processing binning definition '%s'. Post-processing results ...", name.c_str());
968 NLogDebug(
"NGnTree::Process: [BEGIN] ------------------------------------------------");
971 for (
size_t i = 0; i < threadDataVector.size(); ++i) {
972 NLogDebug(
"NGnTree::Process: -> Thread %zu processed %lld entries", i, threadDataVector[i].GetNProcessed());
976 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().begin(),
977 threadDataVector[i].GetHnSparseBase()->GetBinning()->GetDefinition(name)->GetIds().end());
982 for (
size_t i = 0; i < defNames.size(); i++) {
984 std::string other_name = defNames[i];
990 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", other_name.c_str());
994 for (
auto it = otherDef->GetIds().begin(); it != otherDef->GetIds().end();) {
995 NLogTrace(
"NGnTree::Process: Checking entry %lld from definition '%s' against sumIds=%d", *it,
996 other_name.c_str(), sumIds);
998 NLogTrace(
"NGnTree::Process: Removing entry %lld from definition '%s'", *it, other_name.c_str());
999 defIdMapProcessedRemoved[other_name].push_back(*it);
1000 it = otherDef->GetIds().erase(it);
1012 NLogDebug(
"NGnTree::Process: [END] ------------------------------------------------");
1015 catch (
const std::exception & ex) {
1017 ipcExecutor->FinishProcessIpc(
true);
1018 ipcExecutor.reset();
1021 TString what(ex.what());
1022 if (what.Contains(
"Interrupted by user")) {
1024 gROOT->SetInterrupt(kFALSE);
1026 NLogWarning(
"NGnTree::Process: Interrupted by user, stopping processing.");
1029 NLogError(
"NGnTree::Process: Processing failed: %s", ex.what());
1036 auto end_par = std::chrono::high_resolution_clock::now();
1037 std::chrono::duration<double, std::milli> par_duration = end_par - start_par_job;
1040 ipcExecutor->FinishProcessIpc();
1045 const std::set<size_t> registeredWorkers =
1046 (ipcExecutor && useTcp) ? ipcExecutor->GetRegisteredWorkerIndices() : std::set<size_t>{};
1047 const bool filterByRegistered = !registeredWorkers.empty();
1050 Printf(
"NGnTree::Process: Execution completed and it took %s .",
1054 NLogInfo(
"NGnTree::Process: Execution completed and it took %s .",
1058 NLogInfo(
"NGnTree::Process: Post processing %zu results ...", threadDataVector.size());
1059 for (
auto & data : threadDataVector) {
1060 if (useProcessIpc) {
1061 NLogTrace(
"NGnTree::Process: Releasing parent handle for worker %zu file without writing",
1062 data.GetAssignedIndex());
1066 NLogTrace(
"NGnTree::Process: Closing file from thread %zu with write", data.GetAssignedIndex());
1067 data.GetHnSparseBase()->GetStorageTree()->Close(
true);
1071 NLogDebug(
"NGnTree::Process: Merging %zu results ...", threadDataVector.size());
1073 Printf(
"NGnTree::Process: [phase] merge start (%zu workers)", threadDataVector.size());
1075 const auto mergeStart = std::chrono::high_resolution_clock::now();
1076 TList * mergeList =
new TList();
1078 outputData->
Init(0, func,
nullptr,
nullptr,
this, binningIn);
1082 for (
auto & data : threadDataVector) {
1083 if (filterByRegistered && registeredWorkers.find(data.GetAssignedIndex()) == registeredWorkers.end()) {
1084 NLogInfo(
"NGnTree::Process: Skipping worker %zu — never connected", data.GetAssignedIndex());
1087 NLogTrace(
"NGnTree::Process: Adding thread data %zu to merge list ...", data.GetAssignedIndex());
1088 mergeList->Add(&data);
1091 Long64_t nmerged = outputData->
Merge(mergeList);
1092 const auto mergeEnd = std::chrono::high_resolution_clock::now();
1094 const auto mergeSec = std::chrono::duration_cast<std::chrono::duration<double>>(mergeEnd - mergeStart).count();
1095 Printf(
"NGnTree::Process: [phase] merge done (%lld outputs, %.2f s)", nmerged, mergeSec);
1098 NLogError(
"NGnTree::Process: Failed to merge thread data, exiting ...");
1102 NLogInfo(
"NGnTree::Process: Merged %lld outputs successfully", nmerged);
1114 std::set<Long64_t> mergedContentIds;
1115 std::vector<std::pair<Long64_t, std::vector<int>>> mergedContentCoords;
1118 for (
size_t i = 0; i < defNames.size(); i++) {
1119 std::string name = defNames[i];
1121 auto def = mergedBinning->GetDefinition(name);
1123 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1126 for (
auto & [other_name, removedIds] : defIdMapProcessedRemoved) {
1127 if (other_name.compare(name) != 0) {
1130 for (
auto &
id : removedIds) {
1131 if (std::find(def->GetIds().begin(), def->GetIds().end(),
id) == def->GetIds().end()) {
1132 NLogTrace(
"NGnTree::Process: Adding missing entry %lld to definition '%s'",
id, name.c_str());
1133 def->GetIds().push_back(
id);
1137 sort(def->GetIds().begin(), def->GetIds().end());
1141 def->GetContent()->Reset();
1142 for (
auto id : def->GetIds()) {
1144 def->GetBinning()->GetContent()->GetBinContent(
id, point.
GetCoords());
1147 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld", name.c_str(),
id, bin);
1148 def->GetContent()->SetBinContent(bin,
id);
1150 if (mergedContentIds.insert(
id).second) {
1151 mergedContentCoords.emplace_back(
1161 for (
const auto & entry : mergedContentCoords) {
1162 Long64_t bin = mergedBinning->GetContent()->GetBin(entry.second.data());
1163 mergedBinning->GetContent()->SetBinContent(bin, entry.first);
1167 NLogDebug(
"NGnTree::Process: Final binning definitions after processing:");
1168 for (
auto & name : defNames) {
1172 NLogError(
"NGnTree::Process: Binning definition '%s' not found in NGnTree !!!", name.c_str());
1175 binningDef->
Print();
1184 NLogInfo(
"NGnTree::Process: Processing completed successfully. Output was stored in '%s'.",
1188 Printf(
"Processing completed successfully. Output was stored in '%s'.",
fTreeStorage->GetFileName().c_str());
1193 Printf(
"NGnTree::Process: [phase] final close start (%s)",
1196 const auto closeStart = std::chrono::high_resolution_clock::now();
1198 const auto closeEnd = std::chrono::high_resolution_clock::now();
1200 const auto closeSec = std::chrono::duration_cast<std::chrono::duration<double>>(closeEnd - closeStart).count();
1201 Printf(
"NGnTree::Process: [phase] final close done (%.2f s)", closeSec);
1205 Printf(
"NGnTree::Process: [phase] cleanup start (%s)", jobDir.c_str());
1207 const auto cleanupStart = std::chrono::high_resolution_clock::now();
1208 gSystem->Exec(TString::Format(
"rm -fr %s", jobDir.c_str()));
1209 const auto cleanupEnd = std::chrono::high_resolution_clock::now();
1211 const auto cleanupSec = std::chrono::duration_cast<std::chrono::duration<double>>(cleanupEnd - cleanupStart).count();
1212 Printf(
"NGnTree::Process: [phase] cleanup done (%.2f s)", cleanupSec);
1214 gROOT->SetBatch(batch);
1376void NGnTree::Play(
int timeout, std::string binning, std::vector<int> outputPointIds,
1377 std::vector<std::vector<int>> ranges, Option_t * option)
1382 TString opt = option;
1385 std::string annimationTempDir =
1386 TString::Format(
"%s/.ndmspc/animation/%d", gSystem->Getenv(
"HOME"), gSystem->GetPid()).Data();
1387 gSystem->Exec(TString::Format(
"mkdir -p %s", annimationTempDir.c_str()));
1389 if (binning.empty()) {
1390 binning =
fBinning->GetCurrentDefinitionName();
1395 NLogError(
"NGnTree::Play: Binning definition '%s' not found in NGnTree !!!", binning.c_str());
1396 NLogError(
"Available binning definitions:");
1397 for (
auto & name :
fBinning->GetDefinitionNames()) {
1398 if (name ==
fBinning->GetCurrentDefinitionName())
1399 NLogError(
" [*] %s", name.c_str());
1401 NLogError(
" [ ] %s", name.c_str());
1406 THnSparse * bdContent = (THnSparse *)binningDef->
GetContent()->Clone();
1408 std::string bdContentName = TString::Format(
"bdContent_%s", binning.c_str()).Data();
1412 Long64_t linBin = 0;
1413 std::unique_ptr<ROOT::Internal::THnBaseBinIter> iter{bdContent->CreateIter(
true )};
1414 std::vector<Long64_t> ids;
1416 while ((linBin = iter->Next()) >= 0) {
1418 ids.push_back(linBin);
1421 NLogWarning(
"NGnTree::Play: No entries found in binning definition '%s' !!!", binning.c_str());
1426 TCanvas * c1 =
nullptr;
1428 c1 = (TCanvas *)gROOT->GetListOfCanvases()->FindObject(
"c1");
1429 if (c1 ==
nullptr) c1 =
new TCanvas(
"c1",
"NGnTree::Play", 800, 600);
1432 c1->DivideSquare(outputPointIds.size() > 0 ? outputPointIds.size() + 1 : 1);
1433 gSystem->ProcessEvents();
1435 binningDef->
Print();
1439 for (
auto id : ids) {
1443 TList * l = (TList *)
fTreeStorage->GetBranch(
"_outputPoint")->GetObject();
1444 if (!l || l->IsEmpty()) {
1445 NLogWarning(
"NGnTree::Play: No 'outputPoint' for entry %lld !!!",
id);
1452 if (outputPointIds.empty()) {
1453 outputPointIds.resize(l->GetEntries());
1454 for (
int i = 0; i < l->GetEntries(); i++) {
1455 outputPointIds[i] = i;
1458 int n = outputPointIds.size();
1461 for (
int i = 0; i < n; i++) {
1465 TObject * obj = l->At(outputPointIds[i]);
1467 if (obj->InheritsFrom(TH1::Class())) {
1468 TH1 * h = (TH1 *)obj;
1469 h->SetDirectory(
nullptr);
1473 TH1 * hclone = (TH1 *)h->Clone();
1475 hclone->SetDirectory(
nullptr);
1481 if (obj->InheritsFrom(TH1::Class()) && i == 0) {
1482 TH1 * h = (TH1 *)obj;
1484 NLogDebug(
"Mean value from histogram [%s]: %f", h->GetName(), v);
1487 bdContent->SetBinContent(
fBinning->GetPoint()->GetStorageCoords(), 1);
1489 TH1 * bdProj = (TH1 *)gROOT->FindObjectAny(
"bdProj");
1494 if (bdContent->GetNdimensions() == 1) {
1495 bdProj = bdContent->Projection(0,
"O");
1497 else if (bdContent->GetNdimensions() == 2) {
1498 bdProj = bdContent->Projection(0, 1,
"O");
1500 else if (bdContent->GetNdimensions() == 3) {
1501 bdProj = bdContent->Projection(0, 1, 2,
"O");
1504 NLogError(
"NGnTree::Play: Cannot project THnSparse with %d dimensions", bdContent->GetNdimensions());
1507 bdProj->SetName(
"bdProj");
1508 bdProj->SetTitle(TString::Format(
"Binning '%s' content projection", binning.c_str()).Data());
1509 bdProj->SetMinimum(0);
1511 bdProj->Draw(
"colz");
1516 c1->ModifiedUpdate();
1517 c1->SaveAs(TString::Format(
"%s/ndmspc_play_%06lld.png", annimationTempDir.c_str(), bdContent->GetNbins()).Data());
1519 gSystem->ProcessEvents();
1520 if (timeout > 0) gSystem->Sleep(timeout);
1524 NLogInfo(
"Creating animation gif from %s/ndmspc_play_*.png ...", annimationTempDir.c_str());
1526 TString::Format(
"magick -delay 20 -loop 0 %s/ndmspc_play_*.png ndmspc_play.gif", annimationTempDir.c_str()));
1527 gSystem->Exec(TString::Format(
"rm -fr %s", annimationTempDir.c_str()));
1528 NLogInfo(
"Animation saved to ndmspc_play.gif");