XRootD
Loading...
Searching...
No Matches
XrdPoll.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d P o l l . c c */
4/* */
5/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <unistd.h>
31#include <cstdio>
32#include <cstdlib>
33
34#include "XrdSys/XrdSysError.hh"
35#include "XrdSys/XrdSysFD.hh"
38#include "Xrd/XrdLink.hh"
39#include "Xrd/XrdProtocol.hh"
40
41#define TRACE_IDENT pInfo.Link.ID
42#include "Xrd/XrdTrace.hh"
43
44#if defined( __linux__ )
45#include "Xrd/XrdPollE.hh"
46//#include "Xrd/XrdPollPoll.hh"
47#else
48#include "Xrd/XrdPollPoll.hh"
49#endif
50
51#include "Xrd/XrdPollInfo.hh"
52
53/******************************************************************************/
54/* L o c a l C l a s s e s */
55/******************************************************************************/
56
58{
59public:
60
61void DoIt() {}
62
64
65int Process(XrdLink *lp) {return -1;}
66
67void Recycle(XrdLink *lp, int x, const char *y) {}
68
69int Stats(char *buff, int blen, int do_sync=0) {return 0;}
70
71 XrdPoll_End() : XrdProtocol("link termination") {}
73};
74
75/******************************************************************************/
76/* G l o b a l D a t a */
77/******************************************************************************/
78
80
81 XrdSysMutex XrdPoll::doingAttach;
82
83 const char *XrdPoll::TraceID = "Poll";
84
85namespace XrdGlobal
86{
87extern XrdSysError Log;
88extern XrdScheduler Sched;
89}
90
91using namespace XrdGlobal;
92
93/******************************************************************************/
94/* T h r e a d S t a r t u p I n t e r f a c e */
95/******************************************************************************/
96
105
106
107void *XrdStartPolling(void *parg)
108{
109 struct XrdPollArg *PArg = (struct XrdPollArg *)parg;
110 PArg->Poller->Start(&(PArg->PollSync), PArg->retcode);
111 return (void *)0;
112}
113
114/******************************************************************************/
115/* C o n s t r u c t o r */
116/******************************************************************************/
117
119{
120 int fildes[2];
121
122 TID=0;
123 numAttached=numEnabled=numEvents=numInterrupts=0;
124
125 if (XrdSysFD_Pipe(fildes) == 0)
126 {CmdFD = fildes[1];
127 ReqFD = fildes[0];
128 } else {
129 CmdFD = ReqFD = -1;
130 Log.Emsg("Poll", errno, "create poll pipe");
131 }
132 PipeBuff = 0;
133 PipeBlen = 0;
134 PipePoll.fd = ReqFD;
135 PipePoll.events = POLLIN | POLLRDNORM;
136}
137
138/******************************************************************************/
139/* A t t a c h */
140/******************************************************************************/
141
142int XrdPoll__Attach(XrdLink *lp) {return lp->Activate();}
143
145{
146 int i;
147 XrdPoll *pp;
148
149// We allow only one attach at a time to simplify the processing
150//
151 doingAttach.Lock();
152
153// Find a poller with the smallest number of entries
154//
155 pp = Pollers[0];
156 for (i = 1; i < XRD_NUMPOLLERS; i++)
157 if (pp->numAttached > Pollers[i]->numAttached) pp = Pollers[i];
158
159// Include this FD into the poll set of the poller
160//
161 if (!pp->Include(pInfo)) {doingAttach.UnLock(); return 0;}
162
163// Complete the link setup
164//
165 pInfo.Poller = pp;
166 pp->numAttached++;
167 doingAttach.UnLock();
168 TRACEI(POLL, "FD " <<pInfo.FD <<" attached to poller " <<pp->PID
169 <<"; num=" <<pp->numAttached);
170 return 1;
171}
172
173/******************************************************************************/
174/* D e t a c h */
175/******************************************************************************/
176
178{
179 XrdPoll *pp;
180
181// If link is not attached, simply return
182//
183 if (!(pp = pInfo.Poller)) return;
184
185// Exclude this link from the associated poll set
186//
187 pp->Exclude(pInfo);
188
189// Make sure we are consistent
190//
191 doingAttach.Lock();
192 if (!pp->numAttached)
193 {Log.Emsg("Poll","Underflow detaching", pInfo.Link.ID); abort();}
194 pp->numAttached--;
195 doingAttach.UnLock();
196 TRACEI(POLL, "FD " <<pInfo.FD <<" detached from poller " <<pp->PID
197 <<"; num=" <<pp->numAttached);
198}
199
200/******************************************************************************/
201/* F i n i s h */
202/******************************************************************************/
203
204int XrdPoll::Finish(XrdPollInfo &pInfo, const char *etxt)
205{
206 static XrdPoll_End LinkEnd;
207
208// If this link is already scheduled for termination, ignore this call.
209//
210 if (pInfo.Link.getProtocol() == &LinkEnd)
211 {TRACEI(POLL, "Link " <<pInfo.FD <<" already terminating; "
212 <<(etxt ? etxt : "") <<" request ignored.");
213 return 0;
214 }
215
216// Set the protocol pointer to be link termination
217//
218 pInfo.Link.setProtocol(&LinkEnd, false, true);
219 if (!etxt) etxt = "reason unknown";
220 pInfo.Link.setEtext(etxt);
221 TRACEI(POLL, "Link " <<pInfo.FD <<" terminating: " <<etxt);
222 return 1;
223}
224
225/******************************************************************************/
226/* g e t R e q u e s t */
227/******************************************************************************/
228
229// Warning: This method runs unlocked. The caller must have exclusive use of
230// the ReqBuff otherwise unpredictable results will occur.
231
233{
234 ssize_t rlen;
235 int rc;
236
237// See if we are to resume a read or start a fresh one
238//
239 if (!PipeBlen)
240 {PipeBuff = (char *)&ReqBuff; PipeBlen = sizeof(ReqBuff);}
241
242// Wait for the next request. Some OS's (like Linux) don't support non-blocking
243// pipes. So, we must front the read with a poll.
244//
245 do {rc = poll(&PipePoll, 1, 0);}
246 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
247 if (rc < 1) return 0;
248
249// Now we can put up a read without a delay. Normally a full command will be
250// present. Under some heavy conditions, this may not be the case.
251//
252 do {rlen = read(ReqFD, PipeBuff, PipeBlen);}
253 while(rlen < 0 && errno == EINTR);
254 if (rlen <= 0)
255 {if (rlen) Log.Emsg("Poll", errno, "read from request pipe");
256 return 0;
257 }
258
259// Check if all the data has arrived. If not all the data is present, defer
260// this request until more data arrives.
261//
262 if (!(PipeBlen -= rlen)) return 1;
263 PipeBuff += rlen;
264 TRACE(POLL, "Poller " <<PID <<" still needs " <<PipeBlen <<" req pipe bytes");
265 return 0;
266}
267
268/******************************************************************************/
269/* P o l l 2 T e x t */
270/******************************************************************************/
271
272char *XrdPoll::Poll2Text(short events)
273{
274 if (events & POLLERR) return strdup("socket error");
275
276 if (events & POLLHUP) return strdup("hangup");
277
278 if (events & POLLNVAL) return strdup("socket closed");
279
280 {char buff[64];
281 sprintf(buff, "unusual event (%.4x)", events);
282 return strdup(buff);
283 }
284 return (char *)0;
285}
286
287/******************************************************************************/
288/* S e t u p */
289/******************************************************************************/
290
291int XrdPoll::Setup(int numfd)
292{
293 pthread_t tid;
294 int maxfd, retc, i;
295 struct XrdPollArg PArg;
296
297// Calculate the number of table entries per poller
298//
299 maxfd = (numfd / XRD_NUMPOLLERS) + 16;
300
301// Verify that we initialized the poller table
302//
303 for (i = 0; i < XRD_NUMPOLLERS; i++)
304 {if (!(Pollers[i] = newPoller(i, maxfd))) return 0;
305 Pollers[i]->PID = i;
306
307 // Now start a thread to handle this poller object
308 //
309 PArg.Poller = Pollers[i];
310 PArg.retcode= 0;
311 TRACE(POLL, "Starting poller " <<i);
312 if ((retc = XrdSysThread::Run(&tid,XrdStartPolling,(void *)&PArg,
313 XRDSYSTHREAD_BIND, "Poller")))
314 {Log.Emsg("Poll", retc, "create poller thread"); return 0;}
315 Pollers[i]->TID = tid;
316 PArg.PollSync.Wait();
317 if (PArg.retcode)
318 {Log.Emsg("Poll", PArg.retcode, "start poller");
319 return 0;
320 }
321 }
322
323// All done
324//
325 return 1;
326}
327
328/******************************************************************************/
329/* S t a t s */
330/******************************************************************************/
331
332int XrdPoll::Stats(char *buff, int blen, int do_sync)
333{
334 static const char statfmt[] = "<stats id=\"poll\"><att>%d</att>"
335 "<en>%d</en><ev>%d</ev><int>%d</int></stats>";
336 int i, numatt = 0, numen = 0, numev = 0, numint = 0;
337 XrdPoll *pp;
338
339// Return number of bytes if so wanted
340//
341 if (!buff) return (sizeof(statfmt)+(4*16))*XRD_NUMPOLLERS;
342
343// Get statistics. While we wish we could honor do_sync, doing so would be
344// costly and hardly worth it. So, we do not include code such as:
345// x = pp->y; if (do_sync) while(x != pp->y) x = pp->y; tot += x;
346//
347 for (i = 0; i < XRD_NUMPOLLERS; i++)
348 {pp = Pollers[i];
349 numatt += pp->numAttached;
350 numen += pp->numEnabled;
351 numev += pp->numEvents;
352 numint += pp->numInterrupts;
353 }
354
355// Format and return
356//
357 return snprintf(buff, blen, statfmt, numatt, numen, numev, numint);
358}
359
360/******************************************************************************/
361/* I m p l e m e n t a t i o n S p e c i f i c s */
362/******************************************************************************/
363
364#if defined( __linux__ )
365#include "Xrd/XrdPollE.icc"
366//#include "Xrd/XrdPollPoll.icc"
367#else
368#include "Xrd/XrdPollPoll.icc"
369#endif
void * XrdStartPolling(void *parg)
Definition XrdPoll.cc:107
int XrdPoll__Attach(XrdLink *lp)
Definition XrdPoll.cc:142
#define XRD_NUMPOLLERS
Definition XrdPoll.hh:35
#define read(a, b, c)
Definition XrdPosix.hh:77
#define XRDSYSTHREAD_BIND
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define TRACEI(act, x)
Definition XrdTrace.hh:66
XrdLink & Link
XrdPoll * Poller
int Stats(char *buff, int blen, int do_sync=0)
Definition XrdPoll.cc:69
void DoIt()
Definition XrdPoll.cc:61
XrdProtocol * Match(XrdLink *lp)
Definition XrdPoll.cc:63
void Recycle(XrdLink *lp, int x, const char *y)
Definition XrdPoll.cc:67
int Process(XrdLink *lp)
Definition XrdPoll.cc:65
static const char * TraceID
Definition XrdPoll.hh:94
int numInterrupts
Definition XrdPoll.hh:134
XrdPoll()
Definition XrdPoll.cc:118
static XrdPoll * Pollers[XRD_NUMPOLLERS]
Definition XrdPoll.hh:79
pthread_t TID
Definition XrdPoll.hh:83
int PID
Definition XrdPoll.hh:82
struct pollfd PipePoll
Definition XrdPoll.hh:116
virtual int Include(XrdPollInfo &pInfo)=0
virtual void Start(XrdSysSemaphore *syncp, int &rc)=0
virtual void Exclude(XrdPollInfo &pInfo)=0
int ReqFD
Definition XrdPoll.hh:118
int PipeBlen
Definition XrdPoll.hh:128
int numEvents
Definition XrdPoll.hh:133
int getRequest()
Definition XrdPoll.cc:232
PipeData ReqBuff
Definition XrdPoll.hh:126
char * PipeBuff
Definition XrdPoll.hh:127
static char * Poll2Text(short events)
Definition XrdPoll.cc:272
static XrdPoll * newPoller(int pollid, int numfd)
Definition XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPoll.cc:204
static void Detach(XrdPollInfo &pInfo)
Definition XrdPoll.cc:177
int numEnabled
Definition XrdPoll.hh:132
int CmdFD
Definition XrdPoll.hh:117
static int Setup(int numfd)
Definition XrdPoll.cc:291
static int Stats(char *buff, int blen, int do_sync=0)
Definition XrdPoll.cc:332
static int Attach(XrdPollInfo &pInfo)
Definition XrdPoll.cc:144
XrdProtocol(const char *jname)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdSysError Log
Definition XrdConfig.cc:112
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
XrdPoll * Poller
Definition XrdPoll.cc:98
int retcode
Definition XrdPoll.cc:99
XrdSysSemaphore PollSync
Definition XrdPoll.cc:100