43 int bytes, alignment, pagsz = getpagesize();
48 bytes = maxfd *
sizeof(
struct pollfd);
49 alignment = (bytes < pagsz ? 1024 : pagsz);
50 if (posix_memalign((
void **)&pp, alignment, bytes))
51 {
Log.
Emsg(
"Poll", ENOMEM,
"create poll table");
57 memset((
void *)pp, 0, bytes);
58 return (
XrdPoll *)
new XrdPollPoll(pp, maxfd);
82 if (PollTab) free(PollTab);
101 while((ptnum < PollTNum) && (PollTab[ptnum].fd != -1)) ptnum++;
106 {Log.
Emsg(
"Attach",
"Attach",pInfo.
Link.
ID,
"failed; poll table overflow.");
113 pfd = &(PollTab[ptnum]);
115 pfd->events = POLLIN | POLLRDNORM;
121 if (ptnum == PollTNum) PollTNum++;
141 if (pInfo.
inQ) dqLink(&pInfo);
149 TRACEI(POLL,
"Poller " <<
PID <<
" async disabling link FD " <<pInfo.
FD);
154 memset(&cmdbuff, 0,
sizeof(cmdbuff));
155 cmdbuff[0].
req = PipeData::DiFD;
156 cmdbuff[0].
Parms.Arg.fd = pInfo.
FD;
158 cmdbuff[1].
req = PipeData::Post;
159 cmdbuff[1].
Parms.theSem = &mySem;
161 if (
write(
CmdFD, &cmdbuff,
sizeof(cmdbuff)) < 0) myerrno = errno;
166 if (myerrno) Log.
Emsg(
"Poll", myerrno,
"disable link", pInfo.
Link.
ID);
168 if (etxt &&
Finish(pInfo, etxt))
196 TRACEI(POLL,
"sending poller " <<
PID <<
" enable for link " <<pInfo.
FD);
197 cmdbuff.
req = PipeData::EnFD;
198 cmdbuff.
Parms.Arg.fd = pInfo.
FD;
201 if (
write(
CmdFD, &cmdbuff,
sizeof(cmdbuff)) < 0) myerrno = errno;
207 {Log.
Emsg(
"Poll", myerrno,
"enable link", pInfo.
Link.
ID);
return 0;}
227 {Log.
Emsg(
"Poll",
"Detach of enabled link", pInfo.
Link.
ID);
230 else if (pInfo.
inQ) dqLink(&pInfo);
234 TRACEI(POLL,
"sending poller " <<
PID <<
" detach for link " <<pInfo.
FD);
235 cmdbuff[0].
req = PipeData::RmFD;
236 cmdbuff[0].
Parms.Arg.fd = pInfo.
FD;
238 cmdbuff[1].
req = PipeData::Post;
239 cmdbuff[1].
Parms.theSem = &mySem;
241 if (
write(
CmdFD, &cmdbuff,
sizeof(cmdbuff)) < 0) myerrno = errno;
246 if (myerrno) Log.
Emsg(
"Poll", myerrno,
"detach link", pInfo.
Link.
ID);
256 int numpolled, num2sched;
261 const short pollOK = POLLIN | POLLRDNORM;
265 PollTab[0].fd =
ReqFD;
266 PollTab[0].events = pollOK;
267 PollTab[0].revents = 0;
277 do {
do {numpolled = poll(PollTab, PollTNum, -1);}
278 while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
283 {
if (errno != EINTR) Restart(errno);
291 if (PollTab[0].revents & pollOK)
292 {doRequests(numpolled);
293 if (--numpolled <= 0)
continue;
299 plp = 0; nlp = PollQ; jfirst = jlast = 0; num2sched = 0;
300 while ((pInfo = nlp) && numpolled > 0)
301 {
if ((pollevents = pInfo->
PollEnt->revents))
303 if (plp) nlp = plp->
Next = pInfo->
Next;
304 else nlp = PollQ = pInfo->
Next;
305 numpolled--; pInfo->
inQ =
false;
306 if (!(pollevents & pollOK))
310 Log.
Emsg(
"Poll",
"Disabled event occurred for", lp->
ID);
313 if (!jlast) jlast=(
XrdJob *)lp;
318 plp = pInfo; nlp = pInfo->
Next;
320 if (numpolled) Recover(numpolled);
325 if (num2sched == 1) Sched.
Schedule(jfirst);
326 else if (num2sched) Sched.
Schedule(num2sched, jfirst, jlast);
344 if ((lastent = PollTNum-1) < 0)
345 {Log.
Emsg(
"Poll",
"Underflow during detach"); abort();}
348 do {PollTNum--;}
while(PollTNum && PollTab[PollTNum-1].fd == -1);
356void XrdPollPoll::doRequests(
int maxreq)
359 int pti, ptfd, num2do;
365 num2do = (maxreq < 3 ? -1 : maxreq);
375 if ((ptfd = abs(PollTab[pti].fd)) !=
ReqBuff.Parms.Arg.fd)
380 {LogEvent(
ReqBuff.req, -1, ptfd);
continue;}
381 if (
ReqBuff.req == PipeData::EnFD)
382 {PollTab[pti].events = POLLIN | POLLRDNORM;
383 PollTab[pti].fd = ptfd;
385 act =
" enabled fd ";
387 else if (
ReqBuff.req == PipeData::DiFD)
388 {PollTab[pti].fd = -ptfd;
389 act =
" disabled fd ";
392 else if (
ReqBuff.req == PipeData::RmFD)
393 {PollTab[pti].fd = -1;
395 act =
" detached fd ";
398 else {
Log.
Emsg(
"Poll",
"Received an invalid poll pipe request");
402 <<
" entry " <<pti <<
" now at " <<PollTNum);
412 XrdPollInfo *plp, *nlp;
418 plp = 0; nlp = PollQ;
419 while (nlp && (pInfo != nlp)) {plp=nlp; nlp = nlp->
Next;}
423 if (nlp) {
if (plp) plp->
Next = nlp->
Next;
424 else PollQ = nlp->
Next;
427 else {PollMutex.UnLock();
436void XrdPollPoll::LogEvent(
int req,
int pollfd,
int cmdfd)
438 const char *opn, *id1, *id2;
442 if (
ReqBuff.req == PipeData::EnFD) opn =
"enable";
443 else if (
ReqBuff.req == PipeData::DiFD) opn =
"disable";
444 else if (
ReqBuff.req == PipeData::RmFD) opn =
"detach";
448 {sprintf(buff,
"poll %d failed; FD %d",
PID, cmdfd);
449 Log.
Emsg(
"Poll", opn, buff,
"does not map to a link");
454 else id1 =
"unknown";
456 else id2 =
"unknown";
457 snprintf(buff,
sizeof(buff)-1,
458 "%d poll fd=%d (%s) not equal %s cmd fd=%d (%s).",
459 PID, pollfd, id1, opn, cmdfd, id2);
461 Log.
Emsg(
"Poll",
"cmd/poll mismatch:", buff);
468void XrdPollPoll::Recover(
int numleft)
475 for (i = 1; i < PollTNum; i++)
476 if (PollTab[i].revents)
480 PollTab[i].fd = -PollTab[i].fd;
490void XrdPollPoll::Restart(
int ecode)
496 TRACE(POLL,
PID <<
'-' <<
TID <<
" Poll error " <<ecode);
497 Log.
Emsg(
"Poll", errno,
"poll");
502 while((pInfo = PollQ))
503 {PollQ = pInfo->
Next;
505 Finish(*pInfo,
"Unexpected polling error");
static XrdLink * fd2link(int fd)
static XrdPollInfo * fd2PollInfo(int fd)
char * ID
Pointer to the client's link identity.
int Include(XrdPollInfo &pInfo)
XrdPollPoll(struct pollfd *pp, int numfd)
void Start(XrdSysSemaphore *syncp, int &rc)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Exclude(XrdPollInfo &pInfo)
int Enable(XrdPollInfo &pInfo)
static char * Poll2Text(short events)
static XrdPoll * newPoller(int pollid, int numfd)
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
union XrdPoll::PipeData::@304241167226111014042165265274261105310352236352 Parms