salsa 0.7.1
Loading...
Searching...
No Matches
Job.cc
1#include "Job.hh"
2
3namespace Salsa {
4Job::Job(std::string uuid, std::string type)
5 : Object()
6 , mUUID(uuid)
7 , mType(type)
8{
12
13 // mTimeStarted = std::chrono::system_clock::now();
15 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
16}
17
19{
23 for (int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
24 for (auto iTask : mTasks[iType]) {
25 delete iTask.second;
26 iTask.second = nullptr;
27 }
28 mTasks[iType].clear();
29 }
30}
31
32bool Job::addTask(uint32_t id, TaskInfo * pTaskInfo, EQueueType type)
33{
37 if (!pTaskInfo) {
38 return false;
39 }
40
41 if (type >= all) {
42 SPD_CRIT("EQueueType is out of range [{}]", static_cast<int>(type));
43 return false;
44 }
45
46 if (mUid == 99 && mGid == 99) {
47 mUid = pTaskInfo->clientid();
48 mGid = pTaskInfo->groupid();
49 }
50
51 mTasks[type].insert(std::make_pair(id, pTaskInfo));
52
53 return true;
54}
55
56bool Job::moveTask(uint32_t id, EQueueType from, EQueueType to)
57{
61
62 return moveTask(id, nullptr, from, to);
63}
64
65bool Job::moveTask(JobID_t id, TaskInfo * pTaskInfo, EQueueType from, EQueueType to)
66{
70 auto iFound = mTasks[from].find(id);
71 if (iFound != mTasks[from].end()) {
72 if (pTaskInfo == nullptr) {
73 pTaskInfo = iFound->second;
74 }
75 else {
76 delete iFound->second;
77 }
78 // TODO This is asking for trouble...
79 // Possible fix would be... std::shared_ptr;
80 // Also, why do we even need to supply pTaskInfo anyways?
81
82 if (to < EQueueType::done && from != EQueueType::assigned) mChanged = true;
83
84 mTasks[from].erase(iFound);
85 addTask(id, pTaskInfo, to);
86
87 // mChanged = true;
88 return true;
89 }
90 else {
91 SPD_WARN("Job with id [{}] was not found in queue [{}] !!!", static_cast<int>(id), static_cast<int>(from));
92 return false;
93 }
94}
95
96bool Job::removeTask(uint32_t id, EQueueType from)
97{
101
102 // TODO This could cause problems at some point...
103 auto found = mTasks[from].find(id);
104 if (found != mTasks[from].end()) {
105
106 // if (from == EQueueType::running) mChanged = true;
107 mTasks[from].erase(found);
108 return true;
109 }
110
111 return false;
112}
113
114TaskInfo * Job::nextTask()
115{
119
120 auto iAvailTask = mTasks[EQueueType::pending].begin();
121 if (iAvailTask == mTasks[EQueueType::pending].end()) {
122 return nullptr;
123 }
124
125 TaskInfo * pNewTask = iAvailTask->second;
126 moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
127 return pNewTask;
128}
129
130void Job::tasks(std::vector<TaskInfo *> & targetVec, EQueueType type, bool shouldClear)
131{
135
136 for (auto task : mTasks[type]) {
137 targetVec.push_back(task.second);
138 }
139
140 if (shouldClear) {
141 mTasks[type].clear();
142 }
143}
144
146{
150
151 return (mTasks[type].find(id) != mTasks[type].end());
152
153 // auto found = mTasks[type].find(id);
154 // if (found != mTasks[type].end()) {
155 // return true;
156 //}
157 // else {
158 // return false;
159 //}
160}
161
162void Job::print() const
163{
167 SPD_DEBUG("{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]", mUUID, mTasks[EQueueType::pending].size(),
168 mTasks[EQueueType::assigned].size(), mTasks[EQueueType::running].size(), mTasks[EQueueType::done].size(),
169 mTasks[EQueueType::failed].size(), mTimeStarted, mTimeFinished);
170 SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
171}
172
173void Job::json(Json::Value & json)
174{
178
179 Json::Value d;
180 d["name"] = mUUID;
181 d["uid"] = mUid;
182 d["gid"] = mGid;
183 Json::UInt64 ts = static_cast<Json::UInt64>(mTimeStarted);
184 Json::UInt64 tf = static_cast<Json::UInt64>(mTimeFinished);
185 d["time"]["started"] = ts;
186 if (tf) d["time"]["finished"] = tf;
187 d["P"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::pending].size());
188 // d["A"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::assigned].size());
189 d["R"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::running].size() + mTasks[EQueueType::assigned].size());
190 d["D"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::done].size());
191 d["F"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::failed].size());
192
193 d["rc"]["done"] = Json::arrayValue;
194 d["rc"]["failed"] = Json::arrayValue;
195 if (size(EQueueType::all) <= mMaxIdsInJson) {
196 for (auto ti : mTasks[EQueueType::done]) {
197 d["rc"]["done"].append(ti.second->taskid());
198 }
199 for (auto ti : mTasks[EQueueType::failed]) {
200 d["rc"]["failed"].append(ti.second->taskid());
201 }
202 }
203
204 json.append(d);
205}
206
207size_t Job::size(EQueueType type) const
208{
213 if (type >= EQueueType::all) {
214 size_t sum = mTasks[EQueueType::pending].size();
215 sum += mTasks[EQueueType::assigned].size();
216 sum += mTasks[EQueueType::running].size();
217 sum += mTasks[EQueueType::done].size();
218 sum += mTasks[EQueueType::failed].size();
219 return sum;
220 }
221 else {
222
223 return mTasks[type].size();
224 }
225}
226
228{
232 size_t sum = mTasks[EQueueType::pending].size();
233 sum += mTasks[EQueueType::assigned].size();
234 sum += mTasks[EQueueType::running].size();
235 return sum;
236}
237
238void Job::consumer(std::string uuid)
239{
244}
245
246std::string Job::consumer() const
247{
251 // TODO Also potentialy DANGEROUS
252 return std::move(mConsumerUUID);
253}
254
255void Job::feeder(std::string uuid)
256{
261}
262
263std::string Job::feeder() const
264{
268 // TODO Potentialy DANGEROUS!
269 return std::move(mFeederUUID);
270}
271
273{
277 return !mTasks[Job::pending].empty();
278}
279
281{
285 if (sizeNotFinished() > 0) return false;
286
287 if (mTimeFinished == 0) {
288 mJustFinished = true;
290 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
291 .count();
292 }
293 return true;
294}
295
297{
298 if (mJustFinished) {
299 mJustFinished = false;
300 return true;
301 }
302 return mJustFinished;
303};
304
305} // namespace Salsa
bool mJustFinished
Flag if job is finished. Note that it is reported only once.
Definition Job.hh:103
bool haveMoreTasks() const
Task statuses.
Definition Job.cc:272
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition Job.cc:32
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition Job.cc:130
std::string consumer() const
Definition Job.cc:246
void json(Json::Value &json)
Definition Job.cc:173
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
Definition Job.hh:91
TaskInfo * nextTask()
Definition Job.cc:114
size_t sizeNotFinished() const
Definition Job.cc:227
uint64_t mTimeFinished
Time finished.
Definition Job.hh:99
void print() const
Definition Job.cc:162
std::string mUUID
Job UUID.
Definition Job.hh:92
std::string mType
Job type.
Definition Job.hh:97
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition Job.cc:56
Job(std::string uuid="", std::string type="NONE")
Definition Job.cc:4
size_t size(EQueueType t=all) const
Definition Job.cc:207
uint64_t mTimeStarted
Time started.
Definition Job.hh:98
bool isJustFinished()
Returns if job was just finished.
Definition Job.cc:296
virtual ~Job()
Definition Job.cc:18
std::string mFeederUUID
Feeder UUID.
Definition Job.hh:96
bool mChanged
Flag if job was changed.
Definition Job.hh:106
uint32_t JobID_t
Job ID type alias.
Definition Job.hh:68
bool removeTask(uint32_t id, EQueueType from)
Definition Job.cc:96
std::string mConsumerUUID
Source (consumer) UUID.
Definition Job.hh:95
std::string feeder() const
Definition Job.cc:263
uint32_t mUid
Job user id (nobody : 99)
Definition Job.hh:93
EQueueType
Queue types.
Definition Job.hh:19
uint32_t mGid
Job group id (nogroup : 99)
Definition Job.hh:94
size_t mMaxIdsInJson
Maximum number when joh ids are produced in json.
Definition Job.hh:102
bool isFinished()
Returns if jobs is finished.
Definition Job.cc:280
std::string uuid() const
returns UUID
Definition Job.hh:28
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition Job.cc:145