1 #include "ConfigZyre.hh"
3 #include "PollerZmq.hh"
18 std::shared_ptr<Salsa::Node>
ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
24 if (!targetActors)
return nullptr;
26 std::shared_ptr<Salsa::Node> node =
nullptr;
28 if (!
mConfig[
"salsa"][
"type"] ||
mConfig[
"salsa"][
"type"].as<std::string>() !=
"zyre") {
29 SPD_ERROR(
"Salsa type is not [zyre] !!! ");
33 if (!
mConfig[
"salsa"][
"spec"]) {
34 SPD_ERROR(
"Salsa spec was not found !!!");
38 SPD_TRACE(
"Caching hostname via zsys_hostname()");
39 char * pHostnameCStr_ = zsys_hostname();
40 std::string hostname =
"nohostname";
42 hostname = pHostnameCStr_;
45 SPD_TRACE(
"Cached hostname [{}]", hostname);
47 node = std::make_shared<Salsa::Node>(
"SALSA");
50 for (
auto spec :
mConfig[
"salsa"][
"spec"]) {
53 SPD_ERROR(
"Nodes array is missing for [{}] !!!", spec[
"name"].as<std::string>());
56 auto name = spec[
"name"].as<std::string>();
65 if (name ==
filter.first) {
67 if (opt[
"replicas"]) {
68 spec[
"replicas"] = opt[
"replicas"].as<
int>();
76 SPD_TRACE(
"name [{}]", name);
79 if (spec[
"replicas"]) {
80 count = spec[
"replicas"].as<
int>();
82 for (
int iCount = 0; iCount < count; iCount++) {
84 std::string zyreName = fmt::format(
"{}:{}:{}:{}", name, hostname, getpid(), nodeId);
85 std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
87 if (spec[
"jobinfo"][
"group"]) {
88 SPD_INFO(
"Setting jobInfoGroup [{}] from config", spec[
"jobinfo"][
"group"].as<std::string>());
89 pNodeZyre->jobInfoGroupName(spec[
"jobinfo"][
"group"].as<std::string>());
92 if (spec[
"jobinfo"][
"broker"][
"protocol"] && spec[
"jobinfo"][
"broker"][
"ip"] &&
93 spec[
"jobinfo"][
"broker"][
"port"]) {
94 std::string url = spec[
"jobinfo"][
"broker"][
"ip"].as<std::string>();
95 if (url.empty()) url = hostname;
96 url = fmt::format(
">{}{}:{}", spec[
"jobinfo"][
"broker"][
"protocol"].as<std::string>(), url,
97 spec[
"jobinfo"][
"broker"][
"port"].as<std::string>());
98 SPD_INFO(
"Setting jobInfoBrokerUrl [{}] from config", url);
99 pNodeZyre->jobInfoBrokerUrl(url);
101 if (spec[
"jobinfo"][
"client"][
"protocol"] && spec[
"jobinfo"][
"client"][
"ip"] &&
102 spec[
"jobinfo"][
"client"][
"port"]) {
103 std::string url = spec[
"jobinfo"][
"client"][
"ip"].as<std::string>();
104 if (url.empty()) url = hostname;
105 url = fmt::format(
">{}{}:{}", spec[
"jobinfo"][
"client"][
"protocol"].as<std::string>(), url,
106 spec[
"jobinfo"][
"client"][
"port"].as<std::string>());
107 SPD_INFO(
"Broker url for client : {}", url);
108 pNodeZyre->jobInfoClientUrl(url);
110 if (spec[
"timeout"][
"poller"]) {
111 pNodeZyre->timeout(spec[
"timeout"][
"poller"].as<int>());
112 SPD_INFO(
"Setting poller timeout [{}]", pNodeZyre->timeout());
115 if (spec[
"timeout"][
"jobfinished"]) {
116 SPD_INFO(
"Setting jobfinished timeout [{}]", spec[
"timeout"][
"jobfinished"].as<std::string>());
117 setenv(
"SALSA_FINISHED_JOB_TIMEOUT", spec[
"timeout"][
"jobfinished"].as<std::string>().data(),
true);
119 if (spec[
"timeout"][
"jobcheck"]) {
120 SPD_INFO(
"Setting jobcheck timeout [{}]", spec[
"timeout"][
"jobcheck"].as<std::string>());
121 setenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec[
"timeout"][
"jobcheck"].as<std::string>().data(),
true);
124 for (
auto nodes : spec[
"nodes"]) {
125 SPD_TRACE(
" name [{}]", nodes[
"name"].as<std::string>());
126 SPD_TRACE(
" zyreName [{}]", zyreName);
127 SPD_TRACE(
" type [{}]", nodes[
"type"].as<std::string>());
131 if (nodes[
"submit"][
"protocol"] && nodes[
"submit"][
"ip"] && nodes[
"submit"][
"port"]) {
132 if (nodes[
"submit"][
"ip"].as<std::string>() ==
"$all") nodes[
"submit"][
"ip"] =
"*";
134 fmt::format(
"{}://{}:{}", nodes[
"submit"][
"protocol"].as<std::string>(),
135 nodes[
"submit"][
"ip"].as<std::string>(), nodes[
"submit"][
"port"].as<int>());
137 SPD_INFO(
"Submit : url [{}]", url);
138 zsock_t * s = zsock_new_router(url.c_str());
140 SPD_CRIT(
"Failed to bind submitter on '{}' !!!", url);
143 pNodeZyre->addSocket(static_cast<zsock_t *>(s));
144 std::string submit_url_client =
145 fmt::format(
"{}://{}:{}", nodes[
"submit"][
"protocol"].as<std::string>(), hostname,
146 nodes[
"submit"][
"port"].as<int>());
147 pNodeZyre->submitClientUrl(submit_url_client);
150 Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
151 pNodeInfo->set_name(zyreName);
152 pNodeInfo->set_hostname(hostname);
154 std::map<std::string, std::string> headers;
156 std::pair<std::string, std::string>(
"X-SALSA-NODE-TYPE", nodes[
"type"].as<std::string>()));
159 std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
160 std::make_shared<Salsa::SocketZyre>(zyreName, headers);
164 if (nodes[
"discovery"][
"type"]) {
166 std::string url, endpoint;
167 if (getenv(
"SALSA_ENDPOINT")) endpoint = getenv(
"SALSA_ENDPOINT");
169 std::string discoveryType = nodes[
"discovery"][
"type"].as<std::string>();
171 if (discoveryType ==
"udp") {
173 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
174 SPD_TRACE(
"Using discovery [{}] via port [{}]...", discoveryType, port);
175 pSocketZyre->port(port);
177 else if (discoveryType ==
"gossip") {
180 if (nodes[
"discovery"][
"protocol"]) p = nodes[
"discovery"][
"protocol"].as<std::string>();
181 if (nodes[
"discovery"][
"ip"]) i = nodes[
"discovery"][
"ip"].as<std::string>();
182 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
184 url = fmt::format(
"{}://{}:{}", p, i, port);
186 SPD_INFO(
"Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
187 if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(),
"%s", endpoint.c_str());
189 if (nodes[
"discovery"][
"bind"] && nodes[
"discovery"][
"bind"].as<bool>() ==
true) {
190 if (i ==
"$all") i =
"*";
191 url = fmt::format(
"{}://{}:{}", p, i, port);
192 zyre_gossip_bind(pSocketZyre->zyre(),
"%s", url.c_str());
195 zyre_gossip_connect(pSocketZyre->zyre(),
"%s", url.c_str());
198 if (mConfig[
"salsa"][
"options"][
"evasive"]) {
199 SPD_INFO(
"Setting 'evasive' timeout to [{}] msec ...",
200 mConfig[
"salsa"][
"options"][
"evasive"].as<int>());
201 zyre_set_evasive_timeout(pSocketZyre->zyre(),
202 mConfig[
"salsa"][
"options"][
"evasive"].as<
int>());
204 if (mConfig[
"salsa"][
"options"][
"expired"]) {
205 SPD_INFO(
"Setting 'expired' timeout to [{}] msec ...",
206 mConfig[
"salsa"][
"options"][
"expired"].as<int>());
207 zyre_set_expired_timeout(pSocketZyre->zyre(),
208 mConfig[
"salsa"][
"options"][
"expired"].as<
int>());
212 SPD_WARN(
"No discovery type specified !!!");
215 const char * zyreInterface = getenv(
"SALSA_INTERFACE");
216 if (zyreInterface && strcmp(zyreInterface,
"")) {
217 SPD_INFO(
"Using SALSA_INTERFACE [{}]", zyreInterface);
218 zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
221 pSocketZyre->connect();
222 pNodeZyre->addSocket(pSocketZyre);
224 SPD_INFO(
"Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
226 name, zyreName, discoveryType, url, port, endpoint);
229 zyre_print(pSocketZyre->zyre());
232 node->add(pNodeZyre);
233 targetActors->push_back(pNodeZyre);
250 if (opt[
"type"] && src[
"discovery"][
"type"]) {
251 src[
"discovery"][
"type"] = opt[
"type"].as<std::string>();
253 if (opt[
"protocol"] && src[
"discovery"][
"protocol"]) {
254 src[
"discovery"][
"protocol"] = opt[
"protocol"].as<std::string>();
256 if (opt[
"ip"] && src[
"discovery"][
"ip"]) {
257 src[
"discovery"][
"ip"] = opt[
"ip"].as<std::string>();
259 if (opt[
"port"] && src[
"discovery"][
"port"]) {
260 src[
"discovery"][
"port"] = opt[
"port"].as<
int>();
262 if (opt[
"submitport"] && src[
"submit"][
"port"]) {
263 src[
"submit"][
"port"] = opt[
"submitport"].as<
int>();
std::map< std::string, YAML::Node > mFilter
Filter list.
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
void filter(std::string const &f)
YAML::Node mConfig
YAML Configuration.
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> *targetActors)