XRootD
Loading...
Searching...
No Matches
XrdOfsTPCJob.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O f s T P C J o b . c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include "XrdOfs/XrdOfsStats.hh"
36
37/******************************************************************************/
38/* G l o b a l O b j e c t s */
39/******************************************************************************/
40
43
44/******************************************************************************/
45/* S t a t i c O b j e c t s */
46/******************************************************************************/
47
48XrdSysMutex XrdOfsTPCJob::jobMutex;
49XrdOfsTPCJob *XrdOfsTPCJob::jobQ = 0;
50XrdOfsTPCJob *XrdOfsTPCJob::jobLast = 0;
51
52/******************************************************************************/
53/* C o n s t r u c t o r */
54/******************************************************************************/
55
56XrdOfsTPCJob::XrdOfsTPCJob(const char *Url, const char *Org,
57 const char *Lfn, const char *Pfn,
58 const char *Cks, short lfnLoc[2],
59 const char *Spr, const char *Tpr)
60 : XrdOfsTPC(Url, Org, Lfn, Pfn, Cks, Spr, Tpr),
61 Next(0), myProg(0), eCode(0), Status(isWaiting)
62{ lfnPos[0] = lfnLoc[0]; lfnPos[1] = lfnLoc[1]; }
63
64/******************************************************************************/
65/* D e l */
66/******************************************************************************/
67
69{
70 XrdOfsTPCJob *pP = 0;
71 bool tpcCan = false;
72
73// Remove from queue if we are still in the queue
74//
75 jobMutex.Lock();
76 if (inQ)
77 {if (this == jobQ) jobQ = Next;
78 else {pP = jobQ;
79 while(pP && pP->Next != this) pP = pP->Next;
80 if (pP) pP->Next = Next;
81 }
82 if (this == jobLast) jobLast = pP;
83 inQ = 0; tpcCan = true;
84 } else if (Status == isRunning && myProg)
85 {myProg->Cancel(); tpcCan = true;}
86
87 if (tpcCan && Info.cbP)
88 {Refs++; // Make sure this object cannot get deleted
89 Info.Reply(SFS_ERROR, ECANCELED, "destination file prematurely closed",
90 &jobMutex); // Mutex is unlocked upon return!
91 jobMutex.Lock();
92 Refs--; // Undo the extra ref increase
93 }
94
95// Delete the element if possible
96//
97 if (Refs <= 1) {jobMutex.UnLock(); delete this;}
98 else {Refs--; jobMutex.UnLock();}
99}
100
101/******************************************************************************/
102/* D o n e */
103/******************************************************************************/
104
105XrdOfsTPCJob *XrdOfsTPCJob::Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
106{
107 XrdSysMutexHelper jobMon(&jobMutex);
108 XrdOfsTPCJob *jP;
109
110// Indicate job status
111//
112 eCode = rc; Status = isDone;
113 if (Info.Key) free(Info.Key);
114 Info.Key = (rc ? strdup(eTxt) : 0);
115
116// Check if we need to do a callback
117//
118 if (Info.cbP)
119 {if (rc) Info.Reply(SFS_ERROR, rc, eTxt);
120 else Info.Reply(SFS_OK, 0, "");
121 }
122
123// Check if anyone is waiting for a program
124//
125 if ((jP = jobQ))
126 {if (jP == jobLast) jobQ = jobLast = 0;
127 else jobQ = jP->Next;
128 jP->myProg = pgmP; jP->Refs++; jP->inQ = 0; jP->Status = isRunning;
129 if (jP->Info.cbP) jP->Info.Reply(SFS_OK, 0, "");
130 }
131
132// Free up this job and return the next job, if any
133//
134 myProg = 0;
135 if (Refs <= 1) delete this;
136 else Refs--;
137 return jP;
138}
139
140/******************************************************************************/
141/* S y n c */
142/******************************************************************************/
143
145{
146 static const int cbWaitTime = 1800;
147 XrdSysMutexHelper jobMon(&jobMutex);
148 int rc;
149
150// If we are running then simply wait for the copy to complete
151//
152 if (Status == isRunning)
153 {if (Info.SetCB(eRR)) return SFS_ERROR;
154 eRR->setErrCode(cbWaitTime);
155 Info.Engage();
156 return SFS_STARTED;
157 }
158
159// If we are done then return what we have (this can't change)
160//
161 if (Status == isDone)
162 {if (eCode) {eRR->setErrInfo(eCode, Info.Key); return SFS_ERROR;}
163 return SFS_OK;
164 }
165
166// The only thing left is that we are an unstarted job, so try to start it.
167//
168 if (inQ) {myProg = 0; rc = 0;}
169 else if ((myProg = XrdOfsTPCProg::Start(this, rc)))
170 {Refs++; Status = isRunning; return SFS_OK;}
171
172// We could not allocate a program to this job. Check if this is due to an err
173//
174 if (rc)
175 {OfsEroute.Emsg("TPC", rc, "create tpc job thread");
176 Status = isDone;
177 eCode = ECANCELED;
178 if (Info.Key) free(Info.Key);
179 Info.Key = strdup("Copy failed; resources unavailable.");
180 return Info.Fail(eRR, "resources unavailable", ECANCELED);
181 }
182
183// No programs available, place this job in callback mode
184//
185 if (Info.SetCB(eRR)) return SFS_ERROR;
186 if (jobLast) {jobLast->Next = this; jobLast = this;}
187 else jobQ = jobLast = this;
188 inQ = 1;
189 eRR->setErrCode(cbWaitTime);
190 Info.Engage();
191 return SFS_STARTED;
192}
XrdSysError OfsEroute(0)
XrdOfsStats OfsStats
Definition XrdOfs.cc:113
#define SFS_ERROR
#define SFS_STARTED
#define SFS_OK
XrdOucCallBack * cbP
void Reply(int rC, int eC, const char *eMsg, XrdSysMutex *mP=0)
XrdOfsTPCJob(const char *Url, const char *Org, const char *Lfn, const char *Pfn, const char *Cks, short lfnLoc[2], const char *Spr, const char *Tpr)
XrdOfsTPCJob * Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
int Sync(XrdOucErrInfo *eRR)
static XrdOfsTPCProg * Start(XrdOfsTPCJob *jP, int &rc)
XrdOfsTPCInfo Info
Definition XrdOfsTPC.hh:109
int setErrInfo(int code, const char *emsg)
int setErrCode(int code)