salsa  0.7.1
ConfigZyre.cc
1 #include "ConfigZyre.hh"
2 #include "NodeZyre.hh"
3 #include "PollerZmq.hh"
4 namespace Salsa {
6  : Config()
7 {
11 }
13 {
17 }
18 
19 std::shared_ptr<Salsa::Node> ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
20 {
24 
25  if (!targetActors) return nullptr;
26 
27  std::shared_ptr<Salsa::Node> node = nullptr;
28 
29  if (!mConfig["salsa"]["type"] || mConfig["salsa"]["type"].as<std::string>() != "zyre") {
30  SPD_ERROR("Salsa type is not [zyre] !!! ");
31  return nullptr;
32  }
33 
34  if (!mConfig["salsa"]["spec"]) {
35  SPD_ERROR("Salsa spec was not found !!!");
36  return nullptr;
37  }
38 
39  SPD_TRACE("Caching hostname via zsys_hostname()");
40  char * pHostnameCStr_ = zsys_hostname();
41  std::string hostname = "nohostname";
42  if (pHostnameCStr_) {
43  hostname = pHostnameCStr_;
44  free(pHostnameCStr_);
45  }
46  SPD_TRACE("Cached hostname [{}]", hostname);
47 
48  node = std::make_shared<Salsa::Node>("SALSA");
49  int nodeId = 0;
50 
51  for (auto spec : mConfig["salsa"]["spec"]) {
52 
53  if (!spec["nodes"]) {
54  SPD_ERROR("Nodes array is missing for [{}] !!!", spec["name"].as<std::string>());
55  return nullptr;
56  }
57  auto name = spec["name"].as<std::string>();
58  bool found = false;
59  YAML::Node opt;
60 
61  if (mFilter.size() == 0) {
62  found = true;
63  }
64  else {
65  for (auto filter : mFilter) {
66  if (name == filter.first) {
67  opt = filter.second;
68  if (opt["replicas"]) {
69  spec["replicas"] = opt["replicas"].as<int>();
70  }
71  found = true;
72  }
73  }
74  }
75 
76  if (!found) continue;
77  SPD_TRACE("name [{}]", name);
78  int count = 1;
79 
80  if (spec["replicas"]) {
81  count = spec["replicas"].as<int>();
82  }
83  for (int iCount = 0; iCount < count; iCount++) {
84  // Create zyre node
85  std::string zyreName = fmt::format("{}:{}:{}:{}", name, hostname, getpid(), nodeId);
86  std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
87 
88  if (spec["jobinfo"]["group"]) {
89  SPD_INFO("Setting jobInfoGroup [{}] from config", spec["jobinfo"]["group"].as<std::string>());
90  pNodeZyre->jobInfoGroupName(spec["jobinfo"]["group"].as<std::string>());
91  }
92 
93  if (spec["jobinfo"]["broker"]["protocol"] && spec["jobinfo"]["broker"]["ip"] &&
94  spec["jobinfo"]["broker"]["port"]) {
95  std::string url = spec["jobinfo"]["broker"]["ip"].as<std::string>();
96  if (url.empty()) url = hostname;
97  url = fmt::format(">{}{}:{}", spec["jobinfo"]["broker"]["protocol"].as<std::string>(), url,
98  spec["jobinfo"]["broker"]["port"].as<std::string>());
99  SPD_INFO("Setting jobInfoBrokerUrl [{}] from config", url);
100  pNodeZyre->jobInfoBrokerUrl(url);
101  }
102  if (spec["jobinfo"]["client"]["protocol"] && spec["jobinfo"]["client"]["ip"] &&
103  spec["jobinfo"]["client"]["port"]) {
104  std::string url = spec["jobinfo"]["client"]["ip"].as<std::string>();
105  if (url.empty()) url = hostname;
106  url = fmt::format(">{}{}:{}", spec["jobinfo"]["client"]["protocol"].as<std::string>(), url,
107  spec["jobinfo"]["client"]["port"].as<std::string>());
108  SPD_INFO("Broker url for client : {}", url);
109  pNodeZyre->jobInfoClientUrl(url);
110  }
111  if (spec["timeout"]["poller"]) {
112  pNodeZyre->timeout(spec["timeout"]["poller"].as<int>());
113  SPD_INFO("Setting poller timeout [{}]", pNodeZyre->timeout());
114  }
115 
116  if (spec["timeout"]["jobfinished"]) {
117  SPD_INFO("Setting jobfinished timeout [{}]", spec["timeout"]["jobfinished"].as<std::string>());
118  setenv("SALSA_FINISHED_JOB_TIMEOUT", spec["timeout"]["jobfinished"].as<std::string>().data(), true);
119  }
120  if (spec["timeout"]["jobcheck"]) {
121  SPD_INFO("Setting jobcheck timeout [{}]", spec["timeout"]["jobcheck"].as<std::string>());
122  setenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec["timeout"]["jobcheck"].as<std::string>().data(), true);
123  }
124 
125  for (auto nodes : spec["nodes"]) {
126  SPD_TRACE(" name [{}]", nodes["name"].as<std::string>());
127  SPD_TRACE(" zyreName [{}]", zyreName);
128  SPD_TRACE(" type [{}]", nodes["type"].as<std::string>());
129 
130  applyOptions(nodes, opt);
131 
132  if (nodes["submit"]["protocol"] && nodes["submit"]["ip"] && nodes["submit"]["port"]) {
133  if (nodes["submit"]["ip"].as<std::string>() == "$all") nodes["submit"]["ip"] = "*";
134  std::string url =
135  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(),
136  nodes["submit"]["ip"].as<std::string>(), nodes["submit"]["port"].as<int>());
137 
138  SPD_INFO("Submit : url [{}]", url);
139  zsock_t * s = zsock_new_router(url.c_str());
140  if (s == nullptr) {
141  SPD_CRIT("Failed to bind submitter on '{}' !!!", url);
142  return nullptr;
143  }
144  pNodeZyre->addSocket(static_cast<zsock_t *>(s));
145 
146  std::string submitter_hostname = hostname;
147  if (getenv("SALSA_SUBMIT_HOSTNAME")) submitter_hostname = getenv("SALSA_SUBMIT_HOSTNAME");
148  std::string submit_url_client =
149  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(), submitter_hostname,
150  nodes["submit"]["port"].as<int>());
151 
152  SPD_INFO("Submit url for client [{}] ...", submit_url_client);
153  pNodeZyre->submitClientUrl(submit_url_client);
154  }
155  else {
156  pNodeZyre->type(nodes["type"].as<std::string>());
157  if (nodes["alias"]) pNodeZyre->clusterAlias(nodes["alias"].as<std::string>());
158 
159  SPD_INFO("clusterAlias[{}] ...", pNodeZyre->clusterAlias());
160  Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
161  pNodeInfo->set_name(zyreName);
162  pNodeInfo->set_hostname(hostname);
163 
164  std::map<std::string, std::string> headers;
165  headers.insert(
166  std::pair<std::string, std::string>("X-SALSA-NODE-TYPE", nodes["type"].as<std::string>()));
167 
168  // Create zyre socket for node
169  std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
170  std::make_shared<Salsa::SocketZyre>(zyreName, headers);
171 
172  applyOptions(nodes, opt);
173 
174  if (nodes["discovery"]["type"]) {
175  int port;
176  std::string url, endpoint;
177  if (getenv("SALSA_ENDPOINT")) endpoint = getenv("SALSA_ENDPOINT");
178 
179  std::string discoveryType = nodes["discovery"]["type"].as<std::string>();
180 
181  if (discoveryType == "udp") {
182  port = 10000;
183  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
184  SPD_TRACE("Using discovery [{}] via port [{}]...", discoveryType, port);
185  pSocketZyre->port(port); // Set socket's port
186  }
187  else if (discoveryType == "gossip") {
188  std::string p, i;
189  port = 20000;
190  if (nodes["discovery"]["protocol"]) p = nodes["discovery"]["protocol"].as<std::string>();
191  if (nodes["discovery"]["ip"]) i = nodes["discovery"]["ip"].as<std::string>();
192  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
193 
194  url = fmt::format("{}://{}:{}", p, i, port);
195 
196  SPD_INFO("Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
197  if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(), "%s", endpoint.c_str());
198 
199  if (nodes["discovery"]["bind"] && nodes["discovery"]["bind"].as<bool>() == true) {
200  if (i == "$all") i = "*";
201  url = fmt::format("{}://{}:{}", p, i, port);
202  zyre_gossip_bind(pSocketZyre->zyre(), "%s", url.c_str());
203  }
204  else {
205  zyre_gossip_connect(pSocketZyre->zyre(), "%s", url.c_str());
206  }
207 
208  if (mConfig["salsa"]["options"]["evasive"]) {
209  SPD_INFO("Setting 'evasive' timeout to [{}] msec ...",
210  mConfig["salsa"]["options"]["evasive"].as<int>());
211  zyre_set_evasive_timeout(pSocketZyre->zyre(),
212  mConfig["salsa"]["options"]["evasive"].as<int>());
213  }
214  if (mConfig["salsa"]["options"]["expired"]) {
215  SPD_INFO("Setting 'expired' timeout to [{}] msec ...",
216  mConfig["salsa"]["options"]["expired"].as<int>());
217  zyre_set_expired_timeout(pSocketZyre->zyre(),
218  mConfig["salsa"]["options"]["expired"].as<int>());
219  }
220  }
221  else {
222  SPD_WARN("No discovery type specified !!!");
223  }
224 
225  const char * zyreInterface = getenv("SALSA_INTERFACE");
226  if (zyreInterface && strcmp(zyreInterface, "")) {
227  SPD_INFO("Using SALSA_INTERFACE [{}]", zyreInterface);
228  zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
229  }
230 
231  pSocketZyre->connect(); // Connect to socket
232  pNodeZyre->addSocket(pSocketZyre); // Add socket to zyre node
233 
234  SPD_INFO("Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
235  "endpoint [{}]",
236  name, zyreName, discoveryType, url, port, endpoint);
237 
238  if (Salsa::Object::getConsoleOutput()->level() < static_cast<int>(spdlog::level::info)) {
239  zyre_print(pSocketZyre->zyre());
240  }
241  }
242  node->add(pNodeZyre); // Add zyre node to main node
243  targetActors->push_back(pNodeZyre); // Add zyre node to actor index
244  nodeId++;
245  }
246  }
247  }
248  SPD_TRACE("---");
249  nodeId++;
250  }
251 
252  return node;
253 }
254 
255 void ConfigZyre::applyOptions(YAML::detail::iterator_value & src, YAML::Node & opt)
256 {
260  if (opt["type"] && src["discovery"]["type"]) {
261  src["discovery"]["type"] = opt["type"].as<std::string>();
262  }
263  if (opt["protocol"] && src["discovery"]["protocol"]) {
264  src["discovery"]["protocol"] = opt["protocol"].as<std::string>();
265  }
266  if (opt["ip"] && src["discovery"]["ip"]) {
267  src["discovery"]["ip"] = opt["ip"].as<std::string>();
268  }
269  if (opt["port"] && src["discovery"]["port"]) {
270  src["discovery"]["port"] = opt["port"].as<int>();
271  }
272  if (opt["submitport"] && src["submit"]["port"]) {
273  src["submit"]["port"] = opt["submitport"].as<int>();
274  }
275 }
276 
277 } // namespace Salsa
virtual ~ConfigZyre()
Definition: ConfigZyre.cc:12
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
Definition: ConfigZyre.cc:255
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> *targetActors)
Definition: ConfigZyre.cc:19
Base Config class.
Definition: Config.hh:17
void filter(std::string const &f)
Definition: Config.cc:27
std::map< std::string, YAML::Node > mFilter
Filter list.
Definition: Config.hh:31
YAML::Node mConfig
YAML Configuration.
Definition: Config.hh:30
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
Definition: Object.hh:21