2 #include <NStorageTree.h>
7 #include "NBinningPoint.h"
10 #include "NGnThreadData.h"
19 bool NGnThreadData::Init(
size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr funcBegin, NGnEndFuncPtr endFunc,
21 const std::string & treename)
29 TH1::AddDirectory(kFALSE);
39 if (ngnt ==
nullptr) {
40 NLogError(
"NGnThreadData::Init: NGnTree is nullptr !!!");
49 NLogError(
"NGnThreadData::Init: Binning Source is nullptr !!!");
59 NLogError(
"NGnThreadData::InitStorage: Binning is not set !!!");
64 NLogError(
"NGnThreadData::InitStorage: Storage tree is not set !!!");
76 ts->
InitTree(filename.empty() ? fn : filename, treename);
81 NLogTrace(
"NGnThreadData::Init: Adding branch '%s' to thread %zu", kv.first.c_str(),
id);
88 ts->
AddBranch(kv.first,
nullptr, kv.second.GetObjectClassName());
91 if (!b) ts->
AddBranch(
"_outputPoint",
nullptr,
"TList");
94 NLogTrace(
"NGnThreadData::Init: Setting parameters branch for thread %zu",
id);
96 if (!b) ts->
AddBranch(
"_params",
nullptr,
"Ndmspc::NParameters");
123 NLogTrace(
"NGnThreadData::Init: Setting input NGnTree for thread %zu '%s'",
id,
132 ExecuteBeginFunction();
142 TH1::AddDirectory(kFALSE);
152 static std::mutex sPadMutex;
153 std::lock_guard<std::mutex> lk(sPadMutex);
157 auto * c =
new TCanvas(cname, cname, 1, 1);
168 NLogError(
"NGnThreadData::Process: NGnTree is not set in NGnThreadData !!!");
173 NLogError(
"NGnThreadData::Process: Process function is not set in NGnThreadData !!!");
179 if (binningDef ==
nullptr) {
180 NLogError(
"NGnThreadData::Process: Binning definition is not set in NGnThreadData !!!");
191 if (entry < fHnSparseBase->GetBinning()->GetContent()->GetNbins()) {
195 NLogDebug(
"NGnThreadData::Process: [%zu] Skipping entry=%lld, because it was already process !!!",
223 TList * outputPoint =
new TList();
232 if (!point->
GetCfg()[
"_ndmspc"].is_null()) {
240 if (outputPoint->GetEntries() > 0) {
242 "NGnThreadData::Process: [%zu] Entry '%lld' was accepted. %s",
GetAssignedIndex(), entry,
250 Int_t bytes = ts->
Fill(point,
nullptr,
false, {},
false);
261 NLogTrace(
"NGnThreadData::Process: [%zu] Entry '%lld' Fill was done with 0 bytes. Skipping ...",
272 "NGnThreadData::Process: [%zu] Entry '%lld' No output %s. Skipping ...",
GetAssignedIndex(), entry,
283 NLogTrace(
"NGnThreadData::Process: [%zu] Cleaning output list with %d entries for entry '%lld' ...",
286 TObject * obj =
nullptr;
287 while ((obj = outputPoint->First())) {
288 outputPoint->Remove(obj);
295 void NGnThreadData::SetCurrentDefinitionName(
const std::string & name)
305 void NGnThreadData::SyncCurrentDefinitionIds(
const std::vector<Long64_t> & ids)
313 def->GetIds().clear();
330 Long64_t nmerged = 0;
332 NLogTrace(
"NGnThreadData::Merge: BEGIN ------------------------------------------------");
333 NLogTrace(
"NGnThreadData::Merge: Merging thread data from %zu threads ...", list->GetEntries());
336 std::map<std::string, TList *> listOutputs;
339 TList * listTreeStorage =
new TList();
341 for (
auto obj : *list) {
342 if (obj->IsA() == NGnThreadData::Class()) {
344 NLogDebug(
"NGnThreadData::Merge: Merging thread %zu processed %lld ...", hnsttd->
GetAssignedIndex(),
348 NLogError(
"NGnThreadData::Merge: Storage tree is not set in NGnTree !!!");
354 NLogTrace(
"NGnThreadData::Merge: Found output list '%s' with %d objects", kv.first.c_str(),
355 kv.second ? kv.second->GetEntries() : 0);
356 if (kv.second && !kv.second->IsEmpty()) {
357 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
358 kv.second->GetEntries());
359 if (listOutputs.find(kv.first) == listOutputs.end()) {
361 listOutputs[kv.first] =
new TList();
366 listOutputs[kv.first]->Add(kv.second);
378 const std::string mergeFilename =
382 NLogError(
"NGnThreadData::Merge: Failed to open NGnTree from file '%s' !!!", mergeFilename.c_str());
417 mergedDef->
GetIds().clear();
419 for (
auto id : targetBinningDef->
GetIds()) {
423 NLogTrace(
"NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld",name.c_str(),
id, bin);
429 NLogDebug(
"NGnThreadData::Merge: Total entries to merge: %lld", nmerged);
434 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
440 NLogTrace(
"NGnThreadData::Merge: Final IDs in definition '%s': %s", name.c_str(),
448 NLogTrace(
"NGnThreadData::Merge: Merging %d storage trees ...", listTreeStorage->GetEntries());
458 for (
auto & kv : listOutputs) {
459 if (kv.second && !kv.second->IsEmpty()) {
460 NLogTrace(
"NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
461 kv.second->GetEntries() + 1);
472 NLogError(
"NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
478 for (
auto id : binningDef->
GetIds()) {
481 binningDef->
GetContent()->SetBinContent(bin,
id);
483 NLogTrace(
"NGnThreadData::Merge: -> Setting content bin %lld to id %lld", bin,
id);
501 NLogTrace(
"NGnThreadData::Merge: END ------------------------------------------------");
509 void NGnThreadData::ExecuteBeginFunction()
516 void NGnThreadData::ExecuteEndFunction()
527 NLogTrace(
"NGnThreadData::FlushDeferredDeletes: [%zu] Deleting %zu deferred objects ...",
Defines binning mapping and content for NDMSPC histograms.
THnSparse * GetContent() const
Get the template content histogram.
Long64_t GetId(size_t index) const
Get bin ID at specified index.
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Represents a single point in multi-dimensional binning.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
void SetCfg(json cfg)
Set configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
void SetParameters(NParameters *params)
Set the parameters for this binning point.
void SetInput(NGnTree *input)
Set input NGnTree object pointer.
virtual void Reset()
Reset the binning point to initial state.
json & GetCfg()
Get reference to configuration JSON object.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
NBinning object for managing multi-dimensional binning and axis definitions.
NBinningDef * GetDefinition(const std::string &name="")
Get binning definition by name.
std::vector< std::string > GetDefinitionNames() const
Get all definition names.
NBinningPoint * GetPoint()
Get the current binning point.
THnSparse * GetContent() const
Get the content histogram.
void SetCurrentDefinitionName(const std::string &name)
Set current definition name.
Thread-local data object for NDMSPC processing.
NGnEndFuncPtr fEndFunc
Function pointer to the end function.
NGnBeginFuncPtr fBeginFunc
Function pointer to the begin function.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
NGnThreadData()
Constructor.
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
NBinning * fBiningSource
Pointer to the source binning (from the original NGnTree)
virtual void Process(const std::vector< int > &coords)
Process coordinates (virtual).
Long64_t GetNProcessed() const
Get number of processed entries.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
virtual ~NGnThreadData()
Destructor.
json fCfg
Configuration object.
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
std::vector< TObject * > fDeferredDeletes
Objects deferred for single-threaded deletion.
NGnTree * fHnSparseBase
Pointer to the base class.
Long64_t fNProcessed
Number of processed entries.
const std::string & GetResultsFilename() const
Get the results filename for TCP mode.
bool fIsPureCopy
Flag indicating pure copy mode.
NGnProcessFuncPtr fProcessFunc
Function pointer to the processing function.
NDMSPC tree object for managing multi-dimensional data storage and processing.
NBinning * GetBinning() const
Get pointer to binning object.
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
bool Close(bool write=false)
Close the tree, optionally writing data.
bool IsPureCopy() const
Checks if the tree is a pure copy.
Int_t GetEntry(Long64_t entry, bool checkBinningDef=true)
Get entry by index.
void SetInput(NGnTree *input)
Set input NGnTree pointer.
TList * GetOutput(std::string name="")
Get output list by name.
NParameters * GetParameters() const
Returns the parameters associated with this tree.
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
NGnTree * GetInput() const
Get pointer to input NGnTree.
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Monitors and records resource usage (CPU, memory, wall time) for processes or threads.
void Start()
Records the starting resource usage and wall time.
void End()
Records the ending resource usage and wall time.
void Fill(Int_t *coords, int threadId)
Fills resource usage data into the histogram.
THnSparse * GetHnSparse() const
Returns the pointer to the THnSparse histogram.
THnSparse * Initialize(THnSparse *hns)
Initializes the THnSparse histogram for resource data.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
Long64_t Merge(TCollection *list)
Merge storage trees from a collection.
void SetEnabledBranches(std::vector< std::string > branches, int status=1)
Set enabled/disabled status for branches.
virtual void Clear(Option_t *option="")
Clear storage tree data.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::vector< std::string > GetBrancheNames(bool onlyEnabled=false) const
Branches handling.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
Int_t Fill(NBinningPoint *point, NStorageTree *hnstIn=nullptr, bool ignoreFilledCheck=false, std::vector< std::vector< int >> ranges={}, bool useProjection=false)
Fill tree with NBinningPoint and optional input tree.
void SetBinning(NBinning *binning)
Set binning object pointer.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
Thread-local data object for NDMSPC processing.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
void SetAssignedIndex(size_t assignedIndex)
Set the assigned index for the thread.
NResourceMonitor * fResourceMonitor
Pointer to resource monitor.
void SetThreadId(std::thread::id threadId)
Set the thread's unique identifier.
NDMSPC tree branch object for managing ROOT TBranch and associated data.
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
static void SafeDeleteObjects(std::vector< TObject * > &objects)
Safely delete a vector of ROOT objects, bypassing GarbageCollect.
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.