ndmspc v1.2.0-0.1.rc5
Loading...
Searching...
No Matches
NTaskStateManager.cxx
1#include "NTaskStateManager.h"
2#include <stdexcept>
3
4namespace Ndmspc {
5
6void NTaskStateManager::AddPending(TaskId id, const TaskPayload & payload)
7{
8 if (TaskExists(id)) {
9 throw std::runtime_error("Task " + std::to_string(id) + " already exists in state manager");
10 }
11 EnqueuePending(id, payload);
12 fTaskPayloads[id] = payload;
13}
14
15bool NTaskStateManager::AssignToWorker(const WorkerId & worker, TaskId id)
16{
17 if (fPending.empty() || fPendingIds.find(id) == fPendingIds.end()) {
18 return false;
19 }
20
21 // Find and remove from pending
22 std::queue<std::pair<TaskId, TaskPayload>> tempQueue;
23 bool found = false;
24
25 while (!fPending.empty()) {
26 auto [taskId, payload] = std::move(fPending.front());
27 fPending.pop();
28
29 if (taskId == id && !found) {
30 found = true;
31 // Move to running state
32 fRunning[id] = payload;
33 fWorkerToTasks[worker].insert(id);
34 fTaskToWorker[id] = worker;
35 fPendingIds.erase(id);
36 } else {
37 tempQueue.emplace(taskId, payload);
38 }
39 }
40
41 fPending = std::move(tempQueue);
42 return found;
43}
44
45bool NTaskStateManager::ClaimNextPendingForWorker(const WorkerId & worker, TaskId & id, TaskPayload & payload)
46{
47 if (fPending.empty()) {
48 return false;
49 }
50
51 auto item = std::move(fPending.front());
52 fPending.pop();
53 fPendingIds.erase(item.first);
54
55 id = item.first;
56 payload = std::move(item.second);
57
58 fRunning[id] = payload;
59 fWorkerToTasks[worker].insert(id);
60 fTaskToWorker[id] = worker;
61 return true;
62}
63
65{
66 auto it = fRunning.find(id);
67 if (it == fRunning.end()) {
68 return false; // Task not running
69 }
70
71 // Move from running to done
72 fDone.insert(id);
73 fRunning.erase(it);
74
75 // Remove from worker assignment
76 auto workerIt = fTaskToWorker.find(id);
77 if (workerIt != fTaskToWorker.end()) {
78 const WorkerId & worker = workerIt->second;
79 fWorkerToTasks[worker].erase(id);
80 if (fWorkerToTasks[worker].empty()) {
81 fWorkerToTasks.erase(worker);
82 }
83 fTaskToWorker.erase(workerIt);
84 }
85
86 return true;
87}
88
90{
91 auto it = fRunning.find(id);
92 if (it == fRunning.end()) {
93 return false; // Task not running
94 }
95
96 // Move from running back to pending
97 TaskPayload payload = it->second;
98 fRunning.erase(it);
99 EnqueuePending(id, payload);
100
101 // Remove from worker assignment
102 auto workerIt = fTaskToWorker.find(id);
103 if (workerIt != fTaskToWorker.end()) {
104 const WorkerId & worker = workerIt->second;
105 fWorkerToTasks[worker].erase(id);
106 if (fWorkerToTasks[worker].empty()) {
107 fWorkerToTasks.erase(worker);
108 }
109 fTaskToWorker.erase(workerIt);
110 }
111
112 return true;
113}
114
116{
117 // Running -> pending
118 auto runningIt = fRunning.find(id);
119 if (runningIt != fRunning.end()) {
120 TaskPayload payload = runningIt->second;
121 fRunning.erase(runningIt);
122 EnqueuePending(id, payload);
123
124 auto workerIt = fTaskToWorker.find(id);
125 if (workerIt != fTaskToWorker.end()) {
126 const WorkerId & worker = workerIt->second;
127 auto wkIt = fWorkerToTasks.find(worker);
128 if (wkIt != fWorkerToTasks.end()) {
129 wkIt->second.erase(id);
130 if (wkIt->second.empty()) {
131 fWorkerToTasks.erase(wkIt);
132 }
133 }
134 fTaskToWorker.erase(workerIt);
135 }
136 return true;
137 }
138
139 // Done -> pending
140 auto doneIt = fDone.find(id);
141 if (doneIt != fDone.end()) {
142 auto payloadIt = fTaskPayloads.find(id);
143 if (payloadIt == fTaskPayloads.end()) {
144 return false;
145 }
146 fDone.erase(doneIt);
147 EnqueuePending(id, payloadIt->second);
148 return true;
149 }
150
151 // Unknown or already pending.
152 return false;
153}
154
155std::set<NTaskStateManager::TaskId> NTaskStateManager::GetWorkerTasks(const WorkerId & worker) const
156{
157 auto it = fWorkerToTasks.find(worker);
158 if (it != fWorkerToTasks.end()) {
159 return it->second;
160 }
161 return {};
162}
163
164std::pair<NTaskStateManager::TaskId, NTaskStateManager::TaskPayload> NTaskStateManager::GetNextPending()
165{
166 if (fPending.empty()) {
167 return {0, {}};
168 }
169
170 auto [taskId, payload] = fPending.front();
171 return {taskId, payload};
172}
173
175{
176 return !fPending.empty();
177}
178
179std::vector<std::pair<NTaskStateManager::TaskId, NTaskStateManager::TaskPayload>>
181{
182 std::vector<std::pair<TaskId, TaskPayload>> recovered;
183
184 auto workerIt = fWorkerToTasks.find(worker);
185 if (workerIt == fWorkerToTasks.end()) {
186 return recovered; // No tasks assigned to this worker
187 }
188
189 const auto & taskIds = workerIt->second;
190 for (TaskId id : taskIds) {
191 auto runningIt = fRunning.find(id);
192 if (runningIt != fRunning.end()) {
193 // Move back to pending
194 const TaskPayload & payload = runningIt->second;
195 recovered.emplace_back(id, payload);
196 EnqueuePending(id, payload);
197 fRunning.erase(runningIt);
198 }
199 fTaskToWorker.erase(id);
200 }
201
202 fWorkerToTasks.erase(workerIt);
203 return recovered;
204}
205
206bool NTaskStateManager::RemoveTaskFromWorker(const WorkerId & worker, TaskId id)
207{
208 auto workerIt = fWorkerToTasks.find(worker);
209 if (workerIt == fWorkerToTasks.end()) {
210 return false;
211 }
212
213 size_t erased = workerIt->second.erase(id);
214 if (erased > 0) {
215 if (workerIt->second.empty()) {
216 fWorkerToTasks.erase(workerIt);
217 }
218 fTaskToWorker.erase(id);
219 return true;
220 }
221
222 return false;
223}
224
225NTaskStateManager::WorkerId NTaskStateManager::GetTaskWorker(TaskId id) const
226{
227 auto it = fTaskToWorker.find(id);
228 if (it != fTaskToWorker.end()) {
229 return it->second;
230 }
231 return {};
232}
233
234bool NTaskStateManager::IsDone(TaskId id) const
235{
236 return fDone.find(id) != fDone.end();
237}
238
240{
241 fPending = std::queue<std::pair<TaskId, TaskPayload>>();
242 fPendingIds.clear();
243 fRunning.clear();
244 fDone.clear();
245 fWorkerToTasks.clear();
246 fTaskToWorker.clear();
247 fTaskPayloads.clear();
248}
249
250bool NTaskStateManager::TaskExists(TaskId id) const
251{
252 return fPendingIds.find(id) != fPendingIds.end() ||
253 fRunning.find(id) != fRunning.end() ||
254 fDone.find(id) != fDone.end() ||
255 fTaskPayloads.find(id) != fTaskPayloads.end();
256}
257
258void NTaskStateManager::EnqueuePending(TaskId id, const TaskPayload & payload)
259{
260 if (fPendingIds.insert(id).second) {
261 fPending.emplace(id, payload);
262 }
263}
264
265} // namespace Ndmspc
std::pair< TaskId, TaskPayload > GetNextPending()
Get the next pending task for dispatch.
void Clear()
Clear all state (for reuse or cleanup)
bool IsDone(TaskId id) const
Check if a task has been completed.
WorkerId GetTaskWorker(TaskId id) const
Get the worker currently executing a task.
bool AssignToWorker(const WorkerId &worker, TaskId id)
Assign a pending task to a worker (transitions to running)
std::vector< std::pair< TaskId, TaskPayload > > RecoverWorkerTasks(const WorkerId &worker)
Recover all tasks from a failed worker.
bool HasPending() const
Check if there are pending tasks.
bool ClaimNextPendingForWorker(const WorkerId &worker, TaskId &id, TaskPayload &payload)
Atomically pop the next pending task and assign it to a worker.
bool RequeueTask(TaskId id)
Requeue a task to pending state from running or done state.
std::set< TaskId > GetWorkerTasks(const WorkerId &worker) const
Get all tasks currently assigned to a worker.
void AddPending(TaskId id, const TaskPayload &payload)
Add a new task to the pending queue.
bool TaskExists(TaskId id) const
Check if task exists in any state.
bool MarkFailed(TaskId id)
Mark a running task as failed (returns to pending for redistribution)
bool MarkDone(TaskId id)
Mark a running task as completed.
bool RemoveTaskFromWorker(const WorkerId &worker, TaskId id)
Remove a specific task ID from a worker (e.g., after ACK but worker later fails)
void EnqueuePending(TaskId id, const TaskPayload &payload)
Push task into pending queue and pending-id index.
Global callback function for libwebsockets client events.