5 #include <spdlog/fmt/ostr.h>
6 #include <spdlog/sinks/stdout_color_sinks.h>
7 #include <spdlog/spdlog.h>
12 static shared_ptr<spdlog::logger> logger =
nullptr;
16 logger->info(
"obmon-broker -i tcp://*:5000 -o tcp://*:5001 -t 1000 -r 3");
19 int main(
int argc,
char **argv) {
22 auto console = spdlog::stdout_color_mt(
"console");
23 logger = spdlog::get(
"console");
25 char *short_options = strdup(
"i:o:t:r:h");
26 struct option long_options[] = {
27 {
"in", 1,
nullptr,
'i'}, {
"out", 1,
nullptr,
'o'},
28 {
"timeout", 1,
nullptr,
't'}, {
"retry", 1,
nullptr,
'r'},
29 {
"help", 0,
nullptr,
'h'}, {
nullptr, 0,
nullptr, 0}};
31 string in =
"@tcp://*:5000";
32 string out =
"@tcp://*:5001";
33 int timeout_publish = 1000;
34 int timeout_poller = 100;
35 int retry_plublish = 3;
38 getopt_long(argc, argv, short_options, long_options,
nullptr);
39 while (nextOption != -1) {
50 timeout_publish = atoi(optarg);
53 retry_plublish = atoi(optarg);
63 nextOption = getopt_long(argc, argv, short_options, long_options,
nullptr);
68 if (getenv(
"OBMON_LOG")) {
69 int rc = sscanf(getenv(
"OBMON_LOG"),
"%d", &debugLevel);
71 logger->error(
"OBMON_LOG is not a number: {}", getenv(
"OBMON_LOG"));
81 logger->info(
"Setting log level to '{}' ...",
82 spdlog::level::to_c_str(
83 static_cast<spdlog::level::level_enum>(6 - debugLevel)));
84 spdlog::set_level(static_cast<spdlog::level::level_enum>(6 - debugLevel));
88 zpoller_t *poller = zpoller_new(
nullptr);
90 logger->info(
"Connected to in={} out={}", in, out);
93 zsock_t *sub = zsock_new_sub(in.data(),
"");
97 zsock_t *pub = zsock_new_pub(out.data());
101 zpoller_add(poller, sub);
104 map<pair<string, string>,
string> state;
105 map<string, int64_t> subscribers;
106 map<string, int64_t>::iterator it_subs;
108 int64_t cur_time = zclock_time();
109 while (!zsys_interrupted) {
112 static_cast<zsock_t *
>(zpoller_wait(poller, timeout_poller));
113 logger->trace(
"Poller released socket={}", static_cast<void *>(socket));
115 if (zpoller_terminated(poller))
118 if (socket !=
nullptr) {
119 zmsg_t *msg = zmsg_recv(socket);
124 char *sub_str = zmsg_popstr(msg);
126 char *cluster = zmsg_popstr(msg);
127 char *id_str = zmsg_popstr(msg);
128 char *json = zmsg_popstr(msg);
130 auto it_sub = subscribers.find(id_str);
131 if (it_sub != subscribers.end()) {
132 it_sub->second = zclock_time();
133 logger->trace(
"Updating time str={}", it_sub->first);
136 subscribers.insert(it_sub,
137 std::pair<string, int64_t>(id_str, zclock_time()));
138 logger->trace(
"Inserting str={} ", id_str);
139 logger->trace(
"Sensor_list count={}", subscribers.size());
142 if (cluster !=
nullptr && id_str !=
nullptr && json !=
nullptr) {
144 auto it_state = state.find(pair<string, string>(cluster, id_str));
148 if (it_state == state.end()) {
149 state.insert(pair<pair<string, string>,
string>(
150 pair<string, string>(cluster, id_str), json));
152 it_state->second = json;
155 logger->warn(
"Defective message: cluster={} id_str={} json=%s", cluster,
165 if ((zclock_time() - cur_time) >= timeout_publish) {
167 auto it_state = state.begin();
168 while (it_state != state.end()) {
170 string cur = it_state->first.first;
171 std::string json =
"[";
176 while (it_state != state.end() && cur == it_state->first.first) {
178 json += it_state->second;
185 if (json.back() ==
',')
189 zmsg_t *msg = zmsg_new();
190 zmsg_addstr(msg,
"");
191 zmsg_addstr(msg, cur.data());
192 zmsg_addstrf(msg,
"%s", json.data());
193 logger->debug(
"Publishing cluster={}", cur.data());
194 logger->trace(
"JSON={}", json.data());
195 zmsg_send(&msg, pub);
198 cur_time = zclock_time();
200 vector<string> inactive_subcribers;
201 for (
auto it = subscribers.cbegin(); it != subscribers.cend(); ++it) {
202 if ((zclock_time() - it->second) >= retry_plublish * timeout_publish) {
203 logger->trace(
"Putting to inactive_subcribers id(s) = {}", it->first);
204 inactive_subcribers.push_back(it->first);
207 logger->debug(
"Size : subscribers={} state={}", subscribers.size(),
209 for (
auto s : inactive_subcribers) {
211 for (
auto it = state.cbegin(); it != state.cend(); ++it) {
212 logger->trace(
"Checking state cluster={} id={} erease_str={}",
213 it->first.first, it->first.second, s);
214 if (!s.compare(it->first.second)) {
215 logger->trace(
"Removing state cluster={} id={}", it->first.first,
221 auto it_sub = subscribers.find(s);
222 logger->trace(
"Removing sensor id={}", it_sub->first);
223 subscribers.erase(it_sub);
231 zpoller_destroy(&poller);