XRootD
Loading...
Searching...
No Matches
XrdFrmTransfer.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r m T r a n s f e r . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <strings.h>
33#include <cstdio>
34#include <fcntl.h>
35#include <unistd.h>
36#include <utime.h>
37#include <sys/param.h>
38#include <sys/types.h>
39#include <sys/stat.h>
40
41#include "XrdFrc/XrdFrcCID.hh"
43#include "XrdFrc/XrdFrcTrace.hh"
44#include "XrdFrc/XrdFrcXAttr.hh"
45#include "XrdFrm/XrdFrmCns.hh"
52#include "XrdOss/XrdOss.hh"
53#include "XrdOuc/XrdOucEnv.hh"
54#include "XrdOuc/XrdOucMsubs.hh"
55#include "XrdOuc/XrdOucProg.hh"
56#include "XrdOuc/XrdOucSxeq.hh"
57#include "XrdOuc/XrdOucUtils.hh"
58#include "XrdOuc/XrdOucXAttr.hh"
59#include "XrdSys/XrdSysError.hh"
60#include "XrdSys/XrdSysFD.hh"
62
63using namespace XrdFrc;
64using namespace XrdFrm;
65
66/******************************************************************************/
67/* L o c a l C l a s s e s */
68/******************************************************************************/
69
71{
75char *theSrc;
76char *theDst;
77char *theINS;
78char theMDP[8];
79
81 : theEnv(Env), theCmd(0), theVec(0), theSrc(0),
82 theDst(0), theINS(0)
83 {theMDP[0] = '0'; theMDP[1] = 0;}
85};
86
88{ struct stat *Stat;
89 int lkfd;
90 int lkfx;
91
92 XrdFrmTranChk(struct stat *sP) : Stat(sP), lkfd(-1), lkfx(0) {}
93 ~XrdFrmTranChk() {if (lkfd >= 0) close(lkfd);}
94};
95
96/******************************************************************************/
97/* S t a t i c s */
98/******************************************************************************/
99
100XrdSysMutex XrdFrmTransfer::pMutex;
101XrdOucHash<char> XrdFrmTransfer::pTab;
102
103/******************************************************************************/
104/* C o n s t r u c t o r */
105/******************************************************************************/
106
108{
109 int i;
110
111// Construct program objects
112//
113 for (i = 0; i < 4; i++)
114 xfrCmd[i] = (Config.xfrCmd[i].theVec ? new XrdOucProg(&Say) : 0);
115}
116
117/******************************************************************************/
118/* Public: c h e c k F F */
119/******************************************************************************/
120
121const char *XrdFrmTransfer::checkFF(const char *Path)
122{
123 EPNAME("checkFF");
124 struct stat buf;
125
126// Check for a fail file
127//
128 if (!stat(Path, &buf))
129 {if (buf.st_ctime+Config.FailHold >= time(0))
130 return "request previously failed";
131 if (Config.Test) {DEBUG("would have removed '" <<Path <<"'");}
132 else {unlink(Path);
133 DEBUG("removed '" <<Path <<"'");
134 }
135 }
136
137// Return all is well
138//
139 return 0;
140}
141
142/******************************************************************************/
143/* F e t c h */
144/******************************************************************************/
145
146const char *XrdFrmTransfer::Fetch()
147{
148 EPNAME("Fetch");
149 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
150 static const int crOpts = (O_CREAT|O_TRUNC)<<8|XRDOSS_mkpath;
151
152 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
153 XrdFrmTranArg cmdArg(&myEnv);
154 struct stat pfnStat;
155 time_t xfrET;
156 const char *eTxt, *retMsg = 0;
157 char lfnpath[MAXPATHLEN+1024+512+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
158 char pdBuff[1024];
159 int iXfr, pdSZ, lfnEnd, rc, isURL = 0, doRM = 0;
160 long long fSize = 0;
161
162// The remote source is either the url-lfn or a translated lfn
163//
164 if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
165 else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
166 return "lfn2rfn failed";
167 theSrc = Rfn;
168 isURL = (*Rfn != '/');
169 }
170
171// Check if we can actually handle this transfer
172//
173 if (isURL)
174 {if (xfrCmd[2]) iXfr = 2;
175 else return "url copies not configured";
176 } else {
177 if (xfrCmd[0]) iXfr = 0;
178 else return "non-url copies not configured";
179 }
180
181// Check for a fail file
182//
183 if ((eTxt = ffCheck())) return eTxt;
184
185// Check if the file exists
186//
187 Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
188 if (!Config.Stat(Lfn, xfrP->PFN, &pfnStat))
189 {DEBUG(xfrP->PFN <<" exists; not fetched.");
190 return 0;
191 }
192
193// Construct the file name to which to we originally transfer the data. This is
194// the lfn if we do not pre-allocate files and "lfn.anew" otherwise.
195//
196 lfnEnd = strlen(Lfn);
197 strlcpy(lfnpath, Lfn, sizeof(lfnpath)-8);
198 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
199 {strcpy(&lfnpath[lfnEnd], ".anew");
200 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".anew");
201 }
202
203// Setup the command
204//
205 cmdArg.theCmd = xfrCmd[iXfr];
206 cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
207 cmdArg.theSrc = theSrc;
208 cmdArg.theDst = xfrP->PFN;
209 cmdArg.theINS = xfrP->reqData.iName;
210 if (!SetupCmd(&cmdArg)) return "incoming transfer setup failed";
211
212// If the copycmd needs a placeholder in the filesystem for this transfer, we
213// must create one. We first remove any existing "anew" file because we will
214// over-write it. The create process will create a lock file if need be.
215// However, we can ignore it as we are the only ones actually using it.
216//
217 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
218 {Config.ossFS->Unlink(lfnpath);
219 rc = Config.ossFS->Create(xfrP->reqData.User,lfnpath,fMode,myEnv,crOpts);
220 if (rc)
221 {Say.Emsg("Fetch", rc, "create placeholder for", lfnpath);
222 return "create failed";
223 }
224 doRM = 1;
225 } else doRM = Config.xfrCmd[iXfr].Opts & Config.cmdRME;
226
227// Setup program monitoring data
228//
229 pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
230
231// Now run the command to get the file and make sure the file is there
232// If it is, make sure that if a lock file exists its date/time is greater than
233// the file we just fetched; then rename it to be the correct name.
234//
235 xfrET = time(0);
236 if (!(rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
237 {if ((rc = Config.Stat(lfnpath, xfrP->PFN, &pfnStat)))
238 {Say.Emsg("Fetch", lfnpath, "fetched but not resident!"); fSize = 0;}
239 else {fSize = pfnStat.st_size;
240 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
241 FetchDone(lfnpath, pfnStat, rc);
242 }
243 }
244
245// Clean up if we failed otherwise tell the cmsd that we have a new file. Upon
246// failure we issue a a remove as we don't want the temp file to exist.
247//
248 xfrP->PFN[xfrP->pfnEnd] = '\0';
249 if (rc)
250 {if (doRM) Config.ossFS->Unlink(lfnpath);
251 ffMake(rc == -2);
252 if (rc == -2) {xfrP->RetCode = 2; retMsg = "file not found";}
253 else retMsg = "fetch failed";
254 } else if (Config.cmsPath) Config.cmsPath->Have(Lfn);
255
256// We completed, see if we need to do statistics
257//
259 || (Trace.What & TRACE_Debug))
260 {time_t eNow = time(0);
261 int inqT, xfrT;
262 inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
263 if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
264 if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
265 || (Trace.What & TRACE_Debug)) && !retMsg)
266 {char sbuff[80];
267 sprintf(sbuff, "Got: %lld qt: %d xt: %d up: ", fSize, inqT, xfrT);
268 lfnpath[lfnEnd] = '\0';
269 Say.Say(0, sbuff, xfrP->reqData.User, " ", lfnpath);
270 }
272 {if (rc < 0) rc = -rc;
273 snprintf(lfnpath+lfnEnd, sizeof(lfnpath)-lfnEnd-1,
274 "\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
275 static_cast<long long>(eNow), fSize, inqT, xfrT,
276 xfrP->Act, rc, (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
277 XrdFrmMonitor::Map(XROOTD_MON_MAPSTAG,xfrP->reqData.User,lfnpath);
278 }
279 }
280
281// All done
282//
283 return retMsg;
284}
285
286/******************************************************************************/
287/* F e t c h D o n e */
288/******************************************************************************/
289
290const char *XrdFrmTransfer::FetchDone(char *lfnpath, struct stat &Stat, int &rc)
291{
292
293// If we are running in new mode, update file attributes
294//
295 rc = 0;
296 if (Config.runNew && Config.NeedsCTA(lfnpath))
297 {XrdOucXAttr<XrdFrcXAttrCpy> cpyInfo;
298 cpyInfo.Attr.cpyTime = static_cast<long long>(Stat.st_mtime);
299 if ((rc = cpyInfo.Set(xfrP->PFN)))
300 Say.Emsg("Fetch", rc, "set copy time xattr on", xfrP->PFN);
301 }
302
303// Check for a lock file and if we have one, reset it's time or delete it
304//
305 if (Config.runOld && Config.NeedsCTA(lfnpath))
306 {struct stat lkfStat;
307 strcpy(&xfrP->PFN[xfrP->pfnEnd+5], ".lock");
308 if (!stat(xfrP->PFN, &lkfStat))
309 {if (Config.runNew && !rc) unlink(xfrP->PFN);
310 else {struct utimbuf tbuff;
311 tbuff.actime = tbuff.modtime = Stat.st_mtime;
312 if ((rc = utime(xfrP->PFN, &tbuff)))
313 Say.Emsg("Fetch", rc, "set utime on", xfrP->PFN);
314 }
315 }
316 }
317
318// Now rename the lfn to be what it needs to be in the end
319//
320 if (!rc && (rc=Config.ossFS->Rename(lfnpath,xfrP->reqData.LFN)))
321 Say.Emsg("Fetch", rc, "rename", lfnpath);
322 else XrdFrmCns::Add(xfrP->reqData.User, xfrP->reqData.LFN,
323 Stat.st_size, Stat.st_mode);
324
325// Done
326//
327 return (rc ? "Failed" : 0);
328}
329
330/******************************************************************************/
331/* Private: f f C h e c k */
332/******************************************************************************/
333
334const char *XrdFrmTransfer::ffCheck()
335{
336 const char *eTxt;
337
338// Generate proper fail file path and check if it exists
339//
340 if (Config.xfrFdir)
341 {char ffPath[MAXPATHLEN+8];
342 if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return 0;
343 strcpy(ffPath, Config.xfrFdir);
344 strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
345 strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
346 eTxt = checkFF(ffPath);
347 } else {
348 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
349 eTxt = checkFF(xfrP->PFN);
350 xfrP->PFN[xfrP->pfnEnd] = '\0';
351 }
352
353// Determine result
354//
355 if (eTxt) xfrP->RetCode = 1;
356 return eTxt;
357}
358
359/******************************************************************************/
360/* Private: f f M a k e */
361/******************************************************************************/
362
363void XrdFrmTransfer::ffMake(int nofile){
364 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
365 static const mode_t dMode = S_IXUSR|S_IWGRP|S_IXGRP|S_IXOTH | fMode;
366 char ffPath[MAXPATHLEN+8], *ffP;
367 int myFD;
368
369// Generate fail file path
370//
371 if (Config.xfrFdir)
372 {if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return;
373 strcpy(ffPath, Config.xfrFdir);
374 strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
375 strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
376 XrdOucUtils::makePath(ffPath, dMode);
377 ffP = ffPath;
378 } else {
379 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
380 ffP = xfrP->PFN;
381 }
382
383// Create a fail file and if failure is due to "file not found" set the mtime
384// to 2 so that the oss layer picks up the same error in the future.
385//
386 myFD = open(ffP, O_CREAT, fMode);
387 if (myFD >= 0)
388 {close(myFD);
389 if (nofile)
390 {struct utimbuf tbuff;
391 tbuff.actime = time(0); tbuff.modtime = 2;
392 utime(ffP, &tbuff);
393 }
394 }
395 if (!Config.xfrFdir) xfrP->PFN[xfrP->pfnEnd] = '\0';
396}
397
398/******************************************************************************/
399/* I n i t */
400/******************************************************************************/
401
402void *InitXfer(void *parg)
404 if (parg) xP->Start(*(int *)parg);
405 return (void *)0;
406}
407
409{
410 static int anyQ = XrdFrmXfrQueue::useAnyQ;
411 static int inpQ = XrdFrmXfrQueue::useInpQ;
412 static int outQ = XrdFrmXfrQueue::useOutQ;
413 void *qWant;
414 pthread_t tid;
415 int retc, n;
416
417// Initialize the cluster identification object first
418//
419 CID.Init(Config.QPath);
420
421// Initialize the transfer queue first
422//
423 if (!XrdFrmXfrQueue::Init()) return 0;
424
425// Start the required number of transfer threads. Note we can split these
426// as dedicated in threads and dedicated out threads.
427//
428 n = Config.xfrMax;
429 while(n--)
430 { if (Config.xfrMaxIn)
431 { qWant = (void *)&inpQ; Config.xfrMaxIn--;}
432 else if (Config.xfrMaxOt)
433 { qWant = (void *)&outQ; Config.xfrMaxOt--;}
434 else qWant = (void *)&anyQ;
435
436 if ((retc = XrdSysThread::Run(&tid, InitXfer, qWant,
437 XRDSYSTHREAD_BIND, "transfer")))
438 {Say.Emsg("main", retc, "create xfr thread"); return 0;}
439 }
440
441// All done
442//
443 return 1;
444}
445
446/******************************************************************************/
447/* Private: S e t u p C m d */
448/******************************************************************************/
449
450int XrdFrmTransfer::SetupCmd(XrdFrmTranArg *argP)
451{
452 char *pdata[XrdOucMsubs::maxElem + 2], *cP;
453 int pdlen[XrdOucMsubs::maxElem + 2], i, k, n;
454
456 Info(xfrP->reqData.User, argP->theEnv, Config.the_N2N,
457 xfrP->reqData.LFN+xfrP->reqData.LFO,
458 argP->theSrc, xfrP->reqData.Prty,
459 xfrP->reqData.Options & XrdFrcRequest::makeRW?O_RDWR:O_RDONLY,
460 argP->theMDP, xfrP->reqData.ID, xfrP->PFN, argP->theDst);
461
462// We must establish the host, cluster and instance name if we have one
463//
464 if (argP->theEnv)
465 {argP->theEnv->Put(SEC_HOST, Config.myName);
466 if (argP->theINS)
467 {CID.Get(argP->theINS, CMS_CID, argP->theEnv);
468 argP->theEnv->Put(XRD_INS, argP->theINS);
469 }
470 }
471
472// Substitute in the parameters
473//
474 k = argP->theVec->Subs(Info, pdata, pdlen);
475
476// Catenate all of the arguments
477//
478 *cmdBuff = '\0'; n = sizeof(cmdBuff) - 4; cP = cmdBuff;
479 for (i = 0; i < k; i++)
480 {n -= pdlen[i];
481 if (n < 0)
482 {Say.Emsg("Setup",E2BIG,"build command line for", xfrP->reqData.LFN);
483 return 0;
484 }
485 strcpy(cP, pdata[i]); cP += pdlen[i];
486 }
487
488// Now setup the command
489//
490 return (argP->theCmd->Setup(cmdBuff, &Say) == 0);
491}
492
493/******************************************************************************/
494/* Public: S t a r t */
495/******************************************************************************/
496
497void XrdFrmTransfer::Start(int ioqType)
498{
499 EPNAME("Transfer"); // Wrong but looks better
500 const char *Msg;
501
502// Prime I/O queue selection
503
504// Endless loop looking for transfer jobs
505//
506 while(1)
507 {xfrP = XrdFrmXfrQueue::Get(ioqType);
508
509 DEBUG(xfrP->Type <<" starting " <<xfrP->reqData.LFN
510 <<" for " <<xfrP->reqData.User);
511
512 Msg = (xfrP->qNum & XrdFrcRequest::outQ ? Throw() : Fetch());
513 if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
514 xfrP->PFN[xfrP->pfnEnd] = 0;
515
516 if (xfrP->RetCode || Config.Verbose)
517 {char buff1[280], buff2[80];
518 sprintf(buff1, "%s for %s", xfrP->RetCode ? "failed" : "complete",
519 xfrP->reqData.User);
520 if (xfrP->RetCode == 0) *buff2 = 0;
521 else sprintf(buff2, "; %s", (Msg ? Msg : "reason unknown"));
522 Say.Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
523 } else {
524 DEBUG(xfrP->Type
525 <<(xfrP->RetCode ? " failed " : " complete ")
526 << xfrP->reqData.LFN <<" rc=" <<xfrP->RetCode
527 <<' ' <<(Msg ? Msg : ""));
528 }
529
530 XrdFrmXfrQueue::Done(xfrP, Msg);
531 }
532}
533
534/******************************************************************************/
535/* Private: T r a c k D C */
536/******************************************************************************/
537
538int XrdFrmTransfer::TrackDC(char *Lfn, char *Mdp, char *Rfn)
539{
540 (void)Lfn;
541 char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
542 int n = -1;
543
544// If this is a url, then don't back space into the url part
545//
546 if (*Rfn != '/'
547 && (Slash = index(Rfn, '/')) && *(Slash+1) == '/'
548 && (Slash = index(Slash+2, '/')) && *(Slash+1) == '/') begRfn = Slash+1;
549
550// Discard the filename component
551//
552 if (!(FName = rindex(begRfn, '/')) || FName == begRfn) return 0;
553 *FName = 0; Slash = Slush = FName;
554
555// Try to find the created directory path
556//
557 pMutex.Lock();
558 while(Slash != begRfn && !pTab.Find(Rfn))
559 {do {Slash--;} while(Slash != begRfn && *Slash != '/');
560 if (Slush) *Slush = '/';
561 *Slash = 0; Slush = Slash;
562 n++;
563 }
564 pMutex.UnLock();
565
566// Compute offset of uncreated part
567//
568 *Slash = '/';
569 if (Slash == begRfn) n = 0;
570 else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
571 sprintf(Mdp, "%d", n);
572
573// All done
574//
575 return n;
576}
577
578/******************************************************************************/
579
580int XrdFrmTransfer::TrackDC(char *Rfn)
581{
582 char *Slash;
583
584// Trim off the trailing end
585//
586 if (!(Slash = rindex(Rfn, '/')) || Slash == Rfn) return 0;
587 *Slash = 0;
588
589// The path has been added, do insert it into the table of created paths
590//
591 pMutex.Lock();
592 pTab.Add(Rfn, 0, 0, Hash_data_is_key);
593 pMutex.UnLock();
594 *Slash = '/';
595 return 0;
596}
597
598/******************************************************************************/
599/* T h r o w */
600/******************************************************************************/
601
602const char *XrdFrmTransfer::Throw()
603{
604 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
605 XrdFrmTranArg cmdArg(&myEnv);
606 struct stat begStat, endStat;
607 XrdFrmTranChk Chk(&begStat);
608 time_t xfrET;
609 const char *eTxt, *retMsg = 0;
610 char Rfn[MAXPATHLEN+256] = "";
611 char *lfnpath = xfrP->reqData.LFN, *theDest = nullptr;
612 char pdBuff[1024] = "";
613 int isMigr = xfrP->reqData.Options & XrdFrcRequest::Migrate;
614 int iXfr, isURL, pdSZ, rc, mDP = -1;
615
616// The remote source is either the url-lfn or a translated lfn
617//
618 if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
619 else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
620 return "lfn2rfn failed";
621 theDest = Rfn;
622 isURL = (*Rfn != '/');
623 }
624
625// Check if we can actually handle this transfer
626//
627 if (isURL)
628 {if (xfrCmd[3]) iXfr = 3;
629 else return "url copies not configured";
630 } else {
631 if (xfrCmd[1]) iXfr = 1;
632 else return "non-url copies not configured";
633 }
634
635// Check if the file exists (we only copy resident files)
636//
637 if (Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &begStat))
638 return (xfrP->reqFQ ? "file not found" : 0);
639
640// Check for a fail file
641//
642 if ((eTxt = ffCheck())) return eTxt;
643
644// If this is an mss migration request, then recheck if the file can and
645// need to be migrated based on the lock file. This also obtains a directory
646// lock and lock file lock, as needed. If the file need not be migrated but
647// should be purge, we will get a null string error.
648//
649 if (isMigr && (eTxt = ThrowOK(&Chk)))
650 {if (*eTxt) return eTxt;
651 if (!(xfrP->reqData.Options & XrdFrcRequest::Purge)) return "logic error";
652 Throwaway();
653 return 0;
654 }
655
656// Setup the command, including directory tracking, as needed
657//
658 cmdArg.theCmd = xfrCmd[iXfr];
659 cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
660 cmdArg.theDst = theDest;
661 cmdArg.theSrc = xfrP->PFN;
662 cmdArg.theINS = xfrP->reqData.iName;
663 if (Config.xfrCmd[iXfr].Opts & Config.cmdMDP)
664 mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
665 if (!SetupCmd(&cmdArg)) return "outgoing transfer setup failed";
666
667// Setup program monitoring data
668//
669 pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
670
671// Now run the command to put the file. If the command fails and this is a
672// migration request, cretae a fail file if one does not exist.
673//
674 xfrET = time(0);
675 if ((rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
676 {if (isMigr) ffMake(rc == -2);
677 retMsg = "copy failed";
678 }
679
680// Track directory creations if we need to track them
681//
682 if (!rc && mDP >= 0) TrackDC(Rfn);
683
684// Obtain state of the file after the copy and make sure the file was not
685// modified during the copy. This is an error for queued requests but
686// internally generated requests will simply be retried.
687//
688 if (!rc)
689 {if ((rc = Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &endStat)))
690 {Say.Emsg("Throw", lfnpath, "transferred but not found!");
691 retMsg = "unable to verify copy";
692 } else {
693 if (begStat.st_mtime != endStat.st_mtime
694 || begStat.st_size != endStat.st_size)
695 {Say.Emsg("Throw", lfnpath, "modified during transfer!");
696 retMsg = "file modified during copy"; rc = 1;
697 }
698 }
699 }
700
701// Purge the file if so wanted. Otherwise, if this is a migration request,
702// make sure that if a lock file exists its date/time is equal to the file
703// we just copied to prevent the file from being copied again (we have a lock).
704//
705 if (!rc)
706 {if (xfrP->reqData.Options & XrdFrcRequest::Purge) Throwaway();
707 else if (isMigr) ThrowDone(&Chk, endStat.st_mtime);
708 }
709
710// Do statistics if so wanted
711//
713 || (Trace.What & TRACE_Debug))
714 {time_t eNow = time(0);
715 int inqT, xfrT;
716 long long Fsize = begStat.st_size;
717 inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
718 if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
719 if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
720 || (Trace.What & TRACE_Debug)) && !rc)
721 {char sbuff[80];
722 sprintf(sbuff, "Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
723 Say.Say(0, sbuff, xfrP->reqData.User, " ", xfrP->reqData.LFN);
724 }
726 {char monBuff[MAXPATHLEN+1024+512+8];
727 if (rc < 0) rc = -rc;
728 snprintf(monBuff, sizeof(monBuff),
729 "%s\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
730 xfrP->reqData.LFN, static_cast<long long>(eNow), Fsize,
731 inqT, xfrT, xfrP->Act, rc,
732 (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
733 XrdFrmMonitor::Map(XROOTD_MON_MAPMIGR,xfrP->reqData.User,monBuff);
734 }
735 }
736
737// All done
738//
739 return retMsg;
740}
741
742/******************************************************************************/
743/* Private: T h r o w a w a y */
744/******************************************************************************/
745
746void XrdFrmTransfer::Throwaway()
747{
748 EPNAME("Throwaway");
749
750// Purge the file. We do this via the pfn but also indicate we want all
751// migration support suffixes removed it they exist. Notify the cmsd & cnsd.
752//
753 if (Config.Test) {DEBUG("Would have removed '" <<xfrP->PFN <<"'");}
754 else {Config.ossFS->Unlink(xfrP->PFN, XRDOSS_isPFN|XRDOSS_isMIG);
755 DEBUG("removed '" <<xfrP->PFN <<"'");
756 if (Config.cmsPath) Config.cmsPath->Gone(xfrP->PFN);
757 XrdFrmCns::Rm(xfrP->PFN);
758 }
759}
760
761/******************************************************************************/
762/* Private: T h r o w D o n e */
763/******************************************************************************/
764
765void XrdFrmTransfer::ThrowDone(XrdFrmTranChk *cP, time_t endTime)
766{
767
768// Update file attributes if we are running in new mode, otherwise do
769//
770 if (Config.runNew)
771 {XrdOucXAttr<XrdFrcXAttrCpy> cpyInfo;
772 cpyInfo.Attr.cpyTime = static_cast<long long>(endTime);
773 if (cpyInfo.Set(xfrP->PFN, cP->lkfd))
774 Say.Emsg("Throw", "Unable to set copy time xattr for", xfrP->PFN);
775 else if (cP->lkfx)
776 {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
777 unlink(xfrP->PFN);
778 xfrP->PFN[xfrP->pfnEnd] = '\0';
779 }
780 } else {
781 struct stat Stat;
782 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
783 if (!stat(xfrP->PFN, &Stat))
784 {struct utimbuf tbuff;
785 tbuff.actime = tbuff.modtime = endTime;
786 if (utime(xfrP->PFN, &tbuff))
787 Say.Emsg("Throw", errno, "set utime for", xfrP->PFN);
788 }
789 xfrP->PFN[xfrP->pfnEnd] = '\0';
790 }
791}
792
793/******************************************************************************/
794/* Private: T h r o w O K */
795/******************************************************************************/
796
797const char *XrdFrmTransfer::ThrowOK(XrdFrmTranChk *cP)
798{
799 class fdClose
800 {public:
801 int Num;
802 fdClose() : Num(-1) {}
803 ~fdClose() {if (Num >= 0) close(Num);}
804 } fnFD;
805
806 XrdOucXAttr<XrdFrcXAttrCpy> cpyInfo;
807 struct stat lokStat;
808 int statRC;
809
810// Check if the file is in use by checking if we got an exclusive lock
811//
812 if ((fnFD.Num = XrdSysFD_Open(xfrP->PFN, O_RDWR)) < 0)
813 return "unable to open file";
814 if (XrdOucSxeq::Serialize(fnFD.Num,XrdOucSxeq::noWait)) return "file in use";
815
816// Get the info on the lock file (enabled if old mode is in effect
817//
818 if (Config.runOld)
819 {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
820 statRC = stat(xfrP->PFN, &lokStat);
821 xfrP->PFN[xfrP->pfnEnd] = '\0';
822 } else statRC = 1;
823 if (statRC && !Config.runNew) return "missing lock file";
824
825// If running in new mode then we must get the extended attribute for this file
826// unless we got the lock file time which takes precendence.
827//
828 if (Config.runNew)
829 {if (!statRC)
830 cpyInfo.Attr.cpyTime = static_cast<long long>(lokStat.st_mtime);
831 else if (cpyInfo.Get(xfrP->PFN, fnFD.Num) <= 0)
832 return "unable to get copy time xattr";
833 }
834
835// Verify the information
836//
837 if (cpyInfo.Attr.cpyTime >= static_cast<long long>(cP->Stat->st_mtime))
838 {if (xfrP->reqData.Options & XrdFrcRequest::Purge) return "";
839 return "already migrated";
840 }
841
842// Keep the lock on the base file until we are through. No one is allowed to
843// modify this file until we have migrate it.
844//
845 cP->lkfd = fnFD.Num;
846 cP->lkfx = statRC == 0;
847 fnFD.Num = -1;
848 return 0;
849}
#define DEBUG(x)
#define EPNAME(x)
struct stat Stat
Definition XrdCks.cc:49
XrdOucPup XrdCmsParser::Pup & Say
#define TRACE_Debug
void * InitXfer(void *parg)
char PFN[MAXPATHLEN+16]
XrdFrcRequest reqData
#define XRDOSS_isPFN
Definition XrdOss.hh:469
#define XRDOSS_isMIG
Definition XrdOss.hh:470
#define XRDOSS_mkpath
Definition XrdOss.hh:466
@ Hash_data_is_key
Definition XrdOucHash.hh:52
#define SEC_HOST
#define CMS_CID
#define XRD_INS
#define close(a)
Definition XrdPosix.hh:48
#define open
Definition XrdPosix.hh:76
#define unlink(a)
Definition XrdPosix.hh:113
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString Path
size_t strlcpy(char *dst, const char *src, size_t sz)
#define XRDSYSTHREAD_BIND
const kXR_char XROOTD_MON_MAPMIGR
const kXR_char XROOTD_MON_MAPSTAG
char LFN[3072]
static const int Purge
static const int makeRW
static const int outQ
static const int Migrate
signed char Prty
static void Rm(const char *Path, int islfn=0)
Definition XrdFrmCns.hh:53
static void Add(const char *tID, const char *Path, long long Size, mode_t Mode)
Definition XrdFrmCns.cc:67
struct XrdFrmConfig::Cmd xfrCmd[4]
int NeedsCTA(const char *Lfn)
XrdNetCmsNotify * cmsPath
static const int cmdStats
static const int cmdAlloc
static const int cmdRME
int RemotePath(const char *oldp, char *newp, int newpsz)
XrdOss * ossFS
XrdOucMsubs * theVec
static const int cmdXPD
int Stat(const char *xLfn, const char *xPfn, struct stat *buff)
static const int cmdMDP
static kXR_unt32 Map(char code, const char *uname, const char *path)
static char monMIGR
static char monSTAGE
static int Init()
static const char * checkFF(const char *Path)
void Start(int ioqType)
static const int useInpQ
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static XrdFrmXfrJob * Get(int ioQType)
static int Init()
static const int useOutQ
static const int useAnyQ
int Have(const char *Path, int isPfn=1)
int Gone(const char *Path, int isPfn=1)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
T * Find(const char *KeyVal, time_t *KeyTime=0)
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
static const int maxElem
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
int Serialize(int Opts=0)
static const int noWait
Definition XrdOucSxeq.hh:37
static int makePath(char *path, mode_t mode, bool reset=false)
int Get(const char *Path, int fd=-1)
int Set(const char *Path, int fd=-1)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdOucTrace Trace
XrdFrcCID CID
Definition XrdFrcCID.cc:56
XrdFrmConfig Config
XrdOucEnv * theEnv
XrdFrmTranArg(XrdOucEnv *Env)
XrdOucMsubs * theVec
XrdOucProg * theCmd
struct stat * Stat
XrdFrmTranChk(struct stat *sP)