37#if defined(__linux__) || defined(__GNU__)
38#include <netinet/tcp.h>
47#if !defined(__FreeBSD__)
48#include <sys/sendfile.h>
51#include <sys/socket.h>
74#define ETIME ETIMEDOUT
92#if defined(HAVE_SENDFILE)
100const unsigned char KillMax = 60;
101const unsigned char KillMsk = 0x7f;
102const unsigned char KillXwt = 0x80;
104const char *TraceID =
"Link";
190 return linkXQ.PollInfo.FD;
205 int &numstall,
int &numtardy)
206 {
return linkXQ.getIOStats(inbytes, outbytes,
227 return linkXQ.getPeerCerts();
242 (lk ?
linkXQ.LinkInfo.opMutex.Lock() :
linkXQ.LinkInfo.opMutex.UnLock());
276 if (
isTLS)
return linkXQ.TLS_Peek(Buff, Blen, timeout);
277 else return linkXQ.Peek (Buff, Blen, timeout);
287 else return linkXQ.Recv (Buff, Blen);
294 if (
isTLS)
return linkXQ.TLS_Recv(Buff, Blen, timeout);
295 else return linkXQ.Recv (Buff, Blen, timeout);
305 else return linkXQ.Recv (
iov, iocnt, timeout);
314 if (
isTLS)
return linkXQ.TLS_RecvAll(Buff, Blen, timeout);
315 else return linkXQ.RecvAll (Buff, Blen, timeout);
324 return linkXQ.Register(hName);
334 else return linkXQ.Send (Buff, Blen);
343 if (!bytes)
for (
int i = 0; i < iocnt; i++) bytes +=
iov[i].iov_len;
348 else return linkXQ.Send (
iov, iocnt, bytes);
358 {
Log.Emsg(
"Link", E2BIG,
"send file to",
ID);
365 else return linkXQ.Send (sfP, sfN);
379 linkXQ.LinkInfo.opMutex.Lock();
380 if (
linkXQ.LinkInfo.InUse <= 1)
linkXQ.LinkInfo.opMutex.UnLock();
381 else {
linkXQ.LinkInfo.doPost++;
382 linkXQ.LinkInfo.opMutex.UnLock();
383 TRACEI(
DEBUG,
"Waiting for link serialization; use="
385 linkXQ.LinkInfo.IOSemaphore.Wait();
395 linkXQ.LinkInfo.opMutex.Lock();
397 linkXQ.LinkInfo.Etext = (text ? strdup(text) : 0);
398 linkXQ.LinkInfo.opMutex.UnLock();
407 {
linkXQ.setID(userid, procid);}
420 {
linkXQ.setLocation(loc);}
457 linkXQ.LinkInfo.opMutex.Lock();
459 <<
linkXQ.LinkInfo.InUse <<
'+'
460 <<use <<
" post=" <<
linkXQ.LinkInfo.doPost);
461 linkXQ.LinkInfo.InUse += use;
463 if (!
linkXQ.LinkInfo.InUse)
464 {
linkXQ.LinkInfo.InUse = 1;
linkXQ.LinkInfo.opMutex.UnLock();
465 Log.Emsg(
"Link",
"Zero use count for",
ID);
467 else if (
linkXQ.LinkInfo.InUse == 1 &&
linkXQ.LinkInfo.doPost)
468 {
while(
linkXQ.LinkInfo.doPost)
469 {
linkXQ.LinkInfo.IOSemaphore.Post();
470 TRACEI(CONN,
"setRef posted link");
473 linkXQ.LinkInfo.opMutex.UnLock();
475 else if (
linkXQ.LinkInfo.InUse < 0)
476 {
linkXQ.LinkInfo.InUse = 1;
477 linkXQ.LinkInfo.opMutex.UnLock();
478 Log.Emsg(
"Link",
"Negative use count for",
ID);
480 else linkXQ.LinkInfo.opMutex.UnLock();
491 if (
isTLS == enable)
return true;
493 return linkXQ.setTLS(enable, ctx);
529 if (lp ==
this)
return 0;
531 if (!(cp = index(
ID,
':')) || strncmp(lp->
ID,
ID, cp-
ID)
546 || !
linkXQ.PollInfo.Poller || !
linkXQ.getProtocol())
return -EPIPE;
550 int wTime, killTries;
551 killTries =
linkXQ.LinkInfo.KillCnt & KillMsk;
552 if (killTries > KillMax)
return -
ETIME;
557 linkXQ.LinkInfo.KillCnt = killTries | KillXwt;
562 if (!
linkXQ.PollInfo.isEnabled ||
linkXQ.LinkInfo.InUse > 1
563 ||
linkXQ.LinkInfo.KillcvP)
565 return (wTime > 60 ? 60: wTime);
571 linkXQ.LinkInfo.KillcvP = &killDone;
577 snprintf(buff,
sizeof(buff),
"ended by %s", owner);
578 buff[
sizeof(buff)-1] =
'\0';
580 linkXQ.LinkInfo.opMutex.UnLock();
593 linkXQ.LinkInfo.opMutex.Lock();
594 linkXQ.LinkInfo.KillcvP = 0;
595 linkXQ.LinkInfo.opMutex.UnLock();
599 TRACEI(
DEBUG,
"Terminate " << (wTime <= 0 ?
"complete ":
"timeout ") <<wTime);
630 struct pollfd polltab = {
linkXQ.PollInfo.FD, POLLIN|POLLRDNORM, 0};
635 do {retc = poll(&polltab, 1, timeout);}
while(retc < 0 && errno == EINTR);
637 {
if (retc == 0)
return 0;
638 Log.Emsg(
"Link", -errno,
"poll",
ID);
644 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
XrdJob(const char *desc="")
static XrdLink * fd2link(int fd)
static XrdLink * Find(int &curr, XrdLinkMatch *who=0)
static short killWait
Link destruction control constants.
static int getName(int &curr, char *bname, int blen, XrdLinkMatch *who=0)
static int Stats(char *buff, int blen, bool do_sync=false)
XrdProtocol * setProtocol(XrdProtocol *pp, bool runit=false, bool push=false)
void Serialize()
Wait for all outstanding requests to be completed on the link.
int Wait4Data(int timeout)
static XrdLink * Find(int &curr, XrdLinkMatch *who=0)
int setEtext(const char *text)
bool isInstance(unsigned int inst) const
bool setTLS(bool enable, XrdTlsContext *ctx=0)
Enable or disable TLS on the link.
int Peek(char *buff, int blen, int timeout=-1)
int Recv(char *buff, int blen)
void syncStats(int *ctime=0)
static int getName(int &curr, char *bname, int blen, XrdLinkMatch *who=0)
time_t timeCon() const
Return the time the link was made active (i.e. time of connection).
int Close(bool defer=false)
int Terminate(const char *owner, int fdnum, unsigned int inst)
void setID(const char *userid, int procid)
int UseCnt() const
Return link's reference count.
const XrdNetAddr * NetAddr() const
int RecvAll(char *buff, int blen, int timeout=-1)
void Enable()
Enable the link to field interrupts.
XrdTlsPeerCerts * getPeerCerts()
XrdNetAddrInfo * AddrInfo()
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
bool Register(const char *hName)
void armBridge()
Mark this link as an in-memory communications bridge (internal use only).
void setProtName(const char *name)
void setLocation(XrdNetAddrInfo::LocInfo &loc)
int getIOStats(long long &inbytes, long long &outbytes, int &numstall, int &numtardy)
XrdProtocol * getProtocol()
Obtain current protocol object pointer.
int Client(char *buff, int blen)
const char * Host() const
const char * Name() const
void Shutdown(bool getLock)
static int Stats(char *buff, int blen, bool do_sync=0)
static char * Poll2Text(short events)
static int Attach(XrdPollInfo &pInfo)