1 #include "NodeManagerZyre.hh" 2 #include "MessageZyre.hh" 3 #include "PublisherZmq.hh" 4 #include "SocketZyre.hh" 5 #include "TaskExecutorFake.hh" 6 #include "TaskExecutorForkZmq.hh" 22 std::vector<std::string> & values)
73 if (getenv(
"SALSA_FAKE")) {
74 SPD_DEBUG(
"Fake jobs");
96 SPD_TRACE(
"Task [{}:{}] wk [{}] upstream [{}]", pTaskState->
task()->jobid(), pTaskState->
task()->taskid(), wk,
106 SPD_TRACE(
"Sending results to external taskid [{}:{}] rc[{}]", task->jobid(), task->taskid(), task->returncode());
108 zmsg_t * msg_out = zmsg_new();
111 zmsg_addstr(msg_out,
"");
112 zmsg_addstr(msg_out,
"TASK_RESULT");
113 zmsg_addstr(msg_out, task->jobid().data());
114 zmsg_addstr(msg_out, fmt::format(
"{}", task->taskid()).data());
115 zmsg_addstr(msg_out, fmt::format(
"{}", task->returncode()).data());
117 zmsg_destroy(&msg_out);
139 zmsg_t * pMsg =
nullptr;
141 if (vec[0] ==
"&") vec.erase(vec.begin());
142 for (
auto str : vec) {
143 if (pMsg ==
nullptr) {
147 zyre_whisper(pSocketZyre->
zyre(), to.c_str(), &pMsg);
152 zmsg_addstr(pMsg, str.c_str());
155 zyre_whisper(pSocketZyre->
zyre(), to.c_str(), &pMsg);
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
int submitterSocketIndex() const
Returns submitter socket index.
virtual bool run(std::string, std::string)=0
Run task.
bool handlePipe(void *pPipe)
NodeManagerZyre(NodeZyre *pNodeZyre)
Base salsa TaskState class
void taskState(TaskState *pTS)
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual zyre_t * zyre() const
Returns zyre pointer.
void add(void *p, TaskState *t)
NodeZyre * mpNodeZyre
Current zyre node.
virtual ~NodeManagerZyre()
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
virtual bool sendWhisper(Socket *pSocket, std::string to, std::vector< std::string > &vect)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
virtual void resultTaskToExternal(Job *job, TaskInfo *task)
Handle return of task and send it to external client.
TaskExecutorForkZmq class
TaskExecutor * executor()
virtual void runTask(TaskState *pTaskState, std::string wk, std::string upstream)
Run task interface.
virtual void add(SocketZyre *pSocket)
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
zsock_t * socketExternal(int i)
Rerturns external socket.
virtual bool handleTaskPool(void *pPool)
TaskPool * mpTaskPool
Task pool.
virtual void * pipe() const
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Base salsa TaskPool class
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
void * submitterSocketID() const
Returns submitter socket identity.
PollerZmq * pollerZmq() const
virtual void addTaskSlot()