ndmspc v1.2.0-0.1.rc5
Loading...
Searching...
No Matches
NDimensionalExecutor.h
1#ifndef N_DIMENSIONAL_EXECUTOR_H
2#define N_DIMENSIONAL_EXECUTOR_H
3
4#include <iomanip>
5#include <sstream>
6#include <set>
7#include <vector>
8#include <functional>
9#include <cstddef>
10#include <thread>
11#include <mutex>
12#include <condition_variable>
13#include <atomic>
14#include <queue>
15#include <stdexcept>
16#include <utility>
17#include <exception>
18#include <memory>
19#include <string>
20#include <Rtypes.h>
21#include <THnSparse.h>
22#include "NLogger.h"
23#include "NThreadData.h"
24#include "NTaskStateManager.h"
25
26namespace Ndmspc {
27
30 size_t tasksAcked{0};
31 size_t tasksPending{0};
32 size_t tasksRunning{0};
33 size_t tasksDone{0};
34 size_t activeWorkers{0};
35};
36
43 public:
49 NDimensionalExecutor(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
50
56 NDimensionalExecutor(THnSparse * hist, bool onlyfilled = false);
57
59
60 void SetBounds(const std::vector<int> & minBounds, const std::vector<int> & maxBounds);
61
66 void Execute(const std::function<void(const std::vector<int> & coords)> & func);
67
74 template <typename TObject>
75 void ExecuteParallel(const std::function<void(const std::vector<int> & coords, TObject & thread_object)> & func,
76 std::vector<TObject> & thread_objects);
77
84 size_t ExecuteParallelProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount);
85 void StartProcessIpc(std::vector<NThreadData *> & workerObjects, size_t processCount,
86 const std::string & tcpBindEndpoint = "", const std::string & jobDir = "",
87 const std::string & treeName = "ngnt", const std::string & macroList = "",
88 const std::string & tmpDir = "", const std::string & tmpResultsDir = "",
89 const std::string & macroParams = "");
90 size_t ExecuteCurrentBoundsProcessIpc(const std::string & definitionName = "",
91 const std::vector<Long64_t> * definitionIds = nullptr,
92 const std::function<void(const ExecutionProgress&)> & progressCallback = nullptr);
93 void FinishProcessIpc(bool abort = false);
94
95 std::set<size_t> GetRegisteredWorkerIndices() const { return fRegisteredWorkerIndices; }
96
101 size_t Dimensions() const { return fNumDimensions; }
102
107 const std::vector<int> & GetMinBounds() const { return fMinBounds; }
108
113 const std::vector<int> & GetMaxBounds() const { return fMaxBounds; }
114
115 private:
117 std::vector<int> fMinBounds;
118 std::vector<int> fMaxBounds;
119 std::vector<int> fCurrentCoords;
120
125 bool Increment();
126
129 bool InitTcpWorker(const std::string & identity);
130
134 bool HandleBootstrap(const std::string & identity);
135
138 size_t HandleWorkerFailure(const std::string & failedIdentity,
139 const std::string & failureReason,
140 size_t & outstanding,
141 size_t & acked);
142
143 struct IpcSession;
144 std::unique_ptr<IpcSession> fIpcSession;
145 std::set<size_t> fRegisteredWorkerIndices;
146};
147
148
149
150} // namespace Ndmspc
151
152#endif
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
const std::vector< int > & GetMinBounds() const
Returns the minimum bounds for each dimension.
size_t HandleWorkerFailure(const std::string &failedIdentity, const std::string &failureReason, size_t &outstanding, size_t &acked)
Centralized worker failure handling: recovers tasks, removes worker, updates state....
const std::vector< int > & GetMaxBounds() const
Returns the maximum bounds for each dimension.
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
size_t Dimensions() const
Returns the number of dimensions.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Global callback function for libwebsockets client events.
Execution progress metrics for IPC-based distributed processing.
size_t tasksRunning
Tasks currently assigned to workers.
size_t activeWorkers
Number of workers currently active.
size_t tasksAcked
Tasks completed and ACKed by workers.
size_t tasksDone
Tasks completed locally (state machine)
size_t tasksPending
Tasks waiting to be dispatched.