19std::shared_ptr<Salsa::Node>
ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
25 if (!targetActors)
return nullptr;
27 std::shared_ptr<Salsa::Node> node =
nullptr;
29 if (!
mConfig[
"salsa"][
"type"] ||
mConfig[
"salsa"][
"type"].as<std::string>() !=
"zyre") {
30 SPD_ERROR(
"Salsa type is not [zyre] !!! ");
34 if (!
mConfig[
"salsa"][
"spec"]) {
35 SPD_ERROR(
"Salsa spec was not found !!!");
39 SPD_TRACE(
"Caching hostname via zsys_hostname()");
40 char * pHostnameCStr_ = zsys_hostname();
41 std::string hostname =
"nohostname";
43 hostname = pHostnameCStr_;
46 SPD_TRACE(
"Cached hostname [{}]", hostname);
48 node = std::make_shared<Salsa::Node>(
"SALSA");
51 for (
auto spec :
mConfig[
"salsa"][
"spec"]) {
54 SPD_ERROR(
"Nodes array is missing for [{}] !!!", spec[
"name"].as<std::string>());
57 auto name = spec[
"name"].as<std::string>();
66 if (name ==
filter.first) {
68 if (opt[
"replicas"]) {
69 spec[
"replicas"] = opt[
"replicas"].as<
int>();
77 SPD_TRACE(
"name [{}]", name);
80 if (spec[
"replicas"]) {
81 count = spec[
"replicas"].as<
int>();
83 for (
int iCount = 0; iCount < count; iCount++) {
85 std::string zyreName = fmt::format(
"{}:{}:{}:{}", name, hostname, getpid(), nodeId);
86 std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
88 if (spec[
"jobinfo"][
"group"]) {
89 SPD_INFO(
"Setting jobInfoGroup [{}] from config", spec[
"jobinfo"][
"group"].as<std::string>());
90 pNodeZyre->jobInfoGroupName(spec[
"jobinfo"][
"group"].as<std::string>());
93 if (spec[
"jobinfo"][
"broker"][
"protocol"] && spec[
"jobinfo"][
"broker"][
"ip"] &&
94 spec[
"jobinfo"][
"broker"][
"port"]) {
95 std::string url = spec[
"jobinfo"][
"broker"][
"ip"].as<std::string>();
96 if (url.empty()) url = hostname;
97 url = fmt::format(
">{}{}:{}", spec[
"jobinfo"][
"broker"][
"protocol"].as<std::string>(), url,
98 spec[
"jobinfo"][
"broker"][
"port"].as<std::string>());
99 SPD_INFO(
"Setting jobInfoBrokerUrl [{}] from config", url);
100 pNodeZyre->jobInfoBrokerUrl(url);
102 if (spec[
"jobinfo"][
"client"][
"protocol"] && spec[
"jobinfo"][
"client"][
"ip"] &&
103 spec[
"jobinfo"][
"client"][
"port"]) {
104 std::string url = spec[
"jobinfo"][
"client"][
"ip"].as<std::string>();
105 if (url.empty()) url = hostname;
106 url = fmt::format(
">{}{}:{}", spec[
"jobinfo"][
"client"][
"protocol"].as<std::string>(), url,
107 spec[
"jobinfo"][
"client"][
"port"].as<std::string>());
108 SPD_INFO(
"Broker url for client : {}", url);
109 pNodeZyre->jobInfoClientUrl(url);
111 if (spec[
"timeout"][
"poller"]) {
112 pNodeZyre->timeout(spec[
"timeout"][
"poller"].as<int>());
113 SPD_INFO(
"Setting poller timeout [{}]", pNodeZyre->timeout());
116 if (spec[
"timeout"][
"jobfinished"]) {
117 SPD_INFO(
"Setting jobfinished timeout [{}]", spec[
"timeout"][
"jobfinished"].as<std::string>());
118 setenv(
"SALSA_FINISHED_JOB_TIMEOUT", spec[
"timeout"][
"jobfinished"].as<std::string>().data(),
true);
120 if (spec[
"timeout"][
"jobcheck"]) {
121 SPD_INFO(
"Setting jobcheck timeout [{}]", spec[
"timeout"][
"jobcheck"].as<std::string>());
122 setenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec[
"timeout"][
"jobcheck"].as<std::string>().data(),
true);
125 for (
auto nodes : spec[
"nodes"]) {
126 SPD_TRACE(
" name [{}]", nodes[
"name"].as<std::string>());
127 SPD_TRACE(
" zyreName [{}]", zyreName);
128 SPD_TRACE(
" type [{}]", nodes[
"type"].as<std::string>());
132 if (nodes[
"submit"][
"protocol"] && nodes[
"submit"][
"ip"] && nodes[
"submit"][
"port"]) {
133 if (nodes[
"submit"][
"ip"].as<std::string>() ==
"$all") nodes[
"submit"][
"ip"] =
"*";
135 fmt::format(
"{}://{}:{}", nodes[
"submit"][
"protocol"].as<std::string>(),
136 nodes[
"submit"][
"ip"].as<std::string>(), nodes[
"submit"][
"port"].as<int>());
138 SPD_INFO(
"Submit : url [{}]", url);
139 zsock_t * s = zsock_new_router(url.c_str());
141 SPD_CRIT(
"Failed to bind submitter on '{}' !!!", url);
144 pNodeZyre->addSocket(
static_cast<zsock_t *
>(s));
146 std::string submitter_hostname = hostname;
147 if (getenv(
"SALSA_SUBMIT_HOSTNAME")) submitter_hostname = getenv(
"SALSA_SUBMIT_HOSTNAME");
148 std::string submit_url_client =
149 fmt::format(
"{}://{}:{}", nodes[
"submit"][
"protocol"].as<std::string>(), submitter_hostname,
150 nodes[
"submit"][
"port"].as<int>());
152 SPD_INFO(
"Submit url for client [{}] ...", submit_url_client);
153 pNodeZyre->submitClientUrl(submit_url_client);
156 pNodeZyre->type(nodes[
"type"].as<std::string>());
157 if (nodes[
"alias"]) pNodeZyre->clusterAlias(nodes[
"alias"].as<std::string>());
159 SPD_INFO(
"clusterAlias[{}] ...", pNodeZyre->clusterAlias());
160 Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
161 pNodeInfo->set_name(zyreName);
162 pNodeInfo->set_hostname(hostname);
164 std::map<std::string, std::string> headers;
166 std::pair<std::string, std::string>(
"X-SALSA-NODE-TYPE", nodes[
"type"].as<std::string>()));
169 std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
170 std::make_shared<Salsa::SocketZyre>(zyreName, headers);
174 if (nodes[
"discovery"][
"type"]) {
176 std::string url, endpoint;
177 if (getenv(
"SALSA_ENDPOINT")) endpoint = getenv(
"SALSA_ENDPOINT");
179 std::string discoveryType = nodes[
"discovery"][
"type"].as<std::string>();
181 if (discoveryType ==
"udp") {
183 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
184 SPD_TRACE(
"Using discovery [{}] via port [{}]...", discoveryType, port);
185 pSocketZyre->port(port);
187 else if (discoveryType ==
"gossip") {
190 if (nodes[
"discovery"][
"protocol"]) p = nodes[
"discovery"][
"protocol"].as<std::string>();
191 if (nodes[
"discovery"][
"ip"]) i = nodes[
"discovery"][
"ip"].as<std::string>();
192 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
194 url = fmt::format(
"{}://{}:{}", p, i, port);
196 SPD_INFO(
"Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
197 if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(),
"%s", endpoint.c_str());
199 if (nodes[
"discovery"][
"bind"] && nodes[
"discovery"][
"bind"].as<bool>() ==
true) {
200 if (i ==
"$all") i =
"*";
201 url = fmt::format(
"{}://{}:{}", p, i, port);
202 zyre_gossip_bind(pSocketZyre->zyre(),
"%s", url.c_str());
205 zyre_gossip_connect(pSocketZyre->zyre(),
"%s", url.c_str());
208 if (
mConfig[
"salsa"][
"options"][
"evasive"]) {
209 SPD_INFO(
"Setting 'evasive' timeout to [{}] msec ...",
210 mConfig[
"salsa"][
"options"][
"evasive"].as<int>());
211 zyre_set_evasive_timeout(pSocketZyre->zyre(),
212 mConfig[
"salsa"][
"options"][
"evasive"].as<
int>());
214 if (
mConfig[
"salsa"][
"options"][
"expired"]) {
215 SPD_INFO(
"Setting 'expired' timeout to [{}] msec ...",
216 mConfig[
"salsa"][
"options"][
"expired"].as<int>());
217 zyre_set_expired_timeout(pSocketZyre->zyre(),
218 mConfig[
"salsa"][
"options"][
"expired"].as<
int>());
222 SPD_WARN(
"No discovery type specified !!!");
225 const char * zyreInterface = getenv(
"SALSA_INTERFACE");
226 if (zyreInterface && strcmp(zyreInterface,
"")) {
227 SPD_INFO(
"Using SALSA_INTERFACE [{}]", zyreInterface);
228 zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
231 pSocketZyre->connect();
232 pNodeZyre->addSocket(pSocketZyre);
234 SPD_INFO(
"Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
236 name, zyreName, discoveryType, url, port, endpoint);
239 zyre_print(pSocketZyre->zyre());
242 node->add(pNodeZyre);
243 targetActors->push_back(pNodeZyre);