vdr  2.7.6
ringbuffer.c
Go to the documentation of this file.
1 /*
2  * ringbuffer.c: A ring buffer
3  *
4  * See the main source file 'vdr.c' for copyright information and
5  * how to reach the author.
6  *
7  * Parts of this file were inspired by the 'ringbuffy.c' from the
8  * LinuxDVB driver (see linuxtv.org).
9  *
10  * $Id: ringbuffer.c 4.2 2017/03/19 12:43:36 kls Exp $
11  */
12 
13 #include "ringbuffer.h"
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include "tools.h"
17 
18 // --- cRingBuffer -----------------------------------------------------------
19 
20 #define OVERFLOWREPORTDELTA 5 // seconds between reports
21 #define PERCENTAGEDELTA 10
22 #define PERCENTAGETHRESHOLD 70
23 #define IOTHROTTLELOW 20
24 #define IOTHROTTLEHIGH 50
25 
26 cRingBuffer::cRingBuffer(int Size, bool Statistics)
27 {
28  size = Size;
29  statistics = Statistics;
30  getThreadTid = 0;
31  maxFill = 0;
32  lastPercent = 0;
33  putTimeout = getTimeout = 0;
36  ioThrottle = NULL;
37 }
38 
40 {
41  delete ioThrottle;
42  if (statistics)
43  dsyslog("buffer stats: %d (%d%%) used", maxFill, maxFill * 100 / (size - 1));
44 }
45 
47 {
48  if (Fill > maxFill)
49  maxFill = Fill;
50  int percent = Fill * 100 / (Size() - 1) / PERCENTAGEDELTA * PERCENTAGEDELTA; // clamp down to nearest quantum
51  if (percent != lastPercent) {
52  if (percent >= PERCENTAGETHRESHOLD && percent > lastPercent || percent < PERCENTAGETHRESHOLD && lastPercent >= PERCENTAGETHRESHOLD) {
53  dsyslog("buffer usage: %d%% (tid=%d)", percent, getThreadTid);
54  lastPercent = percent;
55  }
56  }
57  if (ioThrottle) {
58  if (percent >= IOTHROTTLEHIGH)
60  else if (percent < IOTHROTTLELOW)
62  }
63 }
64 
66 {
67  if (putTimeout)
69 }
70 
72 {
73  if (getTimeout)
75 }
76 
78 {
79  if (putTimeout && Free() > Size() / 10)
81 }
82 
84 {
85  if (getTimeout && Available() > Size() / 10)
87 }
88 
89 void cRingBuffer::SetTimeouts(int PutTimeout, int GetTimeout)
90 {
91  putTimeout = PutTimeout;
92  getTimeout = GetTimeout;
93 }
94 
96 {
97  if (!ioThrottle)
98  ioThrottle = new cIoThrottle;
99 }
100 
102 {
103  overflowCount++;
104  overflowBytes += Bytes;
105  if (time(NULL) - lastOverflowReport > OVERFLOWREPORTDELTA) {
106  esyslog("ERROR: %d ring buffer overflow%s (%d bytes dropped)", overflowCount, overflowCount > 1 ? "s" : "", overflowBytes);
108  lastOverflowReport = time(NULL);
109  }
110 }
111 
112 // --- cRingBufferLinear -----------------------------------------------------
113 
114 #ifdef DEBUGRINGBUFFERS
115 #define MAXRBLS 30
116 #define DEBUGRBLWIDTH 45
117 
118 cRingBufferLinear *cRingBufferLinear::RBLS[MAXRBLS] = { NULL };
119 
120 void cRingBufferLinear::AddDebugRBL(cRingBufferLinear *RBL)
121 {
122  for (int i = 0; i < MAXRBLS; i++) {
123  if (!RBLS[i]) {
124  RBLS[i] = RBL;
125  break;
126  }
127  }
128 }
129 
130 void cRingBufferLinear::DelDebugRBL(cRingBufferLinear *RBL)
131 {
132  for (int i = 0; i < MAXRBLS; i++) {
133  if (RBLS[i] == RBL) {
134  RBLS[i] = NULL;
135  break;
136  }
137  }
138 }
139 
140 void cRingBufferLinear::PrintDebugRBL(void)
141 {
142  bool printed = false;
143  for (int i = 0; i < MAXRBLS; i++) {
144  cRingBufferLinear *p = RBLS[i];
145  if (p) {
146  printed = true;
147  int lh = p->lastHead;
148  int lt = p->lastTail;
149  int h = lh * DEBUGRBLWIDTH / p->Size();
150  int t = lt * DEBUGRBLWIDTH / p->Size();
151  char buf[DEBUGRBLWIDTH + 10];
152  memset(buf, '-', DEBUGRBLWIDTH);
153  if (lt <= lh)
154  memset(buf + t, '*', max(h - t, 1));
155  else {
156  memset(buf, '*', h);
157  memset(buf + t, '*', DEBUGRBLWIDTH - t);
158  }
159  buf[t] = '<';
160  buf[h] = '>';
161  buf[DEBUGRBLWIDTH] = 0;
162  printf("%2d %s %8d %8d %s\n", i, buf, p->lastPut, p->lastGet, p->description);
163  }
164  }
165  if (printed)
166  printf("\n");
167  }
168 #endif
169 
170 cRingBufferLinear::cRingBufferLinear(int Size, int Margin, bool Statistics, const char *Description)
171 :cRingBuffer(Size, Statistics)
172 {
173  description = Description ? strdup(Description) : NULL;
174  tail = head = margin = Margin;
175  gotten = 0;
176  buffer = NULL;
177  if (Size > 1) { // 'Size - 1' must not be 0!
178  if (Margin <= Size / 2) {
179  buffer = MALLOC(uchar, Size);
180  if (!buffer)
181  esyslog("ERROR: can't allocate ring buffer (size=%d)", Size);
182  Clear();
183  }
184  else
185  esyslog("ERROR: invalid margin for ring buffer (%d > %d)", Margin, Size / 2);
186  }
187  else
188  esyslog("ERROR: invalid size for ring buffer (%d)", Size);
189 #ifdef DEBUGRINGBUFFERS
190  lastHead = head;
191  lastTail = tail;
192  lastPut = lastGet = -1;
193  AddDebugRBL(this);
194 #endif
195 }
196 
198 {
199 #ifdef DEBUGRINGBUFFERS
200  DelDebugRBL(this);
201 #endif
202  free(buffer);
203  free(description);
204 }
205 
206 int cRingBufferLinear::DataReady(const uchar *Data, int Count)
207 {
208  return Count >= margin ? Count : 0;
209 }
210 
212 {
213  int diff = head - tail;
214  return (diff >= 0) ? diff : Size() + diff - margin;
215 }
216 
218 {
219  int Head = head;
220  tail = Head;
221 #ifdef DEBUGRINGBUFFERS
222  lastHead = Head;
223  lastTail = tail;
224  lastPut = lastGet = -1;
225 #endif
226  maxFill = 0;
227  EnablePut();
228 }
229 
230 int cRingBufferLinear::Read(int FileHandle, int Max)
231 {
232  int Tail = tail;
233  int diff = Tail - head;
234  int free = (diff > 0) ? diff - 1 : Size() - head;
235  if (Tail <= margin)
236  free--;
237  int Count = -1;
238  errno = EAGAIN;
239  if (free > 0) {
240  if (0 < Max && Max < free)
241  free = Max;
242  Count = safe_read(FileHandle, buffer + head, free);
243  if (Count > 0) {
244  int Head = head + Count;
245  if (Head >= Size())
246  Head = margin;
247  head = Head;
248  if (statistics) {
249  int fill = head - Tail;
250  if (fill < 0)
251  fill = Size() + fill;
252  else if (fill >= Size())
253  fill = Size() - 1;
254  UpdatePercentage(fill);
255  }
256  }
257  }
258 #ifdef DEBUGRINGBUFFERS
259  lastHead = head;
260  lastPut = Count;
261 #endif
262  EnableGet();
263  if (free == 0)
264  WaitForPut();
265  return Count;
266 }
267 
269 {
270  int Tail = tail;
271  int diff = Tail - head;
272  int free = (diff > 0) ? diff - 1 : Size() - head;
273  if (Tail <= margin)
274  free--;
275  int Count = -1;
276  errno = EAGAIN;
277  if (free > 0) {
278  if (0 < Max && Max < free)
279  free = Max;
280  Count = File->Read(buffer + head, free);
281  if (Count > 0) {
282  int Head = head + Count;
283  if (Head >= Size())
284  Head = margin;
285  head = Head;
286  if (statistics) {
287  int fill = head - Tail;
288  if (fill < 0)
289  fill = Size() + fill;
290  else if (fill >= Size())
291  fill = Size() - 1;
292  UpdatePercentage(fill);
293  }
294  }
295  }
296 #ifdef DEBUGRINGBUFFERS
297  lastHead = head;
298  lastPut = Count;
299 #endif
300  EnableGet();
301  if (free == 0)
302  WaitForPut();
303  return Count;
304 }
305 
306 int cRingBufferLinear::Put(const uchar *Data, int Count)
307 {
308  if (Count > 0) {
309  int Tail = tail;
310  int rest = Size() - head;
311  int diff = Tail - head;
312  int free = ((Tail < margin) ? rest : (diff > 0) ? diff : Size() + diff - margin) - 1;
313  if (statistics) {
314  int fill = Size() - free - 1 + Count;
315  if (fill >= Size())
316  fill = Size() - 1;
317  UpdatePercentage(fill);
318  }
319  if (free > 0) {
320  if (free < Count)
321  Count = free;
322  if (Count >= rest) {
323  memcpy(buffer + head, Data, rest);
324  if (Count - rest)
325  memcpy(buffer + margin, Data + rest, Count - rest);
326  head = margin + Count - rest;
327  }
328  else {
329  memcpy(buffer + head, Data, Count);
330  head += Count;
331  }
332  }
333  else
334  Count = 0;
335 #ifdef DEBUGRINGBUFFERS
336  lastHead = head;
337  lastPut = Count;
338 #endif
339  EnableGet();
340  if (Count == 0)
341  WaitForPut();
342  }
343  return Count;
344 }
345 
347 {
348  int Head = head;
349  if (getThreadTid <= 0)
351  int rest = Size() - tail;
352  if (rest < margin && Head < tail) {
353  int t = margin - rest;
354  memcpy(buffer + t, buffer + tail, rest);
355  tail = t;
356  rest = Head - tail;
357  }
358  int diff = Head - tail;
359  int cont = (diff >= 0) ? diff : Size() + diff - margin;
360  if (cont > rest)
361  cont = rest;
362  uchar *p = buffer + tail;
363  if ((cont = DataReady(p, cont)) > 0) {
364  Count = gotten = cont;
365  return p;
366  }
367  WaitForGet();
368  return NULL;
369 }
370 
371 void cRingBufferLinear::Del(int Count)
372 {
373  if (Count > gotten) {
374  esyslog("ERROR: invalid Count in cRingBufferLinear::Del: %d (limited to %d)", Count, gotten);
375  Count = gotten;
376  }
377  if (Count > 0) {
378  int Tail = tail;
379  Tail += Count;
380  gotten -= Count;
381  if (Tail >= Size())
382  Tail = margin;
383  tail = Tail;
384  EnablePut();
385  }
386 #ifdef DEBUGRINGBUFFERS
387  lastTail = tail;
388  lastGet = Count;
389 #endif
390 }
391 
392 // --- cFrame ----------------------------------------------------------------
393 
394 cFrame::cFrame(const uchar *Data, int Count, eFrameType Type, int Index, uint32_t Pts, bool Independent)
395 {
396  count = abs(Count);
397  type = Type;
398  index = Index;
399  pts = Pts;
400  independent = Type == ftAudio ? true : Independent;
401  if (Count < 0)
402  data = (uchar *)Data;
403  else {
404  data = MALLOC(uchar, count);
405  if (data)
406  memcpy(data, Data, count);
407  else
408  esyslog("ERROR: can't allocate frame buffer (count=%d)", count);
409  }
410  next = NULL;
411 }
412 
414 {
415  free(data);
416 }
417 
418 // --- cRingBufferFrame ------------------------------------------------------
419 
420 cRingBufferFrame::cRingBufferFrame(int Size, bool Statistics)
421 :cRingBuffer(Size, Statistics)
422 {
423  head = NULL;
424  currentFill = 0;
425 }
426 
428 {
429  Clear();
430 }
431 
433 {
434  Lock();
435  cFrame *p;
436  while ((p = Get()) != NULL)
437  Drop(p);
438  Unlock();
439  EnablePut();
440  EnableGet();
441 }
442 
444 {
445  if (Frame->Count() <= Free()) {
446  Lock();
447  if (head) {
448  Frame->next = head->next;
449  head->next = Frame;
450  head = Frame;
451  }
452  else {
453  head = Frame->next = Frame;
454  }
455  currentFill += Frame->Count();
456  Unlock();
457  EnableGet();
458  return true;
459  }
460  return false;
461 }
462 
464 {
465  Lock();
466  cFrame *p = head ? head->next : NULL;
467  Unlock();
468  return p;
469 }
470 
472 {
473  currentFill -= Frame->Count();
474  delete Frame;
475 }
476 
478 {
479  Lock();
480  if (head) {
481  if (Frame == head->next) {
482  if (head->next != head) {
483  head->next = Frame->next;
484  Delete(Frame);
485  }
486  else {
487  Delete(head);
488  head = NULL;
489  }
490  }
491  else
492  esyslog("ERROR: attempt to drop wrong frame from ring buffer!");
493  }
494  Unlock();
495  EnablePut();
496 }
497 
499 {
500  Lock();
501  int av = currentFill;
502  Unlock();
503  return av;
504 }
bool Wait(int TimeoutMs=0)
Waits at most TimeoutMs milliseconds for a call to Signal(), or forever if TimeoutMs is 0.
Definition: thread.c:79
void Signal(void)
Signals a caller of Wait() that the condition it is waiting for is met.
Definition: thread.c:101
uchar * Data(void) const
Definition: ringbuffer.h:125
eFrameType type
Definition: ringbuffer.h:115
uint32_t pts
Definition: ringbuffer.h:117
uint32_t Pts(void) const
Definition: ringbuffer.h:129
bool independent
Definition: ringbuffer.h:118
int Index(void) const
Definition: ringbuffer.h:128
eFrameType Type(void) const
Definition: ringbuffer.h:127
uchar * data
Definition: ringbuffer.h:113
int index
Definition: ringbuffer.h:116
cFrame * next
Definition: ringbuffer.h:112
bool Independent(void) const
Definition: ringbuffer.h:130
~cFrame()
Definition: ringbuffer.c:413
int count
Definition: ringbuffer.h:114
cFrame(const uchar *Data, int Count, eFrameType=ftUnknown, int Index=-1, uint32_t Pts=0, bool independent=false)
Creates a new cFrame object.
Definition: ringbuffer.c:394
int Count(void) const
Definition: ringbuffer.h:126
void Activate(void)
Activates the global I/O throttling mechanism.
Definition: thread.c:906
void Release(void)
Releases the global I/O throttling mechanism.
Definition: thread.c:917
virtual void Clear(void) override
Definition: ringbuffer.c:432
void Unlock(void)
Definition: ringbuffer.h:140
cFrame * Get(void)
Definition: ringbuffer.c:463
virtual int Available(void) override
Definition: ringbuffer.c:498
void Delete(cFrame *Frame)
Definition: ringbuffer.c:471
bool Put(cFrame *Frame)
Definition: ringbuffer.c:443
cRingBufferFrame(int Size, bool Statistics=false)
Definition: ringbuffer.c:420
virtual ~cRingBufferFrame() override
Definition: ringbuffer.c:427
void Lock(void)
Definition: ringbuffer.h:139
void Drop(cFrame *Frame)
Definition: ringbuffer.c:477
cRingBufferLinear(int Size, int Margin=0, bool Statistics=false, const char *Description=NULL)
Creates a linear ring buffer.
Definition: ringbuffer.c:170
void Del(int Count)
Deletes at most Count bytes from the ring buffer.
Definition: ringbuffer.c:371
virtual void Clear(void) override
Immediately clears the ring buffer.
Definition: ringbuffer.c:217
virtual ~cRingBufferLinear() override
Definition: ringbuffer.c:197
int Put(const uchar *Data, int Count)
Puts at most Count bytes of Data into the ring buffer.
Definition: ringbuffer.c:306
virtual int DataReady(const uchar *Data, int Count)
By default a ring buffer has data ready as soon as there are at least 'margin' bytes available.
Definition: ringbuffer.c:206
uchar * Get(int &Count)
Gets data from the ring buffer.
Definition: ringbuffer.c:346
char * description
Definition: ringbuffer.h:64
int Read(int FileHandle, int Max=0)
Reads at most Max bytes from FileHandle and stores them in the ring buffer.
Definition: ringbuffer.c:230
virtual int Available(void) override
Definition: ringbuffer.c:211
int Size(void)
Definition: ringbuffer.h:39
void WaitForGet(void)
Definition: ringbuffer.c:71
int overflowCount
Definition: ringbuffer.h:23
void SetTimeouts(int PutTimeout, int GetTimeout)
Definition: ringbuffer.c:89
void SetIoThrottle(void)
Definition: ringbuffer.c:95
int lastPercent
Definition: ringbuffer.h:29
time_t lastOverflowReport
Definition: ringbuffer.h:22
cRingBuffer(int Size, bool Statistics=false)
Definition: ringbuffer.c:26
void EnablePut(void)
Definition: ringbuffer.c:77
bool statistics
Definition: ringbuffer.h:30
void WaitForPut(void)
Definition: ringbuffer.c:65
int getTimeout
Definition: ringbuffer.h:20
virtual ~cRingBuffer()
Definition: ringbuffer.c:39
void EnableGet(void)
Definition: ringbuffer.c:83
void UpdatePercentage(int Fill)
Definition: ringbuffer.c:46
tThreadId getThreadTid
Definition: ringbuffer.h:27
void ReportOverflow(int Bytes)
Definition: ringbuffer.c:101
cIoThrottle * ioThrottle
Definition: ringbuffer.h:25
virtual int Free(void)
Definition: ringbuffer.h:38
int putTimeout
Definition: ringbuffer.h:19
int overflowBytes
Definition: ringbuffer.h:24
cCondWait readyForGet
Definition: ringbuffer.h:18
virtual int Available(void)=0
cCondWait readyForPut
Definition: ringbuffer.h:18
static tThreadId ThreadId(void)
Definition: thread.c:373
cUnbufferedFile is used for large files that are mainly written or read in a streaming manner,...
Definition: tools.h:494
ssize_t Read(void *Data, size_t Size)
Definition: tools.c:1878
#define OVERFLOWREPORTDELTA
Definition: ringbuffer.c:20
#define IOTHROTTLELOW
Definition: ringbuffer.c:23
#define PERCENTAGETHRESHOLD
Definition: ringbuffer.c:22
#define IOTHROTTLEHIGH
Definition: ringbuffer.c:24
#define PERCENTAGEDELTA
Definition: ringbuffer.c:21
eFrameType
Definition: ringbuffer.h:107
@ ftAudio
Definition: ringbuffer.h:107
ssize_t safe_read(int filedes, void *buffer, size_t size)
Definition: tools.c:53
unsigned char uchar
Definition: tools.h:31
#define dsyslog(a...)
Definition: tools.h:37
#define MALLOC(type, size)
Definition: tools.h:47
T max(T a, T b)
Definition: tools.h:64
#define esyslog(a...)
Definition: tools.h:35