obmon  1.3.1
 All Classes Functions Variables Typedefs Enumerations Groups Pages
obmon-broker.cpp
1 #include <czmq.h>
2 #include <getopt.h>
3 #include <iostream>
4 #include <map>
5 #include <spdlog/fmt/ostr.h>
6 #include <spdlog/sinks/stdout_color_sinks.h>
7 #include <spdlog/spdlog.h>
8 #include <utility>
9 
10 using namespace std;
11 
12 static shared_ptr<spdlog::logger> logger = nullptr;
13 
14 void help();
15 void help() {
16  logger->info("obmon-broker -i tcp://*:5000 -o tcp://*:5001 -t 1000 -r 3");
17 }
18 
19 int main(int argc, char **argv) {
20 
21  // creating logger
22  auto console = spdlog::stdout_color_mt("console");
23  logger = spdlog::get("console");
24 
25  char *short_options = strdup("i:o:t:r:h");
26  struct option long_options[] = {
27  {"in", 1, nullptr, 'i'}, {"out", 1, nullptr, 'o'},
28  {"timeout", 1, nullptr, 't'}, {"retry", 1, nullptr, 'r'},
29  {"help", 0, nullptr, 'h'}, {nullptr, 0, nullptr, 0}};
30 
31  string in = "@tcp://*:5000";
32  string out = "@tcp://*:5001";
33  int timeout_publish = 1000;
34  int timeout_poller = 100;
35  int retry_plublish = 3;
36 
37  int nextOption =
38  getopt_long(argc, argv, short_options, long_options, nullptr);
39  while (nextOption != -1) {
40  switch (nextOption) {
41  case 'i':
42  in = "@";
43  in.append(optarg);
44  break;
45  case 'o':
46  out = "@";
47  out.append(optarg);
48  break;
49  case 't':
50  timeout_publish = atoi(optarg);
51  break;
52  case 'r':
53  retry_plublish = atoi(optarg);
54  break;
55  case 'h':
56  help();
57  exit(0);
58  // break; <- Unreachable code - emits warning
59  default:
60  help();
61  exit(1);
62  }
63  nextOption = getopt_long(argc, argv, short_options, long_options, nullptr);
64  }
65 
66  // Setting log level
67  int debugLevel = 0;
68  if (getenv("OBMON_LOG")) {
69  int rc = sscanf(getenv("OBMON_LOG"), "%d", &debugLevel);
70  if (!rc) {
71  logger->error("OBMON_LOG is not a number: {}", getenv("OBMON_LOG"));
72  return 2;
73  }
74 
75  if (debugLevel < 0)
76  debugLevel = 0;
77 
78  if (debugLevel > 6)
79  debugLevel = 6;
80 
81  logger->info("Setting log level to '{}' ...",
82  spdlog::level::to_c_str(
83  static_cast<spdlog::level::level_enum>(6 - debugLevel)));
84  spdlog::set_level(static_cast<spdlog::level::level_enum>(6 - debugLevel));
85  }
86 
87  // Creating poller
88  zpoller_t *poller = zpoller_new(nullptr);
89  assert(poller);
90  logger->info("Connected to in={} out={}", in, out);
91 
92  // Subscriber
93  zsock_t *sub = zsock_new_sub(in.data(), "");
94  assert(sub);
95 
96  // Publisher
97  zsock_t *pub = zsock_new_pub(out.data());
98  assert(pub);
99 
100  // Adding subscriber to poller
101  zpoller_add(poller, sub);
102 
103  // Initialization of container "map" with double key
104  map<pair<string, string>, string> state;
105  map<string, int64_t> subscribers;
106  map<string, int64_t>::iterator it_subs;
107 
108  int64_t cur_time = zclock_time();
109  while (!zsys_interrupted) {
110 
111  zsock_t *socket =
112  static_cast<zsock_t *>(zpoller_wait(poller, timeout_poller));
113  logger->trace("Poller released socket={}", static_cast<void *>(socket));
114 
115  if (zpoller_terminated(poller))
116  break;
117 
118  if (socket != nullptr) {
119  zmsg_t *msg = zmsg_recv(socket);
120  if (!msg)
121  break;
122 
123  // zmsg_print(msg);
124  char *sub_str = zmsg_popstr(msg);
125  free(sub_str);
126  char *cluster = zmsg_popstr(msg);
127  char *id_str = zmsg_popstr(msg);
128  char *json = zmsg_popstr(msg);
129 
130  auto it_sub = subscribers.find(id_str);
131  if (it_sub != subscribers.end()) {
132  it_sub->second = zclock_time();
133  logger->trace("Updating time str={}", it_sub->first);
134  } else {
135 
136  subscribers.insert(it_sub,
137  std::pair<string, int64_t>(id_str, zclock_time()));
138  logger->trace("Inserting str={} ", id_str);
139  logger->trace("Sensor_list count={}", subscribers.size());
140  }
141 
142  if (cluster != nullptr && id_str != nullptr && json != nullptr) {
143  // Search item in map with double key <cluster, id_str>
144  auto it_state = state.find(pair<string, string>(cluster, id_str));
145 
146  // If the iterator has reached the end of the map, add new item.
147  // Otherwise update the element pointed to by the iterator.
148  if (it_state == state.end()) {
149  state.insert(pair<pair<string, string>, string>(
150  pair<string, string>(cluster, id_str), json));
151  } else {
152  it_state->second = json;
153  }
154  } else {
155  logger->warn("Defective message: cluster={} id_str={} json=%s", cluster,
156  id_str, json);
157  }
158 
159  free(id_str);
160  free(cluster);
161  free(json);
162  zmsg_destroy(&msg);
163  }
164 
165  if ((zclock_time() - cur_time) >= timeout_publish) {
166 
167  auto it_state = state.begin();
168  while (it_state != state.end()) {
169  // Current type of the node
170  string cur = it_state->first.first;
171  std::string json = "[";
172  int i = 0;
173 
174  // Whlie identical type node and not the end of map, fill in the
175  // message.
176  while (it_state != state.end() && cur == it_state->first.first) {
177  json += "{";
178  json += it_state->second;
179  i++;
180  it_state++;
181  json += "}";
182  json += ",";
183  }
184 
185  if (json.back() == ',')
186  json.pop_back();
187  json += "]";
188 
189  zmsg_t *msg = zmsg_new();
190  zmsg_addstr(msg, "");
191  zmsg_addstr(msg, cur.data());
192  zmsg_addstrf(msg, "%s", json.data());
193  logger->debug("Publishing cluster={}", cur.data());
194  logger->trace("JSON={}", json.data());
195  zmsg_send(&msg, pub);
196  zmsg_destroy(&msg);
197  }
198  cur_time = zclock_time();
199 
200  vector<string> inactive_subcribers;
201  for (auto it = subscribers.cbegin(); it != subscribers.cend(); ++it) {
202  if ((zclock_time() - it->second) >= retry_plublish * timeout_publish) {
203  logger->trace("Putting to inactive_subcribers id(s) = {}", it->first);
204  inactive_subcribers.push_back(it->first);
205  }
206  }
207  logger->debug("Size : subscribers={} state={}", subscribers.size(),
208  state.size());
209  for (auto s : inactive_subcribers) {
210 
211  for (auto it = state.cbegin(); it != state.cend(); ++it) {
212  logger->trace("Checking state cluster={} id={} erease_str={}",
213  it->first.first, it->first.second, s);
214  if (!s.compare(it->first.second)) {
215  logger->trace("Removing state cluster={} id={}", it->first.first,
216  it->first.second);
217  state.erase(it);
218  }
219  }
220 
221  auto it_sub = subscribers.find(s);
222  logger->trace("Removing sensor id={}", it_sub->first);
223  subscribers.erase(it_sub);
224  }
225  }
226  }
227 
228  zsock_destroy(&sub);
229  zsock_destroy(&pub);
230 
231  zpoller_destroy(&poller);
232 
233  free(short_options);
234 
235  // cleaning spdlog
236  spdlog::drop_all();
237 
238  return 0;
239 }