ndmspc v1.2.0-0.1.rc7
Loading...
Searching...
No Matches
NGnThreadData.cxx
1#include <NGnTree.h>
2#include <NStorageTree.h>
3#include <TROOT.h>
4#include <TCanvas.h>
5#include <mutex>
6#include "THnSparse.h"
7#include "NBinningPoint.h"
8#include "NLogger.h"
9#include "NUtils.h"
10#include "NGnThreadData.h"
11
13ClassImp(Ndmspc::NGnThreadData);
15
16namespace Ndmspc {
19bool NGnThreadData::Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr funcBegin, NGnEndFuncPtr endFunc,
20 NGnTree * ngnt, NBinning * binningIn, NGnTree * input, const std::string & filename,
21 const std::string & treename)
22{
27 SetThreadId(std::this_thread::get_id());
28
29 TH1::AddDirectory(kFALSE); // Disable ROOT auto directory management
30
31 // if (!func) {
32 // NLogError("NGnThreadData::Init: Process function is not set !!!");
33 // return false;
34 // }
35 fBeginFunc = funcBegin;
36 fProcessFunc = func;
37 fEndFunc = endFunc;
38
39 if (ngnt == nullptr) {
40 NLogError("NGnThreadData::Init: NGnTree is nullptr !!!");
41 return false;
42 }
43
44 fIsPureCopy = ngnt->IsPureCopy();
45
46 fBiningSource = binningIn;
47
48 if (fBiningSource == nullptr) {
49 NLogError("NGnThreadData::Init: Binning Source is nullptr !!!");
50 return false;
51 }
52
53 fHnSparseBase = (NGnTree *)ngnt->Clone();
54 // fHnSparseBase = new NGnTree(hnsb->GetBinning(), (NStorageTree *)hnsb->GetStorageTree()->Clone());
55 // fHnSparseBase = new NGnTree(hnsb->GetBinning(), new NStorageTree(hnsb->GetBinning()));
56 // fHnSparseBase = new NGnTree(hnsb->GetBinning(), nullptr);
57
58 if (fHnSparseBase->GetBinning() == nullptr) {
59 NLogError("NGnThreadData::InitStorage: Binning is not set !!!");
60 return false;
61 }
62
63 if (fHnSparseBase->GetStorageTree() == nullptr) {
64 NLogError("NGnThreadData::InitStorage: Storage tree is not set !!!");
65 return false;
66 }
67
68 // NLogDebug("NGnThreadData::Init: Initializing storage for thread %zu", id);
69 // NGnTree * ngntIn = new NGnTree(ngnt->GetInput(), "");
70 //
71 // fHnSparseBase->SetInput(ngntIn); // Clear input for the thread local NGnTree
72
73 NStorageTree * ts = fHnSparseBase->GetStorageTree();
74 std::string fn = ts->GetFileName();
75 ts->Clear("F");
76 ts->InitTree(filename.empty() ? fn : filename, treename);
77
78 NTreeBranch * b = nullptr;
79 // loop over all branches and add them to the new storage tree
80 for (auto & kv : ngnt->GetStorageTree()->GetBranchesMap()) {
81 NLogTrace("NGnThreadData::Init: Adding branch '%s' to thread %zu", kv.first.c_str(), id);
82 b = ts->GetBranch(kv.first);
83 if (b) continue;
84
85 b = ts->GetBranch(kv.first);
86 if (b) continue;
87
88 ts->AddBranch(kv.first, nullptr, kv.second.GetObjectClassName());
89 }
90 b = ts->GetBranch("_outputPoint");
91 if (!b) ts->AddBranch("_outputPoint", nullptr, "TList");
92
93 if (ngnt->GetParameters()) {
94 NLogTrace("NGnThreadData::Init: Setting parameters branch for thread %zu", id);
95 b = ts->GetBranch("_params");
96 if (!b) ts->AddBranch("_params", nullptr, "Ndmspc::NParameters");
97
98 NParameters * params = (NParameters *)ngnt->GetParameters()->Clone();
99 ts->GetBranch("_params")->SetAddress(params);
100 fHnSparseBase->GetBinning()->GetPoint()->SetParameters(params);
101 }
102
103 // Recreate the point and set the storage tree
104 fHnSparseBase->GetBinning()->GetPoint()->SetTreeStorage(fHnSparseBase->GetStorageTree());
105
106 // for (auto & kv : ts->GetBranchesMap()) {
107 // NLogTrace("NGnThreadData::Init: Adding branch '%s' to thread %zu", kv.first.c_str(), id);
108 // fHnSparseBase->GetStorageTree()->AddBranch(kv.first, nullptr, kv.second.GetObjectClassName());
109 // }
110
111 // TODO: check if needed or move it somewhere else like Reset();
112 //
113 // Loop over aoll definitions and reset their content and ids
114 for (const auto & name : fHnSparseBase->GetBinning()->GetDefinitionNames()) {
115 NBinningDef * def = fHnSparseBase->GetBinning()->GetDefinition(name);
116 if (def) {
117 def->GetContent()->Reset();
118 def->GetIds().clear();
119 }
120 }
121
122 if (input) {
123 NLogTrace("NGnThreadData::Init: Setting input NGnTree for thread %zu '%s'", id,
124 input->GetStorageTree()->GetFileName().c_str());
125 std::string branches = NUtils::Join(input->GetStorageTree()->GetBrancheNames(true), ',');
126 fHnSparseBase->SetInput(NGnTree::Open(input->GetStorageTree()->GetFileName(), branches)); // Set the input NGnTree
127 }
128
129 // fHnSparseBase->GetBinning()->GetDefinition()->GetContent()->Reset();
130 // fHnSparseBase->GetBinning()->GetDefinition()->GetIds().clear();
131
132 ExecuteBeginFunction();
133
134 return true;
135}
136
137void NGnThreadData::Process(const std::vector<int> & coords)
138{
142 TH1::AddDirectory(kFALSE); // Disable ROOT auto directory management
143
144 // Ensure this thread has a current pad so that user code calling h->Fit()
145 // does not trigger the non-thread-safe TCanvas::MakeDefCanvas().
146 // gPad is thread_local in ROOT 6: each worker thread starts with nullptr.
147 // We create a minimal batch canvas (batch mode is already set by
148 // NGnTree::Process before ExecuteParallel) and serialise the one-time
149 // TCanvas::Constructor call with a static mutex. After this block gPad is
150 // set for this thread and subsequent Fit() calls find it non-null.
151 if (!gPad) {
152 static std::mutex sPadMutex;
153 std::lock_guard<std::mutex> lk(sPadMutex);
154 // gPad is still nullptr for THIS thread even inside the lock (thread-local);
155 // the mutex only serialises concurrent TCanvas::Constructor calls.
156 TString cname = TString::Format("_ndmspc_wk%zu", GetAssignedIndex());
157 auto * c = new TCanvas(cname, cname, 1, 1);
158 fDeferredDeletes.push_back(c); // cleaned up by FlushDeferredDeletes
159 }
160
161 fNProcessed++;
162 // NThreadData::Process(coords);
163
164 // NLogDebug("NGnThreadData::Process: Thread %d processing coordinates %s", GetAssignedIndex(),
165 // NUtils::GetCoordsString(coords).c_str());
166
167 if (!fHnSparseBase) {
168 NLogError("NGnThreadData::Process: NGnTree is not set in NGnThreadData !!!");
169 return;
170 }
171
172 if (!fProcessFunc) {
173 NLogError("NGnThreadData::Process: Process function is not set in NGnThreadData !!!");
174 return;
175 }
176
177 // NBinning * binning = fBiningSource;
178 NBinningDef * binningDef = fBiningSource->GetDefinition();
179 if (binningDef == nullptr) {
180 NLogError("NGnThreadData::Process: Binning definition is not set in NGnThreadData !!!");
181 return;
182 }
183
184 NStorageTree * ts = fHnSparseBase->GetStorageTree();
185 NGnTree * in = fHnSparseBase->GetInput();
186
187 NBinningPoint * point = fHnSparseBase->GetBinning()->GetPoint();
188
189 Long64_t entry = fBiningSource->GetDefinition()->GetId(coords[0]);
190
191 if (fProcessedBinIds.count(entry)) {
192 NLogDebug("NGnThreadData::Process: [%zu] Skipping entry=%lld, because it was already process !!!",
193 GetAssignedIndex(), entry);
194 return;
195 }
196 fProcessedBinIds.insert(entry);
197
198 if (fResourceMonitor == nullptr) {
200 fResourceMonitor->Initialize(binningDef->GetContent());
201 fHnSparseBase->GetOutput()->Add(fResourceMonitor->GetHnSparse());
202 }
203
204 // Long64_t entry = binningDef->GetId(coords[0]);
205 // NLogDebug("NGnThreadData::Process: [%zu] Entry in global content mapping: %lld",
206 // GetAssignedIndex(), entry);
207 fBiningSource->GetContent()->GetBinContent(entry, point->GetCoords());
208 point->RecalculateStorageCoords(entry, false);
209 point->SetCfg(fCfg); // Set configuration to the point
210 // point->Print("C");
211
212 // TODO: check if entry was already processed
213 // So we dont execute the function again
214
215 // NLogDebug(
216 // "AAA NGnThreadData::Process: Thread %zu processing entry %lld for coordinates %s", GetAssignedIndex(),
217 // entry,
218 // NUtils::GetCoordsString(NUtils::ArrayToVector(point->GetCoords(), point->GetNDimensionsContent())).c_str());
219 point->SetTreeStorage(ts); // Set the storage tree to the binning point
220 point->SetInput(in); // Set the input NGnTree to the binning point
221 TList * outputPoint = new TList();
222
223 fResourceMonitor->Start();
224
225 fProcessFunc(point, fHnSparseBase->GetOutput(), outputPoint, GetAssignedIndex());
226
227 fResourceMonitor->End();
229
230 if (!point->GetCfg()["_ndmspc"].is_null()) {
231 fCfg["_ndmspc"] = point->GetCfg()["_ndmspc"]; // Get configuration from the point
232 }
233
234 // NLogTrace(
235 // "NGnThreadData::Process: [%zu] entry=%lld coords=%s outputPoint=%d", GetAssignedIndex(), entry,
236 // NUtils::GetCoordsString(NUtils::ArrayToVector(point->GetCoords(), point->GetNDimensionsContent())).c_str(),
237 // outputPoint->GetEntries());
238 if (outputPoint->GetEntries() > 0) {
239 NLogTrace(
240 "NGnThreadData::Process: [%zu] Entry '%lld' was accepted. %s", GetAssignedIndex(), entry,
242
243 if (!fIsPureCopy) {
244 ts->GetBranch("_outputPoint")->SetAddress(outputPoint); // Set the output list as branch address
245 }
246 //
247 // ts->Fill(point, nullptr, false, {}, false);
248 Int_t bytes = ts->Fill(point, nullptr, false, {}, false);
249 if (bytes > 0) {
250 // Long64_t entryInBinDef = binningDefgcc->GetId(coords[0]);
251 // NLogDebug("NGnThreadData::Process: Thread %zu: Filled %d bytes for coordinates %s entry=%lld",
252 // GetAssignedIndex(), bytes, NUtils::GetCoordsString(coords).c_str(), entry);
253
254 fHnSparseBase->GetBinning()->GetDefinition()->GetIds().push_back(entry);
255 // NLogInfo("Entry number in storage tree: %lld", point->GetEntryNumber());
256 // fHnSparseBase->GetBinning()->GetDefinition()->GetIds().push_back(point->GetEntryNumber());
257 }
258 else {
259 NLogTrace("NGnThreadData::Process: [%zu] Entry '%lld' Fill was done with 0 bytes. Skipping ...",
260 GetAssignedIndex(), entry);
261 // NLogError("NGnThreadData::Process: Thread %zu: zero bytes were writtent for coordinates %s
262 // entry=%lld",
263 // GetAssignedIndex(), NUtils::GetCoordsString(coords).c_str(), entry);
264 }
265 // outputPoint->Print();
266 // outputPoint->Clear(); // Clear the list to avoid memory leaks
267 }
268 else {
269 NLogTrace(
270 "NGnThreadData::Process: [%zu] Entry '%lld' No output %s. Skipping ...", GetAssignedIndex(), entry,
272 // NLogTrace(
273 // "No output for coordinates %s",
274 // NUtils::GetCoordsString(NUtils::ArrayToVector(point->GetCoords(),
275 // point->GetNDimensionsContent())).c_str());
276 }
277
278 // Defer deletion to FlushDeferredDeletes() on the main thread.
279 // ROOT's cleanup machinery (GarbageCollect, RecursiveRemove) is not thread-safe.
280 {
281 NLogTrace("NGnThreadData::Process: [%zu] Cleaning output list with %d entries for entry '%lld' ...",
282 GetAssignedIndex(), outputPoint->GetEntries(), entry);
283
284 TObject * obj = nullptr;
285 while ((obj = outputPoint->First())) {
286 outputPoint->Remove(obj);
287 fDeferredDeletes.push_back(obj);
288 }
289 delete outputPoint;
290 }
291}
292
293void NGnThreadData::SetCurrentDefinitionName(const std::string & name)
294{
297 }
298 if (fBiningSource) {
300 }
301}
302
303void NGnThreadData::SyncCurrentDefinitionIds(const std::vector<Long64_t> & ids)
304{
305 // fHnSparseBase tracks only the entries actually written by this worker.
306 // Clear it so Process() builds the list from scratch (same as thread mode via Init).
307 // Do NOT assign the full 'ids' list here — that would include unprocessed entries
308 // (e.g., even-indexed entries when onlyOddPoints=true) and corrupt the merge step.
309 if (fHnSparseBase && fHnSparseBase->GetBinning()) {
310 if (auto * def = fHnSparseBase->GetBinning()->GetDefinition()) {
311 def->GetIds().clear();
312 }
313 }
314
315 // fBiningSource is used for GetId() coordinate lookups — it needs the full list.
316 if (fBiningSource) {
317 if (auto * def = fBiningSource->GetDefinition()) {
318 def->GetIds() = ids;
319 }
320 }
321}
322
323Long64_t NGnThreadData::Merge(TCollection * list)
324{
328 Long64_t nmerged = 0;
329
330 NLogTrace("NGnThreadData::Merge: BEGIN ------------------------------------------------");
331 NLogTrace("NGnThreadData::Merge: Merging thread data from %zu threads ...", list->GetEntries());
332
333 NStorageTree * ts = nullptr;
334 std::map<std::string, TList *> listOutputs;
335
336 // TList * listOut = new TList();
337 TList * listTreeStorage = new TList();
338
339 for (auto obj : *list) {
340 if (obj->IsA() == NGnThreadData::Class()) {
341 NGnThreadData * hnsttd = (NGnThreadData *)obj;
342 NLogDebug("NGnThreadData::Merge: Merging thread %zu processed %lld ...", hnsttd->GetAssignedIndex(),
343 hnsttd->GetNProcessed());
344 ts = hnsttd->GetHnSparseBase()->GetStorageTree();
345 if (!ts) {
346 NLogError("NGnThreadData::Merge: Storage tree is not set in NGnTree !!!");
347 continue;
348 }
349 // hnsttd->Print();
350
351 for (auto & kv : hnsttd->GetHnSparseBase()->GetOutputs()) {
352 NLogTrace("NGnThreadData::Merge: Found output list '%s' with %d objects", kv.first.c_str(),
353 kv.second ? kv.second->GetEntries() : 0);
354 if (kv.second && !kv.second->IsEmpty()) {
355 NLogTrace("NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
356 kv.second->GetEntries());
357 if (listOutputs.find(kv.first) == listOutputs.end()) {
358 if (fHnSparseBase->GetOutput(kv.first)->IsEmpty()) {
359 listOutputs[kv.first] = new TList();
360 fHnSparseBase->GetOutput(kv.first)->AddAll(kv.second);
361 }
362 }
363 else {
364 listOutputs[kv.first]->Add(kv.second);
365 }
366 }
367 }
368
369 // if (fOutput == nullptr) {
370 // fOutput = hnsttd->GetOutput();
371 // }
372 // else {
373 // listOut->Add(hnsttd->GetOutput());
374 // }
375
376 const std::string mergeFilename =
377 hnsttd->GetResultsFilename().empty() ? ts->GetFileName() : hnsttd->GetResultsFilename();
378 NGnTree * hnsb = NGnTree::Open(mergeFilename);
379 if (!hnsb) {
380 NLogError("NGnThreadData::Merge: Failed to open NGnTree from file '%s' !!!", mergeFilename.c_str());
381 continue;
382 }
383
384 // hnsb->Print();
385 listTreeStorage->Add(hnsb->GetStorageTree());
386
387 // TODO: check if needed
388 // fHnSparseBase->GetBinning()->GetDefinition()->GetContent()->Add(
389 // hnsb->GetBinning()->GetDefinition()->GetContent());
390 nmerged++;
391 }
392 }
393
394
395 // for (const auto & name : fBiningSource->GetDefinitionNames()) {
396 // auto binningDef = fBiningSource->GetDefinition(name);
397 // if (!binningDef) {
398 // NLogError("NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
399 // continue;
400 // }
401 // // add ids from fBiningSource to binningDef
402 // // binningDef->GetIds().insert(binningDef->GetIds().end(), fBiningSource->GetDefinition(name)->GetIds().begin(),
403 // // fBiningSource->GetDefinition(name)->GetIds().end());
404 // NLogDebug("NGnThreadData::Merge: BEFORE Final IDs in definition '%s': %s", name.c_str(),
405 // NUtils::GetCoordsString(binningDef->GetIds(), -1).c_str());
406 // }
407
408
409 // FIXME: Fix this properly [it should be ok now]
410 fHnSparseBase->GetBinning()->GetContent()->Reset();
411 // Print hnsb binning definition ids
412 for (const auto & name : fBiningSource->GetDefinitionNames()) {
413 NBinningDef * targetBinningDef = fBiningSource->GetDefinition(name);
414 if (auto * mergedDef = fHnSparseBase->GetBinning()->GetDefinition(name)) {
415 mergedDef->GetIds().clear();
416 }
417 for (auto id : targetBinningDef->GetIds()) {
418 NBinningPoint point(fHnSparseBase->GetBinning());
419 fBiningSource->GetContent()->GetBinContent(id, point.GetCoords());
420 Long64_t bin = fHnSparseBase->GetBinning()->GetContent()->GetBin(point.GetCoords());
421 NLogTrace("NGnThreadData::Merge: [%s] Adding def_id=%lld to content_bin=%lld",name.c_str(), id, bin);
422 fHnSparseBase->GetBinning()->GetContent()->SetBinContent(bin, id);
423 }
424 // fHnSparseBase->GetBinning()->GetDefinition(name)->Print();
425 }
426 // FIXME: End
427 NLogDebug("NGnThreadData::Merge: Total entries to merge: %lld", nmerged);
428
429 for (const auto & name : fHnSparseBase->GetBinning()->GetDefinitionNames()) {
430 auto binningDef = fHnSparseBase->GetBinning()->GetDefinition(name);
431 if (!binningDef) {
432 NLogError("NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
433 continue;
434 }
435 // add ids from fBiningSource to binningDef
436 // binningDef->GetIds().insert(binningDef->GetIds().end(), fBiningSource->GetDefinition(name)->GetIds().begin(),
437 // fBiningSource->GetDefinition(name)->GetIds().end());
438 NLogTrace("NGnThreadData::Merge: Final IDs in definition '%s': %s", name.c_str(),
439 NUtils::GetCoordsString(binningDef->GetIds(), -1).c_str());
440 }
441
442 // fHnSparseBase->GetBinning()->GetContent()->Projection(5)->Print("all");
443 // NLogDebug("Not ready to merge, exiting ...");
444 // return 0;
445
446 NLogTrace("NGnThreadData::Merge: Merging %d storage trees ...", listTreeStorage->GetEntries());
447 // fHnSparseBase->GetBinning()->GetPoint()->Reset();
448 // fHnSparseBase->GetBinning()->GetDefinition("default")->Print();
449 // fHnSparseBase->GetBinning()->GetDefinition("b2")->Print();
450 fHnSparseBase->GetStorageTree()->SetBinning(fHnSparseBase->GetBinning()); // Update binning to the merged one
451 fHnSparseBase->GetStorageTree()->Merge(listTreeStorage);
452 // NLogDebug("Not ready to merge after Storage merge, exiting ...");
453 // return 0;
454
455 // loop over all output lists and merge them
456 for (auto & kv : listOutputs) {
457 if (kv.second && !kv.second->IsEmpty()) {
458 NLogTrace("NGnThreadData::Merge: Merging output list '%s' with %d objects", kv.first.c_str(),
459 kv.second->GetEntries() + 1);
460 fHnSparseBase->GetOutput(kv.first)->Merge(kv.second);
461 // fHnSparseBase->GetOutput(kv.first)->Print();
462 }
463 }
464
465 // Loop over binning definitions and merge their contents
466 fHnSparseBase->GetStorageTree()->SetEnabledBranches({}, 0);
467 for (const auto & name : fHnSparseBase->GetBinning()->GetDefinitionNames()) {
468 NBinningDef * binningDef = fHnSparseBase->GetBinning()->GetDefinition(name);
469 if (!binningDef) {
470 NLogError("NGnThreadData::Merge: Binning definition '%s' not found in NGnTree !!!", name.c_str());
471 continue;
472 }
473 // binningDef->Print();
474 // Recalculate binningDef content based on ids
475 binningDef->GetContent()->Reset();
476 for (auto id : binningDef->GetIds()) {
477 fHnSparseBase->GetEntry(id, false);
478 Long64_t bin = binningDef->GetContent()->GetBin(fHnSparseBase->GetBinning()->GetPoint()->GetStorageCoords());
479 binningDef->GetContent()->SetBinContent(bin, id);
480 // binningDef->GetIds().push_back(id);
481 NLogTrace("NGnThreadData::Merge: -> Setting content bin %lld to id %lld", bin, id);
482 }
483 }
484
485 // // print all definitions
486 // for (const auto & name : fHnSparseBase->GetBinning()->GetDefinitionNames()) {
487 // fHnSparseBase->GetBinning()->GetDefinition(name)->Print();
488 // }
489
490 if (fHnSparseBase->GetInput()) {
491 fHnSparseBase->GetInput()->Close(false);
492 }
493
494 // Set default setting
495 fHnSparseBase->GetBinning()->GetPoint()->Reset();
496 fHnSparseBase->GetStorageTree()->SetEnabledBranches({}, 1);
497 fHnSparseBase->GetBinning()->SetCurrentDefinitionName(fHnSparseBase->GetBinning()->GetDefinitionNames().front());
498
499 NLogTrace("NGnThreadData::Merge: END ------------------------------------------------");
500
502 // NLogError("NGnThreadData::Merge: Not implemented !!!");
504 return nmerged;
505}
506
507void NGnThreadData::ExecuteBeginFunction()
508{
509 if (fBeginFunc) {
511 }
512}
513
514void NGnThreadData::ExecuteEndFunction()
515{
516 if (fEndFunc) {
517 fEndFunc(fHnSparseBase->GetBinning()->GetPoint(), GetAssignedIndex());
518 }
519}
520
522{
523 if (fDeferredDeletes.empty()) return;
524
525 NLogTrace("NGnThreadData::FlushDeferredDeletes: [%zu] Deleting %zu deferred objects ...",
527
529}
530
531} // namespace Ndmspc
Defines binning mapping and content for NDMSPC histograms.
Definition NBinningDef.h:26
std::vector< Long64_t > GetIds() const
Get list of bin IDs.
Definition NBinningDef.h:93
THnSparse * GetContent() const
Get the template content histogram.
Represents a single point in multi-dimensional binning.
void SetTreeStorage(NStorageTree *s)
Set storage tree object pointer.
bool RecalculateStorageCoords(Long64_t entry=-1, bool useBinningDefCheck=false)
Recalculate storage coordinates for the point.
void SetCfg(json cfg)
Set configuration JSON object.
json & GetCfg()
Get reference to configuration JSON object.
Int_t * GetCoords() const
Get pointer to content coordinates array.
void SetInput(NGnTree *input)
Set input NGnTree object pointer.
Int_t GetNDimensionsContent() const
Get number of dimensions in content histogram.
Int_t * GetStorageCoords() const
Get pointer to storage coordinates array.
NBinning object for managing multi-dimensional binning and axis definitions.
Definition NBinning.h:45
NBinningPoint * GetPoint()
Get the current binning point.
void SetCurrentDefinitionName(const std::string &name)
Set current definition name.
Thread-local data object for NDMSPC processing.
NGnEndFuncPtr fEndFunc
Function pointer to the end function.
NGnBeginFuncPtr fBeginFunc
Function pointer to the begin function.
bool Init(size_t id, NGnProcessFuncPtr func, NGnBeginFuncPtr beginFunc, NGnEndFuncPtr endFunc, NGnTree *ngnt, NBinning *binningIn, NGnTree *input=nullptr, const std::string &filename="", const std::string &treename="ngnt")
Initialize thread data for processing.
NGnThreadData()
Constructor.
NBinning * fBiningSource
Pointer to the source binning (from the original NGnTree)
std::unordered_set< Long64_t > fProcessedBinIds
Set of already-processed global bin IDs (duplicate guard)
virtual void Process(const std::vector< int > &coords)
Process coordinates (virtual).
const std::string & GetResultsFilename() const
Get the results filename for TCP mode.
Long64_t GetNProcessed() const
Get number of processed entries.
virtual Long64_t Merge(TCollection *list)
Merge thread data from a collection (virtual).
NGnTree * GetHnSparseBase() const
Get pointer to base NGnTree object.
virtual ~NGnThreadData()
Destructor.
json fCfg
Configuration object.
void FlushDeferredDeletes()
Delete deferred ROOT objects, skipping TCanvas/TPad (leaked safely).
std::vector< TObject * > fDeferredDeletes
Objects deferred for single-threaded deletion.
NGnTree * fHnSparseBase
Pointer to the base class.
Long64_t fNProcessed
Number of processed entries.
bool fIsPureCopy
Flag indicating pure copy mode.
NGnProcessFuncPtr fProcessFunc
Function pointer to the processing function.
NDMSPC tree object for managing multi-dimensional data storage and processing.
Definition NGnTree.h:75
NStorageTree * GetStorageTree() const
Get pointer to storage tree object.
Definition NGnTree.h:173
bool IsPureCopy() const
Checks if the tree is a pure copy.
Definition NGnTree.h:223
std::map< std::string, TList * > GetOutputs() const
Get outputs map.
Definition NGnTree.h:179
NParameters * GetParameters() const
Returns the parameters associated with this tree.
Definition NGnTree.h:329
NBinning * GetBinning() const
Get pointer to binning object.
Definition NGnTree.h:161
static NGnTree * Open(const std::string &filename, const std::string &branches="", const std::string &treename="ngnt")
Open NGnTree from file.
Definition NGnTree.cxx:1243
NParameters object.
Definition NParameters.h:13
Monitors and records resource usage (CPU, memory, wall time) for processes or threads.
NDMSPC storage tree object for managing ROOT TTree-based data storage.
virtual void Clear(Option_t *option="")
Clear storage tree data.
std::string GetFileName() const
Get file name.
bool AddBranch(const std::string &name, void *address, const std::string &className)
Add a branch to the tree.
std::vector< std::string > GetBrancheNames(bool onlyEnabled=false) const
Branches handling.
std::map< std::string, NTreeBranch > GetBranchesMap() const
Get map of branch names to NTreeBranch objects.
NTreeBranch * GetBranch(const std::string &name)
Get pointer to NTreeBranch by name.
Int_t Fill(NBinningPoint *point, NStorageTree *hnstIn=nullptr, bool ignoreFilledCheck=false, std::vector< std::vector< int > > ranges={}, bool useProjection=false)
Fill tree with NBinningPoint and optional input tree.
bool InitTree(const std::string &filename="", const std::string &treename="ngnt")
Initialize tree from file and tree name.
size_t GetAssignedIndex() const
Get the assigned index for the thread.
Definition NThreadData.h:96
void SetAssignedIndex(size_t assignedIndex)
Set the assigned index for the thread.
Definition NThreadData.h:53
NResourceMonitor * fResourceMonitor
Pointer to resource monitor.
NThreadData()
Default constructor.
void SetThreadId(std::thread::id threadId)
Set the thread's unique identifier.
Definition NThreadData.h:41
NDMSPC tree branch object for managing ROOT TBranch and associated data.
Definition NTreeBranch.h:18
void SetAddress(void *address, bool deleteExisting=false)
Set address for branch data.
static std::string Join(const std::vector< std::string > &values, const char delim=',')
Join vector of strings into a single string with delimiter.
Definition NUtils.cxx:1115
static std::string GetCoordsString(const std::vector< int > &coords, int index=-1, int width=0)
Get string representation of coordinates.
Definition NUtils.cxx:1592
static void SafeDeleteObjects(std::vector< TObject * > &objects)
Safely delete a vector of ROOT objects, bypassing GarbageCollect.
Definition NUtils.cxx:1957
static std::vector< int > ArrayToVector(Int_t *v1, int size)
Convert array to vector.
Definition NUtils.cxx:1555
Global callback function for libwebsockets client events.
void(*)(Ndmspc::NBinningPoint *, TList *, TList *, int) NGnProcessFuncPtr
Function pointer type for processing binning points and lists.
Definition NGnTree.h:40
void(*)(Ndmspc::NBinningPoint *, int) NGnEndFuncPtr
Function pointer type for termination functions used in NGnTree.
Definition NGnTree.h:62
void(*)(Ndmspc::NBinningPoint *, int) NGnBeginFuncPtr
Function pointer type for the beginning of a tree operation.
Definition NGnTree.h:52