ndmspc v1.2.0-0.1.rc4
Loading...
Searching...
No Matches
NDimensionalExecutor.h
1#ifndef N_DIMENSIONAL_EXECUTOR_H
2#define N_DIMENSIONAL_EXECUTOR_H
3
4#include <iomanip>
5#include <sstream>
6#include <set>
7#include <vector>
8#include <functional>
9#include <cstddef>
10#include <thread>
11#include <mutex>
12#include <condition_variable>
13#include <atomic>
14#include <queue>
15#include <stdexcept>
16#include <utility>
17#include <exception>
18#include <memory>
19#include <string>
20#include <Rtypes.h>
21#include <THnSparse.h>
22#include "NLogger.h"
23#include "NThreadData.h"
24
25namespace Ndmspc {
26
33 public:
39 NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
40
46 NDimensionalExecutor(THnSparse * hist, bool onlyfilled = false);
47
49
50 void SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
51
56 void Execute(const std::function<void(const std::vector<int> & coords)> & func);
57
64 template <typename TObject>
65 void ExecuteParallel(const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
66 std::vector<TObject> & thread_objects);
67
74 size_t ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount);
75 void StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
76 const std::string & tcpBindEndpoint = "", const std::string & jobDir = "",
77 const std::string & treeName = "ngnt", const std::string & macroList = "",
78 const std::string & tmpDir = "", const std::string & tmpResultsDir = "",
79 const std::string & macroParams = "");
80 size_t ExecuteCurrentBoundsProcessIpc(const std::string & definitionName = "",
81 const std::vector<Long64_t> * definitionIds = nullptr,
82 const std::function<void(size_t, size_t)> & progressCallback = nullptr);
83 void FinishProcessIpc(bool abort = false);
84
85 std::set<size_t> GetRegisteredWorkerIndices() const { return fRegisteredWorkerIndices; }
86
91 size_t Dimensions() const { return fNumDimensions; }
92
97 const std::vector<int> & GetMinBounds() const { return fMinBounds; }
98
103 const std::vector<int> & GetMaxBounds() const { return fMaxBounds; }
104
105 private:
107 std::vector<int> fMinBounds;
108 std::vector<int> fMaxBounds;
109 std::vector<int> fCurrentCoords;
110
115 bool Increment();
116
119 bool InitTcpWorker(const std::string & identity);
120
124 bool HandleBootstrap(const std::string & identity);
125
126 struct IpcSession;
127 std::unique_ptr<IpcSession> fIpcSession;
128 std::set<size_t> fRegisteredWorkerIndices;
129};
130
131
132
133} // namespace Ndmspc
134
135#endif
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
const std::vector< int > & GetMinBounds() const
Returns the minimum bounds for each dimension.
const std::vector< int > & GetMaxBounds() const
Returns the maximum bounds for each dimension.
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
size_t Dimensions() const
Returns the number of dimensions.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Global callback function for libwebsockets client events.