nemea-common  1.6.3
real_time_sending.h
Go to the documentation of this file.
1 
7 /*
8  * Copyright (C) 2014 CESNET
9  *
10  * LICENSE TERMS
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  * 1. Redistributions of source code must retain the above copyright
16  * notice, this list of conditions and the following disclaimer.
17  * 2. Redistributions in binary form must reproduce the above copyright
18  * notice, this list of conditions and the following disclaimer in
19  * the documentation and/or other materials provided with the
20  * distribution.
21  * 3. Neither the name of the Company nor the names of its contributors
22  * may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * ALTERNATIVELY, provided that this notice is retained in full, this
26  * product may be distributed under the terms of the GNU General Public
27  * License (GPL) version 2 or later, in which case the provisions
28  * of the GPL apply INSTEAD OF those given above.
29  *
30  * This software is provided ``as is'', and any express or implied
31  * warranties, including, but not limited to, the implied warranties of
32  * merchantability and fitness for a particular purpose are disclaimed.
33  * In no event shall the company or contributors be liable for any
34  * direct, indirect, incidental, special, exemplary, or consequential
35  * damages (including, but not limited to, procurement of substitute
36  * goods or services; loss of use, data, or profits; or business
37  * interruption) however caused and on any theory of liability, whether
38  * in contract, strict liability, or tort (including negligence or
39  * otherwise) arising in any way out of the use of this software, even
40  * if advised of the possibility of such damage.
41  *
42  */
43 #ifndef _REAL_TIME_SENDING_
44 #define _REAL_TIME_SENDING_
45 
46 #include <sys/time.h>
47 
48 #define RT_PAR_SET_DEFAULT 0
49 
50 #define DEFAULT_POOL_SIZE 10
51 #define DEFAULT_INIT_TS_CNT 1000
52 #define DEFAULT_SAMPLE_RATE 100
53 #define DEFAULT_TS_DIFF_THRESHOLD 3.5
54 
57 typedef struct rt_state_s {
58  uint32_t *mins; // pool for n minimal timestamps to compute average minimum
59  uint16_t min_pool_size; // size of pool for minimal timestamps (n from above)
60  uint16_t act_min_cnt;
61  uint16_t sample_rate;
62 
63  uint64_t init_ts_count; // from how many first flows should be minimal timestamp determined
64 
65  struct timeval start;
66  struct timeval end;
67  uint32_t init_timestamp;
68  uint32_t ts_diff_cnt;
71 
72  float ts_diff_threshold; // holds "magic" value which represents change in timestamps of records in real traffic
73 } rt_state_t;
74 
85 #define RT_INIT(rt_state, pool_size, init_timestamp_count, par_sample_rate, timestamp_diff_threshold, err_command) \
86  do { \
87  if (pool_size == RT_PAR_SET_DEFAULT){ \
88  rt_state.min_pool_size = DEFAULT_POOL_SIZE;\
89  } else { \
90  rt_state.min_pool_size = pool_size;\
91  } \
92  rt_state.mins = (uint32_t *) malloc (rt_state.min_pool_size * sizeof(uint32_t)); \
93  if (rt_state.mins == NULL){ \
94  err_command; \
95  } \
96  rt_state.act_min_cnt = 0; \
97  if (init_timestamp_count == RT_PAR_SET_DEFAULT){ \
98  rt_state.init_ts_count = DEFAULT_INIT_TS_CNT; \
99  } else { \
100  rt_state.init_ts_count = init_timestamp_count; \
101  } \
102  if (par_sample_rate == RT_PAR_SET_DEFAULT){ \
103  rt_state.sample_rate = DEFAULT_SAMPLE_RATE; \
104  } else { \
105  rt_state.sample_rate = par_sample_rate; \
106  } \
107  if (timestamp_diff_threshold == RT_PAR_SET_DEFAULT){ \
108  rt_state.ts_diff_threshold = DEFAULT_TS_DIFF_THRESHOLD; \
109  } else { \
110  rt_state.ts_diff_threshold = timestamp_diff_threshold; \
111  } \
112  rt_state.ts_diff_cnt = 0; \
113  rt_state.ts_diff_sum = 0; \
114  rt_state.ts_diff_total = 0; \
115  } while(0)
116 
119 #define RT_DESTROY(rt_state) \
120  free(rt_state.mins);
121 
130 #define RT_CHECK_DELAY(record_counter, actual_timestamp, rt_state) \
131 do{ \
132  /* Get minimal timestamp >> */ \
133  if (record_counter < rt_state.init_ts_count){ /* check if we have initial timestamp; initial timestamp is average of <min_pool_size> minimal timestamps */\
134  /* first fill pool with first <min_pool_size> timestamp and sort them */ \
135  if (rt_state.act_min_cnt < rt_state.min_pool_size){ \
136  rt_state.mins[rt_state.act_min_cnt] = actual_timestamp; \
137  if (++rt_state.act_min_cnt == rt_state.min_pool_size){ /* array of first timestamps were filled, sort it */ \
138  /* simple bubble-sort */ \
139  for (int i = 0; i < (rt_state.act_min_cnt - 1); ++i){ \
140  for (int j = 0; j < (rt_state.act_min_cnt - i - 1); ++j){ \
141  if (rt_state.mins[j] > rt_state.mins[j+1]){ \
142  uint32_t tmp = rt_state.mins[j+1]; \
143  rt_state.mins[j+1] = rt_state.mins[j]; \
144  rt_state.mins[j] = tmp; \
145  } \
146  } \
147  } \
148  /* array of minimal timestamps is now sorted, minimal at 0 index */ \
149  } \
150  /* pool with minimal timestamps is full, now add only smaller timestamps */ \
151  } else { \
152  if (actual_timestamp < rt_state.mins[rt_state.act_min_cnt - 1]){ \
153  for (int i = (rt_state.act_min_cnt - 2); i >= 0; --i){ \
154  if (actual_timestamp < rt_state.mins[i]){ \
155  rt_state.mins[i + 1] = rt_state.mins[i]; \
156  if (i == 0){ \
157  rt_state.mins[i] = actual_timestamp; \
158  } \
159  } else { \
160  rt_state.mins[i + 1] = actual_timestamp; \
161  break; \
162  } \
163  } \
164  } \
165  } \
166  } else if (record_counter == rt_state.init_ts_count){ \
167  /* get average minimal timestamp */ \
168  uint64_t tmp_sum = 0; \
169  for (int i = 0; i < rt_state.act_min_cnt; ++i){ \
170  tmp_sum += rt_state.mins[i]; \
171  } \
172  rt_state.init_timestamp = tmp_sum / rt_state.act_min_cnt; \
173  /* adjust samplre rate if it is same as initial timestamp count - to do not to consider end of sample in next step */ \
174  if (!(record_counter % rt_state.sample_rate)){ \
175  --rt_state.sample_rate; \
176  } \
177  /*get actual (real) time */ \
178  gettimeofday(&rt_state.start, NULL); \
179  /* << Got minimal timestamp now << */ \
180  } else { \
181  long ts_diff = (long) actual_timestamp - (long) rt_state.init_timestamp; \
182  if (ts_diff > 400 || ts_diff < -400){ /* some huge change in input data (like another file) */ \
183  rt_state.init_timestamp = actual_timestamp; \
184  ts_diff = 0; \
185  rt_state.ts_diff_sum = 0; \
186  rt_state.ts_diff_cnt = 0; \
187  } \
188  rt_state.ts_diff_sum += ts_diff; \
189  rt_state.ts_diff_cnt++; \
190  if (!(record_counter % rt_state.sample_rate)){ /* check real-time sending each <sample_rate>-record */ \
191  if (rt_state.ts_diff_cnt){ /* it should be ... */ \
192  rt_state.ts_diff_total = ((float) rt_state.ts_diff_sum / (float) rt_state.ts_diff_cnt); \
193  } \
194  if (rt_state.ts_diff_total > rt_state.ts_diff_threshold){ \
195  /* check if records was sendend in proper time, wait (sleep) if was send too fast */ \
196  gettimeofday(&rt_state.end, NULL); \
197  long rt_diff = ((rt_state.end.tv_sec * 1000000 + rt_state.end.tv_usec) - (rt_state.start.tv_sec * 1000000 + rt_state.start.tv_usec)); \
198  if (rt_diff < 1000000){ \
199  usleep(1000000 - rt_diff); \
200  } \
201  /* clear & update counters */ \
202  rt_state.ts_diff_sum = 0; \
203  rt_state.ts_diff_cnt = 0; \
204  rt_state.init_timestamp++; \
205  /* adjust and correct sample rate */ \
206  float inc_index = (rt_state.ts_diff_total - rt_state.ts_diff_threshold) / rt_state.ts_diff_threshold; \
207  if (inc_index > 5){ \
208  inc_index = 5; \
209  } else if (inc_index < 0.2){ \
210  inc_index = 0.2; \
211  } \
212  rt_state.sample_rate /= inc_index; \
213  if (rt_state.sample_rate < 100){ \
214  rt_state.sample_rate = 100; \
215  }else if (rt_state.sample_rate > 10000){ \
216  rt_state.sample_rate = 10000; \
217  } \
218  rt_state.ts_diff_total = 0; \
219  gettimeofday(&rt_state.start, NULL); /* get actual (real) time */ \
220  } \
221  } \
222  } \
223 }while (0)
224 
225 #endif // _REAL_TIME_SENDING_
float ts_diff_threshold
uint16_t sample_rate
struct rt_state_s rt_state_t
State of real-time delaying (for real-time sending).
uint32_t * mins
uint16_t act_min_cnt
struct timeval end
uint16_t min_pool_size
uint32_t ts_diff_cnt
struct timeval start
State of real-time delaying (for real-time sending).
uint32_t init_timestamp
uint64_t init_ts_count