1#ifndef N_DIMENSIONAL_EXECUTOR_H
2#define N_DIMENSIONAL_EXECUTOR_H
12#include <condition_variable>
23#include "NThreadData.h"
24#include "NTaskStateManager.h"
60 void SetBounds(
const std::vector<int> & minBounds,
const std::vector<int> & maxBounds);
66 void Execute(
const std::function<
void(
const std::vector<int> & coords)> & func);
74 template <
typename TObject>
75 void ExecuteParallel(
const std::function<
void(
const std::vector<int> & coords, TObject & thread_object)> & func,
76 std::vector<TObject> & thread_objects);
85 void StartProcessIpc(std::vector<NThreadData *> & workerObjects,
size_t processCount,
86 const std::string & tcpBindEndpoint =
"",
const std::string & jobDir =
"",
87 const std::string & treeName =
"ngnt",
const std::string & macroList =
"",
88 const std::string & tmpDir =
"",
const std::string & tmpResultsDir =
"",
89 const std::string & macroParams =
"");
90 size_t ExecuteCurrentBoundsProcessIpc(
const std::string & definitionName =
"",
91 const std::vector<Long64_t> * definitionIds =
nullptr,
92 const std::function<
void(
const ExecutionProgress&)> & progressCallback =
nullptr);
93 void FinishProcessIpc(
bool abort =
false);
139 const std::string & failureReason,
140 size_t & outstanding,
144 std::unique_ptr<IpcSession> fIpcSession;
std::vector< int > fMinBounds
Minimum bounds for each dimension.
bool InitTcpWorker(const std::string &identity)
Sends INIT to a newly-connected TCP worker, waits for ACK, and registers it in identityToWorker / wor...
const std::vector< int > & GetMinBounds() const
Returns the minimum bounds for each dimension.
size_t HandleWorkerFailure(const std::string &failedIdentity, const std::string &failureReason, size_t &outstanding, size_t &acked)
Centralized worker failure handling: recovers tasks, removes worker, updates state....
const std::vector< int > & GetMaxBounds() const
Returns the maximum bounds for each dimension.
std::set< size_t > fRegisteredWorkerIndices
Worker indices that completed registration (TCP mode)
std::vector< int > fMaxBounds
Maximum bounds for each dimension.
size_t ExecuteParallelProcessIpc(std::vector< NThreadData * > &workerObjects, size_t processCount)
Execute fixed-contract processing in multiple child processes over IPC.
bool HandleBootstrap(const std::string &identity)
Handles a BOOTSTRAP message from a worker: assigns the next sequential index and replies with a CONFI...
std::vector< int > fCurrentCoords
Current coordinates during iteration.
bool Increment()
Increment the current coordinates to the next point in the N-dimensional space.
NDimensionalExecutor(const std::vector< int > &minBounds, const std::vector< int > &maxBounds)
Constructor from min/max bounds for each dimension.
size_t Dimensions() const
Returns the number of dimensions.
void ExecuteParallel(const std::function< void(const std::vector< int > &coords, TObject &thread_object)> &func, std::vector< TObject > &thread_objects)
Execute a function in parallel over all coordinates, using thread-local objects.
size_t fNumDimensions
Number of dimensions.
void Execute(const std::function< void(const std::vector< int > &coords)> &func)
Execute a function over all coordinates in the N-dimensional space.
Global callback function for libwebsockets client events.
Execution progress metrics for IPC-based distributed processing.
size_t tasksRunning
Tasks currently assigned to workers.
size_t activeWorkers
Number of workers currently active.
size_t tasksAcked
Tasks completed and ACKed by workers.
size_t tasksDone
Tasks completed locally (state machine)
size_t tasksPending
Tasks waiting to be dispatched.