1#include "NTaskStateManager.h"
9 throw std::runtime_error(
"Task " + std::to_string(
id) +
" already exists in state manager");
12 fTaskPayloads[id] = payload;
17 if (fPending.empty() || fPendingIds.find(
id) == fPendingIds.end()) {
22 std::queue<std::pair<TaskId, TaskPayload>> tempQueue;
25 while (!fPending.empty()) {
26 auto [taskId, payload] = std::move(fPending.front());
29 if (taskId ==
id && !found) {
32 fRunning[id] = payload;
33 fWorkerToTasks[worker].insert(
id);
34 fTaskToWorker[id] = worker;
35 fPendingIds.erase(
id);
37 tempQueue.emplace(taskId, payload);
41 fPending = std::move(tempQueue);
47 if (fPending.empty()) {
51 auto item = std::move(fPending.front());
53 fPendingIds.erase(item.first);
56 payload = std::move(item.second);
58 fRunning[id] = payload;
59 fWorkerToTasks[worker].insert(
id);
60 fTaskToWorker[id] = worker;
66 auto it = fRunning.find(
id);
67 if (it == fRunning.end()) {
76 auto workerIt = fTaskToWorker.find(
id);
77 if (workerIt != fTaskToWorker.end()) {
78 const WorkerId & worker = workerIt->second;
79 fWorkerToTasks[worker].erase(
id);
80 if (fWorkerToTasks[worker].empty()) {
81 fWorkerToTasks.erase(worker);
83 fTaskToWorker.erase(workerIt);
91 auto it = fRunning.find(
id);
92 if (it == fRunning.end()) {
97 TaskPayload payload = it->second;
102 auto workerIt = fTaskToWorker.find(
id);
103 if (workerIt != fTaskToWorker.end()) {
104 const WorkerId & worker = workerIt->second;
105 fWorkerToTasks[worker].erase(
id);
106 if (fWorkerToTasks[worker].empty()) {
107 fWorkerToTasks.erase(worker);
109 fTaskToWorker.erase(workerIt);
118 auto runningIt = fRunning.find(
id);
119 if (runningIt != fRunning.end()) {
120 TaskPayload payload = runningIt->second;
121 fRunning.erase(runningIt);
124 auto workerIt = fTaskToWorker.find(
id);
125 if (workerIt != fTaskToWorker.end()) {
126 const WorkerId & worker = workerIt->second;
127 auto wkIt = fWorkerToTasks.find(worker);
128 if (wkIt != fWorkerToTasks.end()) {
129 wkIt->second.erase(
id);
130 if (wkIt->second.empty()) {
131 fWorkerToTasks.erase(wkIt);
134 fTaskToWorker.erase(workerIt);
140 auto doneIt = fDone.find(
id);
141 if (doneIt != fDone.end()) {
142 auto payloadIt = fTaskPayloads.find(
id);
143 if (payloadIt == fTaskPayloads.end()) {
157 auto it = fWorkerToTasks.find(worker);
158 if (it != fWorkerToTasks.end()) {
166 if (fPending.empty()) {
170 auto [taskId, payload] = fPending.front();
171 return {taskId, payload};
176 return !fPending.empty();
179std::vector<std::pair<NTaskStateManager::TaskId, NTaskStateManager::TaskPayload>>
182 std::vector<std::pair<TaskId, TaskPayload>> recovered;
184 auto workerIt = fWorkerToTasks.find(worker);
185 if (workerIt == fWorkerToTasks.end()) {
189 const auto & taskIds = workerIt->second;
190 for (TaskId
id : taskIds) {
191 auto runningIt = fRunning.find(
id);
192 if (runningIt != fRunning.end()) {
194 const TaskPayload & payload = runningIt->second;
195 recovered.emplace_back(
id, payload);
197 fRunning.erase(runningIt);
199 fTaskToWorker.erase(
id);
202 fWorkerToTasks.erase(workerIt);
208 auto workerIt = fWorkerToTasks.find(worker);
209 if (workerIt == fWorkerToTasks.end()) {
213 size_t erased = workerIt->second.erase(
id);
215 if (workerIt->second.empty()) {
216 fWorkerToTasks.erase(workerIt);
218 fTaskToWorker.erase(
id);
227 auto it = fTaskToWorker.find(
id);
228 if (it != fTaskToWorker.end()) {
236 return fDone.find(
id) != fDone.end();
241 fPending = std::queue<std::pair<TaskId, TaskPayload>>();
245 fWorkerToTasks.clear();
246 fTaskToWorker.clear();
247 fTaskPayloads.clear();
252 return fPendingIds.find(
id) != fPendingIds.end() ||
253 fRunning.find(
id) != fRunning.end() ||
254 fDone.find(
id) != fDone.end() ||
255 fTaskPayloads.find(
id) != fTaskPayloads.end();
260 if (fPendingIds.insert(
id).second) {
261 fPending.emplace(
id, payload);
std::pair< TaskId, TaskPayload > GetNextPending()
Get the next pending task for dispatch.
void Clear()
Clear all state (for reuse or cleanup)
bool IsDone(TaskId id) const
Check if a task has been completed.
WorkerId GetTaskWorker(TaskId id) const
Get the worker currently executing a task.
bool AssignToWorker(const WorkerId &worker, TaskId id)
Assign a pending task to a worker (transitions to running)
std::vector< std::pair< TaskId, TaskPayload > > RecoverWorkerTasks(const WorkerId &worker)
Recover all tasks from a failed worker.
bool HasPending() const
Check if there are pending tasks.
bool ClaimNextPendingForWorker(const WorkerId &worker, TaskId &id, TaskPayload &payload)
Atomically pop the next pending task and assign it to a worker.
bool RequeueTask(TaskId id)
Requeue a task to pending state from running or done state.
std::set< TaskId > GetWorkerTasks(const WorkerId &worker) const
Get all tasks currently assigned to a worker.
void AddPending(TaskId id, const TaskPayload &payload)
Add a new task to the pending queue.
bool TaskExists(TaskId id) const
Check if task exists in any state.
bool MarkFailed(TaskId id)
Mark a running task as failed (returns to pending for redistribution)
bool MarkDone(TaskId id)
Mark a running task as completed.
bool RemoveTaskFromWorker(const WorkerId &worker, TaskId id)
Remove a specific task ID from a worker (e.g., after ACK but worker later fails)
void EnqueuePending(TaskId id, const TaskPayload &payload)
Push task into pending queue and pending-id index.
Global callback function for libwebsockets client events.