XRootD
Loading...
Searching...
No Matches
XrdSendQ Class Reference

#include <XrdSendQ.hh>

Inheritance diagram for XrdSendQ:
Collaboration diagram for XrdSendQ:

Public Member Functions

 XrdSendQ (XrdLink &lP, XrdSysMutex &mP)
unsigned int Backlog ()
virtual void DoIt ()
int Send (const char *buff, int blen)
int Send (const struct iovec *iov, int iovcnt, int iotot)
void Terminate (XrdLink *lP=0)
Public Member Functions inherited from XrdJob
 XrdJob (const char *desc="")
virtual ~XrdJob ()

Static Public Member Functions

static void SetAQ (bool onoff)
static void SetQM (unsigned int qmVal)
static void SetQW (unsigned int qwVal)

Additional Inherited Members

Public Attributes inherited from XrdJob
const char * Comment
XrdJobNextJob

Detailed Description

Definition at line 42 of file XrdSendQ.hh.

Constructor & Destructor Documentation

◆ XrdSendQ()

XrdSendQ::XrdSendQ ( XrdLink & lP,
XrdSysMutex & mP )

Definition at line 91 of file XrdSendQ.cc.

92 : XrdJob("sendQ runner"),
93 mLink(lP), wMutex(mP),
94 fMsg(0), lMsg(0), delQ(0), theFD(lP.FDnum()),
95 inQ(0), qWmsg(qWarn), discards(0),
96 active(false), terminate(false) {}
XrdJob(const char *desc="")
Definition XrdJob.hh:51

References XrdJob::XrdJob().

Here is the call graph for this function:

Member Function Documentation

◆ Backlog()

unsigned int XrdSendQ::Backlog ( )
inline

Definition at line 46 of file XrdSendQ.hh.

46{return inQ;}

◆ DoIt()

void XrdSendQ::DoIt ( )
virtual

Implements XrdJob.

Definition at line 102 of file XrdSendQ.cc.

103{
104 mBuff *theMsg;
105 int myFD, rc;
106 bool theEnd;
107
108// Obtain the lock
109//
110 wMutex.Lock();
111
112// Before we start check if we should delete any messages
113//
114 if (delQ) {RelMsgs(delQ); delQ = 0;}
115
116// Send all queued messages (we can use a blocking send here)
117//
118 while(!terminate && (theMsg = fMsg))
119 {if (!(fMsg = fMsg->next)) lMsg = 0;
120 inQ--; myFD = theFD;
121 wMutex.UnLock();
122 rc = send(myFD, theMsg->mData, theMsg->mLen, 0);
123 free(theMsg);
124 wMutex.Lock();
125 if (rc < 0) {Scuttle(); break;}
126 }
127
128// Before we exit check if we should delete any messages
129//
130 if (delQ) {RelMsgs(delQ); delQ = 0;}
131 if ((theEnd = terminate) && fMsg) RelMsgs(fMsg);
132 active = false;
133 qWmsg = qWarn;
134
135// Release any messages that need to be released. Note that we may have been
136// deleted at this point so we cannot reference anything via "this" once we
137// unlock the mutex. We may also need to delete ourselves.
138//
139 wMutex.UnLock();
140 if (theEnd) delete this;
141}

◆ Send() [1/2]

int XrdSendQ::Send ( const char * buff,
int blen )

Definition at line 230 of file XrdSendQ.cc.

231{
232 mBuff *theMsg;
233 int bleft, bsent;
234
235// If there is an active thread handling messages then we have to queue it.
236// Otherwise try to send it. We need to hold the lock here to prevent messing
237// up the message is only part of it could be sent. This is a non-blocking call.
238//
239 if (active) bleft = blen;
240 else if ((bleft = SendNB(buff, blen)) <= 0) return (bleft ? -1 : blen);
241
242// Allocate buffer for the message
243//
244 if (!(theMsg = (mBuff *)malloc(sizeof(mBuff) + bleft)))
245 {errno = ENOMEM; return -1;}
246
247// Copy the unsent message fragment
248//
249 bsent = blen - bleft;
250 memcpy(theMsg->mData, buff+bsent, bleft);
251 theMsg->mLen = bleft;
252
253// Queue the message.
254//
255 return (QMsg(theMsg) ? blen : -1);
256}

◆ Send() [2/2]

int XrdSendQ::Send ( const struct iovec * iov,
int iovcnt,
int iotot )

Definition at line 262 of file XrdSendQ.cc.

263{
264 mBuff *theMsg;
265 char *body;
266 int bleft, bmore, iovX;
267
268// If there is an active thread handling messages then we have to queue it.
269// Otherwise try to send it. We need to hold the lock here to prevent messing
270// up the message is only part of it could be sent. This is a non-blocking call.
271//
272 if (active)
273 {bleft = 0;
274 for (iovX = 0; iovX < iovcnt; iovX++)
275 if ((bleft = iov[iovX].iov_len)) break;
276 if (!bleft) return iotot;
277 } else {
278 if ((bleft = SendNB(iov, iovcnt, iotot, iovX)) <= 0)
279 return (bleft ? -1 : 0);
280 }
281
282// Readjust the total amount not sent based on where we stopped in the iovec.
283//
284 bmore = bleft;
285 for (int i = iovX+1; i < iovcnt; i++) bmore += iov[i].iov_len;
286
287// Copy the unsent message (for simplicity we will copy the whole iovec stop).
288//
289 if (!(theMsg = (mBuff *)malloc(bmore+sizeof(mBuff))))
290 {errno = ENOMEM; return -1;}
291
292// Setup the message length
293//
294 theMsg->mLen = bmore;
295
296// Copy the first fragment (it cannot be zero length)
297//
298 body = theMsg->mData;
299 memcpy(body, ((char *)iov[iovX].iov_base)+(iov[iovX].iov_len-bleft), bleft);
300 body += bleft;
301
302// All remaining items
303//
304 for (int i = iovX+1; i < iovcnt; i++)
305 {if (iov[i].iov_len)
306 {memcpy(body, iov[i].iov_base, iov[i].iov_len);
307 body += iov[i].iov_len;
308 }
309 }
310
311// Queue the message.
312//
313 return (QMsg(theMsg) ? iotot : 0);
314}

◆ SetAQ()

void XrdSendQ::SetAQ ( bool onoff)
inlinestatic

Definition at line 54 of file XrdSendQ.hh.

54{qPerm = onoff;}

◆ SetQM()

void XrdSendQ::SetQM ( unsigned int qmVal)
inlinestatic

Definition at line 56 of file XrdSendQ.hh.

56{qMax = qmVal;}

Referenced by XrdCmsConfig::Configure1().

Here is the caller graph for this function:

◆ SetQW()

void XrdSendQ::SetQW ( unsigned int qwVal)
inlinestatic

Definition at line 58 of file XrdSendQ.hh.

58{qWarn = qwVal;}

◆ Terminate()

void XrdSendQ::Terminate ( XrdLink * lP = 0)

Definition at line 396 of file XrdSendQ.cc.

397{
398// First step is to see if we need to schedule a shutdown prior to quiting
399//
400 if (lP) Sched.Schedule((XrdJob *)new LinkShutdown(lP));
401
402// If there is an active thread then we need to let the thread handle the
403// termination of this object. Otherwise, we can do it now.
404//
405 if (active)
406 {Scuttle();
407 terminate = true;
408 theFD =-1;
409 } else {
410 if (fMsg) {RelMsgs(fMsg); fMsg = lMsg = 0;}
411 if (delQ) {RelMsgs(delQ); delQ = 0;}
412 delete this;
413 }
414}
void Schedule(XrdJob *jp)
XrdScheduler Sched
Definition XrdLinkCtl.cc:54

References XrdJob::XrdJob(), and XrdGlobal::Sched.

Here is the call graph for this function:

The documentation for this class was generated from the following files: