2#include <NStorageTree.h>
7#include "NBinningPoint.h"
10#include "NGnThreadData.h"
21 const std::string & treename)
29 TH1::AddDirectory(kFALSE);
39 if (ngnt ==
nullptr) {
40 NLogError(
"NGnThreadData::Init: NGnTree is nullptr !!!");
49 NLogError(
"NGnThreadData::Init: Binning Source is nullptr !!!");
59 NLogError(
"NGnThreadData::InitStorage: Binning is not set !!!");
64 NLogError(
"NGnThreadData::InitStorage: Storage tree is not set !!!");
76 ts->
InitTree(filename.empty() ? fn : filename, treename);
81 NLogTrace(
"NGnThreadData::Init: Adding branch '%s' to thread %zu", kv.first.c_str(),
id);
88 ts->
AddBranch(kv.first,
nullptr, kv.second.GetObjectClassName());
91 if (!b) ts->
AddBranch(
"_outputPoint",
nullptr,
"TList");
94 NLogTrace(
"NGnThreadData::Init: Setting parameters branch for thread %zu",
id);
96 if (!b) ts->
AddBranch(
"_params",
nullptr,
"Ndmspc::NParameters");
100 fHnSparseBase->GetBinning()->GetPoint()->SetParameters(params);
114 for (
const auto & name :
fHnSparseBase->GetBinning()->GetDefinitionNames()) {
123 NLogTrace(
"NGnThreadData::Init: Setting input NGnTree for thread %zu '%s'",
id,
132 ExecuteBeginFunction();
142 TH1::AddDirectory(kFALSE);
152 static std::mutex sPadMutex;
153 std::lock_guard<std::mutex> lk(sPadMutex);
157 auto * c =
new TCanvas(cname, cname, 1, 1);
168 NLogError(
"NGnThreadData::Process: NGnTree is not set in NGnThreadData !!!");
173 NLogError(
"NGnThreadData::Process: Process function is not set in NGnThreadData !!!");
179 if (binningDef ==
nullptr) {
180 NLogError(
"NGnThreadData::Process: Binning definition is not set in NGnThreadData !!!");
189 Long64_t entry =
fBiningSource->GetDefinition()->GetId(coords[0]);
192 NLogDebug(
"NGnThreadData::Process: [%zu] Skipping entry=%lld, because it was already process !!!",
221 TList * outputPoint =
new TList();
230 if (!point->
GetCfg()[
"_ndmspc"].is_null()) {
238 if (outputPoint->GetEntries() > 0) {
240 "NGnThreadData::Process: [%zu] Entry '%lld' was accepted. %s",
GetAssignedIndex(), entry,
248 Int_t bytes = ts->
Fill(point,
nullptr,
false, {},
false);
254 fHnSparseBase->GetBinning()->GetDefinition()->GetIds().push_back(entry);
259 NLogTrace(
"NGnThreadData::Process: [%zu] Entry '%lld' Fill was done with 0 bytes. Skipping ...",
270 "NGnThreadData::Process: [%zu] Entry '%lld' No output %s. Skipping ...",
GetAssignedIndex(), entry,
281 NLogTrace(
"NGnThreadData::Process: [%zu] Cleaning output list with %d entries for entry '%lld' ...",
284 TObject * obj =
nullptr;
285 while ((obj = outputPoint->First())) {
286 outputPoint->Remove(obj);
293void NGnThreadData::SetCurrentDefinitionName(
const std::string & name)
303void NGnThreadData::SyncCurrentDefinitionIds(
const std::vector<Long64_t> & ids)
310 if (
auto * def =
fHnSparseBase->GetBinning()->GetDefinition()) {
311 def->GetIds().clear();
328 Long64_t nmerged = 0;
330 NLogTrace(
"NGnThreadData::Merge: BEGIN ------------------------------------------------");
331 NLogTrace(
"NGnThreadData::Merge: Merging thread data from %zu threads ...", list->GetEntries());
334 std::map<std::string, TList *> listOutputs;
337 TList * listTreeStorage =
new TList();
339 for (
auto obj : *list) {
340 if (obj->IsA() == NGnThreadData::Class()) {
342 NLogDebug(
"NGnThreadData::Merge: Merging thread %zu processed %lld ...", hnsttd->
GetAssignedIndex(),
346 NLogError(
"NGnThreadData::Merge: Storage tree is not set in NGnTree !!!");
352 NLogTrace(
"NGnThreadData::Merge: Found output list '%s' with %d objects", kv.first.c_str(),
353 kv.second ? kv.second->GetEntries() : 0);
354 if (kv.second && !kv.second->IsEmpty()) {
355 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
356 kv.second->GetEntries());
357 if (listOutputs.find(kv.first) == listOutputs.end()) {
359 listOutputs[kv.first] =
new TList();
364 listOutputs[kv.first]->Add(kv.second);
376 const std::string mergeFilename =
380 NLogError(
"NGnThreadData::Merge: Failed to open NGnTree from file '%s' !!!", mergeFilename.c_str());
412 for (
const auto & name :
fBiningSource->GetDefinitionNames()) {
414 if (
auto * mergedDef =
fHnSparseBase->GetBinning()->GetDefinition(name)) {
415 mergedDef->GetIds().clear();
417 for (
auto id : targetBinningDef->
GetIds()) {
421 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld",name.c_str(),
id, bin);
422 fHnSparseBase->GetBinning()->GetContent()->SetBinContent(bin,
id);
427 NLogDebug(
"NGnThreadData::Merge: Total entries to merge: %lld", nmerged);
429 for (
const auto & name :
fHnSparseBase->GetBinning()->GetDefinitionNames()) {
430 auto binningDef =
fHnSparseBase->GetBinning()->GetDefinition(name);
432 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
438 NLogTrace(
"NGnThreadData::Merge: Final IDs in definition '%s': %s", name.c_str(),
446 NLogTrace(
"NGnThreadData::Merge: Merging %d storage trees ...", listTreeStorage->GetEntries());
456 for (
auto & kv : listOutputs) {
457 if (kv.second && !kv.second->IsEmpty()) {
458 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
459 kv.second->GetEntries() + 1);
467 for (
const auto & name :
fHnSparseBase->GetBinning()->GetDefinitionNames()) {
470 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
476 for (
auto id : binningDef->
GetIds()) {
479 binningDef->
GetContent()->SetBinContent(bin,
id);
481 NLogTrace(
"NGnThreadData::Merge: -> Setting content bin %lld to id %lld", bin,
id);
499 NLogTrace(
"NGnThreadData::Merge: END ------------------------------------------------");
507void NGnThreadData::ExecuteBeginFunction()
514void NGnThreadData::ExecuteEndFunction()
525 NLogTrace(
"NGnThreadData::FlushDeferredDeletes: [%zu] Deleting %zu deferred objects ...",
Defines binning mapping and content for NDMSPC histograms.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
THnSparse * GetContent() const
Get the template content histogram.
Represents a single point in multi-dimensional binning.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
void SetCfg(json cfg)
Set configuration JSON object.
json & GetCfg()
Get reference to configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
void SetInput(NGnTree *input)
Set input NGnTree object pointer.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
NBinning object for managing multi-dimensional binning and axis definitions.
NBinningPoint * GetPoint()
Get the current binning point.
void SetCurrentDefinitionName(const std::string &name)
Set current definition name.
Thread-local data object for NDMSPC processing.
NGnEndFuncPtr fEndFunc
Function pointer to the end function.
NGnBeginFuncPtr fBeginFunc
Function pointer to the begin function.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
NGnThreadData()
Constructor.
NBinning * fBiningSource
Pointer to the source binning (from the original NGnTree)
std::unordered_set< Long64_t > fProcessedBinIds
Set of already-processed global bin IDs (duplicate guard)
virtual void Process(const std::vector< int > &coords)
Process coordinates (virtual).
const std::string & GetResultsFilename() const
Get the results filename for TCP mode.
Long64_t GetNProcessed() const
Get number of processed entries.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
virtual ~NGnThreadData()
Destructor.
json fCfg
Configuration object.
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
std::vector< TObject * > fDeferredDeletes
Objects deferred for single-threaded deletion.
NGnTree * fHnSparseBase
Pointer to the base class.
Long64_t fNProcessed
Number of processed entries.
bool fIsPureCopy
Flag indicating pure copy mode.
NGnProcessFuncPtr fProcessFunc
Function pointer to the processing function.
NDMSPC tree object for managing multi-dimensional data storage and processing.
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
bool IsPureCopy() const
Checks if the tree is a pure copy.
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
NParameters * GetParameters() const
Returns the parameters associated with this tree.
NBinning * GetBinning() const
Get pointer to binning object.
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Monitors and records resource usage (CPU, memory, wall time) for processes or threads.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
virtual void Clear(Option_t *option="")
Clear storage tree data.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::vector< std::string > GetBrancheNames(bool onlyEnabled=false) const
Branches handling.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
Int_t Fill(NBinningPoint *point, NStorageTree *hnstIn=nullptr, bool ignoreFilledCheck=false, std::vector< std::vector< int > > ranges={}, bool useProjection=false)
Fill tree with NBinningPoint and optional input tree.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
void SetAssignedIndex(size_t assignedIndex)
Set the assigned index for the thread.
NResourceMonitor * fResourceMonitor
Pointer to resource monitor.
NThreadData()
Default constructor.
void SetThreadId(std::thread::id threadId)
Set the thread's unique identifier.
NDMSPC tree branch object for managing ROOT TBranch and associated data.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
static void SafeDeleteObjects(std::vector< TObject * > &objects)
Safely delete a vector of ROOT objects, bypassing GarbageCollect.
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
Global callback function for libwebsockets client events.
void(*)(Ndmspc::NBinningPoint *, TList *, TList *, int) NGnProcessFuncPtr
Function pointer type for processing binning points and lists.
void(*)(Ndmspc::NBinningPoint *, int) NGnEndFuncPtr
Function pointer type for termination functions used in NGnTree.
void(*)(Ndmspc::NBinningPoint *, int) NGnBeginFuncPtr
Function pointer type for the beginning of a tree operation.