salsa  0.4.0
NodeManager.hh
1 #pragma once
2 #include "Consumer.hh"
3 #include "Feeder.hh"
4 #include "Job.hh"
5 #include "Object.hh"
6 #include "Publisher.hh"
7 #include "Socket.hh"
8 #include "TaskPool.hh"
9 #include "Worker.hh"
10 #include "TaskInfo.pb.h"
11 
12 namespace Salsa {
21 
22 class NodeManager : public Object {
23 public:
24  NodeManager();
25  virtual ~NodeManager();
26 
27  void print(std::string opt = "") const;
28 
29  // TODO Annotate add ops
30  void addConsumer(std::string uuid, std::shared_ptr<Socket> s);
31  void addFeeder(std::string uuid, std::shared_ptr<Socket> s);
32  void addWorker(std::string uuid, std::shared_ptr<Socket> s);
33  void addTask(TaskInfo * taskInfo, std::string cuuid, std::string fuuid,
34  Salsa::Job::EQueueType t = Salsa::Job::pending);
35 
36  // TODO Yet again...
37  virtual Socket * onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out);
38  virtual Socket * onExit(std::string self, Message * msg, std::vector<std::string> & out);
39  virtual Socket * onWhisper(std::string self, Message * msg, std::vector<std::string> & out);
40 
41  // TODO Return ops
42  std::shared_ptr<Feeder> feeder(std::string uuid) const;
43  std::shared_ptr<Consumer> consumer(std::string uuid) const;
44  std::shared_ptr<Worker> worker(std::string uuid) const;
45  Job * job(std::string uuid);
46 
48  TaskPool * taskPool();
49 
50  // TODO Annotate
51  virtual void addTaskSlot();
52  bool hasJobs() const;
53  virtual bool terminateFinishedJobs();
55  uint64_t finishedJobTimeout() const { return mFinishedJobTimeout; }
57  void finishedJobTimeout(uint64_t t) { mFinishedJobTimeout = t; }
58 
59  int32_t nSlots(double mult = 1.0) const;
60  void jobs(std::string clientUUID, std::vector<std::string> & jobs) const; // TODO ehm, what?
61 
62  // TODO Task ops
63  TaskInfo * getNextTask();
64  virtual void resultTask(TaskInfo * task);
66  virtual void resultTaskToExternal(Job *, TaskInfo *){};
67  virtual void noMoreTasks(std::string jobUUID);
68  virtual bool haveMoreTasks();
69  virtual bool haveMoreTasks(std::string jobUUID);
70 
72  virtual void runTask(TaskState * ts, std::string wk, std::string upstream) = 0;
73 
74  // TODO Terminate ops
75  virtual void terminateJob(std::string uuid);
76  virtual void terminateAllJobs(bool finishedonly = false);
77 
78  virtual bool handleTaskPool(void * p);
79  virtual bool sendWhisper(Socket * s, std::string to, std::vector<std::string> & v);
80 
81  // TODO annotate ops
82  virtual void publisher(Publisher * p);
83  virtual Publisher * publisher() const;
84  virtual void publish(std::string id,
85  bool force = false) const; // TODO what does this do again? const and no return...
86 
87 protected:
88  std::map<std::string, Job *> mJobs{};
89  std::vector<std::string> mActiveJobs{};
90  std::vector<std::string> mFinishedJobs{};
92  std::map<std::string, std::shared_ptr<Worker>> mWorkers{};
93  std::map<std::string, std::shared_ptr<Consumer>> mConsumers{};
94  std::map<std::string, std::shared_ptr<Feeder>> mFeeders{};
95  TaskPool * mpTaskPool = nullptr;
96  Publisher * mpPublisher = nullptr;
97 };
98 } // namespace Salsa
std::vector< std::string > mActiveJobs
List of active jobs.
Definition: NodeManager.hh:89
Base Message class
Definition: Message.hh:15
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
Definition: NodeManager.hh:91
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
Definition: NodeManager.cc:527
Base salsa TaskState class
Definition: TaskState.hh:16
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:467
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:147
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:360
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
Definition: NodeManager.hh:93
virtual void addTaskSlot()
Definition: NodeManager.cc:493
virtual bool haveMoreTasks()
Definition: NodeManager.cc:565
virtual void noMoreTasks(std::string jobUUID)
Definition: NodeManager.cc:552
NodeManager class
Definition: NodeManager.hh:22
Job class
Definition: Job.hh:16
virtual void publish(std::string id, bool force=false) const
Definition: NodeManager.cc:623
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:539
EQueueType
Queue types.
Definition: Job.hh:19
virtual void resultTask(TaskInfo *task)
Definition: NodeManager.cc:286
uint64_t finishedJobTimeout() const
Returns finished job timeout.
Definition: NodeManager.hh:55
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:64
Base Publisher class
Definition: Publisher.hh:14
Job * job(std::string uuid)
Definition: NodeManager.cc:480
bool hasJobs() const
Definition: NodeManager.cc:519
void print(std::string opt="") const
Definition: NodeManager.cc:30
std::map< std::string, Job * > mJobs
List of jobs.
Definition: NodeManager.hh:88
virtual Publisher * publisher() const
Definition: NodeManager.cc:615
std::vector< std::string > mFinishedJobs
List of finished jobs.
Definition: NodeManager.hh:90
virtual void runTask(TaskState *ts, std::string wk, std::string upstream)=0
Run task interface.
TaskInfo * getNextTask()
Definition: NodeManager.cc:258
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:185
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
Definition: NodeManager.hh:92
Definition: Actor.cc:2
virtual ~NodeManager()
Definition: NodeManager.cc:12
Base Socket class
Definition: Socket.hh:15
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:55
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:95
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:217
Publisher * mpPublisher
Publisher.
Definition: NodeManager.hh:96
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:73
virtual bool handleTaskPool(void *p)
Definition: NodeManager.cc:503
TaskPool * taskPool()
Get NM&#39;s task pool.
Definition: NodeManager.cc:511
Base Salsa Object class
Definition: Object.hh:15
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:389
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
Definition: NodeManager.hh:94
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:82
Base salsa TaskPool class
Definition: TaskPool.hh:18
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:226
void finishedJobTimeout(uint64_t t)
Sets finished job timeout.
Definition: NodeManager.hh:57
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:441
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:420
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:455
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Definition: NodeManager.hh:66