salsa 0.7.1
Loading...
Searching...
No Matches
Consumer.cc
1#include "Consumer.hh"
2#include "NodeManager.hh"
3namespace Salsa {
4Consumer::Consumer(std::string uuid, std::shared_ptr<Socket> pPipe, NodeManager * pNM) : Distributor(uuid, pPipe, pNM)
5{
9}
16
17void Consumer::onEnter(Message * pInMsg, std::vector<std::string> & out, std::string /*type*/)
18{
22
23 std::vector<std::string> & content = pInMsg->content();
24 for (auto data : content) {
25 SPD_TRACE("::onEnter IN [{}]", data.c_str());
26 }
27
28 out.push_back("AUTHOK");
29 SPD_TRACE("AUTHOK");
30}
31
32void Consumer::onExit(Message * pInMsg, std::vector<std::string> & /*out*/)
33{
37 for (auto data : pInMsg->content()) {
38 SPD_TRACE("::onExit IN [{}]", data.c_str());
39 }
40
41 SPD_TRACE("Handling EXIT from [{}]", pInMsg->uuid());
42 std::vector<std::string> jobs;
43 mpNodeManager->jobs(pInMsg->uuid(), jobs);
44 for (auto job : jobs) {
45 SPD_TRACE("Terminating job [{}] from upstream [{}]", job, pInMsg->uuid());
46 mpNodeManager->terminateJob(job);
47 }
48}
49void Consumer::onWhisper(Message * pInMsg, std::vector<std::string> & out)
50{
54
55 std::vector<std::string> & content = pInMsg->content();
56 for (auto data : content) {
57 SPD_TRACE("::onWhisper IN [{}]", data.c_str());
58 }
59
60 if (content[0] == "START") {
61 int32_t nTasks = 1;
62 nTasks = mpNodeManager->nSlots(1.5);
63 if (getenv("SALSA_FAKE")) {
64 nTasks *= 10;
65 }
66 if (nTasks != 0) {
67 out.push_back("SENDTASKS");
68 out.push_back(fmt::format("{}", nTasks));
69 SPD_TRACE("SENDTASKS");
70 }
71 }
72 else if (content[0] == "TASK") {
73 SPD_TRACE("TASK");
74 TaskInfo * ti = new TaskInfo();
75 {
76 if (!ti->ParseFromString(content[1].c_str())) {
77 SPD_ERROR("Message does not contain ProtoBuf message!");
78 return;
79 }
80 }
81 // TODO : now we need to tell all feeders that that they should subscribe to workers
82 // (probably in addTask when creating new Job)
83 mpNodeManager->addTask(ti, mUUID, pInMsg->uuid());
84 }
85 else if (content[0] == "NOMORETASKS") {
86 mpNodeManager->noMoreTasks(content[1]);
87 }
88 else {
89 out.push_back("START");
90 SPD_TRACE("START");
91 }
92}
93} // namespace Salsa
virtual ~Consumer()
Definition Consumer.cc:10
virtual void onExit(Message *pInMsg, std::vector< std::string > &out)
TODO Consumer action on EXIT event.
Definition Consumer.cc:32
Consumer(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
Definition Consumer.cc:4
virtual void onWhisper(Message *pInMsg, std::vector< std::string > &out)
TODO Consumer action on WHISPER event.
Definition Consumer.cc:49
virtual void onEnter(Message *pInMsg, std::vector< std::string > &out, std::string type)
TODO Consumer action on ENTER event.
Definition Consumer.cc:17
std::string mUUID
Self UUID.
NodeManager * mpNodeManager
Node Manager.
std::string uuid() const
Returns distributor's UUID.
Distributor(std::string uuid, std::shared_ptr< Socket > pPipe, NodeManager *pNM)
Definition Distributor.cc:3
Base Message class.
Definition Message.hh:15
virtual std::string uuid() const =0
Returns node uuid.
virtual std::vector< std::string > & content()=0
Retursn vector of partial messages as strings.
NodeManager class.