1 #include "NodeManagerZyre.hh" 2 #include <MessageZyre.hh> 3 #include <PublisherZmq.hh> 4 #include <SocketZyre.hh> 5 #include <TaskExecutorFake.hh> 6 #include <TaskExecutorForkZmq.hh> 14 char * pPubUrl = getenv(
"SALSA_PUB_URL");
16 SPD_INFO(
"Publisher url [{}]", pPubUrl);
28 std::vector<std::string> & values)
79 if (getenv(
"SALSA_FAKE")) {
80 SPD_DEBUG(
"Fake jobs");
102 SPD_TRACE(
"Task [{}:{}] wk [{}] upstream [{}]", pTaskState->
task()->jobid(), pTaskState->
task()->taskid(), wk,
126 zmsg_t * pMsg =
nullptr;
128 if (vec[0] ==
"&") vec.erase(vec.begin());
129 for (
auto str : vec) {
130 if (pMsg ==
nullptr) {
134 zyre_whisper(pSocketZyre->
zyre(), to.c_str(), &pMsg);
139 zmsg_addstr(pMsg, str.c_str());
142 zyre_whisper(pSocketZyre->
zyre(), to.c_str(), &pMsg);
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
virtual bool run(std::string, std::string)=0
Run task.
bool handlePipe(void *pPipe)
NodeManagerZyre(NodeZyre *pNodeZyre)
Base salsa TaskState class
void taskState(TaskState *pTS)
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual zyre_t * zyre() const
Returns zyre pointer.
void add(void *p, TaskState *t)
NodeZyre * mpNodeZyre
Current zyre node.
virtual ~NodeManagerZyre()
virtual std::string uuid() const =0
Returns node uuid.
virtual bool sendWhisper(Socket *pSocket, std::string to, std::vector< std::string > &vect)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
TaskExecutorForkZmq class
TaskExecutor * executor()
virtual void runTask(TaskState *pTaskState, std::string wk, std::string upstream)
Run task interface.
virtual void add(SocketZyre *pSocket)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
virtual bool handleTaskPool(void *pPool)
TaskPool * mpTaskPool
Task pool.
virtual void * pipe() const
Publisher * mpPublisher
Publisher.
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Base salsa TaskPool class
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
PollerZmq * pollerZmq() const
virtual void addTaskSlot()