XRootD
Loading...
Searching...
No Matches
XrdSys::IOEvents::PollKQ Class Reference
Inheritance diagram for XrdSys::IOEvents::PollKQ:
Collaboration diagram for XrdSys::IOEvents::PollKQ:

Public Member Functions

 PollKQ (struct kevent *ptab, int numfd, int pfd, int pFD[2])
 ~PollKQ ()
Public Member Functions inherited from XrdSys::IOEvents::Poller
 Poller (int cFD, int rFD)
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted.
void Stop ()

Static Public Member Functions

static int AllocMem (void **memP, int slots)
Static Public Member Functions inherited from XrdSys::IOEvents::Poller
static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)

Protected Member Functions

void Begin (XrdSysSemaphore *syncp, int &rc, const char **eMsg)
void Exclude (Channel *cP, bool &isLocked, bool dover=1)
bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Shutdown ()
Protected Member Functions inherited from XrdSys::IOEvents::Poller
void CbkTMO ()
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 CPP_ATOMIC_TYPE (bool) wakePend
int GetFault (Channel *cP)
int GetPollEnt (Channel *cP)
int GetRequest ()
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
void LockChannel (Channel *cP)
int Poll2Enum (short events)
int SendCmd (PipeData &cmd)
void SetPollEnt (Channel *cP, int ptEnt)
bool TmoAdd (Channel *cP, int tmoSet)
void TmoDel (Channel *cP)
int TmoGet ()
void UnLockChannel (Channel *cP)

Additional Inherited Members

Public Types inherited from XrdSys::IOEvents::Poller
enum  CreateOpts { optTOM }
Protected Attributes inherited from XrdSys::IOEvents::Poller
ChannelattBase
bool chDead
int cmdFD
int pipeBlen
char * pipeBuff
struct pollfd pipePoll
pthread_t pollTid
PipeData reqBuff
int reqFD
ChanneltmoBase
unsigned char tmoMask
Static Protected Attributes inherited from XrdSys::IOEvents::Poller
static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
static pid_t parentPID = getpid()

Detailed Description

Definition at line 52 of file XrdSysIOEventsPollKQ.icc.

Constructor & Destructor Documentation

◆ PollKQ()

XrdSys::IOEvents::PollKQ::PollKQ ( struct kevent * ptab,
int numfd,
int pfd,
int pFD[2] )
inline

Definition at line 58 of file XrdSysIOEventsPollKQ.icc.

59 : Poller(pFD[0], pFD[1]), pollTab(ptab), cbNext(0),
60 pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0)
61 {EV_SET(&armPipe, reqFD, EVFILT_READ,
62 EV_ADD|EV_CLEAR|EV_ENABLE, 0, 0, 0);
63 }

References XrdSys::IOEvents::Poller::Poller(), and XrdSys::IOEvents::Poller::reqFD.

Here is the call graph for this function:

◆ ~PollKQ()

XrdSys::IOEvents::PollKQ::~PollKQ ( )
inline

Definition at line 64 of file XrdSysIOEventsPollKQ.icc.

References XrdSys::IOEvents::Poller::Stop().

Here is the call graph for this function:

Member Function Documentation

◆ AllocMem()

int XrdSys::IOEvents::PollKQ::AllocMem ( void ** memP,
int slots )
static

Definition at line 156 of file XrdSysIOEventsPollKQ.icc.

157{
158 int rc, bytes, alignment, pagsz = getpagesize();
159
160// Calculate the size of the poll table and allocate it
161//
162 bytes = slots * sizeof(struct kevent);
163 alignment = (bytes < pagsz ? 1024 : pagsz);
164 if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
165 return rc;
166}

◆ Begin()

void XrdSys::IOEvents::PollKQ::Begin ( XrdSysSemaphore * syncp,
int & rc,
const char ** eTxt )
protectedvirtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implements XrdSys::IOEvents::Poller.

Definition at line 198 of file XrdSysIOEventsPollKQ.icc.

201{
202 struct timespec *tmP, tmOut;
203 Channel *cP;
204 long long tmVal;
205 int numpolled, pollN;
206
207// Indicate to the starting thread that all went well
208//
209 retcode = 0;
210 *eTxt = 0;
211 syncsem->Post();
212 tmOut.tv_nsec = 0;
213
214// Now start dispatching channels that are ready. We use the wakePend flag to
215// keep the chatter down when we actually wakeup.
216//
217 do {if ((tmVal = TmoGet()) < 0) tmP = 0;
218 else {tmOut.tv_sec = tmVal / 1000; tmP = &tmOut;}
219 do {numpolled = kevent(pollDfd, 0, 0, pollTab, pollMax, tmP);}
220 while (numpolled < 0 && errno == EINTR);
221 wakePend = true; numPoll = numpolled;
222 if (numpolled == 0) CbkTMO();
223 else if (numpolled < 0)
224 {int rc = errno;
225 //--------------------------------------------------------------
226 // If we are in a child process and the poll file descriptor
227 // has been closed, there is an immense chance the fork will be
228 // followed by an exec, in which case we don't want to abort
229 //--------------------------------------------------------------
230 if( rc == EBADF && parentPID != getpid() ) return;
231 std::cerr <<"KQ: " <<XrdSysE2T(rc) <<" polling for events" <<std::endl;
232 abort();
233 }
234 else for (int i = 0; i < numpolled; i++)
235 {if ((cP = (Channel *)pollTab[i].udata)) Dispatch(cP, i);
236 else if (!Process(i+1)) return;
237 }
238
239 pollN = AtomicGet(pollNum);
240 if (pollMax < pollN) AllocPT(pollN);
241
242 } while(1);
243}
#define AtomicGet(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104

References AtomicGet, XrdSys::IOEvents::Poller::CbkTMO(), XrdSys::IOEvents::Poller::Channel, XrdSys::IOEvents::Poller::parentPID, XrdSysSemaphore::Post(), XrdSys::IOEvents::Poller::TmoGet(), and XrdSysE2T().

Here is the call graph for this function:

◆ Exclude()

void XrdSys::IOEvents::PollKQ::Exclude ( Channel * cP,
bool & isLocked,
bool dover = 1 )
protectedvirtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implements XrdSys::IOEvents::Poller.

Definition at line 283 of file XrdSysIOEventsPollKQ.icc.

285{
286 struct kevent chlist[2];
287 int i = 0, theFD = cP->GetFD(), kqStatus = GetPollEnt(cP);
288
289// Setup the removal elements.
290// may have been closed prior to this call (though this shouldn't happen).
291//
292 if (kqStatus & rFilterX)
293 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DELETE, 0, 0, cP);}
294 if (kqStatus & wFilterX)
295 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DELETE, 0, 0, cP);}
296
297// Remove this channel from the poll set. We ignore errors as the descriptor
298// may have been closed prior to this call (though this shouldn't happen).
299//
300 if (i) kevent(pollDfd, chlist, i, 0, 0, 0);
301 SetPollEnt(cP, 0);
302 AtomicDec(pollNum);
303
304// If we need to verify this action, sync with the poller thread (note that the
305// poller thread will not ask for this action unless it wants to deadlock). We
306// may actually deadlock anyway if the channel lock is held. We are allowed to
307// release it if the caller locked it. This will prevent a deadlock. Otherwise,
308// if we are in a callback and this channel is not the one that initiated the
309// exclude then we must make sure that we cancel any pending callback to the
310// excluded channel as it may have been deleted and we won't know that here.
311//
312 if (dover)
313 {PipeData cmdbuff;
314 if (isLocked)
315 {isLocked = false;
316 UnLockChannel(cP);
317 }
318 cmdbuff.req = PipeData::RmFD;
319 cmdbuff.fd = theFD;
320 SendCmd(cmdbuff);
321 } else {
322 if (cbNext)
323 for (int i = cbNext; i < numPoll; i++)
324 {if (cP == (Channel *)pollTab[i].udata)
325 pollTab[i].udata = &deadChP;
326 }
327 }
328}
#define AtomicDec(x)
int GetPollEnt(Channel *cP)
int SendCmd(PipeData &cmd)
void UnLockChannel(Channel *cP)
void SetPollEnt(Channel *cP, int ptEnt)

References AtomicDec, XrdSys::IOEvents::Poller::Channel, XrdSys::IOEvents::Poller::PipeData::fd, XrdSys::IOEvents::Channel::GetFD(), XrdSys::IOEvents::Poller::GetPollEnt(), XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::SendCmd(), XrdSys::IOEvents::Poller::SetPollEnt(), and XrdSys::IOEvents::Poller::UnLockChannel().

Here is the call graph for this function:

◆ Include()

bool XrdSys::IOEvents::PollKQ::Include ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedvirtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implements XrdSys::IOEvents::Poller.

Definition at line 334 of file XrdSysIOEventsPollKQ.icc.

338{
339
340// We simply call modify as this will add events to the kqueue as needed
341//
342 if (!Modify(cP, eNum, eTxt, isLocked))
343 {if (eTxt) *eTxt = "adding channel";
344 return false;
345 }
346
347// All went well. Bump the number in the set. The poller thread will
348// reallocate the poll table if need be.
349//
350 AtomicInc(pollNum);
351 return true;
352}
#define AtomicInc(x)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)

References AtomicInc, and Modify().

Here is the call graph for this function:

◆ Modify()

bool XrdSys::IOEvents::PollKQ::Modify ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedvirtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implements XrdSys::IOEvents::Poller.

Definition at line 358 of file XrdSysIOEventsPollKQ.icc.

362{
363 (void)isLocked;
364 struct kevent chlist[2];
365 int i = 0;
366 int events = cP->GetEvents(), theFD = cP->GetFD();
367 int kqStatus = GetPollEnt(cP);
368
369// Establish new read event mask
370//
371 if (events & Channel:: readEvents)
372 {if (!(kqStatus & rEnabled))
373 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, cP);
374 kqStatus |= rEnabled | rFilterX;
375 i++;
376 }
377 } else {
378 if (kqStatus & rEnabled)
379 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DISABLE, 0, 0, cP);
380 kqStatus &= ~rEnabled;
381 i++;
382 }
383 }
384
385// Establish new write event mask
386//
387 if (events & Channel::writeEvents)
388 {if (!(kqStatus & wEnabled))
389 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, cP);
390 kqStatus |= wEnabled | wFilterX;
391 i++;
392 }
393 } else {
394 if (kqStatus & wEnabled)
395 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DISABLE, 0, 0, cP);
396 kqStatus &= ~wEnabled;
397 i++;
398 }
399 }
400
401// Modify this fd if anything changed
402//
403 if (i)
404 {if (kevent(pollDfd, chlist, i, 0, 0, 0) < 0)
405 {eNum = errno;
406 if (eTxt) *eTxt = "modifying poll events";
407 return false;
408 }
409 SetPollEnt(cP, kqStatus);
410 }
411
412// All done
413//
414 return true;
415}
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.

References XrdSys::IOEvents::Channel::GetEvents(), XrdSys::IOEvents::Channel::GetFD(), XrdSys::IOEvents::Poller::GetPollEnt(), XrdSys::IOEvents::Channel::readEvents, XrdSys::IOEvents::Poller::SetPollEnt(), and XrdSys::IOEvents::Channel::writeEvents.

Referenced by Include().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Shutdown()

void XrdSys::IOEvents::PollKQ::Shutdown ( )
protectedvirtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implements XrdSys::IOEvents::Poller.

Definition at line 455 of file XrdSysIOEventsPollKQ.icc.

456{
457 static XrdSysMutex shutMutex;
458
459// To avoid race conditions, we serialize this code
460//
461 shutMutex.Lock();
462
463// Release the poll table
464//
465 if (pollTab) {free(pollTab); pollTab = 0;}
466
467// Close the kqueue file descriptor
468//
469 if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
470
471// All done
472//
473 shutMutex.UnLock();
474}
#define close(a)
Definition XrdPosix.hh:48

References close, XrdSysMutex::Lock(), and XrdSysMutex::UnLock().

Here is the call graph for this function:

The documentation for this class was generated from the following file: