ndmspc  v1.2.0-0.1.rc3
NDimensionalExecutor.h
1 #ifndef N_DIMENSIONAL_EXECUTOR_H
2 #define N_DIMENSIONAL_EXECUTOR_H
3 
4 #include <iomanip>
5 #include <sstream>
6 #include <set>
7 #include <vector>
8 #include <functional>
9 #include <cstddef>
10 #include <thread>
11 #include <mutex>
12 #include <condition_variable>
13 #include <atomic>
14 #include <queue>
15 #include <stdexcept>
16 #include <utility>
17 #include <exception>
18 #include <memory>
19 #include <string>
20 #include <Rtypes.h>
21 #include <THnSparse.h>
22 #include "NLogger.h"
23 #include "NThreadData.h"
24 
25 namespace Ndmspc {
26 
33  public:
39  NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
40 
46  NDimensionalExecutor(THnSparse * hist, bool onlyfilled = false);
47 
49 
50  void SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
51 
56  void Execute(const std::function<void(const std::vector<int> & coords)> & func);
57 
64  template <typename TObject>
65  void ExecuteParallel(const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
66  std::vector<TObject> & thread_objects);
67 
74  size_t ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount);
75  void StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
76  const std::string & tcpBindEndpoint = "", const std::string & jobDir = "",
77  const std::string & treeName = "ngnt", const std::string & macroList = "",
78  const std::string & tmpDir = "", const std::string & tmpResultsDir = "");
79  size_t ExecuteCurrentBoundsProcessIpc(const std::string & definitionName = "",
80  const std::vector<Long64_t> * definitionIds = nullptr,
81  const std::function<void(size_t, size_t)> & progressCallback = nullptr);
82  void FinishProcessIpc(bool abort = false);
83 
84  std::set<size_t> GetRegisteredWorkerIndices() const { return fRegisteredWorkerIndices; }
85 
90  size_t Dimensions() const { return fNumDimensions; }
91 
96  const std::vector<int> & GetMinBounds() const { return fMinBounds; }
97 
102  const std::vector<int> & GetMaxBounds() const { return fMaxBounds; }
103 
104  private:
105  size_t fNumDimensions;
106  std::vector<int> fMinBounds;
107  std::vector<int> fMaxBounds;
108  std::vector<int> fCurrentCoords;
109 
114  bool Increment();
115 
118  bool InitTcpWorker(const std::string & identity);
119 
122  bool HandleBootstrap(const std::string & identity);
123 
124  struct IpcSession;
125  std::unique_ptr<IpcSession> fIpcSession;
126  std::set<size_t> fRegisteredWorkerIndices;
127 };
128 
129 
130 
131 } // namespace Ndmspc
132 
133 #endif
Executes a function over all points in an N-dimensional space, optionally in parallel.
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
const std::vector< int > & GetMaxBounds() const
Returns the maximum bounds for each dimension.
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
size_t Dimensions() const
Returns the number of dimensions.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
const std::vector< int > & GetMinBounds() const
Returns the minimum bounds for each dimension.