ndmspc v1.2.0-0.1.rc7
Loading...
Searching...
No Matches
NTaskStateManager.h
1#ifndef N_TASK_STATE_MANAGER_H
2#define N_TASK_STATE_MANAGER_H
3
4#include <queue>
5#include <set>
6#include <unordered_map>
7#include <unordered_set>
8#include <vector>
9#include <cstddef>
10#include <string>
11#include <utility>
12
13namespace Ndmspc {
14
22class NTaskStateManager {
23 public:
24 using TaskId = size_t;
25 using WorkerId = std::string;
26 using TaskPayload = std::vector<int>; // coordinates
27
28 NTaskStateManager() = default;
29 ~NTaskStateManager() = default;
30
36 void AddPending(TaskId id, const TaskPayload & payload);
37
44 bool AssignToWorker(const WorkerId & worker, TaskId id);
45
53 bool ClaimNextPendingForWorker(const WorkerId & worker, TaskId & id, TaskPayload & payload);
54
60 bool MarkDone(TaskId id);
61
67 bool MarkFailed(TaskId id);
68
74 bool RequeueTask(TaskId id);
75
81 std::set<TaskId> GetWorkerTasks(const WorkerId & worker) const;
82
88 std::pair<TaskId, TaskPayload> GetNextPending();
89
94 bool HasPending() const;
95
102 std::vector<std::pair<TaskId, TaskPayload>> RecoverWorkerTasks(const WorkerId & worker);
103
110 bool RemoveTaskFromWorker(const WorkerId & worker, TaskId id);
111
117 WorkerId GetTaskWorker(TaskId id) const;
118
124 bool IsDone(TaskId id) const;
125
127 size_t PendingCount() const { return fPending.size(); }
128
130 size_t RunningCount() const { return fRunning.size(); }
131
133 size_t DoneCount() const { return fDone.size(); }
134
136 size_t TotalCount() const { return PendingCount() + RunningCount() + DoneCount(); }
137
139 void Clear();
140
141 private:
142 // State buckets: tasks flow pending → running → done
143 std::queue<std::pair<TaskId, TaskPayload>> fPending; // Not yet dispatched
144 std::unordered_set<TaskId> fPendingIds;
145 std::unordered_map<TaskId, TaskPayload> fRunning; // Assigned to workers
146 std::set<TaskId> fDone; // Successfully completed
147
148 // Mappings for efficient lookup
149 std::unordered_map<WorkerId, std::set<TaskId>> fWorkerToTasks; // Current assignments
150 std::unordered_map<TaskId, WorkerId> fTaskToWorker; // Reverse mapping
151 std::unordered_map<TaskId, TaskPayload> fTaskPayloads; // All payloads (for recovery)
152
154 bool TaskExists(TaskId id) const;
155
157 void EnqueuePending(TaskId id, const TaskPayload & payload);
158};
159
160} // namespace Ndmspc
161
162#endif // N_TASK_STATE_MANAGER_H
std::pair< TaskId, TaskPayload > GetNextPending()
Get the next pending task for dispatch.
void Clear()
Clear all state (for reuse or cleanup)
bool IsDone(TaskId id) const
Check if a task has been completed.
size_t DoneCount() const
Get number of completed tasks.
WorkerId GetTaskWorker(TaskId id) const
Get the worker currently executing a task.
bool AssignToWorker(const WorkerId &worker, TaskId id)
Assign a pending task to a worker (transitions to running)
std::vector< std::pair< TaskId, TaskPayload > > RecoverWorkerTasks(const WorkerId &worker)
Recover all tasks from a failed worker.
bool HasPending() const
Check if there are pending tasks.
bool ClaimNextPendingForWorker(const WorkerId &worker, TaskId &id, TaskPayload &payload)
Atomically pop the next pending task and assign it to a worker.
bool RequeueTask(TaskId id)
Requeue a task to pending state from running or done state.
std::set< TaskId > GetWorkerTasks(const WorkerId &worker) const
Get all tasks currently assigned to a worker.
void AddPending(TaskId id, const TaskPayload &payload)
Add a new task to the pending queue.
size_t RunningCount() const
Get number of running tasks.
bool TaskExists(TaskId id) const
Check if task exists in any state.
bool MarkFailed(TaskId id)
Mark a running task as failed (returns to pending for redistribution)
size_t TotalCount() const
Get total tasks tracked (pending + running + done)
size_t PendingCount() const
Get number of pending tasks.
bool MarkDone(TaskId id)
Mark a running task as completed.
bool RemoveTaskFromWorker(const WorkerId &worker, TaskId id)
Remove a specific task ID from a worker (e.g., after ACK but worker later fails)
void EnqueuePending(TaskId id, const TaskPayload &payload)
Push task into pending queue and pending-id index.
Global callback function for libwebsockets client events.