salsa  0.4.15
Job.cc
1 #include "Job.hh"
2 
3 namespace Salsa {
4 Job::Job(std::string uuid, std::string type) : Object(), mUUID(uuid), mType(type)
5 {
9 
10  // mTimeStarted = std::chrono::system_clock::now();
11  mTimeStarted =
12  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
13 }
14 
16 {
20  for (int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
21  for (auto iTask : mTasks[iType]) {
22  delete iTask.second;
23  iTask.second = nullptr;
24  }
25  mTasks[iType].clear();
26  }
27 }
28 
29 bool Job::addTask(uint32_t id, TaskInfo * pTaskInfo, EQueueType type)
30 {
34  if (!pTaskInfo) {
35  return false;
36  }
37 
38  if (type >= all) {
39  SPD_CRIT("EQueueType is out of range [{}]", static_cast<int>(type));
40  return false;
41  }
42 
43  if (mUid == 99 && mGid == 99) {
44  mUid = pTaskInfo->clientid();
45  mGid = pTaskInfo->groupid();
46  }
47 
48  mTasks[type].insert(std::make_pair(id, pTaskInfo));
49  return true;
50 }
51 
52 bool Job::moveTask(uint32_t id, EQueueType from, EQueueType to)
53 {
57 
58  return moveTask(id, nullptr, from, to);
59 }
60 
61 bool Job::moveTask(JobID_t id, TaskInfo * pTaskInfo, EQueueType from, EQueueType to)
62 {
66  auto iFound = mTasks[from].find(id);
67  if (iFound != mTasks[from].end()) {
68  if (pTaskInfo == nullptr) {
69  pTaskInfo = iFound->second;
70  }
71  else {
72  delete iFound->second;
73  }
74  // TODO This is asking for trouble...
75  // Possible fix would be... std::shared_ptr;
76  // Also, why do we even need to supply pTaskInfo anyways?
77 
78  mTasks[from].erase(iFound);
79  addTask(id, pTaskInfo, to);
80  mChanged = true;
81  return true;
82  }
83  else {
84  SPD_WARN("Job with id [{}] was not found in queue [{}] !!!", static_cast<int>(id), static_cast<int>(from));
85  return false;
86  }
87 }
88 
89 bool Job::removeTask(uint32_t id, EQueueType from)
90 {
94 
95  // TODO This could cause problems at some point...
96  auto found = mTasks[from].find(id);
97  if (found != mTasks[from].end()) {
98  mTasks[from].erase(found);
99  return true;
100  }
101 
102  return false;
103 }
104 
105 TaskInfo * Job::nextTask()
106 {
110 
111  auto iAvailTask = mTasks[EQueueType::pending].begin();
112  if (iAvailTask == mTasks[EQueueType::pending].end()) {
113  return nullptr;
114  }
115 
116  TaskInfo * pNewTask = iAvailTask->second;
117  moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
118  return pNewTask;
119 }
120 
121 void Job::tasks(std::vector<TaskInfo *> & targetVec, EQueueType type, bool shouldClear)
122 {
126 
127  for (auto task : mTasks[type]) {
128  targetVec.push_back(task.second);
129  }
130 
131  if (shouldClear) {
132  mTasks[type].clear();
133  }
134 }
135 
137 {
141 
142  return (mTasks[type].find(id) != mTasks[type].end());
143 
144  // auto found = mTasks[type].find(id);
145  // if (found != mTasks[type].end()) {
146  // return true;
147  //}
148  // else {
149  // return false;
150  //}
151 }
152 
153 void Job::print() const
154 {
158  SPD_DEBUG("{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]", mUUID, mTasks[EQueueType::pending].size(),
159  mTasks[EQueueType::assigned].size(), mTasks[EQueueType::running].size(), mTasks[EQueueType::done].size(),
160  mTasks[EQueueType::failed].size(), mTimeStarted, mTimeFinished);
161  SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
162 }
163 
164 void Job::json(Json::Value & json)
165 {
169 
170  Json::Value d;
171  d["name"] = mUUID;
172  d["uid"] = mUid;
173  d["gid"] = mGid;
174  Json::UInt64 ts = static_cast<Json::UInt64>(mTimeStarted);
175  Json::UInt64 tf = static_cast<Json::UInt64>(mTimeFinished);
176  d["time"]["started"] = ts;
177  if (tf) d["time"]["finished"] = tf;
178  d["P"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::pending].size());
179  // d["A"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::assigned].size());
180  d["R"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::running].size() + mTasks[EQueueType::assigned].size());
181  d["D"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::done].size());
182  d["F"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::failed].size());
183 
184  d["rc"]["done"] = Json::arrayValue;
185  d["rc"]["failed"] = Json::arrayValue;
186  if (size(EQueueType::all) <= mMaxIdsInJson) {
187  for (auto ti : mTasks[EQueueType::done]) {
188  d["rc"]["done"].append(ti.second->taskid());
189  }
190  for (auto ti : mTasks[EQueueType::failed]) {
191  d["rc"]["failed"].append(ti.second->taskid());
192  }
193  }
194 
195  json.append(d);
196 }
197 
198 size_t Job::size(EQueueType type) const
199 {
204  if (type >= EQueueType::all) {
205  size_t sum = mTasks[EQueueType::pending].size();
206  sum += mTasks[EQueueType::assigned].size();
207  sum += mTasks[EQueueType::running].size();
208  sum += mTasks[EQueueType::done].size();
209  sum += mTasks[EQueueType::failed].size();
210  return sum;
211  }
212  else {
213 
214  return mTasks[type].size();
215  }
216 }
217 
218 size_t Job::sizeNotFinished() const
219 {
223  size_t sum = mTasks[EQueueType::pending].size();
224  sum += mTasks[EQueueType::assigned].size();
225  sum += mTasks[EQueueType::running].size();
226  return sum;
227 }
228 
229 void Job::consumer(std::string uuid)
230 {
235 }
236 
237 std::string Job::consumer() const
238 {
242  // TODO Also potentialy DANGEROUS
243  return std::move(mConsumerUUID);
244 }
245 
246 void Job::feeder(std::string uuid)
247 {
251  mFeederUUID = uuid;
252 }
253 
254 std::string Job::feeder() const
255 {
259  // TODO Potentialy DANGEROUS!
260  return std::move(mFeederUUID);
261 }
262 
263 bool Job::haveMoreTasks() const
264 {
268  return !mTasks[Job::pending].empty();
269 }
270 
272 {
276  if (sizeNotFinished() > 0) return false;
277 
278  mTimeFinished =
279  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
280 
281  return true;
282 }
283 
284 } // namespace Salsa
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:263
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:29
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:121
std::string consumer() const
Definition: Job.cc:237
void json(Json::Value &json)
Definition: Job.cc:164
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
Definition: Job.hh:88
TaskInfo * nextTask()
Definition: Job.cc:105
size_t sizeNotFinished() const
Definition: Job.cc:218
uint64_t mTimeFinished
Time finished.
Definition: Job.hh:96
void print() const
Definition: Job.cc:153
std::string mUUID
Job UUID.
Definition: Job.hh:89
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:52
Job(std::string uuid="", std::string type="NONE")
Definition: Job.cc:4
size_t size(EQueueType t=all) const
Definition: Job.cc:198
uint64_t mTimeStarted
Time started.
Definition: Job.hh:95
virtual ~Job()
Definition: Job.cc:15
std::string mFeederUUID
Feeder UUID.
Definition: Job.hh:93
bool mChanged
Flag if job was changed.
Definition: Job.hh:102
uint32_t JobID_t
Job ID type alias.
Definition: Job.hh:65
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:89
std::string mConsumerUUID
Source (consumer) UUID.
Definition: Job.hh:92
std::string feeder() const
Definition: Job.cc:254
uint32_t mUid
Job user id (nobody : 99)
Definition: Job.hh:90
EQueueType
Queue types.
Definition: Job.hh:19
uint32_t mGid
Job group id (nogroup : 99)
Definition: Job.hh:91
size_t mMaxIdsInJson
Maximum number when joh ids are produced in json.
Definition: Job.hh:99
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:271
std::string uuid() const
returns UUID
Definition: Job.hh:28
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:136
Base Salsa Object class.
Definition: Object.hh:15