1#ifndef N_TASK_STATE_MANAGER_H
2#define N_TASK_STATE_MANAGER_H
6#include <unordered_map>
7#include <unordered_set>
22class NTaskStateManager {
24 using TaskId = size_t;
25 using WorkerId = std::string;
26 using TaskPayload = std::vector<int>;
28 NTaskStateManager() =
default;
29 ~NTaskStateManager() =
default;
36 void AddPending(TaskId
id,
const TaskPayload & payload);
102 std::vector<std::pair<TaskId, TaskPayload>>
RecoverWorkerTasks(
const WorkerId & worker);
124 bool IsDone(TaskId
id)
const;
143 std::queue<std::pair<TaskId, TaskPayload>> fPending;
144 std::unordered_set<TaskId> fPendingIds;
145 std::unordered_map<TaskId, TaskPayload> fRunning;
146 std::set<TaskId> fDone;
149 std::unordered_map<WorkerId, std::set<TaskId>> fWorkerToTasks;
150 std::unordered_map<TaskId, WorkerId> fTaskToWorker;
151 std::unordered_map<TaskId, TaskPayload> fTaskPayloads;
std::pair< TaskId, TaskPayload > GetNextPending()
Get the next pending task for dispatch.
void Clear()
Clear all state (for reuse or cleanup)
bool IsDone(TaskId id) const
Check if a task has been completed.
size_t DoneCount() const
Get number of completed tasks.
WorkerId GetTaskWorker(TaskId id) const
Get the worker currently executing a task.
bool AssignToWorker(const WorkerId &worker, TaskId id)
Assign a pending task to a worker (transitions to running)
std::vector< std::pair< TaskId, TaskPayload > > RecoverWorkerTasks(const WorkerId &worker)
Recover all tasks from a failed worker.
bool HasPending() const
Check if there are pending tasks.
bool ClaimNextPendingForWorker(const WorkerId &worker, TaskId &id, TaskPayload &payload)
Atomically pop the next pending task and assign it to a worker.
bool RequeueTask(TaskId id)
Requeue a task to pending state from running or done state.
std::set< TaskId > GetWorkerTasks(const WorkerId &worker) const
Get all tasks currently assigned to a worker.
void AddPending(TaskId id, const TaskPayload &payload)
Add a new task to the pending queue.
size_t RunningCount() const
Get number of running tasks.
bool TaskExists(TaskId id) const
Check if task exists in any state.
bool MarkFailed(TaskId id)
Mark a running task as failed (returns to pending for redistribution)
size_t TotalCount() const
Get total tasks tracked (pending + running + done)
size_t PendingCount() const
Get number of pending tasks.
bool MarkDone(TaskId id)
Mark a running task as completed.
bool RemoveTaskFromWorker(const WorkerId &worker, TaskId id)
Remove a specific task ID from a worker (e.g., after ACK but worker later fails)
void EnqueuePending(TaskId id, const TaskPayload &payload)
Push task into pending queue and pending-id index.
Global callback function for libwebsockets client events.