4 Job::Job(std::string uuid, std::string type) :
Object(), mUUID(uuid), mType(type)
12 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
20 for (
int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
21 for (
auto iTask :
mTasks[iType]) {
23 iTask.second =
nullptr;
39 SPD_CRIT(
"EQueueType is out of range [{}]", type);
44 mUid = pTaskInfo->clientid();
45 mGid = pTaskInfo->groupid();
48 mTasks[type].insert(std::make_pair(
id, pTaskInfo));
58 return moveTask(
id,
nullptr, from, to);
66 auto iFound =
mTasks[from].find(
id);
67 if (iFound !=
mTasks[from].end()) {
68 if (pTaskInfo ==
nullptr) {
69 pTaskInfo = iFound->second;
72 delete iFound->second;
78 mTasks[from].erase(iFound);
84 SPD_WARN(
"Job with id [{}] was not found in queue [{}] !!!",
id, from);
96 auto found =
mTasks[from].find(
id);
97 if (found !=
mTasks[from].end()) {
111 auto iAvailTask =
mTasks[EQueueType::pending].begin();
112 if (iAvailTask ==
mTasks[EQueueType::pending].end()) {
116 TaskInfo * pNewTask = iAvailTask->second;
117 moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
127 for (
auto task :
mTasks[type]) {
128 targetVec.push_back(task.second);
158 SPD_DEBUG(
"{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]",
mUUID,
mTasks[EQueueType::pending].
size(),
174 Json::UInt64 ts =
static_cast<Json::UInt64
>(
mTimeStarted);
176 d[
"time"][
"started"] = ts;
177 if (tf) d[
"time"][
"finished"] = tf;
178 d[
"P"] =
static_cast<Json::Value::UInt64
>(
mTasks[EQueueType::pending].size());
179 d[
"A"] =
static_cast<Json::Value::UInt64
>(
mTasks[EQueueType::assigned].size());
180 d[
"R"] =
static_cast<Json::Value::UInt64
>(
mTasks[EQueueType::running].size());
181 d[
"D"] =
static_cast<Json::Value::UInt64
>(
mTasks[EQueueType::done].size());
182 d[
"F"] =
static_cast<Json::Value::UInt64
>(
mTasks[EQueueType::failed].size());
184 d[
"rc"][
"done"] = Json::arrayValue;
185 for (
auto ti :
mTasks[EQueueType::done]) {
186 d[
"rc"][
"done"].append(ti.second->taskid());
188 d[
"rc"][
"failed"] = Json::arrayValue;
189 for (
auto ti :
mTasks[EQueueType::failed]) {
190 d[
"rc"][
"failed"].append(ti.second->taskid());
202 if (type >= EQueueType::all) {
203 size_t sum =
mTasks[EQueueType::pending].size();
204 sum +=
mTasks[EQueueType::assigned].size();
205 sum +=
mTasks[EQueueType::running].size();
206 sum +=
mTasks[EQueueType::done].size();
207 sum +=
mTasks[EQueueType::failed].size();
212 return mTasks[type].size();
221 size_t sum =
mTasks[EQueueType::pending].size();
222 sum +=
mTasks[EQueueType::assigned].size();
223 sum +=
mTasks[EQueueType::running].size();
266 return !
mTasks[Job::pending].empty();
277 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
bool removeTask(uint32_t id, EQueueType from)
size_t size(EQueueType t=all) const
uint32_t JobID_t
Job ID type alias.
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Job(std::string uuid="", std::string type="NONE")
std::string mFeederUUID
Feeder UUID.
bool isFinished()
Returns if jobs is finished.
std::string uuid() const
returns UUID
std::string mConsumerUUID
Source (consumer) UUID.
std::string mUUID
Job UUID.
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
bool haveMoreTasks() const
Task statuses.
bool mChanged
Flag if job was changed.
std::string consumer() const
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
void json(Json::Value &json)
size_t sizeNotFinished() const
std::string feeder() const
uint32_t mUid
Job user id (nobody : 99)
uint32_t mGid
Job group id (nogroup : 99)
uint64_t mTimeFinished
Time finished.
uint64_t mTimeStarted
Time started.
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)