salsa 0.7.1
Loading...
Searching...
No Matches
ConfigZyre.cc
1#include "ConfigZyre.hh"
2#include "NodeZyre.hh"
3#include "PollerZmq.hh"
4namespace Salsa {
18
19std::shared_ptr<Salsa::Node> ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
20{
24
25 if (!targetActors) return nullptr;
26
27 std::shared_ptr<Salsa::Node> node = nullptr;
28
29 if (!mConfig["salsa"]["type"] || mConfig["salsa"]["type"].as<std::string>() != "zyre") {
30 SPD_ERROR("Salsa type is not [zyre] !!! ");
31 return nullptr;
32 }
33
34 if (!mConfig["salsa"]["spec"]) {
35 SPD_ERROR("Salsa spec was not found !!!");
36 return nullptr;
37 }
38
39 SPD_TRACE("Caching hostname via zsys_hostname()");
40 char * pHostnameCStr_ = zsys_hostname();
41 std::string hostname = "nohostname";
42 if (pHostnameCStr_) {
43 hostname = pHostnameCStr_;
44 free(pHostnameCStr_);
45 }
46 SPD_TRACE("Cached hostname [{}]", hostname);
47
48 node = std::make_shared<Salsa::Node>("SALSA");
49 int nodeId = 0;
50
51 for (auto spec : mConfig["salsa"]["spec"]) {
52
53 if (!spec["nodes"]) {
54 SPD_ERROR("Nodes array is missing for [{}] !!!", spec["name"].as<std::string>());
55 return nullptr;
56 }
57 auto name = spec["name"].as<std::string>();
58 bool found = false;
59 YAML::Node opt;
60
61 if (mFilter.size() == 0) {
62 found = true;
63 }
64 else {
65 for (auto filter : mFilter) {
66 if (name == filter.first) {
67 opt = filter.second;
68 if (opt["replicas"]) {
69 spec["replicas"] = opt["replicas"].as<int>();
70 }
71 found = true;
72 }
73 }
74 }
75
76 if (!found) continue;
77 SPD_TRACE("name [{}]", name);
78 int count = 1;
79
80 if (spec["replicas"]) {
81 count = spec["replicas"].as<int>();
82 }
83 for (int iCount = 0; iCount < count; iCount++) {
84 // Create zyre node
85 std::string zyreName = fmt::format("{}:{}:{}:{}", name, hostname, getpid(), nodeId);
86 std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
87
88 if (spec["jobinfo"]["group"]) {
89 SPD_INFO("Setting jobInfoGroup [{}] from config", spec["jobinfo"]["group"].as<std::string>());
90 pNodeZyre->jobInfoGroupName(spec["jobinfo"]["group"].as<std::string>());
91 }
92
93 if (spec["jobinfo"]["broker"]["protocol"] && spec["jobinfo"]["broker"]["ip"] &&
94 spec["jobinfo"]["broker"]["port"]) {
95 std::string url = spec["jobinfo"]["broker"]["ip"].as<std::string>();
96 if (url.empty()) url = hostname;
97 url = fmt::format(">{}{}:{}", spec["jobinfo"]["broker"]["protocol"].as<std::string>(), url,
98 spec["jobinfo"]["broker"]["port"].as<std::string>());
99 SPD_INFO("Setting jobInfoBrokerUrl [{}] from config", url);
100 pNodeZyre->jobInfoBrokerUrl(url);
101 }
102 if (spec["jobinfo"]["client"]["protocol"] && spec["jobinfo"]["client"]["ip"] &&
103 spec["jobinfo"]["client"]["port"]) {
104 std::string url = spec["jobinfo"]["client"]["ip"].as<std::string>();
105 if (url.empty()) url = hostname;
106 url = fmt::format(">{}{}:{}", spec["jobinfo"]["client"]["protocol"].as<std::string>(), url,
107 spec["jobinfo"]["client"]["port"].as<std::string>());
108 SPD_INFO("Broker url for client : {}", url);
109 pNodeZyre->jobInfoClientUrl(url);
110 }
111 if (spec["timeout"]["poller"]) {
112 pNodeZyre->timeout(spec["timeout"]["poller"].as<int>());
113 SPD_INFO("Setting poller timeout [{}]", pNodeZyre->timeout());
114 }
115
116 if (spec["timeout"]["jobfinished"]) {
117 SPD_INFO("Setting jobfinished timeout [{}]", spec["timeout"]["jobfinished"].as<std::string>());
118 setenv("SALSA_FINISHED_JOB_TIMEOUT", spec["timeout"]["jobfinished"].as<std::string>().data(), true);
119 }
120 if (spec["timeout"]["jobcheck"]) {
121 SPD_INFO("Setting jobcheck timeout [{}]", spec["timeout"]["jobcheck"].as<std::string>());
122 setenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec["timeout"]["jobcheck"].as<std::string>().data(), true);
123 }
124
125 for (auto nodes : spec["nodes"]) {
126 SPD_TRACE(" name [{}]", nodes["name"].as<std::string>());
127 SPD_TRACE(" zyreName [{}]", zyreName);
128 SPD_TRACE(" type [{}]", nodes["type"].as<std::string>());
129
130 applyOptions(nodes, opt);
131
132 if (nodes["submit"]["protocol"] && nodes["submit"]["ip"] && nodes["submit"]["port"]) {
133 if (nodes["submit"]["ip"].as<std::string>() == "$all") nodes["submit"]["ip"] = "*";
134 std::string url =
135 fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(),
136 nodes["submit"]["ip"].as<std::string>(), nodes["submit"]["port"].as<int>());
137
138 SPD_INFO("Submit : url [{}]", url);
139 zsock_t * s = zsock_new_router(url.c_str());
140 if (s == nullptr) {
141 SPD_CRIT("Failed to bind submitter on '{}' !!!", url);
142 return nullptr;
143 }
144 pNodeZyre->addSocket(static_cast<zsock_t *>(s));
145
146 std::string submitter_hostname = hostname;
147 if (getenv("SALSA_SUBMIT_HOSTNAME")) submitter_hostname = getenv("SALSA_SUBMIT_HOSTNAME");
148 std::string submit_url_client =
149 fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(), submitter_hostname,
150 nodes["submit"]["port"].as<int>());
151
152 SPD_INFO("Submit url for client [{}] ...", submit_url_client);
153 pNodeZyre->submitClientUrl(submit_url_client);
154 }
155 else {
156 pNodeZyre->type(nodes["type"].as<std::string>());
157 if (nodes["alias"]) pNodeZyre->clusterAlias(nodes["alias"].as<std::string>());
158
159 SPD_INFO("clusterAlias[{}] ...", pNodeZyre->clusterAlias());
160 Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
161 pNodeInfo->set_name(zyreName);
162 pNodeInfo->set_hostname(hostname);
163
164 std::map<std::string, std::string> headers;
165 headers.insert(
166 std::pair<std::string, std::string>("X-SALSA-NODE-TYPE", nodes["type"].as<std::string>()));
167
168 // Create zyre socket for node
169 std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
170 std::make_shared<Salsa::SocketZyre>(zyreName, headers);
171
172 applyOptions(nodes, opt);
173
174 if (nodes["discovery"]["type"]) {
175 int port;
176 std::string url, endpoint;
177 if (getenv("SALSA_ENDPOINT")) endpoint = getenv("SALSA_ENDPOINT");
178
179 std::string discoveryType = nodes["discovery"]["type"].as<std::string>();
180
181 if (discoveryType == "udp") {
182 port = 10000;
183 if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
184 SPD_TRACE("Using discovery [{}] via port [{}]...", discoveryType, port);
185 pSocketZyre->port(port); // Set socket's port
186 }
187 else if (discoveryType == "gossip") {
188 std::string p, i;
189 port = 20000;
190 if (nodes["discovery"]["protocol"]) p = nodes["discovery"]["protocol"].as<std::string>();
191 if (nodes["discovery"]["ip"]) i = nodes["discovery"]["ip"].as<std::string>();
192 if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
193
194 url = fmt::format("{}://{}:{}", p, i, port);
195
196 SPD_INFO("Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
197 if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(), "%s", endpoint.c_str());
198
199 if (nodes["discovery"]["bind"] && nodes["discovery"]["bind"].as<bool>() == true) {
200 if (i == "$all") i = "*";
201 url = fmt::format("{}://{}:{}", p, i, port);
202 zyre_gossip_bind(pSocketZyre->zyre(), "%s", url.c_str());
203 }
204 else {
205 zyre_gossip_connect(pSocketZyre->zyre(), "%s", url.c_str());
206 }
207
208 if (mConfig["salsa"]["options"]["evasive"]) {
209 SPD_INFO("Setting 'evasive' timeout to [{}] msec ...",
210 mConfig["salsa"]["options"]["evasive"].as<int>());
211 zyre_set_evasive_timeout(pSocketZyre->zyre(),
212 mConfig["salsa"]["options"]["evasive"].as<int>());
213 }
214 if (mConfig["salsa"]["options"]["expired"]) {
215 SPD_INFO("Setting 'expired' timeout to [{}] msec ...",
216 mConfig["salsa"]["options"]["expired"].as<int>());
217 zyre_set_expired_timeout(pSocketZyre->zyre(),
218 mConfig["salsa"]["options"]["expired"].as<int>());
219 }
220 }
221 else {
222 SPD_WARN("No discovery type specified !!!");
223 }
224
225 const char * zyreInterface = getenv("SALSA_INTERFACE");
226 if (zyreInterface && strcmp(zyreInterface, "")) {
227 SPD_INFO("Using SALSA_INTERFACE [{}]", zyreInterface);
228 zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
229 }
230
231 pSocketZyre->connect(); // Connect to socket
232 pNodeZyre->addSocket(pSocketZyre); // Add socket to zyre node
233
234 SPD_INFO("Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
235 "endpoint [{}]",
236 name, zyreName, discoveryType, url, port, endpoint);
237
238 if (Salsa::Object::getConsoleOutput()->level() < static_cast<int>(spdlog::level::info)) {
239 zyre_print(pSocketZyre->zyre());
240 }
241 }
242 node->add(pNodeZyre); // Add zyre node to main node
243 targetActors->push_back(pNodeZyre); // Add zyre node to actor index
244 nodeId++;
245 }
246 }
247 }
248 SPD_TRACE("---");
249 nodeId++;
250 }
251
252 return node;
253}
254
255void ConfigZyre::applyOptions(YAML::detail::iterator_value & src, YAML::Node & opt)
256{
260 if (opt["type"] && src["discovery"]["type"]) {
261 src["discovery"]["type"] = opt["type"].as<std::string>();
262 }
263 if (opt["protocol"] && src["discovery"]["protocol"]) {
264 src["discovery"]["protocol"] = opt["protocol"].as<std::string>();
265 }
266 if (opt["ip"] && src["discovery"]["ip"]) {
267 src["discovery"]["ip"] = opt["ip"].as<std::string>();
268 }
269 if (opt["port"] && src["discovery"]["port"]) {
270 src["discovery"]["port"] = opt["port"].as<int>();
271 }
272 if (opt["submitport"] && src["submit"]["port"]) {
273 src["submit"]["port"] = opt["submitport"].as<int>();
274 }
275}
276
277} // namespace Salsa
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq > > *targetActors)
Definition ConfigZyre.cc:19
virtual ~ConfigZyre()
Definition ConfigZyre.cc:12
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
Base Config class.
Definition Config.hh:17
void filter(std::string const &f)
Definition Config.cc:27
std::map< std::string, YAML::Node > mFilter
Filter list.
Definition Config.hh:31
YAML::Node mConfig
YAML Configuration.
Definition Config.hh:30
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
Definition Object.hh:21