XRootD
Loading...
Searching...
No Matches
XrdLinkXeq.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d L i n k X e q . c c */
4/* */
5/* (c) 2018 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <limits.h>
31#include <poll.h>
32#include <signal.h>
33#include <cstdio>
34#include <cstring>
35#include <unistd.h>
36#include <sys/types.h>
37#include <sys/uio.h>
38
39#if defined(__linux__) || defined(__GNU__)
40#include <netinet/tcp.h>
41#if !defined(TCP_CORK)
42#undef HAVE_SENDFILE
43#endif
44#endif
45
46#ifdef HAVE_SENDFILE
47
48#if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49#include <sys/sendfile.h>
50#endif
51
52#endif
53
55#include "XrdSys/XrdSysError.hh"
56#include "XrdSys/XrdSysFD.hh"
58
59#include "Xrd/XrdBuffer.hh"
60#include "Xrd/XrdLink.hh"
61#include "Xrd/XrdLinkCtl.hh"
62#include "Xrd/XrdLinkXeq.hh"
63#include "Xrd/XrdPoll.hh"
64#include "Xrd/XrdScheduler.hh"
65#include "Xrd/XrdSendQ.hh"
66#include "Xrd/XrdTcpMonPin.hh"
67
68#define TRACE_IDENT ID
69#include "Xrd/XrdTrace.hh"
70
71/******************************************************************************/
72/* G l o b a l s */
73/******************************************************************************/
74
75namespace XrdGlobal
76{
77extern XrdSysError Log;
78extern XrdScheduler Sched;
79extern XrdTlsContext *tlsCtx;
81extern int devNull;
83};
84
85using namespace XrdGlobal;
86
87/******************************************************************************/
88/* S t a t i c s */
89/******************************************************************************/
90
91 const char *XrdLinkXeq::TraceID = "LinkXeq";
92
93 long long XrdLinkXeq::LinkBytesIn = 0;
94 long long XrdLinkXeq::LinkBytesOut = 0;
95 long long XrdLinkXeq::LinkConTime = 0;
96 long long XrdLinkXeq::LinkCountTot = 0;
103
104/******************************************************************************/
105/* C o n s t r u c t o r */
106/******************************************************************************/
107
109{
111}
112
114{
115 memcpy(Uname+sizeof(Uname)-7, "anon.0@", 7);
116 strcpy(Lname, "somewhere");
117 ID = &Uname[sizeof(Uname)-5];
118 Comment = ID;
119 sendQ = 0;
120 stallCnt = stallCntTot = 0;
121 tardyCnt = tardyCntTot = 0;
122 SfIntr = 0;
123 isIdle = 0;
125 LockReads= false;
126 KeepFD = false;
127 Protocol = 0;
128 ProtoAlt = 0;
129
130 LinkInfo.Reset();
131 PollInfo.Zorch();
132 ResetLink();
133}
134
135/******************************************************************************/
136/* B a c k l o g */
137/******************************************************************************/
138
140{
142
143// Return backlog information
144//
145 return (sendQ ? sendQ->Backlog() : 0);
146}
147
148/******************************************************************************/
149/* C l i e n t */
150/******************************************************************************/
151
152int XrdLinkXeq::Client(char *nbuf, int nbsz)
153{
154 int ulen;
155
156// Generate full client name
157//
158 if (nbsz <= 0) return 0;
159 ulen = (Lname - ID);
160 if ((ulen + HNlen) >= nbsz) ulen = 0;
161 else {strncpy(nbuf, ID, ulen);
162 strcpy(nbuf+ulen, HostName);
163 ulen += HNlen;
164 }
165 return ulen;
166}
167
168/******************************************************************************/
169/* C l o s e */
170/******************************************************************************/
171
172int XrdLinkXeq::Close(bool defer)
173{ XrdSysMutexHelper opHelper(LinkInfo.opMutex);
174 int csec, fd, rc = 0;
175
176// If a defer close is requested, we can close the descriptor but we must
177// keep the slot number to prevent a new client getting the same fd number.
178// Linux is peculiar in that any in-progress operations will remain in that
179// state even after the FD is closed unless there is some activity either on
180// the connection or an event occurs that causes an operation restart. We
181// portably solve this problem by issuing a shutdown() on the socket prior
182// closing it. On most platforms, this informs readers that the connection is
183// gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
184// nonblocking mode is enabled, we need to do this in a separate thread as
185// a shutdown may block for a pretty long time if lots\ of messages are queued.
186// We will ask the SendQ object to schedule the shutdown for us before it
187// commits suicide.
188// Note that we can hold the opMutex while we also get the wrMutex.
189//
190 if (defer)
191 {if (!sendQ) Shutdown(false);
192 else {TRACEI(DEBUG, "Shutdown FD " <<LinkInfo.FD<<" only via SendQ");
193 LinkInfo.InUse++;
194 LinkInfo.FD = -LinkInfo.FD; // Leave poll version untouched!
195 wrMutex.Lock();
196 sendQ->Terminate(this);
197 sendQ = 0;
198 wrMutex.UnLock();
199 }
200 return 0;
201 }
202
203// If we got here then this is not a deferred close so we just need to check
204// if there is a sendq appendage we need to get rid of.
205//
206 if (sendQ)
207 {wrMutex.Lock();
208 sendQ->Terminate();
209 sendQ = 0;
210 wrMutex.UnLock();
211 }
212
213// Multiple protocols may be bound to this link. If it is in use, defer the
214// actual close until the use count drops to one.
215//
216 while(LinkInfo.InUse > 1)
217 {opHelper.UnLock();
218 TRACEI(DEBUG, "Close FD "<<LinkInfo.FD <<" deferred, use count="
219 <<LinkInfo.InUse);
220 Serialize();
221 opHelper.Lock(&LinkInfo.opMutex);
222 }
223 LinkInfo.InUse--;
224 Instance = 0;
225
226// Add up the statistic for this link
227//
228 syncStats(&csec);
229
230// Cleanup TLS if it is active
231//
232 if (isTLS) tlsIO.Shutdown();
233
234// Clean this link up
235//
236 if (Protocol) {Protocol->Recycle(this, csec, LinkInfo.Etext); Protocol = 0;}
237 if (ProtoAlt) {ProtoAlt->Recycle(this, csec, LinkInfo.Etext); ProtoAlt = 0;}
238 if (LinkInfo.Etext) {free(LinkInfo.Etext); LinkInfo.Etext = 0;}
239 LinkInfo.InUse = 0;
240
241// At this point we can have no lock conflicts, so if someone is waiting for
242// us to terminate let them know about it. Note that we will get the condvar
243// mutex while we hold the opMutex. This is the required order! We will also
244// zero out the pointer to the condvar while holding the opmutex.
245//
246 if (LinkInfo.KillcvP)
247 {LinkInfo.KillcvP->Lock();
248 LinkInfo.KillcvP->Signal();
249 LinkInfo.KillcvP->UnLock();
250 LinkInfo.KillcvP = 0;
251 }
252
253// Remove ourselves from the poll table and then from the Link table. We may
254// not hold on to the opMutex when we acquire the LTMutex. However, the link
255// table needs to be cleaned up prior to actually closing the socket. So, we
256// do some fancy footwork to prevent multiple closes of this link.
257//
258 fd = abs(LinkInfo.FD);
259 if (PollInfo.FD > 0)
260 {if (PollInfo.Poller) {XrdPoll::Detach(PollInfo); PollInfo.Poller = 0;}
261 PollInfo.FD = -1;
262 opHelper.UnLock();
264 } else opHelper.UnLock();
265
266// Invoke the TCP monitor if it was loaded.
267//
268 if (TcpMonPin && fd > 2)
269 {XrdTcpMonPin::LinkInfo lnkInfo;
270 lnkInfo.tident = ID;
271 lnkInfo.fd = fd;
272 lnkInfo.consec = csec;
273 lnkInfo.bytesIn = BytesInTot;
274 lnkInfo.bytesOut = BytesOutTot;
275 TcpMonPin->Monitor(Addr, lnkInfo, sizeof(lnkInfo));
276 }
277
278// Close the file descriptor if it isn't being shared. Do it as the last
279// thing because closes and accepts and not interlocked.
280//
281 if (fd >= 2) {if (KeepFD) rc = 0;
282 else rc = (close(fd) < 0 ? errno : 0);
283 }
284 if (rc) Log.Emsg("Link", rc, "close", ID);
285 return rc;
286}
287
288/******************************************************************************/
289/* D o I t */
290/******************************************************************************/
291
293{
294 int rc;
295
296// The Process() return code tells us what to do:
297// < 0 -> Stop getting requests,
298// -EINPROGRESS leave link disabled but otherwise all is well
299// -n Error, disable and close the link
300// = 0 -> OK, get next request, if allowed, o/w enable the link
301// > 0 -> Slow link, stop getting requests and enable the link
302//
303 if (Protocol)
304 do {rc = Protocol->Process(this);} while (!rc && Sched.canStick());
305 else {Log.Emsg("Link", "Dispatch on closed link", ID);
306 return;
307 }
308
309// Either re-enable the link and cycle back waiting for a new request, leave
310// disabled, or terminate the connection.
311//
312 if (rc >= 0)
313 {if (PollInfo.Poller && !PollInfo.Poller->Enable(PollInfo)) Close();}
314 else if (rc != -EINPROGRESS) Close();
315}
316
317/******************************************************************************/
318/* g e t P e e r C e r t s */
319/******************************************************************************/
320
322{
323 return (isTLS ? tlsIO.getCerts(true) : 0);
324}
325
326/******************************************************************************/
327/* P e e k */
328/******************************************************************************/
329
330int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
331{
332 XrdSysMutexHelper theMutex;
333 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
334 ssize_t mlen;
335 int retc;
336
337// Lock the read mutex if we need to, the helper will unlock it upon exit
338//
339 if (LockReads) theMutex.Lock(&rdMutex);
340
341// Wait until we can actually read something
342//
343 isIdle = 0;
344 do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
345 if (retc != 1)
346 {if (retc == 0) return 0;
347 return Log.Emsg("Link", -errno, "poll", ID);
348 }
349
350// Verify it is safe to read now
351//
352 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
353 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
354 return -1;
355 }
356
357// Do the peek.
358//
359 do {mlen = recv(LinkInfo.FD, Buff, Blen, MSG_PEEK);}
360 while(mlen < 0 && errno == EINTR);
361
362// Return the result
363//
364 if (mlen >= 0) return int(mlen);
365 Log.Emsg("Link", errno, "peek on", ID);
366 return -1;
367}
368
369/******************************************************************************/
370/* R e c v */
371/******************************************************************************/
372
373int XrdLinkXeq::Recv(char *Buff, int Blen)
374{
375 ssize_t rlen;
376
377// Note that we will read only as much as is queued. Use Recv() with a
378// timeout to receive as much data as possible.
379//
380 if (LockReads) rdMutex.Lock();
381 isIdle = 0;
382 do {rlen = read(LinkInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
383 if (rlen > 0) AtomicAdd(BytesIn, rlen);
384 if (LockReads) rdMutex.UnLock();
385
386 if (rlen >= 0) return int(rlen);
387 if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
388 return -1;
389}
390
391/******************************************************************************/
392
393int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
394{
395 XrdSysMutexHelper theMutex;
396 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
397 ssize_t rlen, totlen = 0;
398 int retc;
399
400// Lock the read mutex if we need to, the helper will unlock it upon exit
401//
402 if (LockReads) theMutex.Lock(&rdMutex);
403
404// Wait up to timeout milliseconds for data to arrive
405//
406 isIdle = 0;
407 while(Blen > 0)
408 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
409 if (retc != 1)
410 {if (retc == 0)
411 {tardyCnt++;
412 if (totlen)
413 {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
414 AtomicAdd(BytesIn, totlen);
415 }
416 return int(totlen);
417 }
418 return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
419 }
420
421 // Verify it is safe to read now
422 //
423 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
424 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
425 "polling", ID);
426 return -1;
427 }
428
429 // Read as much data as you can. Note that we will force an error
430 // if we get a zero-length read after poll said it was OK.
431 //
432 do {rlen = recv(LinkInfo.FD, Buff, Blen, 0);}
433 while(rlen < 0 && errno == EINTR);
434 if (rlen <= 0)
435 {if (!rlen) return -ENOMSG;
436 if (LinkInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
437 return -1;
438 }
439 totlen += rlen; Blen -= rlen; Buff += rlen;
440 }
441
442 AtomicAdd(BytesIn, totlen);
443 return int(totlen);
444}
445
446/******************************************************************************/
447
448int XrdLinkXeq::Recv(const struct iovec *iov, int iocnt, int timeout)
449{
450 XrdSysMutexHelper theMutex;
451 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
452 int retc, rlen;
453
454// Lock the read mutex if we need to, the helper will unlock it upon exit
455//
456 if (LockReads) theMutex.Lock(&rdMutex);
457
458// Wait up to timeout milliseconds for data to arrive
459//
460 isIdle = 0;
461 do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
462 if (retc != 1)
463 {if (retc == 0)
464 {tardyCnt++;
465 return 0;
466 }
467 return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
468 }
469
470// Verify it is safe to read now
471//
472 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
473 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
474 return -1;
475 }
476
477// If the iocnt is within limits then just go ahead and read once.
478//
479 if (iocnt <= maxIOV)
480 {rlen = RecvIOV(iov, iocnt);
481 if (rlen > 0) {AtomicAdd(BytesIn, rlen);}
482 return rlen;
483 }
484
485// We will have to break this up into allowable segments and we need to add up
486// the bytes in each segment so that we know when to stop reading.
487//
488 int seglen, segcnt = maxIOV, totlen = 0;
489 do {seglen = 0;
490 for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
491 if ((rlen = RecvIOV(iov, segcnt)) < 0) return rlen;
492 totlen += rlen;
493 if (rlen < seglen) break;
494 iov += segcnt;
495 iocnt -= segcnt;
496 if (iocnt <= maxIOV) segcnt = iocnt;
497 } while(iocnt > 0);
498
499// All done
500//
501 AtomicAdd(BytesIn, totlen);
502 return totlen;
503}
504
505/******************************************************************************/
506/* R e c v A l l */
507/******************************************************************************/
508
509int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
510{
511 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
512 ssize_t rlen;
513 int retc;
514
515// Check if timeout specified. Notice that the timeout is the max we will
516// for some data. We will wait forever for all the data. Yeah, it's weird.
517//
518 if (timeout >= 0)
519 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
520 if (retc != 1)
521 {if (!retc) return -ETIMEDOUT;
522 Log.Emsg("Link",errno,"poll",ID);
523 return -1;
524 }
525 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
526 {Log.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
527 return -1;
528 }
529 }
530
531// Note that we will block until we receive all he bytes.
532//
533 if (LockReads) rdMutex.Lock();
534 isIdle = 0;
535 do {rlen = recv(LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
536 while(rlen < 0 && errno == EINTR);
537 if (rlen > 0) AtomicAdd(BytesIn, rlen);
538 if (LockReads) rdMutex.UnLock();
539
540 if (int(rlen) == Blen) return Blen;
541 if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
542 else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
543 else if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
544 return -1;
545}
546
547/******************************************************************************/
548/* Protected: R e c v I O V */
549/******************************************************************************/
550
551int XrdLinkXeq::RecvIOV(const struct iovec *iov, int iocnt)
552{
553 ssize_t retc = 0;
554
555// Read the data in. On some version of Unix (e.g., Linux) a readv() may
556// end at any time without reading all the bytes when directed to a socket.
557// We always return the number bytes read (or an error). The caller needs to
558// restart the read at the appropriate place in the iovec when more data arrives.
559//
560 do {retc = readv(LinkInfo.FD, iov, iocnt);}
561 while(retc < 0 && errno == EINTR);
562
563// Check how we completed
564//
565 if (retc < 0) Log.Emsg("Link", errno, "receive from", ID);
566 return retc;
567}
568
569/******************************************************************************/
570/* R e g i s t e r */
571/******************************************************************************/
572
573bool XrdLinkXeq::Register(const char *hName)
574{
575
576// First see if we can register this name with the address object
577//
578 if (!Addr.Register(hName)) return false;
579
580// Make appropriate changes here
581//
582 if (HostName) free(HostName);
583 HostName = strdup(hName);
584 strlcpy(Lname, hName, sizeof(Lname));
585 return true;
586}
587
588/******************************************************************************/
589/* S e n d */
590/******************************************************************************/
591
592int XrdLinkXeq::Send(const char *Buff, int Blen)
593{
594 ssize_t retc = 0, bytesleft = Blen;
595
596// Get a lock
597//
598 wrMutex.Lock();
599 isIdle = 0;
600 AtomicAdd(BytesOut, Blen);
601
602// Do non-blocking writes if we are setup to do so.
603//
604 if (sendQ)
605 {retc = sendQ->Send(Buff, Blen);
606 wrMutex.UnLock();
607 return retc;
608 }
609
610// Write the data out
611//
612 while(bytesleft)
613 {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
614 {if (errno == EINTR) continue;
615 else break;
616 }
617 bytesleft -= retc; Buff += retc;
618 }
619
620// All done
621//
622 wrMutex.UnLock();
623 if (retc >= 0) return Blen;
624 Log.Emsg("Link", errno, "send to", ID);
625 return -1;
626}
627
628/******************************************************************************/
629
630int XrdLinkXeq::Send(const struct iovec *iov, int iocnt, int bytes)
631{
632 int retc;
633
634// Get a lock and assume we will be successful (statistically we are)
635//
636 wrMutex.Lock();
637 isIdle = 0;
638 AtomicAdd(BytesOut, bytes);
639
640// Do non-blocking writes if we are setup to do so.
641//
642 if (sendQ)
643 {retc = sendQ->Send(iov, iocnt, bytes);
644 wrMutex.UnLock();
645 return retc;
646 }
647
648// If the iocnt is within limits then just go ahead and write this out
649//
650 if (iocnt <= maxIOV)
651 {retc = SendIOV(iov, iocnt, bytes);
652 wrMutex.UnLock();
653 return retc;
654 }
655
656// We will have to break this up into allowable segments
657//
658 int seglen, segcnt = maxIOV, iolen = 0;
659 do {seglen = 0;
660 for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
661 if ((retc = SendIOV(iov, segcnt, seglen)) < 0)
662 {wrMutex.UnLock();
663 return retc;
664 }
665 iolen += retc;
666 iov += segcnt;
667 iocnt -= segcnt;
668 if (iocnt <= maxIOV) segcnt = iocnt;
669 } while(iocnt > 0);
670
671// All done
672//
673 wrMutex.UnLock();
674 return iolen;
675}
676
677/******************************************************************************/
678
679int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
680{
681#if !defined(HAVE_SENDFILE)
682
683 return -1;
684
685#elif defined(__solaris__)
686
687 sendfilevec_t vecSF[XrdOucSFVec::sfMax], *vecSFP = vecSF;
688 size_t xframt, totamt, bytes = 0;
689 ssize_t retc;
690 int i = 0;
691
692// Construct the sendfilev() vector
693//
694 for (i = 0; i < sfN; sfP++, i++)
695 {if (sfP->fdnum < 0)
696 {vecSF[i].sfv_fd = SFV_FD_SELF;
697 vecSF[i].sfv_off = (off_t)sfP->buffer;
698 } else {
699 vecSF[i].sfv_fd = sfP->fdnum;
700 vecSF[i].sfv_off = sfP->offset;
701 }
702 vecSF[i].sfv_flag = 0;
703 vecSF[i].sfv_len = sfP->sendsz;
704 bytes += sfP->sendsz;
705 }
706 totamt = bytes;
707
708// Lock the link, issue sendfilev(), and unlock the link. The documentation
709// is very spotty and inconsistent. We can only retry this operation under
710// very limited conditions.
711//
712 wrMutex.Lock();
713 isIdle = 0;
714do{retc = sendfilev(LinkInfo.FD, vecSFP, sfN, &xframt);
715
716// Check if all went well and return if so (usual case)
717//
718 if (xframt == bytes)
719 {AtomicAdd(BytesOut, bytes);
720 wrMutex.UnLock();
721 return totamt;
722 }
723
724// The only one we will recover from is EINTR. We cannot legally get EAGAIN.
725//
726 if (retc < 0 && errno != EINTR) break;
727
728// Try to resume the transfer
729//
730 if (xframt > 0)
731 {AtomicAdd(BytesOut, xframt); bytes -= xframt; SfIntr++;
732 while(xframt > 0 && sfN)
733 {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
734 {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
735 xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
736 }
737 }
738 } while(sfN > 0);
739
740// See if we can recover without destroying the connection
741//
742 retc = (retc < 0 ? errno : ECANCELED);
743 wrMutex.UnLock();
744 Log.Emsg("Link", retc, "send file to", ID);
745 return -1;
746
747#elif defined(__linux__) || defined(__GNU__)
748
749 static const int setON = 1, setOFF = 0;
750 ssize_t retc = 0, bytesleft;
751 off_t myOffset;
752 int i, xfrbytes = 0, uncork = 1, xIntr = 0;
753
754// lock the link
755//
756 wrMutex.Lock();
757 isIdle = 0;
758
759// In linux we need to cork the socket. On permanent errors we do not uncork
760// the socket because it will be closed in short order.
761//
762 if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
763 {Log.Emsg("Link", errno, "cork socket for", ID);
764 uncork = 0; sfOK = 0;
765 }
766
767// Send the header first
768//
769 for (i = 0; i < sfN; sfP++, i++)
770 {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
771 else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
772 while(bytesleft
773 && (retc=sendfile(LinkInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
774 {bytesleft -= retc; xIntr++;}
775 }
776 if (retc < 0 && errno == EINTR) continue;
777 if (retc <= 0) break;
778 xfrbytes += sfP->sendsz;
779 }
780
781// Diagnose any sendfile errors
782//
783 if (retc <= 0)
784 {if (retc == 0) errno = ECANCELED;
785 wrMutex.UnLock();
786 Log.Emsg("Link", errno, "send file to", ID);
787 return -1;
788 }
789
790// Now uncork the socket
791//
792 if (uncork
793 && setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
794 Log.Emsg("Link", errno, "uncork socket for", ID);
795
796// All done
797//
798 if (xIntr > sfN) SfIntr += (xIntr - sfN);
799 AtomicAdd(BytesOut, xfrbytes);
800 wrMutex.UnLock();
801 return xfrbytes;
802
803#else
804
805 return -1;
806
807#endif
808}
809
810/******************************************************************************/
811/* Protected: s e n d D a t a */
812/******************************************************************************/
813
814int XrdLinkXeq::sendData(const char *Buff, int Blen)
815{
816 ssize_t retc = 0, bytesleft = Blen;
817
818// Write the data out
819//
820 while(bytesleft)
821 {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
822 {if (errno == EINTR) continue;
823 else break;
824 }
825 bytesleft -= retc; Buff += retc;
826 }
827
828// All done
829//
830 return retc;
831}
832
833/******************************************************************************/
834/* Protected: S e n d I O V */
835/******************************************************************************/
836
837int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
838{
839 ssize_t bytesleft, n, retc = 0;
840 const char *Buff;
841
842// Write the data out. On some version of Unix (e.g., Linux) a writev() may
843// end at any time without writing all the bytes when directed to a socket.
844// So, we attempt to resume the writev() using a combination of write() and
845// a writev() continuation. This approach slowly converts a writev() to a
846// series of writes if need be. We must do this inline because we must hold
847// the lock until all the bytes are written or an error occurs.
848//
849 bytesleft = static_cast<ssize_t>(bytes);
850 while(bytesleft)
851 {do {retc = writev(LinkInfo.FD, iov, iocnt);}
852 while(retc < 0 && errno == EINTR);
853 if (retc >= bytesleft || retc < 0) break;
854 bytesleft -= retc;
855 while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
856 {retc -= n; iov++; iocnt--;}
857 Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
858 while(n) {if ((retc = write(LinkInfo.FD, Buff, n)) < 0)
859 {if (errno == EINTR) continue;
860 else break;
861 }
862 n -= retc; Buff += retc; bytesleft -= retc;
863 }
864 if (retc < 0 || iocnt < 1) break;
865 }
866
867// All done
868//
869 if (retc >= 0) return bytes;
870 Log.Emsg("Link", errno, "send to", ID);
871 return -1;
872}
873
874/******************************************************************************/
875/* s e t I D */
876/******************************************************************************/
877
878void XrdLinkXeq::setID(const char *userid, int procid)
879{
880 char buff[sizeof(Uname)], *bp, *sp;
881 int ulen;
882
883 snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
884 ulen = strlen(buff);
885 sp = buff + ulen - 1;
886 bp = &Uname[sizeof(Uname)-1];
887 if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
888 *bp = '@'; bp--;
889 while(ulen--) {*bp = *sp; bp--; sp--;}
890 ID = bp+1;
891 Comment = (const char *)ID;
892
893// Update the ID in the TLS socket if enabled
894//
895 if (isTLS) tlsIO.SetTraceID(ID);
896}
897
898/******************************************************************************/
899/* s e t N B */
900/******************************************************************************/
901
903{
904// We don't support non-blocking output except for Linux at the moment
905//
906#if !defined(__linux__)
907 return false;
908#else
909// Trace this request
910//
911 TRACEI(DEBUG,"enabling non-blocking output");
912
913// If we don't already have a sendQ object get one. This is a one-time call
914// so to optimize checking if this object exists we also get the opMutex.'
915//
916 LinkInfo.opMutex.Lock();
917 if (!sendQ)
918 {wrMutex.Lock();
919 sendQ = new XrdSendQ(*this, wrMutex);
920 wrMutex.UnLock();
921 }
922 LinkInfo.opMutex.UnLock();
923 return true;
924#endif
925}
926
927/******************************************************************************/
928/* s e t P r o t o c o l */
929/******************************************************************************/
930
932{
933
934// Set new protocol.
935//
936 LinkInfo.opMutex.Lock();
937 XrdProtocol *op = Protocol;
938 if (push) ProtoAlt = Protocol;
939 Protocol = pp;
940 LinkInfo.opMutex.UnLock();
941 return op;
942}
943
944/******************************************************************************/
945/* s e t P r o t N a m e */
946/******************************************************************************/
947
948void XrdLinkXeq::setProtName(const char *name)
949{
950
951// Set the protocol name.
952//
953 LinkInfo.opMutex.Lock();
954 Addr.SetDialect(name);
955 LinkInfo.opMutex.UnLock();
956}
957
958/******************************************************************************/
959/* s e t T L S */
960/******************************************************************************/
961
962bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
963{ //???
964// static const XrdTlsConnection::RW_Mode rwMode=XrdTlsConnection::TLS_RNB_WBL;
967 const char *eNote;
968 XrdTls::RC rc;
969
970// If we are already in a compatible mode, we are done
971//
972
973 if (isTLS == enable) return true;
974
975// If this is a shutdown, then do it now.
976//
977 if (!enable)
978 {tlsIO.Shutdown();
979 isTLS = enable;
980 Addr.SetTLS(enable);
981 return true;
982 }
983// We want to initialize TLS, do so now.
984//
985 if (!ctx) ctx = tlsCtx;
986 eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, false, ID);
987
988// Check for errors
989//
990 if (eNote)
991 {char buff[1024];
992 snprintf(buff, sizeof(buff), "Unable to enable tls for %s;", ID);
993 Log.Emsg("LinkXeq", buff, eNote);
994 return false;
995 }
996
997// Now we need to accept this TLS connection
998//
999 std::string eMsg;
1000 rc = tlsIO.Accept(&eMsg);
1001
1002// Diagnose return state
1003//
1004 if (rc != XrdTls::TLS_AOK) Log.Emsg("LinkXeq", eMsg.c_str());
1005 else {isTLS = enable;
1006 Addr.SetTLS(enable);
1007 Log.Emsg("LinkXeq", ID, "connection upgraded to", verTLS());
1008 }
1009 return rc == XrdTls::TLS_AOK;
1010}
1011
1012/******************************************************************************/
1013/* S F E r r o r */
1014/******************************************************************************/
1015
1017{
1018 Log.Emsg("TLS", rc, "send file to", ID);
1019 return -1;
1020}
1021
1022/******************************************************************************/
1023/* S h u t d o w n */
1024/******************************************************************************/
1025
1026void XrdLinkXeq::Shutdown(bool getLock)
1027{
1028 int temp;
1029
1030// Trace the entry
1031//
1032 TRACEI(DEBUG, (getLock ? "Async" : "Sync") <<" link shutdown in progress");
1033
1034// Get the lock if we need too (external entry via another thread)
1035//
1036 if (getLock) LinkInfo.opMutex.Lock();
1037
1038// If there is something to do, do it now
1039//
1040 temp = Instance; Instance = 0;
1041 if (!KeepFD)
1042 {shutdown(PollInfo.FD, SHUT_RDWR);
1043 if (dup2(devNull, PollInfo.FD) < 0)
1044 {Instance = temp;
1045 Log.Emsg("Link", errno, "shutdown FD for", ID);
1046 }
1047 }
1048
1049// All done
1050//
1051 if (getLock) LinkInfo.opMutex.UnLock();
1052}
1053
1054/******************************************************************************/
1055/* S t a t s */
1056/******************************************************************************/
1057
1058int XrdLinkXeq::Stats(char *buff, int blen, bool do_sync)
1059{
1060 static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
1061 "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1062 "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1063 "<sfps>%d</sfps></stats>";
1064 int i;
1065
1066// Check if actual length wanted
1067//
1068 if (!buff) return sizeof(statfmt)+17*6;
1069
1070// We must synchronize the statistical counters
1071//
1072 if (do_sync) XrdLinkCtl::SyncAll();
1073
1074// Obtain lock on the stats area and format it
1075//
1077 i = snprintf(buff, blen, statfmt, AtomicGet(LinkCount),
1087 return i;
1088}
1089
1090/******************************************************************************/
1091/* s y n c S t a t s */
1092/******************************************************************************/
1093
1095{
1096 long long tmpLL;
1097 int tmpI4;
1098
1099// If this is dynamic, get the opMutex lock
1100//
1101 if (!ctime) LinkInfo.opMutex.Lock();
1102
1103// Either the caller has the opMutex or this is called out of close. In either
1104// case, we need to get the read and write mutexes; each followed by the stats
1105// mutex. This order is important because we should not hold the stats mutex
1106// for very long and the r/w mutexes may take a long time to acquire. If we
1107// must maintain the link count we need to actually acquire the stats mutex as
1108// we will be doing compound operations. Atomics are still used to keep other
1109// threads from seeing partial results.
1110//
1112
1113 if (ctime)
1114 {*ctime = time(0) - LinkInfo.conTime;
1115 AtomicAdd(LinkConTime, *ctime);
1116 statsMutex.Lock();
1117 if (LinkCount > 0) AtomicDec(LinkCount);
1118 statsMutex.UnLock();
1119 }
1120
1122
1123 tmpLL = AtomicFAZ(BytesIn);
1124 AtomicAdd(LinkBytesIn, tmpLL); AtomicAdd(BytesInTot, tmpLL);
1125 tmpI4 = AtomicFAZ(tardyCnt);
1127 tmpI4 = AtomicFAZ(stallCnt);
1128 AtomicAdd(LinkStalls, tmpI4); AtomicAdd(stallCntTot, tmpI4);
1130
1132 tmpLL = AtomicFAZ(BytesOut);
1134 tmpI4 = AtomicFAZ(SfIntr);
1135 AtomicAdd(LinkSfIntr, tmpI4);
1137
1138// Make sure the protocol updates it's statistics as well
1139//
1140 if (Protocol) Protocol->Stats(0, 0, 1);
1141
1142// All done
1143//
1144 if (!ctime) LinkInfo.opMutex.UnLock();
1145}
1146
1147/******************************************************************************/
1148/* Protected: T L S _ E r r o r */
1149/******************************************************************************/
1150
1151int XrdLinkXeq::TLS_Error(const char *act, XrdTls::RC rc)
1152{
1153 std::string reason = XrdTls::RC2Text(rc);
1154 char msg[512];
1155
1156 snprintf(msg, sizeof(msg), "Unable to %s %s;", act, ID);
1157 Log.Emsg("TLS", msg, reason.c_str());
1158 return -1;
1159}
1160
1161/******************************************************************************/
1162/* T L S _ P e e k */
1163/******************************************************************************/
1164
1165int XrdLinkXeq::TLS_Peek(char *Buff, int Blen, int timeout)
1166{
1167 XrdSysMutexHelper theMutex;
1168 XrdTls::RC retc;
1169 int rc, rlen;
1170
1171// Lock the read mutex if we need to, the helper will unlock it upon exit
1172//
1173 if (LockReads) theMutex.Lock(&rdMutex);
1174
1175// Wait until we can actually read something
1176//
1177 isIdle = 0;
1178 if (timeout)
1179 {rc = Wait4Data(timeout);
1180 if (rc < 1) return rc;
1181 }
1182
1183// Do the peek and if sucessful, the number of bytes available.
1184//
1185 retc = tlsIO.Peek(Buff, Blen, rlen);
1186 if (retc == XrdTls::TLS_AOK) return rlen;
1187
1188// Dianose the TLS error and return failure
1189//
1190 return TLS_Error("peek on", retc);
1191}
1192
1193/******************************************************************************/
1194/* T L S _ R e c v */
1195/******************************************************************************/
1196
1197int XrdLinkXeq::TLS_Recv(char *Buff, int Blen)
1198{
1199 XrdSysMutexHelper theMutex;
1200 XrdTls::RC retc;
1201 int rlen;
1202
1203// Lock the read mutex if we need to, the helper will unlock it upon exit
1204//
1205 if (LockReads) theMutex.Lock(&rdMutex);
1206
1207// Note that we will read only as much as is queued. Use Recv() with a
1208// timeout to receive as much data as possible.
1209//
1210 isIdle = 0;
1211 retc = tlsIO.Read(Buff, Blen, rlen);
1212 if (retc != XrdTls::TLS_AOK) return TLS_Error("receive from", retc);
1213 if (rlen > 0) AtomicAdd(BytesIn, rlen);
1214 return rlen;
1215}
1216
1217/******************************************************************************/
1218
1219int XrdLinkXeq::TLS_Recv(char *Buff, int Blen, int timeout, bool havelock)
1220{
1221 XrdSysMutexHelper theMutex;
1222 XrdTls::RC retc;
1223 int pend, rlen, totlen = 0;
1224
1225// Lock the read mutex if we need to, the helper will unlock it upon exit
1226//
1227 if (LockReads && !havelock) theMutex.Lock(&rdMutex);
1228
1229// Wait up to timeout milliseconds for data to arrive
1230//
1231 isIdle = 0;
1232 while(Blen > 0)
1233 {pend = tlsIO.Pending(true);
1234 if (!pend) pend = Wait4Data(timeout);
1235 if (pend < 1)
1236 {if (pend < 0) return -1;
1237 tardyCnt++;
1238 if (totlen)
1239 {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
1240 AtomicAdd(BytesIn, totlen);
1241 }
1242 return totlen;
1243 }
1244
1245 // Read as much data as you can. Note that we will force an error
1246 // if we get a zero-length read after poll said it was OK. However,
1247 // if we never read anything, then we simply return -ENOMSG to avoid
1248 // generating a "read link error" as clearly there was a hangup.
1249 //
1250 retc = tlsIO.Read(Buff, Blen, rlen);
1251 if (retc != XrdTls::TLS_AOK)
1252 {if (!totlen) return -ENOMSG;
1253 AtomicAdd(BytesIn, totlen);
1254 return TLS_Error("receive from", retc);
1255 }
1256 if (rlen <= 0) break;
1257 totlen += rlen; Blen -= rlen; Buff += rlen;
1258 }
1259
1260 AtomicAdd(BytesIn, totlen);
1261 return totlen;
1262}
1263
1264/******************************************************************************/
1265
1266int XrdLinkXeq::TLS_Recv(const struct iovec *iov, int iocnt, int timeout)
1267{
1268 XrdSysMutexHelper theMutex;
1269 char *Buff;
1270 int Blen, rlen, totlen = 0;
1271
1272// Lock the read mutex if we need to, the helper will unlock it upon exit
1273//
1274 if (LockReads) theMutex.Lock(&rdMutex);
1275
1276// Individually process each element until we can't read any more
1277//
1278 isIdle = 0;
1279 for (int i = 0; i < iocnt; i++)
1280 {Buff = (char *)iov[i].iov_base;
1281 Blen = iov[i].iov_len;
1282 rlen = TLS_Recv(Buff, Blen, timeout, true);
1283 if (rlen <= 0) break;
1284 totlen += rlen;
1285 if (rlen < Blen) break;
1286 }
1287
1288 if (totlen) {AtomicAdd(BytesIn, totlen);}
1289 return totlen;
1290}
1291
1292/******************************************************************************/
1293/* T L S _ R e c v A l l */
1294/******************************************************************************/
1295
1296int XrdLinkXeq::TLS_RecvAll(char *Buff, int Blen, int timeout)
1297{
1298 int retc;
1299
1300// Check if timeout specified. Notice that the timeout is the max we will
1301// wait for some data. We will wait forever for all the data. Yeah, it's weird.
1302//
1303 if (timeout >= 0)
1304 {retc = tlsIO.Pending(true);
1305 if (!retc) retc = Wait4Data(timeout);
1306 if (retc < 1) return (retc ? -1 : -ETIMEDOUT);
1307 }
1308
1309// Note that we will block until we receive all the bytes.
1310//
1311 return TLS_Recv(Buff, Blen, -1);
1312}
1313
1314/******************************************************************************/
1315/* T L S _ S e n d */
1316/******************************************************************************/
1317
1318int XrdLinkXeq::TLS_Send(const char *Buff, int Blen)
1319{
1321 ssize_t bytesleft = Blen;
1322 XrdTls::RC retc;
1323 int byteswritten;
1324
1325// Prepare to send
1326//
1327 isIdle = 0;
1328 AtomicAdd(BytesOut, Blen);
1329
1330// Do non-blocking writes if we are setup to do so.
1331//
1332 if (sendQ) return sendQ->Send(Buff, Blen);
1333
1334// Write the data out
1335//
1336 while(bytesleft)
1337 {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1338 if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1339 bytesleft -= byteswritten; Buff += byteswritten;
1340 }
1341
1342// All done
1343//
1344 return Blen;
1345}
1346
1347/******************************************************************************/
1348
1349int XrdLinkXeq::TLS_Send(const struct iovec *iov, int iocnt, int bytes)
1350{
1352 XrdTls::RC retc;
1353 int byteswritten;
1354
1355// Get a lock and assume we will be successful (statistically we are). Note
1356// that the calling interface gauranteed bytes are not zero.
1357//
1358 isIdle = 0;
1359 AtomicAdd(BytesOut, bytes);
1360
1361// Do non-blocking writes if we are setup to do so.
1362//
1363 if (sendQ) return sendQ->Send(iov, iocnt, bytes);
1364
1365// Write the data out.
1366//
1367 for (int i = 0; i < iocnt; i++)
1368 {ssize_t bytesleft = iov[i].iov_len;
1369 char *Buff = (char *)iov[i].iov_base;
1370 while(bytesleft)
1371 {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1372 if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1373 bytesleft -= byteswritten; Buff += byteswritten;
1374 }
1375 }
1376
1377// All done
1378//
1379 return bytes;
1380}
1381
1382/******************************************************************************/
1383
1384int XrdLinkXeq::TLS_Send(const sfVec *sfP, int sfN)
1385{
1387 int bytes, buffsz, fileFD, retc;
1388 off_t offset;
1389 ssize_t totamt = 0;
1390 char myBuff[65536];
1391
1392// Convert the sendfile to a regular send. The conversion is not particularly
1393// fast and caller are advised to avoid using sendfile on TLS connections.
1394//
1395 isIdle = 0;
1396 for (int i = 0; i < sfN; sfP++, i++)
1397 {if (!(bytes = sfP->sendsz)) continue;
1398 totamt += bytes;
1399 if (sfP->fdnum < 0)
1400 {if (!TLS_Write(sfP->buffer, bytes)) return -1;
1401 continue;
1402 }
1403 offset = sfP->offset;
1404 fileFD = sfP->fdnum;
1405 buffsz = (bytes < (int)sizeof(myBuff) ? bytes : sizeof(myBuff));
1406 do {do {retc = pread(fileFD, myBuff, buffsz, offset);}
1407 while(retc < 0 && errno == EINTR);
1408 if (retc < 0) return SFError(errno);
1409 if (!retc) break;
1410 if (!TLS_Write(myBuff, buffsz)) return -1;
1411 offset += buffsz; bytes -= buffsz; totamt += retc;
1412 } while(bytes > 0);
1413 }
1414
1415// We are done
1416//
1417 AtomicAdd(BytesOut, totamt);
1418 return totamt;
1419}
1420
1421/******************************************************************************/
1422/* Protected: T L S _ W r i t e */
1423/******************************************************************************/
1424
1425bool XrdLinkXeq::TLS_Write(const char *Buff, int Blen)
1426{
1427 XrdTls::RC retc;
1428 int byteswritten;
1429
1430// Write the data out
1431//
1432 while(Blen)
1433 {retc = tlsIO.Write(Buff, Blen, byteswritten);
1434 if (retc != XrdTls::TLS_AOK)
1435 {TLS_Error("write to", retc);
1436 return false;
1437 }
1438 Blen -= byteswritten; Buff += byteswritten;
1439 }
1440
1441// All done
1442//
1443 return true;
1444}
1445
1446/******************************************************************************/
1447/* v e r T L S */
1448/******************************************************************************/
1449
1451{
1452 return tlsIO.Version();
1453}
#define DEBUG(x)
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define writev(a, b, c)
Definition XrdPosix.hh:117
#define readv(a, b, c)
Definition XrdPosix.hh:84
#define read(a, b, c)
Definition XrdPosix.hh:82
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
#define eMsg(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACEI(act, x)
Definition XrdTrace.hh:66
const char * Comment
Definition XrdJob.hh:47
static void SyncAll()
Synchronize statustics for ll links.
static void Unhook(int fd)
Unhook a link from the active table of links.
static const char * TraceID
int TLS_Send(const char *Buff, int Blen)
long long BytesOut
int TLS_Error(const char *act, XrdTls::RC rc)
int TLS_Peek(char *Buff, int Blen, int timeout)
int Client(char *buff, int blen)
char Uname[24]
XrdTlsPeerCerts * getPeerCerts()
static int LinkCountMax
XrdLinkInfo LinkInfo
XrdProtocol * ProtoAlt
int Close(bool defer=false)
XrdNetAddr Addr
int TLS_Recv(char *Buff, int Blen)
int sendData(const char *Buff, int Blen)
long long BytesInTot
bool TLS_Write(const char *Buff, int Blen)
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
static long long LinkCountTot
long long BytesOutTot
void Shutdown(bool getLock)
int Peek(char *buff, int blen, int timeout=-1)
static int LinkCount
void Reset()
int Backlog()
XrdSysMutex wrMutex
static int Stats(char *buff, int blen, bool do_sync=false)
XrdSendQ * sendQ
XrdPollInfo PollInfo
void setID(const char *userid, int procid)
int Recv(char *buff, int blen)
static long long LinkBytesIn
int TLS_RecvAll(char *Buff, int Blen, int timeout)
int SFError(int rc)
long long BytesIn
int Send(const char *buff, int blen)
XrdSysMutex rdMutex
const char * verTLS()
bool setNB()
int RecvIOV(const struct iovec *iov, int iocnt)
char Lname[256]
static long long LinkConTime
static int LinkSfIntr
XrdTlsSocket tlsIO
void DoIt()
int RecvAll(char *buff, int blen, int timeout=-1)
XrdProtocol * Protocol
bool Register(const char *hName)
static XrdSysMutex statsMutex
void setProtName(const char *name)
static int LinkStalls
static long long LinkBytesOut
void syncStats(int *ctime=0)
bool setTLS(bool enable, XrdTlsContext *ctx=0)
static int LinkTimeOuts
static char * Poll2Text(short events)
Definition XrdPoll.cc:272
static void Detach(XrdPollInfo &pInfo)
Definition XrdPoll.cc:177
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
long long bytesOut
Bytes written to the socket.
int consec
Seconds connected.
long long bytesIn
Bytes read from the socket.
const char * tident
Pointer to the client's trace identifier.
@ TLS_HS_BLOCK
Always block during handshake.
@ TLS_RBL_WBL
blocking read blocking write
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
Definition XrdTls.cc:127
@ TLS_AOK
All went well, will always be zero.
Definition XrdTls.hh:40
XrdTlsContext * tlsCtx
Definition XrdGlobals.cc:52
XrdTcpMonPin * TcpMonPin
Definition XrdLinkXeq.cc:80
const int maxIOV
Definition XrdLinkXeq.cc:82
XrdSysError Log
Definition XrdConfig.cc:113
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
int getIovMax()
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.