salsa  0.7.1
Job.cc
1 #include "Job.hh"
2 
3 namespace Salsa {
4 Job::Job(std::string uuid, std::string type)
5  : Object()
6  , mUUID(uuid)
7  , mType(type)
8 {
12 
13  // mTimeStarted = std::chrono::system_clock::now();
14  mTimeStarted =
15  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
16 }
17 
19 {
23  for (int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
24  for (auto iTask : mTasks[iType]) {
25  delete iTask.second;
26  iTask.second = nullptr;
27  }
28  mTasks[iType].clear();
29  }
30 }
31 
32 bool Job::addTask(uint32_t id, TaskInfo * pTaskInfo, EQueueType type)
33 {
37  if (!pTaskInfo) {
38  return false;
39  }
40 
41  if (type >= all) {
42  SPD_CRIT("EQueueType is out of range [{}]", static_cast<int>(type));
43  return false;
44  }
45 
46  if (mUid == 99 && mGid == 99) {
47  mUid = pTaskInfo->clientid();
48  mGid = pTaskInfo->groupid();
49  }
50 
51  mTasks[type].insert(std::make_pair(id, pTaskInfo));
52 
53  return true;
54 }
55 
56 bool Job::moveTask(uint32_t id, EQueueType from, EQueueType to)
57 {
61 
62  return moveTask(id, nullptr, from, to);
63 }
64 
65 bool Job::moveTask(JobID_t id, TaskInfo * pTaskInfo, EQueueType from, EQueueType to)
66 {
70  auto iFound = mTasks[from].find(id);
71  if (iFound != mTasks[from].end()) {
72  if (pTaskInfo == nullptr) {
73  pTaskInfo = iFound->second;
74  }
75  else {
76  delete iFound->second;
77  }
78  // TODO This is asking for trouble...
79  // Possible fix would be... std::shared_ptr;
80  // Also, why do we even need to supply pTaskInfo anyways?
81 
82  if (to < EQueueType::done && from != EQueueType::assigned) mChanged = true;
83 
84  mTasks[from].erase(iFound);
85  addTask(id, pTaskInfo, to);
86 
87  // mChanged = true;
88  return true;
89  }
90  else {
91  SPD_WARN("Job with id [{}] was not found in queue [{}] !!!", static_cast<int>(id), static_cast<int>(from));
92  return false;
93  }
94 }
95 
96 bool Job::removeTask(uint32_t id, EQueueType from)
97 {
101 
102  // TODO This could cause problems at some point...
103  auto found = mTasks[from].find(id);
104  if (found != mTasks[from].end()) {
105 
106  // if (from == EQueueType::running) mChanged = true;
107  mTasks[from].erase(found);
108  return true;
109  }
110 
111  return false;
112 }
113 
114 TaskInfo * Job::nextTask()
115 {
119 
120  auto iAvailTask = mTasks[EQueueType::pending].begin();
121  if (iAvailTask == mTasks[EQueueType::pending].end()) {
122  return nullptr;
123  }
124 
125  TaskInfo * pNewTask = iAvailTask->second;
126  moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
127  return pNewTask;
128 }
129 
130 void Job::tasks(std::vector<TaskInfo *> & targetVec, EQueueType type, bool shouldClear)
131 {
135 
136  for (auto task : mTasks[type]) {
137  targetVec.push_back(task.second);
138  }
139 
140  if (shouldClear) {
141  mTasks[type].clear();
142  }
143 }
144 
146 {
150 
151  return (mTasks[type].find(id) != mTasks[type].end());
152 
153  // auto found = mTasks[type].find(id);
154  // if (found != mTasks[type].end()) {
155  // return true;
156  //}
157  // else {
158  // return false;
159  //}
160 }
161 
162 void Job::print() const
163 {
167  SPD_DEBUG("{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]", mUUID, mTasks[EQueueType::pending].size(),
168  mTasks[EQueueType::assigned].size(), mTasks[EQueueType::running].size(), mTasks[EQueueType::done].size(),
169  mTasks[EQueueType::failed].size(), mTimeStarted, mTimeFinished);
170  SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
171 }
172 
173 void Job::json(Json::Value & json)
174 {
178 
179  Json::Value d;
180  d["name"] = mUUID;
181  d["uid"] = mUid;
182  d["gid"] = mGid;
183  Json::UInt64 ts = static_cast<Json::UInt64>(mTimeStarted);
184  Json::UInt64 tf = static_cast<Json::UInt64>(mTimeFinished);
185  d["time"]["started"] = ts;
186  if (tf) d["time"]["finished"] = tf;
187  d["P"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::pending].size());
188  // d["A"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::assigned].size());
189  d["R"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::running].size() + mTasks[EQueueType::assigned].size());
190  d["D"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::done].size());
191  d["F"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::failed].size());
192 
193  d["rc"]["done"] = Json::arrayValue;
194  d["rc"]["failed"] = Json::arrayValue;
195  if (size(EQueueType::all) <= mMaxIdsInJson) {
196  for (auto ti : mTasks[EQueueType::done]) {
197  d["rc"]["done"].append(ti.second->taskid());
198  }
199  for (auto ti : mTasks[EQueueType::failed]) {
200  d["rc"]["failed"].append(ti.second->taskid());
201  }
202  }
203 
204  json.append(d);
205 }
206 
207 size_t Job::size(EQueueType type) const
208 {
213  if (type >= EQueueType::all) {
214  size_t sum = mTasks[EQueueType::pending].size();
215  sum += mTasks[EQueueType::assigned].size();
216  sum += mTasks[EQueueType::running].size();
217  sum += mTasks[EQueueType::done].size();
218  sum += mTasks[EQueueType::failed].size();
219  return sum;
220  }
221  else {
222 
223  return mTasks[type].size();
224  }
225 }
226 
227 size_t Job::sizeNotFinished() const
228 {
232  size_t sum = mTasks[EQueueType::pending].size();
233  sum += mTasks[EQueueType::assigned].size();
234  sum += mTasks[EQueueType::running].size();
235  return sum;
236 }
237 
238 void Job::consumer(std::string uuid)
239 {
244 }
245 
246 std::string Job::consumer() const
247 {
251  // TODO Also potentialy DANGEROUS
252  return std::move(mConsumerUUID);
253 }
254 
255 void Job::feeder(std::string uuid)
256 {
260  mFeederUUID = uuid;
261 }
262 
263 std::string Job::feeder() const
264 {
268  // TODO Potentialy DANGEROUS!
269  return std::move(mFeederUUID);
270 }
271 
272 bool Job::haveMoreTasks() const
273 {
277  return !mTasks[Job::pending].empty();
278 }
279 
281 {
285  if (sizeNotFinished() > 0) return false;
286 
287  if (mTimeFinished == 0) {
288  mJustFinished = true;
289  mTimeFinished =
290  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
291  .count();
292  }
293  return true;
294 }
295 
297 {
298  if (mJustFinished) {
299  mJustFinished = false;
300  return true;
301  }
302  return mJustFinished;
303 };
304 
305 } // namespace Salsa
bool mJustFinished
Flag if job is finished. Note that it is reported only once.
Definition: Job.hh:103
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:272
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:32
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:130
std::string consumer() const
Definition: Job.cc:246
void json(Json::Value &json)
Definition: Job.cc:173
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
Definition: Job.hh:91
TaskInfo * nextTask()
Definition: Job.cc:114
size_t sizeNotFinished() const
Definition: Job.cc:227
uint64_t mTimeFinished
Time finished.
Definition: Job.hh:99
void print() const
Definition: Job.cc:162
std::string mUUID
Job UUID.
Definition: Job.hh:92
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:56
Job(std::string uuid="", std::string type="NONE")
Definition: Job.cc:4
size_t size(EQueueType t=all) const
Definition: Job.cc:207
uint64_t mTimeStarted
Time started.
Definition: Job.hh:98
bool isJustFinished()
Returns if job was just finished.
Definition: Job.cc:296
virtual ~Job()
Definition: Job.cc:18
std::string mFeederUUID
Feeder UUID.
Definition: Job.hh:96
bool mChanged
Flag if job was changed.
Definition: Job.hh:106
uint32_t JobID_t
Job ID type alias.
Definition: Job.hh:68
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:96
std::string mConsumerUUID
Source (consumer) UUID.
Definition: Job.hh:95
std::string feeder() const
Definition: Job.cc:263
uint32_t mUid
Job user id (nobody : 99)
Definition: Job.hh:93
EQueueType
Queue types.
Definition: Job.hh:19
uint32_t mGid
Job group id (nogroup : 99)
Definition: Job.hh:94
size_t mMaxIdsInJson
Maximum number when joh ids are produced in json.
Definition: Job.hh:102
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:280
std::string uuid() const
returns UUID
Definition: Job.hh:28
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:145
Base Salsa Object class.
Definition: Object.hh:15