salsa  0.4.15
ConfigZyre.cc
1 #include "ConfigZyre.hh"
2 #include "NodeZyre.hh"
3 #include "PollerZmq.hh"
4 namespace Salsa {
6 {
10 }
12 {
16 }
17 
18 std::shared_ptr<Salsa::Node> ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
19 {
23 
24  if (!targetActors) return nullptr;
25 
26  std::shared_ptr<Salsa::Node> node = nullptr;
27 
28  if (!mConfig["salsa"]["type"] || mConfig["salsa"]["type"].as<std::string>() != "zyre") {
29  SPD_ERROR("Salsa type is not [zyre] !!! ");
30  return nullptr;
31  }
32 
33  if (!mConfig["salsa"]["spec"]) {
34  SPD_ERROR("Salsa spec was not found !!!");
35  return nullptr;
36  }
37 
38  SPD_TRACE("Caching hostname via zsys_hostname()");
39  char * pHostnameCStr_ = zsys_hostname();
40  std::string hostname = "nohostname";
41  if (pHostnameCStr_) {
42  hostname = pHostnameCStr_;
43  free(pHostnameCStr_);
44  }
45  SPD_TRACE("Cached hostname [{}]", hostname);
46 
47  node = std::make_shared<Salsa::Node>("SALSA");
48  int nodeId = 0;
49 
50  for (auto spec : mConfig["salsa"]["spec"]) {
51 
52  if (!spec["nodes"]) {
53  SPD_ERROR("Nodes array is missing for [{}] !!!", spec["name"].as<std::string>());
54  return nullptr;
55  }
56  auto name = spec["name"].as<std::string>();
57  bool found = false;
58  YAML::Node opt;
59 
60  if (mFilter.size() == 0) {
61  found = true;
62  }
63  else {
64  for (auto filter : mFilter) {
65  if (name == filter.first) {
66  opt = filter.second;
67  if (opt["replicas"]) {
68  spec["replicas"] = opt["replicas"].as<int>();
69  }
70  found = true;
71  }
72  }
73  }
74 
75  if (!found) continue;
76  SPD_TRACE("name [{}]", name);
77  int count = 1;
78 
79  if (spec["replicas"]) {
80  count = spec["replicas"].as<int>();
81  }
82  for (int iCount = 0; iCount < count; iCount++) {
83  // Create zyre node
84  std::string zyreName = fmt::format("{}:{}:{}:{}", name, hostname, getpid(), nodeId);
85  std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
86 
87  if (spec["jobinfo"]["group"]) {
88  SPD_INFO("Setting jobInfoGroup [{}] from config", spec["jobinfo"]["group"].as<std::string>());
89  pNodeZyre->jobInfoGroupName(spec["jobinfo"]["group"].as<std::string>());
90  }
91 
92  if (spec["jobinfo"]["broker"]["protocol"] && spec["jobinfo"]["broker"]["ip"] &&
93  spec["jobinfo"]["broker"]["port"]) {
94  std::string url = spec["jobinfo"]["broker"]["ip"].as<std::string>();
95  if (url.empty()) url = hostname;
96  url = fmt::format(">{}{}:{}", spec["jobinfo"]["broker"]["protocol"].as<std::string>(), url,
97  spec["jobinfo"]["broker"]["port"].as<std::string>());
98  SPD_INFO("Setting jobInfoBrokerUrl [{}] from config", url);
99  pNodeZyre->jobInfoBrokerUrl(url);
100  }
101  if (spec["jobinfo"]["client"]["protocol"] && spec["jobinfo"]["client"]["ip"] &&
102  spec["jobinfo"]["client"]["port"]) {
103  std::string url = spec["jobinfo"]["client"]["ip"].as<std::string>();
104  if (url.empty()) url = hostname;
105  url = fmt::format(">{}{}:{}", spec["jobinfo"]["client"]["protocol"].as<std::string>(), url,
106  spec["jobinfo"]["client"]["port"].as<std::string>());
107  SPD_INFO("Broker url for client : {}", url);
108  pNodeZyre->jobInfoClientUrl(url);
109  }
110  if (spec["timeout"]["poller"]) {
111  pNodeZyre->timeout(spec["timeout"]["poller"].as<int>());
112  SPD_INFO("Setting poller timeout [{}]", pNodeZyre->timeout());
113  }
114 
115  if (spec["timeout"]["jobfinished"]) {
116  SPD_INFO("Setting jobfinished timeout [{}]", spec["timeout"]["jobfinished"].as<std::string>());
117  setenv("SALSA_FINISHED_JOB_TIMEOUT", spec["timeout"]["jobfinished"].as<std::string>().data(), true);
118  }
119  if (spec["timeout"]["jobcheck"]) {
120  SPD_INFO("Setting jobcheck timeout [{}]", spec["timeout"]["jobcheck"].as<std::string>());
121  setenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec["timeout"]["jobcheck"].as<std::string>().data(), true);
122  }
123 
124  for (auto nodes : spec["nodes"]) {
125  SPD_TRACE(" name [{}]", nodes["name"].as<std::string>());
126  SPD_TRACE(" zyreName [{}]", zyreName);
127  SPD_TRACE(" type [{}]", nodes["type"].as<std::string>());
128 
129  applyOptions(nodes, opt);
130 
131  if (nodes["submit"]["protocol"] && nodes["submit"]["ip"] && nodes["submit"]["port"]) {
132  if (nodes["submit"]["ip"].as<std::string>() == "$all") nodes["submit"]["ip"] = "*";
133  std::string url =
134  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(),
135  nodes["submit"]["ip"].as<std::string>(), nodes["submit"]["port"].as<int>());
136 
137  SPD_INFO("Submit : url [{}]", url);
138  zsock_t * s = zsock_new_router(url.c_str());
139  if (s == nullptr) {
140  SPD_CRIT("Failed to bind submitter on '{}' !!!", url);
141  return nullptr;
142  }
143  pNodeZyre->addSocket(static_cast<zsock_t *>(s));
144 
145  std::string submitter_hostname = hostname;
146  if (getenv("SALSA_SUBMIT_HOSTNAME")) submitter_hostname = getenv("SALSA_SUBMIT_HOSTNAME");
147  std::string submit_url_client =
148  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(), submitter_hostname,
149  nodes["submit"]["port"].as<int>());
150 
151  SPD_INFO("Submit url for client [{}] ...", submit_url_client);
152  pNodeZyre->submitClientUrl(submit_url_client);
153  }
154  else {
155  Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
156  pNodeInfo->set_name(zyreName);
157  pNodeInfo->set_hostname(hostname);
158 
159  std::map<std::string, std::string> headers;
160  headers.insert(
161  std::pair<std::string, std::string>("X-SALSA-NODE-TYPE", nodes["type"].as<std::string>()));
162 
163  // Create zyre socket for node
164  std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
165  std::make_shared<Salsa::SocketZyre>(zyreName, headers);
166 
167  applyOptions(nodes, opt);
168 
169  if (nodes["discovery"]["type"]) {
170  int port;
171  std::string url, endpoint;
172  if (getenv("SALSA_ENDPOINT")) endpoint = getenv("SALSA_ENDPOINT");
173 
174  std::string discoveryType = nodes["discovery"]["type"].as<std::string>();
175 
176  if (discoveryType == "udp") {
177  port = 10000;
178  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
179  SPD_TRACE("Using discovery [{}] via port [{}]...", discoveryType, port);
180  pSocketZyre->port(port); // Set socket's port
181  }
182  else if (discoveryType == "gossip") {
183  std::string p, i;
184  port = 20000;
185  if (nodes["discovery"]["protocol"]) p = nodes["discovery"]["protocol"].as<std::string>();
186  if (nodes["discovery"]["ip"]) i = nodes["discovery"]["ip"].as<std::string>();
187  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
188 
189  url = fmt::format("{}://{}:{}", p, i, port);
190 
191  SPD_INFO("Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
192  if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(), "%s", endpoint.c_str());
193 
194  if (nodes["discovery"]["bind"] && nodes["discovery"]["bind"].as<bool>() == true) {
195  if (i == "$all") i = "*";
196  url = fmt::format("{}://{}:{}", p, i, port);
197  zyre_gossip_bind(pSocketZyre->zyre(), "%s", url.c_str());
198  }
199  else {
200  zyre_gossip_connect(pSocketZyre->zyre(), "%s", url.c_str());
201  }
202 
203  if (mConfig["salsa"]["options"]["evasive"]) {
204  SPD_INFO("Setting 'evasive' timeout to [{}] msec ...",
205  mConfig["salsa"]["options"]["evasive"].as<int>());
206  zyre_set_evasive_timeout(pSocketZyre->zyre(),
207  mConfig["salsa"]["options"]["evasive"].as<int>());
208  }
209  if (mConfig["salsa"]["options"]["expired"]) {
210  SPD_INFO("Setting 'expired' timeout to [{}] msec ...",
211  mConfig["salsa"]["options"]["expired"].as<int>());
212  zyre_set_expired_timeout(pSocketZyre->zyre(),
213  mConfig["salsa"]["options"]["expired"].as<int>());
214  }
215  }
216  else {
217  SPD_WARN("No discovery type specified !!!");
218  }
219 
220  const char * zyreInterface = getenv("SALSA_INTERFACE");
221  if (zyreInterface && strcmp(zyreInterface, "")) {
222  SPD_INFO("Using SALSA_INTERFACE [{}]", zyreInterface);
223  zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
224  }
225 
226  pSocketZyre->connect(); // Connect to socket
227  pNodeZyre->addSocket(pSocketZyre); // Add socket to zyre node
228 
229  SPD_INFO("Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
230  "endpoint [{}]",
231  name, zyreName, discoveryType, url, port, endpoint);
232 
233  if (Salsa::Object::getConsoleOutput()->level() < static_cast<int>(spdlog::level::info)) {
234  zyre_print(pSocketZyre->zyre());
235  }
236  }
237  node->add(pNodeZyre); // Add zyre node to main node
238  targetActors->push_back(pNodeZyre); // Add zyre node to actor index
239  nodeId++;
240  }
241  }
242  }
243  SPD_TRACE("---");
244  nodeId++;
245  }
246 
247  return node;
248 }
249 
250 void ConfigZyre::applyOptions(YAML::detail::iterator_value & src, YAML::Node & opt)
251 {
255  if (opt["type"] && src["discovery"]["type"]) {
256  src["discovery"]["type"] = opt["type"].as<std::string>();
257  }
258  if (opt["protocol"] && src["discovery"]["protocol"]) {
259  src["discovery"]["protocol"] = opt["protocol"].as<std::string>();
260  }
261  if (opt["ip"] && src["discovery"]["ip"]) {
262  src["discovery"]["ip"] = opt["ip"].as<std::string>();
263  }
264  if (opt["port"] && src["discovery"]["port"]) {
265  src["discovery"]["port"] = opt["port"].as<int>();
266  }
267  if (opt["submitport"] && src["submit"]["port"]) {
268  src["submit"]["port"] = opt["submitport"].as<int>();
269  }
270 }
271 
272 } // namespace Salsa
virtual ~ConfigZyre()
Definition: ConfigZyre.cc:11
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
Definition: ConfigZyre.cc:250
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> *targetActors)
Definition: ConfigZyre.cc:18
Base Config class.
Definition: Config.hh:17
void filter(std::string const &f)
Definition: Config.cc:27
std::map< std::string, YAML::Node > mFilter
Filter list.
Definition: Config.hh:31
YAML::Node mConfig
YAML Configuration.
Definition: Config.hh:30
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
Definition: Object.hh:21