nemea-common 1.6.3
real_time_sending.h
Go to the documentation of this file.
1
7/*
8 * Copyright (C) 2014 CESNET
9 *
10 * LICENSE TERMS
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
14 * are met:
15 * 1. Redistributions of source code must retain the above copyright
16 * notice, this list of conditions and the following disclaimer.
17 * 2. Redistributions in binary form must reproduce the above copyright
18 * notice, this list of conditions and the following disclaimer in
19 * the documentation and/or other materials provided with the
20 * distribution.
21 * 3. Neither the name of the Company nor the names of its contributors
22 * may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * ALTERNATIVELY, provided that this notice is retained in full, this
26 * product may be distributed under the terms of the GNU General Public
27 * License (GPL) version 2 or later, in which case the provisions
28 * of the GPL apply INSTEAD OF those given above.
29 *
30 * This software is provided ``as is'', and any express or implied
31 * warranties, including, but not limited to, the implied warranties of
32 * merchantability and fitness for a particular purpose are disclaimed.
33 * In no event shall the company or contributors be liable for any
34 * direct, indirect, incidental, special, exemplary, or consequential
35 * damages (including, but not limited to, procurement of substitute
36 * goods or services; loss of use, data, or profits; or business
37 * interruption) however caused and on any theory of liability, whether
38 * in contract, strict liability, or tort (including negligence or
39 * otherwise) arising in any way out of the use of this software, even
40 * if advised of the possibility of such damage.
41 *
42 */
43#ifndef _REAL_TIME_SENDING_
44#define _REAL_TIME_SENDING_
45
46#include <sys/time.h>
47
48#define RT_PAR_SET_DEFAULT 0
49
50#define DEFAULT_POOL_SIZE 10
51#define DEFAULT_INIT_TS_CNT 1000
52#define DEFAULT_SAMPLE_RATE 100
53#define DEFAULT_TS_DIFF_THRESHOLD 3.5
54
57typedef struct rt_state_s {
58 uint32_t *mins; // pool for n minimal timestamps to compute average minimum
59 uint16_t min_pool_size; // size of pool for minimal timestamps (n from above)
60 uint16_t act_min_cnt;
61 uint16_t sample_rate;
62
63 uint64_t init_ts_count; // from how many first flows should be minimal timestamp determined
64
65 struct timeval start;
66 struct timeval end;
68 uint32_t ts_diff_cnt;
71
72 float ts_diff_threshold; // holds "magic" value which represents change in timestamps of records in real traffic
74
85#define RT_INIT(rt_state, pool_size, init_timestamp_count, par_sample_rate, timestamp_diff_threshold, err_command) \
86 do { \
87 if (pool_size == RT_PAR_SET_DEFAULT){ \
88 rt_state.min_pool_size = DEFAULT_POOL_SIZE;\
89 } else { \
90 rt_state.min_pool_size = pool_size;\
91 } \
92 rt_state.mins = (uint32_t *) malloc (rt_state.min_pool_size * sizeof(uint32_t)); \
93 if (rt_state.mins == NULL){ \
94 err_command; \
95 } \
96 rt_state.act_min_cnt = 0; \
97 if (init_timestamp_count == RT_PAR_SET_DEFAULT){ \
98 rt_state.init_ts_count = DEFAULT_INIT_TS_CNT; \
99 } else { \
100 rt_state.init_ts_count = init_timestamp_count; \
101 } \
102 if (par_sample_rate == RT_PAR_SET_DEFAULT){ \
103 rt_state.sample_rate = DEFAULT_SAMPLE_RATE; \
104 } else { \
105 rt_state.sample_rate = par_sample_rate; \
106 } \
107 if (timestamp_diff_threshold == RT_PAR_SET_DEFAULT){ \
108 rt_state.ts_diff_threshold = DEFAULT_TS_DIFF_THRESHOLD; \
109 } else { \
110 rt_state.ts_diff_threshold = timestamp_diff_threshold; \
111 } \
112 rt_state.ts_diff_cnt = 0; \
113 rt_state.ts_diff_sum = 0; \
114 rt_state.ts_diff_total = 0; \
115 } while(0)
116
119#define RT_DESTROY(rt_state) \
120 free(rt_state.mins);
121
130#define RT_CHECK_DELAY(record_counter, actual_timestamp, rt_state) \
131do{ \
132 /* Get minimal timestamp >> */ \
133 if (record_counter < rt_state.init_ts_count){ /* check if we have initial timestamp; initial timestamp is average of <min_pool_size> minimal timestamps */\
134 /* first fill pool with first <min_pool_size> timestamp and sort them */ \
135 if (rt_state.act_min_cnt < rt_state.min_pool_size){ \
136 rt_state.mins[rt_state.act_min_cnt] = actual_timestamp; \
137 if (++rt_state.act_min_cnt == rt_state.min_pool_size){ /* array of first timestamps were filled, sort it */ \
138 /* simple bubble-sort */ \
139 for (int i = 0; i < (rt_state.act_min_cnt - 1); ++i){ \
140 for (int j = 0; j < (rt_state.act_min_cnt - i - 1); ++j){ \
141 if (rt_state.mins[j] > rt_state.mins[j+1]){ \
142 uint32_t tmp = rt_state.mins[j+1]; \
143 rt_state.mins[j+1] = rt_state.mins[j]; \
144 rt_state.mins[j] = tmp; \
145 } \
146 } \
147 } \
148 /* array of minimal timestamps is now sorted, minimal at 0 index */ \
149 } \
150 /* pool with minimal timestamps is full, now add only smaller timestamps */ \
151 } else { \
152 if (actual_timestamp < rt_state.mins[rt_state.act_min_cnt - 1]){ \
153 for (int i = (rt_state.act_min_cnt - 2); i >= 0; --i){ \
154 if (actual_timestamp < rt_state.mins[i]){ \
155 rt_state.mins[i + 1] = rt_state.mins[i]; \
156 if (i == 0){ \
157 rt_state.mins[i] = actual_timestamp; \
158 } \
159 } else { \
160 rt_state.mins[i + 1] = actual_timestamp; \
161 break; \
162 } \
163 } \
164 } \
165 } \
166 } else if (record_counter == rt_state.init_ts_count){ \
167 /* get average minimal timestamp */ \
168 uint64_t tmp_sum = 0; \
169 for (int i = 0; i < rt_state.act_min_cnt; ++i){ \
170 tmp_sum += rt_state.mins[i]; \
171 } \
172 rt_state.init_timestamp = tmp_sum / rt_state.act_min_cnt; \
173 /* adjust samplre rate if it is same as initial timestamp count - to do not to consider end of sample in next step */ \
174 if (!(record_counter % rt_state.sample_rate)){ \
175 --rt_state.sample_rate; \
176 } \
177 /*get actual (real) time */ \
178 gettimeofday(&rt_state.start, NULL); \
179 /* << Got minimal timestamp now << */ \
180 } else { \
181 long ts_diff = (long) actual_timestamp - (long) rt_state.init_timestamp; \
182 if (ts_diff > 400 || ts_diff < -400){ /* some huge change in input data (like another file) */ \
183 rt_state.init_timestamp = actual_timestamp; \
184 ts_diff = 0; \
185 rt_state.ts_diff_sum = 0; \
186 rt_state.ts_diff_cnt = 0; \
187 } \
188 rt_state.ts_diff_sum += ts_diff; \
189 rt_state.ts_diff_cnt++; \
190 if (!(record_counter % rt_state.sample_rate)){ /* check real-time sending each <sample_rate>-record */ \
191 if (rt_state.ts_diff_cnt){ /* it should be ... */ \
192 rt_state.ts_diff_total = ((float) rt_state.ts_diff_sum / (float) rt_state.ts_diff_cnt); \
193 } \
194 if (rt_state.ts_diff_total > rt_state.ts_diff_threshold){ \
195 /* check if records was sendend in proper time, wait (sleep) if was send too fast */ \
196 gettimeofday(&rt_state.end, NULL); \
197 long rt_diff = ((rt_state.end.tv_sec * 1000000 + rt_state.end.tv_usec) - (rt_state.start.tv_sec * 1000000 + rt_state.start.tv_usec)); \
198 if (rt_diff < 1000000){ \
199 usleep(1000000 - rt_diff); \
200 } \
201 /* clear & update counters */ \
202 rt_state.ts_diff_sum = 0; \
203 rt_state.ts_diff_cnt = 0; \
204 rt_state.init_timestamp++; \
205 /* adjust and correct sample rate */ \
206 float inc_index = (rt_state.ts_diff_total - rt_state.ts_diff_threshold) / rt_state.ts_diff_threshold; \
207 if (inc_index > 5){ \
208 inc_index = 5; \
209 } else if (inc_index < 0.2){ \
210 inc_index = 0.2; \
211 } \
212 rt_state.sample_rate /= inc_index; \
213 if (rt_state.sample_rate < 100){ \
214 rt_state.sample_rate = 100; \
215 }else if (rt_state.sample_rate > 10000){ \
216 rt_state.sample_rate = 10000; \
217 } \
218 rt_state.ts_diff_total = 0; \
219 gettimeofday(&rt_state.start, NULL); /* get actual (real) time */ \
220 } \
221 } \
222 } \
223}while (0)
224
225#endif // _REAL_TIME_SENDING_
struct rt_state_s rt_state_t
State of real-time delaying (for real-time sending).
State of real-time delaying (for real-time sending).
struct timeval end
uint16_t min_pool_size
uint32_t ts_diff_cnt
uint16_t sample_rate
uint32_t * mins
struct timeval start
uint64_t init_ts_count
float ts_diff_threshold
uint32_t init_timestamp
uint16_t act_min_cnt