XRootD
Loading...
Searching...
No Matches
XrdXrootdTransit.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d T r a n s i t . c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <unistd.h>
33#include <sys/uio.h>
34
36
38
39#include "Xrd/XrdBuffer.hh"
40#include "Xrd/XrdLink.hh"
42#include "XrdOuc/XrdOucUtils.hh"
50
51/******************************************************************************/
52/* C l o b a l S y m b o l s */
53/******************************************************************************/
54
56
57#undef TRACELINK
58#define TRACELINK Link
59
60#define XRD_GETNUM(x)\
61 ntohl(*(static_cast<unsigned int *>(static_cast<void *>(x))))
62
63/******************************************************************************/
64/* S t a t i c M e m b e r s */
65/******************************************************************************/
66
67const char *XrdXrootdTransit::reqTab = XrdXrootdTransit::ReqTable();
68
70 XrdXrootdTransit::TranStack("TranStack",
71 "transit protocol anchor");
72
73/******************************************************************************/
74/* A l l o c */
75/******************************************************************************/
76
78 XrdLink *linkP,
79 XrdSecEntity *seceP,
80 const char *nameP,
81 const char *protP
82 )
83{
85
86// Simply return a new transit object masquerading as a bridge
87//
88 if (!(xp = TranStack.Pop())) xp = new XrdXrootdTransit();
89 xp->Init(rsltP, linkP, seceP, nameP, protP);
90 return xp;
91}
92
93/******************************************************************************/
94/* A t t n */
95/******************************************************************************/
96
97int XrdXrootdTransit::Attn(XrdLink *lP, short *theSID, int rcode,
98 const struct iovec *ioV, int ioN, int ioL)
99{
101
102// Find the request
103//
104 if (!(tP = XrdXrootdTransPend::Remove(lP, *theSID)))
105 {TRACE(REQ, "Unable to find request for " <<lP->ID <<" sid=" <<*theSID);
106 return 0;
107 }
108
109// Resume the request as we have been waiting for the response.
110//
111 return tP->bridge->AttnCont(tP, rcode, ioV, ioN, ioL);
112}
113
114/******************************************************************************/
115/* A t t n C o n t */
116/******************************************************************************/
117
118int XrdXrootdTransit::AttnCont(XrdXrootdTransPend *tP, int rcode,
119 const struct iovec *ioV, int ioN, int ioL)
120{
121 int rc;
122
123// Refresh the request structure
124//
125 memcpy(&Request, &(tP->Pend.Request), sizeof(Request));
126 delete tP;
127 runWait = 0;
128
129// Reissue the request if it's a wait 0 response.
130//
131 if (rcode==kXR_wait
132 && (!ioN || XRD_GETNUM(ioV[0].iov_base) == 0))
133 {Sched->Schedule((XrdJob *)&waitJob);
134 return 0;
135 }
136
137// Send off the deferred response
138//
139 rc = Send(rcode, ioV, ioN, ioL);
140
141// Handle end based on current state
142//
143 if (rc >= 0 && !runWait)
144 {if (runDone)
145 {AtomicBeg(runMutex);
146 AtomicZAP(runStatus);
147 AtomicEnd(runMutex);
148 }
149 if (reInvoke) Sched->Schedule((XrdJob *)&respJob);
150 else Link->Enable();
151 }
152
153// All done
154//
155 return rc;
156}
157
158/******************************************************************************/
159/* D i s c */
160/******************************************************************************/
161
163{
164 char buff[128];
165 int rc;
166
167// We do not allow disconnection while we are active
168//
169 AtomicBeg(runMutex);
170 rc = AtomicInc(runStatus);
171 AtomicEnd(runMutex);
172 if (rc) return false;
173
174// Reconnect original protocol to the link
175//
176 Link->setProtocol(realProt);
177
178// Now we need to recycle our xrootd part
179//
180 sprintf(buff, "%s disconnection", pName);
181 XrdXrootdProtocol::Recycle(Link, time(0)-cTime, buff);
182
183// Now just free up our object.
184//
185 TranStack.Push(&TranLink);
186 return true;
187}
188
189/******************************************************************************/
190/* Private: F a i l */
191/******************************************************************************/
192
193bool XrdXrootdTransit::Fail(int ecode, const char *etext)
194{
195 runError = ecode;
196 runEText = etext;
197 return true;
198}
199
200/******************************************************************************/
201/* F a t a l */
202/******************************************************************************/
203
204int XrdXrootdTransit::Fatal(int rc)
205{
206 XrdXrootd::Bridge::Context rInfo(Link, Request.header.streamid,
207 Request.header.requestid);
208
209 return (respObj->Error(rInfo, runError, runEText) ? rc : -1);
210}
211
212/******************************************************************************/
213/* I n i t */
214/******************************************************************************/
215
216void XrdXrootdTransit::Init(XrdScheduler *schedP, int qMax, int qTTL)
217{
218 TranStack.Set(schedP, &XrdXrootdTrace, TRACE_MEM);
219 TranStack.Set(qMax, qTTL);
220}
221
222/******************************************************************************/
223
225 XrdLink *linkP,
226 XrdSecEntity *seceP,
227 const char *nameP,
228 const char *protP
229 )
230{
231 XrdNetAddrInfo *addrP;
232 const char *who;
233 char uname[sizeof(Request.login.username)+1];
234
235// Set standard stuff
236//
237 runArgs = 0;
238 runALen = 0;
239 runABsz = 0;
240 runError = 0;
241 runStatus = 0;
242 runWait = 0;
243 runWTot = 0;
244 runWMax = 3600;
245 runWCall = false;
246 runDone = false;
247 reInvoke = false;
248 wBuff = 0;
249 wBLen = 0;
250 respObj = respP;
251 pName = protP;
252 mySID = getSID();
253
254// Bind the protocol to the link
255//
256 SI->Bump(SI->Count);
257 Link = linkP;
258 Response.Set(linkP);
259 Response.Set(this);
260 strcpy(Entity.prot, "host");
261 Entity.host = (char *)linkP->Host();
262
263// Develop a trace identifier
264//
265 strncpy(uname, nameP, sizeof(uname)-1);
266 uname[sizeof(uname)-1] = 0;
268 linkP->setID(uname, mySID);
269
270// Place trace identifier everywhere is should be located
271
272// Indicate that this brige supports asynchronous responses
273//
275
276// Mark the client as IPv4 if they came in as IPv4 or mapped IPv4. Note
277// there is no way we can figure out if this is a dual-stack client.
278//
279 addrP = Link->AddrInfo();
280 if (addrP->isIPType(XrdNetAddrInfo::IPv4) || addrP->isMapped())
282
283// Mark the client as being on a private net if the address is private
284//
285 if (addrP->isPrivate()) {clientPV |= XrdOucEI::uPrip; rdType = 1;}
286 else rdType = 0;
287
288// Now tie the security information
289//
290 Client = (seceP ? seceP : &Entity);
291 Client->ueid = mySID;
292 Client->tident = Client->pident = Link->ID;
293 Client->addrInfo = addrP;
294
295// Allocate a monitoring object, if needed for this connection and record login
296//
297 if (Monitor.Ready())
298 {Monitor.Register(linkP->ID, linkP->Host(), protP);
299 if (Monitor.Logins())
300 {if (Monitor.Auths() && seceP) MonAuth();
301 else Monitor.Report(Monitor.Auths() ? "" : 0);
302 }
303 }
304
305// Complete the request ID object
306//
307 ReqID.setID(Request.header.streamid, linkP->FDnum(), linkP->Inst());
308
309// Substitute our protocol for the existing one
310//
311 realProt = linkP->setProtocol(this);
312 linkP->setProtName(protP);
313 linkP->armBridge();
314
315// Document this login
316//
317 who = (seceP && seceP->name ? seceP->name : "nobody");
318 eDest.Log(SYS_LOG_01, "Bridge", Link->ID, "login as", who);
319
320// All done, indicate we are logged in
321//
323 cTime = time(0);
324
325// Propogate a connect through the whole system
326//
327 osFS->Connect(Client);
328}
329
330/******************************************************************************/
331/* P r o c e e d */
332/******************************************************************************/
333
335{
336 int rc;
337
338// If we were interrupted in a reinvoke state, resume that state.
339//
340 if (reInvoke) rc = Process(Link);
341 else rc = 0;
342
343// Handle ending status
344//
345 if (rc >= 0) Link->Enable();
346 else if (rc != -EINPROGRESS) Link->Close();
347}
348
349/******************************************************************************/
350/* P r o c e s s */
351/******************************************************************************/
352
354{
355 int rc;
356
357// This entry is serialized via link processing and data is now available.
358// One of the following will be returned.
359//
360// < 0 -> Stop getting requests,
361// -EINPROGRESS leave link disabled but otherwise all is well
362// -n Error, disable and close the link
363// = 0 -> OK, get next request, if allowed, o/w enable the link
364// > 0 -> Slow link, stop getting requests and enable the link
365//
366
367// Reflect data is present to the underlying protocol and if Run() has been
368// called we need to dispatch that request. This may be iterative.
369//
370do{rc = realProt->Process((reInvoke ? 0 : lp));
371 if (rc >= 0 && runStatus)
372 {reInvoke = (rc == 0);
373 if (runError) rc = Fatal(rc);
374 else {runDone = false;
376 if (rc >= 0)
377 {if (runWait) rc = -EINPROGRESS;
378 if (!runDone) return rc;
379 AtomicBeg(runMutex);
380 AtomicZAP(runStatus);
381 AtomicEnd(runMutex);
382 }
383 }
384 } else reInvoke = false;
385 } while(rc >= 0 && reInvoke);
386
387// Make sure that we indicate that we are no longer active
388//
389 if (runStatus)
390 {AtomicBeg(runMutex);
391 AtomicZAP(runStatus);
392 AtomicEnd(runMutex);
393 }
394
395// All done
396//
397 return (rc ? rc : 1);
398}
399
400/******************************************************************************/
401/* R e c y c l e */
402/******************************************************************************/
403
404void XrdXrootdTransit::Recycle(XrdLink *lp, int consec, const char *reason)
405{
406
407// Set ourselves as active so we can't get more requests
408//
409 AtomicBeg(runMutex);
410 AtomicInc(runStatus);
411 AtomicEnd(runMutex);
412
413// If we were active then we will need to quiesce before dismantling ourselves.
414// Note that Recycle() can only be called if the link is enabled. So, this bit
415// of code is improbable but we check it anyway.
416//
417 if (runWait > 0) Sched->Cancel(&waitJob);
418
419// First we need to recycle the real protocol
420//
421 if (realProt) realProt->Recycle(lp, consec, reason);
422
423// Now we need to recycle our xrootd part
424//
425 XrdXrootdProtocol::Recycle(lp, consec, reason);
426
427// Release the argument buffer
428//
429 if (runArgs) {free(runArgs); runArgs = 0;}
430
431// Delete all pending requests
432//
434
435// Now just free up our object.
436//
437 TranStack.Push(&TranLink);
438}
439
440/******************************************************************************/
441/* R e d r i v e */
442/******************************************************************************/
443
445{
446 static int eCode = htonl(kXR_NoMemory);
447 static char eText[] = "Insufficent memory to re-issue request";
448 static struct iovec ioV[] = {{(char *)&eCode,sizeof(eCode)},
449 {(char *)&eText,sizeof(eText)}};
450 int rc;
451
452// Do some tracing
453//
454 TRACEP(REQ, "Bridge redrive runStatus="<<runStatus<<" runError="<<runError
455 <<" runWait="<<runWait<<" runWTot="<<runWTot);
456
457// Update wait statistics
458//
459 runWTot += runWait;
460 runWait = 0;
461
462// While we are running asynchronously, there is no way that this object can
463// be deleted while a timer is outstanding as the link has been disabled. So,
464// we can reissue the request with little worry.
465//
466// This is a bit tricky here as a redriven request may result in a wait. If
467// this happens we cannot hand the result off to the real protocol until we
468// wait and successfully redrive. The wait handling occurs asynchronously
469// so all we need to do is honor it here.
470//
471 if (!runALen || RunCopy(runArgs, runALen)) {
472 do{rc = Process2();
473 TRACEP(REQ, "Bridge redrive Process2 rc="<<rc
474 <<" runError="<<runError<<" runWait="<<runWait);
475 if (rc == 0 && !runWait && !runError) {
476 rc = realProt->Process(NULL);
477 TRACEP(REQ, "Bridge redrive callback rc="<<rc
478 <<" runStatus="<<runStatus);
479 }
480 if (runStatus)
481 {AtomicBeg(runMutex);
482 AtomicZAP(runStatus);
483 AtomicEnd(runMutex);
484 }
485 } while((rc == 0) && !runError && !runWait);
486 }
487 else rc = Send(kXR_error, ioV, 2, 0);
488
489// Defer the request if need be
490//
491 if (rc >= 0 && runWait) return;
492 runWTot = 0;
493
494// Indicate we are no longer active
495//
496 if (runStatus)
497 {AtomicBeg(runMutex);
498 AtomicZAP(runStatus);
499 AtomicEnd(runMutex);
500 }
501
502// If the link needs to be terminated, terminate the link. Otherwise, we can
503// enable the link for new requests at this point.
504//
505 if (rc < 0) Link->Close();
506 else Link->Enable();
507}
508
509/******************************************************************************/
510/* R e q T a b l e */
511/******************************************************************************/
512
513#define KXR_INDEX(x) x-kXR_auth
514
516{
517 static char rTab[kXR_truncate-kXR_auth+1];
518
519// Initialize the table
520//
521 memset(rTab, 0, sizeof(rTab));
522 rTab[KXR_INDEX(kXR_chmod)] = 1;
523 rTab[KXR_INDEX(kXR_close)] = 1;
524 rTab[KXR_INDEX(kXR_dirlist)] = 1;
525 rTab[KXR_INDEX(kXR_locate)] = 1;
526 rTab[KXR_INDEX(kXR_mkdir)] = 1;
527 rTab[KXR_INDEX(kXR_mv)] = 1;
528 rTab[KXR_INDEX(kXR_open)] = 1;
529 rTab[KXR_INDEX(kXR_prepare)] = 1;
530 rTab[KXR_INDEX(kXR_protocol)] = 1;
531 rTab[KXR_INDEX(kXR_query)] = 1;
532 rTab[KXR_INDEX(kXR_read)] = 2;
533 rTab[KXR_INDEX(kXR_readv)] = 2;
534 rTab[KXR_INDEX(kXR_rm)] = 1;
535 rTab[KXR_INDEX(kXR_rmdir)] = 1;
536 rTab[KXR_INDEX(kXR_set)] = 1;
537 rTab[KXR_INDEX(kXR_stat)] = 1;
538 rTab[KXR_INDEX(kXR_statx)] = 1;
539 rTab[KXR_INDEX(kXR_sync)] = 1;
540 rTab[KXR_INDEX(kXR_truncate)] = 1;
541 rTab[KXR_INDEX(kXR_write)] = 2;
542
543// Now return the address
544//
545 return rTab;
546}
547
548/******************************************************************************/
549/* Private: R e q W r i t e */
550/******************************************************************************/
551
552bool XrdXrootdTransit::ReqWrite(char *xdataP, int xdataL)
553{
554
555// Make sure we always transit to the resume point
556//
557 myBlen = 0;
558
559// If nothing was read, then this is a straight-up write
560//
561 if (!xdataL || !xdataP || !Request.header.dlen)
562 {Resume = 0; wBuff = xdataP; wBLen = xdataL;
563 return true;
564 }
565
566// Partial data was read, we may have to split this between a direct write
567// and a network read/write -- somewhat complicated.
568//
569 myBuff = wBuff = xdataP;
570 myBlast = wBLen = xdataL;
572 return true;
573}
574
575/******************************************************************************/
576/* R u n */
577/******************************************************************************/
578
579bool XrdXrootdTransit::Run(const char *xreqP, char *xdataP, int xdataL)
580{
581 int movLen, rc;
582
583// We do not allow re-entry if we are curently processing a request.
584// It will be reset, as need, when a response is effected.
585//
586 AtomicBeg(runMutex);
587 rc = AtomicInc(runStatus);
588 AtomicEnd(runMutex);
589 if (rc)
590 {TRACEP(REQ, "Bridge request failed due to re-entry");
591 return false;
592 }
593
594// Copy the request header
595//
596 memcpy((void *)&Request, (void *)xreqP, sizeof(Request));
597
598// Validate that we can actually handle this request
599//
600 Request.header.requestid = ntohs(Request.header.requestid);
601 if (Request.header.requestid & 0x8000
602 || Request.header.requestid > static_cast<kXR_unt16>(kXR_truncate)
603 || !reqTab[Request.header.requestid - kXR_auth])
604 {TRACEP(REQ, "Unsupported bridge request");
605 return Fail(kXR_Unsupported, "Unsupported bridge request");
606 }
607
608// Validate the data length
609//
610 Request.header.dlen = ntohl(Request.header.dlen);
611 if (Request.header.dlen < 0)
612 {TRACEP(REQ, "Invalid request data length");
613 return Fail(kXR_ArgInvalid, "Invalid request data length");
614 }
615
616// Copy the stream id and trace this request
617//
618 Response.Set(Request.header.streamid);
619 TRACEP(REQ, "Bridge req=" <<Request.header.requestid
620 <<" dlen=" <<Request.header.dlen <<" blen=" <<xdataL);
621
622// If this is a write request, we will need to do a lot more
623//
624 if (Request.header.requestid == kXR_write) return ReqWrite(xdataP, xdataL);
625
626// Obtain any needed buffer and handle any existing data arguments. Also, we
627// need to keep a shadow copy of the request arguments should we get a wait
628// and will need to re-issue the request (the server mangles the args).
629//
630 if (Request.header.dlen)
631 {movLen = (xdataL < Request.header.dlen ? xdataL : Request.header.dlen);
632 if (!RunCopy(xdataP, movLen)) return true;
633 if (!runArgs || movLen > runABsz)
634 {if (runArgs) free(runArgs);
635 if (!(runArgs = (char *)malloc(movLen)))
636 {TRACEP(REQ, "Failed to allocate memory");
637 return Fail(kXR_NoMemory, "Insufficient memory");
638 }
639 runABsz = movLen;
640 }
641 memcpy(runArgs, xdataP, movLen); runALen = movLen;
642 if ((myBlen = Request.header.dlen - movLen))
643 {myBuff = argp->buff + movLen;
645 return true;
646 }
647 } else runALen = 0;
648
649// If we have all the data, indicate request accepted.
650//
651 runError = 0;
652 Resume = 0;
653 return true;
654}
655
656/******************************************************************************/
657/* Privae: R u n C o p y */
658/******************************************************************************/
659
660bool XrdXrootdTransit::RunCopy(char *buffP, int buffL)
661{
662
663// Allocate a buffer if we do not have one or it is too small
664//
665 if (!argp || Request.header.dlen+1 > argp->bsize)
666 {if (argp) BPool->Release(argp);
667 if (!(argp = BPool->Obtain(Request.header.dlen+1)))
668 {Fail(kXR_ArgTooLong, "Request argument too long"); return false;}
669 hcNow = hcPrev; halfBSize = argp->bsize >> 1;
670 }
671
672// Copy the arguments to the buffer
673//
674 memcpy(argp->buff, buffP, buffL);
675 argp->buff[buffL] = 0;
676 return true;
677}
678
679/******************************************************************************/
680/* S e n d */
681/******************************************************************************/
682
683int XrdXrootdTransit::Send(int rcode, const struct iovec *ioV, int ioN, int ioL)
684{
685 XrdXrootd::Bridge::Context rInfo(Link, Request.header.streamid,
686 Request.header.requestid);
687 const char *eMsg;
688 int rc;
689 bool aOK;
690
691// Invoke the result object (we initially assume this is the final result)
692//
693 runDone = true;
694 switch(rcode)
695 {case kXR_error:
696 rc = XRD_GETNUM(ioV[0].iov_base);
697 eMsg = (ioN < 2 ? "" : (const char *)ioV[1].iov_base);
698 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
699 aOK = respObj->Error(rInfo, rc, eMsg);
700 break;
701 case kXR_ok:
702 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
703 aOK = (ioN ? respObj->Data(rInfo, ioV, ioN, ioL, true)
704 : respObj->Done(rInfo));
705 break;
706 case kXR_oksofar:
707 aOK = respObj->Data(rInfo, ioV, ioN, ioL, false);
708 runDone = false;
709 break;
710 case kXR_redirect:
711 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
712 rc = XRD_GETNUM(ioV[0].iov_base);
713 aOK = respObj->Redir(rInfo,rc,(const char *)ioV[1].iov_base);
714 break;
715 case kXR_wait:
716 return Wait(rInfo, ioV, ioN, ioL);
717 break;
718 case kXR_waitresp:
719 runDone = false;
720 return WaitResp(rInfo, ioV, ioN, ioL);
721 break;
722 default: if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
723 aOK = respObj->Error(rInfo, kXR_ServerError,
724 "internal logic error");
725 break;
726 };
727
728// All done
729//
730 return (aOK ? 0 : -1);
731}
732
733/******************************************************************************/
734
735int XrdXrootdTransit::Send(long long offset, int dlen, int fdnum)
736{
737 XrdXrootdTransSend sfInfo(Link, Request.header.streamid,
738 Request.header.requestid,
739 offset, dlen, fdnum);
740
741// Effect callback (this is always a final result)
742//
743 runDone = true;
744 return (respObj->File(sfInfo, dlen) ? 0 : -1);
745}
746
747/******************************************************************************/
748
749int XrdXrootdTransit::Send(XrdOucSFVec *sfvec, int sfvnum, int dlen)
750{
751 XrdXrootdTransSend sfInfo(Link, Request.header.streamid,
752 Request.header.requestid,
753 sfvec, sfvnum, dlen);
754
755// Effect callback (this is always a final result)
756//
757 runDone = true;
758 return (respObj->File(sfInfo, dlen) ? 0 : -1);
759}
760
761/******************************************************************************/
762/* Private: W a i t */
763/******************************************************************************/
764
765int XrdXrootdTransit::Wait(XrdXrootd::Bridge::Context &rInfo,
766 const struct iovec *ioV, int ioN, int ioL)
767{
768 const char *eMsg;
769
770// Trace this request if need be
771//
772 runWait = XRD_GETNUM(ioV[0].iov_base);
773 eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
774
775// Check if the protocol wants to handle all waits
776//
777 if (runWMax <= 0)
778 {int wtime = runWait;
779 runWait = 0;
780 return (respObj->Wait(rInfo, wtime, eMsg) ? 0 : -1);
781 }
782
783// Check if we have exceeded the maximum wait time
784//
785 if (runWTot >= runWMax)
786 {runDone = true;
787 runWait = 0;
788 return (respObj->Error(rInfo, kXR_Cancelled, eMsg) ? 0 : -1);
789 }
790
791// Readjust wait time
792//
793 if (runWait > runWMax) runWait = runWMax;
794
795// Check if the protocol wants a wait notification
796//
797 if (runWCall && !(respObj->Wait(rInfo, runWait, eMsg))) return -1;
798
799// All done, schedule the wait
800//
801 TRACEP(REQ, "Bridge delaying request " <<runWait <<" sec (" <<eMsg <<")");
802 Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
803 return 0;
804}
805
806/******************************************************************************/
807/* Private: W a i t R e s p */
808/******************************************************************************/
809
810int XrdXrootdTransit::WaitResp(XrdXrootd::Bridge::Context &rInfo,
811 const struct iovec *ioV, int ioN, int ioL)
812{
813 XrdXrootdTransPend *trP;
814 const char *eMsg;
815 int wTime;
816
817// Trace this request if need be
818//
819 wTime = XRD_GETNUM(ioV[0].iov_base);
820 eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
821 TRACEP(REQ, "Bridge waiting for resp; sid=" <<rInfo.sID.num
822 <<" wt=" <<wTime <<" (" <<eMsg <<")");
823
824// We would issue callback to see how we should handle this. However, we can't
825// predictably handle a waitresp. So that means we will just wait for a resp.
826//
827// XrdXrootd::Bridge::Result *newCBP = respObj->WaitResp(rInfo, runWait, eMsg);
828
829// Save the current state
830//
831 trP = new XrdXrootdTransPend(Link, this, &Request);
832 trP->Queue();
833
834// Effect a wait
835//
836 runWait = -1;
837 return 0;
838}
@ kXR_ArgInvalid
Definition XProtocol.hh:990
@ kXR_Unsupported
@ kXR_Cancelled
@ kXR_ServerError
@ kXR_ArgTooLong
Definition XProtocol.hh:992
@ kXR_NoMemory
Definition XProtocol.hh:998
kXR_char username[8]
Definition XProtocol.hh:396
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ClientRequestHdr header
Definition XProtocol.hh:846
struct ClientLoginRequest login
Definition XProtocol.hh:857
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_auth
Definition XProtocol.hh:112
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_statx
Definition XProtocol.hh:134
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_prepare
Definition XProtocol.hh:133
@ kXR_asyncap
Definition XProtocol.hh:378
@ kXR_ver002
Definition XProtocol.hh:386
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
XrdOucTrace * XrdXrootdTrace
#define eMsg(x)
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicZAP(x)
#define AtomicEnd(Mtx)
const int SYS_LOG_01
#define TRACE_MEM
Definition XrdTrace.hh:38
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define XRD_LOGGEDIN
#define TRACEP(act, x)
#define XRD_GETNUM(x)
#define KXR_INDEX(x)
void Release(XrdBuffer *bp)
Definition XrdBuffer.cc:221
XrdBuffer * Obtain(int bsz)
Definition XrdBuffer.cc:140
friend class XrdScheduler
Definition XrdJob.hh:44
XrdJob(const char *desc="")
Definition XrdJob.hh:51
bool isMapped() const
bool isIPType(IPType ipType) const
void Bump(int &val)
static void Sanitize(char *instr, char subc='_')
void Schedule(XrdJob *jp)
char prot[XrdSecPROTOIDSIZE]
Auth protocol used (e.g. krb5)
char * name
Entity's name.
char * host
Entity's host name dnr dependent.
static XrdXrootdStats * SI
static XrdSysError & eDest
static unsigned int getSID()
XrdXrootdMonitor::User Monitor
int(XrdXrootdProtocol::* Resume)()
static XrdScheduler * Sched
int Process(XrdLink *lp) override
void Recycle(XrdLink *lp, int consec, const char *reason) override
XrdXrootdResponse Response
static XrdBuffManager * BPool
static XrdSfsFileSystem * osFS
void Set(XrdLink *lp)
static XrdXrootdTransPend * Remove(XrdLink *lP, short sid)
union XrdXrootdTransPend::@371012140333040222300212162025004307132302363251 Pend
XrdXrootdTransit * bridge
static void Clear(XrdXrootdTransit *trP)
bool Run(const char *xreqP, char *xdataP=0, int xdataL=0)
Inject an xrootd request into the protocol stack.
static const char * ReqTable()
Initialize the valid request table.
void Redrive()
Redrive a request after a wait.
int Send(int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle request data response.
void Recycle(XrdLink *lp, int consec, const char *reason)
Handle link shutdown.
static void Init(XrdScheduler *schedP, int qMax, int qTTL)
Perform one-time initialization.
static XrdXrootdTransit * Alloc(XrdXrootd::Bridge::Result *respP, XrdLink *linkP, XrdSecEntity *seceP, const char *nameP, const char *protP)
Get a new transit object.
static int Attn(XrdLink *lP, short *theSID, int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle attention response (i.e. async response)
XrdXrootdTransit()
Constructor & Destructor.
void Proceed()
Resume processing after a waitresp completion.
bool Disc()
Handle dismantlement.
int Process(XrdLink *lp)
Handle link activation (replaces parent activation).
union XrdXrootd::Bridge::Context::@216053020250347016153077031152206173164054152024 sID
associated request stream ID
virtual bool Wait(Bridge::Context &info, int wtime, const char *wtext)
static const int uIPv4
ucap: Supports read redirects
static const int uPrip