salsa  0.7.1
Consumer.cc
1 #include "Consumer.hh"
2 #include "NodeManager.hh"
3 namespace Salsa {
4 Consumer::Consumer(std::string uuid, std::shared_ptr<Socket> pPipe, NodeManager * pNM) : Distributor(uuid, pPipe, pNM)
5 {
9 }
11 {
15 }
16 
17 void Consumer::onEnter(Message * pInMsg, std::vector<std::string> & out, std::string /*type*/)
18 {
22 
23  std::vector<std::string> & content = pInMsg->content();
24  for (auto data : content) {
25  SPD_TRACE("::onEnter IN [{}]", data.c_str());
26  }
27 
28  out.push_back("AUTHOK");
29  SPD_TRACE("AUTHOK");
30 }
31 
32 void Consumer::onExit(Message * pInMsg, std::vector<std::string> & /*out*/)
33 {
37  for (auto data : pInMsg->content()) {
38  SPD_TRACE("::onExit IN [{}]", data.c_str());
39  }
40 
41  SPD_TRACE("Handling EXIT from [{}]", pInMsg->uuid());
42  std::vector<std::string> jobs;
43  mpNodeManager->jobs(pInMsg->uuid(), jobs);
44  for (auto job : jobs) {
45  SPD_TRACE("Terminating job [{}] from upstream [{}]", job, pInMsg->uuid());
47  }
48 }
49 void Consumer::onWhisper(Message * pInMsg, std::vector<std::string> & out)
50 {
54 
55  std::vector<std::string> & content = pInMsg->content();
56  for (auto data : content) {
57  SPD_TRACE("::onWhisper IN [{}]", data.c_str());
58  }
59 
60  if (content[0] == "START") {
61  int32_t nTasks = 1;
62  nTasks = mpNodeManager->nSlots(1.5);
63  if (getenv("SALSA_FAKE")) {
64  nTasks *= 10;
65  }
66  if (nTasks != 0) {
67  out.push_back("SENDTASKS");
68  out.push_back(fmt::format("{}", nTasks));
69  SPD_TRACE("SENDTASKS");
70  }
71  }
72  else if (content[0] == "TASK") {
73  SPD_TRACE("TASK");
74  TaskInfo * ti = new TaskInfo();
75  {
76  if (!ti->ParseFromString(content[1].c_str())) {
77  SPD_ERROR("Message does not contain ProtoBuf message!");
78  return;
79  }
80  }
81  // TODO : now we need to tell all feeders that that they should subscribe to workers
82  // (probably in addTask when creating new Job)
83  mpNodeManager->addTask(ti, mUUID, pInMsg->uuid());
84  }
85  else if (content[0] == "NOMORETASKS") {
86  mpNodeManager->noMoreTasks(content[1]);
87  }
88  else {
89  out.push_back("START");
90  SPD_TRACE("START");
91  }
92 }
93 } // namespace Salsa
virtual ~Consumer()
Definition: Consumer.cc:10
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
TODO Consumer action on EXIT event.
Definition: Consumer.cc:32
Consumer(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
Definition: Consumer.cc:4
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
TODO Consumer action on WHISPER event.
Definition: Consumer.cc:49
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Consumer action on ENTER event.
Definition: Consumer.cc:17
Base Distributor class.
Definition: Distributor.hh:19
std::string mUUID
Self UUID.
Definition: Distributor.hh:54
NodeManager * mpNodeManager
Node Manager.
Definition: Distributor.hh:58
Base Message class.
Definition: Message.hh:15
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
virtual std::string uuid() const =0
Returns node uuid.
NodeManager class.
Definition: NodeManager.hh:22
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:362
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:228
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
Definition: NodeManager.cc:529
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:541
virtual void noMoreTasks(std::string jobUUID)
Definition: NodeManager.cc:554