82 XrdXrootdPgwFob *fobP = fP->
pgwFob;
83 int numErrs, numFixes, numLeft;
87 if (!fobP)
return true;
91 numLeft = fobP->
numOffs(&numErrs, &numFixes);
99 snprintf(ebuff,
sizeof(ebuff),
"%d uncorrected checksum errors",numLeft);
113int XrdXrootdProtocol::do_PgRead()
116 XrdXrootdFHandle fh(
Request.pgread.fhandle);
131 if (!
FTab || !(
IO.File =
FTab->Get(fh.handle)))
133 "pgread does not refer to an open file");
138 if (!
Request.header.dlen) pathID = 0;
139 else {ClientPgReadReqArgs *rargs=(ClientPgReadReqArgs *)(
argp->buff);
140 pathID =
static_cast<int>(rargs->
pathid);
142 IO.Flags =
static_cast<unsigned short>(rargs->
reqflags);
147 TRACEP(FSIO,pathID<<
" pgread "<<
IO.IOLen<<
'@'<<
IO.Offset
148 <<
" fn=" <<
IO.File->FileKey);
165 if (
IO.File->AsyncMode &&
IO.IOLen >= pgAioMin
166 &&
IO.Offset+
IO.IOLen <=
IO.File->Stats.fSize+pgAioHalf
170 XrdXrootdPgrwAio *aioP;
173 if (!pathID) pP =
this;
174 else {
if (!(pP =
VerifyStream(rc, pathID,
false)))
return rc;
179 {
if (!
IO.File->aioFob)
IO.File->aioFob =
new XrdXrootdAioFob;
188 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgRIO, pathID);
203int XrdXrootdProtocol::do_PgRIO()
209 static const int maxCSSZ = maxIOVZ/2 - 1;
210 static const int maxPGRD = maxCSSZ*pgPageSize;
211 static const int infoLen =
sizeof(
kXR_int64);
213 struct pgReadResponse
214 {ServerResponseStatus rsp;
218 XrdSfsFile *sfsP =
IO.File->XrdSfsp;
219 uint64_t pgrOpts = 0;
220 int dlen, fLen, lLen, rc, xframt, Quantum;
221 uint32_t csVec[maxCSSZ];
222 struct iovec iov[maxIOVZ];
232 memset(pgrResp.rsp.bdy.reserved, 0,
sizeof(pgrResp.rsp.bdy.reserved));
237 int pgOff, rPages, rLen =
IO.IOLen;
243 if (rPages < Quantum) Quantum = rPages;
248 if (!
argp || Quantum < halfBSize || Quantum >
argp->bsize)
249 {
if ((rc = getBuff(1, Quantum)) <= 0)
return rc;}
251 if (
argp->bsize > maxPGRD) Quantum = maxPGRD;
257 int items = Quantum / pgPageSize;
263 uint32_t *csVP = csVec;
264 char *buff =
argp->buff;
265 int i = 1, n = items * 2;
267 {iov[i ].iov_base = csVP++;
268 iov[i++].iov_len =
sizeof(uint32_t);
269 iov[i ].iov_base = buff;
270 iov[i++].iov_len = pgPageSize;
278 if ((pgOff =
IO.Offset & pgPageMask))
279 {rLen = pgPageSize - pgOff;
280 buff =
argp->buff + pgOff;
281 iov[2].iov_base = buff;
282 iov[2].iov_len = rLen;
283 rLen += Quantum - pgPageSize;
288 if (
IO.IOLen < rLen) rLen =
IO.IOLen;
294 long long ioOffset =
IO.Offset;
295 do {
if ((xframt = sfsP->
pgRead(
IO.Offset, buff, rLen, csVec, pgrOpts)) <= 0)
299 iov[2].iov_len = fLen;
300 if (items > 1) iov[items<<1].iov_len = lLen;
302 if (xframt < rLen || xframt ==
IO.IOLen)
306 IO.IOLen -= xframt;
IO.Offset += xframt;
307 rLen = (
IO.IOLen < Quantum ?
IO.IOLen : Quantum);
310 for (
int i = 0; i < items; i++) csVec[i] = htonl(csVec[i]);
312 pgrResp.ofs = htonll(ioOffset);
317 dlen = xframt + (items *
sizeof(uint32_t));
318 if ((rc =
Response.Send(pgrResp.rsp, infoLen, iov, items*2+1, dlen)) < 0)
322 {iov[2].iov_base =
argp->buff;
323 iov[2].iov_len = pgPageSize;
328 ioOffset =
IO.Offset;
329 }
while(
IO.IOLen > 0);
333 if (xframt < 0)
return fsError(xframt, 0, sfsP->
error, 0, 0);
339 pgrResp.rsp.bdy.dlen = 0;
340 pgrResp.ofs = htonll(
IO.Offset);
341 return Response.Send(pgrResp.rsp, infoLen);
350int XrdXrootdProtocol::do_PgWrite()
352 XrdXrootdFHandle fh(
Request.pgwrite.fhandle);
359 n2hll(
Request.pgwrite.offset,
IO.Offset);
360 pathID =
Request.pgwrite.pathid;
361 IO.Flags =
static_cast<unsigned short>(
Request.pgwrite.reqflags);
367 return Link->setEtext(
"pgwrite protocol violation");
378 if (!
FTab || !(
IO.File =
FTab->Get(fh.handle)))
380 return do_WriteNone(pathID);
385 if (
IO.File->pgwFob == 0)
IO.File->pgwFob =
new XrdXrootdPgwFob(
IO.File);
389 TRACEP(FSIO, pathID<<
" pgwrite "
391 <<
IO.IOLen<<
'@'<<
IO.Offset<<
" fn=" <<
IO.File->FileKey);
407 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgWIO, pathID);
411 return do_PgWIO(
true);
423bool XrdXrootdProtocol::do_PgWAIO(
int &rc)
425 XrdXrootdPgrwAio *aioP;
457int XrdXrootdProtocol::do_PgWIO() {
return do_PgWIO(
true);}
459int XrdXrootdProtocol::do_PgWIO(
bool isFresh)
462 XrdSfsFile *sfsP =
IO.File->XrdSfsp;
466 int n, rc, Quantum, iovLen, iovNum, csNum;
472 if (!
IO.File->pgwFob)
480 {
if (
IO.File->AsyncMode &&
IO.IOLen >= pgAioMin
482 && !isRetry && do_PgWAIO(rc))
return rc;
483 if (isRetry && !do_PgWIORetry(rc))
return rc;
484 if (!do_PgWIOSetup(
pgwCtl))
return -1;
492 {
if (!(ioV =
pgwCtl->FrameInfo(iovNum, iovLen)))
break;
494 if ((rc =
getData(
this,
"pgwrite", ioV, iovNum)))
return rc;
499 if (!(csVec =
pgwCtl->FrameInfo(csNum, buff, Quantum,
argp)))
504 for (
int i = 0; i < csNum; i++) csVec[i] = ntohl(csVec[i]);
508 XrdOucPgrwUtils::dataInfo dInfo(buff, csVec,
IO.Offset, Quantum);
521 if ((rc = sfsP->
pgWrite(
IO.Offset, buff, Quantum, csVec)) <= 0)
522 {
IO.EInfo[0] = rc;
IO.EInfo[1] = 0;
523 return do_WriteNone();
529 IO.File->pgwFob->delOffs(
IO.Offset, Quantum);
533 IO.Offset += Quantum;
536 }
while(
pgwCtl->Advance());
554bool XrdXrootdProtocol::do_PgWIORetry(
int &rc)
556 static const int csLen =
sizeof(
kXR_unt32);
563 if (
IO.Offset & pgPageMask)
564 {
int n = pgPageSize - (
IO.Offset & pgPageMask);
565 isBad =
IO.IOLen > (n + csLen);
566 }
else isBad =
IO.IOLen > pgUnitSize;
572 "pgwrite retry of more than one page not allowed");
579 if (!
IO.File->pgwFob->hasOffs(
IO.Offset,
IO.IOLen - csLen))
581 snprintf(buff,
sizeof(buff),
"retry %d@%lld",
IO.IOLen-csLen,
IO.Offset);
582 eDest.Emsg(
"pgwRetry", buff,
"not in error; fn=",
IO.File->FileKey);
583 IO.Flags &= ~XrdProto::kXR_pgRetry;
608 Quantum = (
IO.IOLen < pgPageSize ? pgPageSize :
IO.IOLen);
613 if (!
argp || Quantum < halfBSize || argp->bsize < Quantum
615 {
if (getBuff(0, Quantum) <= 0)
return -1;}
624 Link->setEtext(
"pgwrite protocol violation");
XrdOucTrace * XrdXrootdTrace
static bool csVer(dataInfo &dInfo, off_t &bado, int &badc)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static const uint64_t Verify
Options for pgRead() and pgWrite() as noted below.
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize wrlen, uint32_t *csvec, uint64_t opts=0)
void pgUpdt(int wErrs, int wFixd, int wUnc)
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
static const int maxBSize
int numOffs(int *errs=0, int *fixs=0)
static XrdXrootdStats * SI
XrdXrootdProtocol * VerifyStream(int &rc, int pID, bool lok=true)
XrdXrootdProtocol * Stream[maxStreams]
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
XrdXrootdMonitor::User Monitor
XrdXrootdResponse Response
static const int maxStreams
static RAtomic_int srvrAioOps
static const int kXR_pgUnitSZ
static const int kXR_pgPageSZ
static const int kXR_pgRetry