XRootD
Loading...
Searching...
No Matches
XrdOfsPoscq.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O f s P o s c q . c c */
4/* */
5/* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <strings.h>
33#include <stddef.h>
34#include <cstdio>
35#include <fcntl.h>
36#include <unistd.h>
37#include <cerrno>
38#include <sys/param.h>
39#include <sys/types.h>
40#include <sys/stat.h>
41
42#include "XrdOfs/XrdOfsPoscq.hh"
43#include "XrdOss/XrdOss.hh"
44#include "XrdSfs/XrdSfsFlags.hh"
45#include "XrdSys/XrdSysError.hh"
46#include "XrdSys/XrdSysFD.hh"
48
49/******************************************************************************/
50/* C o n s t r u c t o r */
51/******************************************************************************/
52
53XrdOfsPoscq::XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn, int sv)
54{
55 eDest = erp;
56 ossFS = oss;
57 pocFN = strdup(fn);
58 pocFD = -1;
59 pocSZ = 0;
60 pocIQ = 0;
61 SlotList = SlotLust = 0;
62
63 if (sv > 32767) sv = 32767;
64 else if (sv < 0) sv = 0;
65 pocWS = pocSV = sv-1;
66}
67
68/******************************************************************************/
69/* A d d */
70/******************************************************************************/
71
72int XrdOfsPoscq::Add(const char *Tident, const char *Lfn, bool isNew)
73{
74 XrdSysMutexHelper myHelp(myMutex);
75 std::map<std::string,int>::iterator it = pqMap.end();
77 struct stat Stat;
78 FileSlot *freeSlot;
79 int fP;
80
81// Add is only called when file is to be created. Therefore, it must not exist
82// unless it is being replaced typically due to a retry. If not being replaced
83// then We need to check this to avoid deleting already created files.
84// Otherwise, we need to see if the file is already in the queue to avoid it
85// being deleted after the fact because it would be in the queue twice.
86//
87 if (!ossFS->Stat(Lfn, &Stat))
88 {if (isNew) return -EEXIST;
89 it = pqMap.find(std::string(Lfn));
90 if (it != pqMap.end() && VerOffset(Lfn, it->second)) return it->second;
91 }
92
93// Construct the request
94//
95 tmpReq.addT = 0;
96 strlcpy(tmpReq.LFN, Lfn, sizeof(tmpReq.LFN));
97 strlcpy(tmpReq.User, Tident, sizeof(tmpReq.User));
98 memset(tmpReq.Reserved, 0, sizeof(tmpReq.Reserved));
99
100// Obtain a free slot
101//
102 if ((freeSlot = SlotList))
103 {fP = freeSlot->Offset;
104 SlotList = freeSlot->Next;
105 freeSlot->Next = SlotLust;
106 SlotLust = freeSlot;
107 } else {fP = pocSZ; pocSZ += ReqSize;}
108 pocIQ++;
109
110// Write out the record
111//
112 if (!reqWrite((void *)&tmpReq, sizeof(tmpReq), fP))
113 {eDest->Emsg("Add", Lfn, "not added to the persist queue.");
114 myMutex.Lock(); pocIQ--; myMutex.UnLock();
115 return -EIO;
116 }
117
118// Check if we update the map or simply add it to the map
119//
120 if (it != pqMap.end()) it->second = fP;
121 else pqMap[std::string(Lfn)] = fP;
122
123// Return the record offset
124//
125 return fP;
126}
127
128/******************************************************************************/
129/* C o m m i t */
130/******************************************************************************/
131
132int XrdOfsPoscq::Commit(const char *Lfn, int Offset)
133{
134 long long addT = static_cast<long long>(time(0));
135
136// Verify the offset it must be correct
137//
138 if (!VerOffset(Lfn, Offset)) return -EINVAL;
139
140// Indicate the record is free
141//
142 if (!reqWrite((void *)&addT, sizeof(addT), Offset))
143 {eDest->Emsg("Commit", Lfn, "not committed to the persist queue.");
144 return -EIO;
145 }
146
147// Remove entry from the map and return
148//
149 myMutex.Lock();
150 pqMap.erase(std::string(Lfn));
151 myMutex.UnLock();
152 return 0;
153}
154
155/******************************************************************************/
156/* D e l */
157/******************************************************************************/
158
159int XrdOfsPoscq::Del(const char *Lfn, int Offset, int Unlink)
160{
161 static int Zero = 0;
162 FileSlot *freeSlot;
163 int retc;
164
165// Verify the offset it must be correct
166//
167 if (!VerOffset(Lfn, Offset)) return -EINVAL;
168
169// Unlink the file if need be
170//
171 if (Unlink && (retc = ossFS->Unlink(Lfn)) && retc != -ENOENT)
172 {eDest->Emsg("Del", retc, "remove", Lfn);
173 return (retc < 0 ? retc : -retc);
174 }
175
176// Indicate the record is free
177//
178 if (!reqWrite((void *)&Zero, sizeof(Zero), Offset+offsetof(Request,LFN)))
179 {eDest->Emsg("Del", Lfn, "not removed from the persist queue.");
180 return -EIO;
181 }
182
183// Serialize and place this on the free queue
184//
185 myMutex.Lock();
186 if ((freeSlot = SlotLust)) SlotLust = freeSlot->Next;
187 else freeSlot = new FileSlot;
188 freeSlot->Offset = Offset;
189 freeSlot->Next = SlotList;
190 SlotList = freeSlot;
191 if (pocIQ > 0) pocIQ--;
192
193// Remove item from the map
194//
195 pqMap.erase(std::string(Lfn));
196 myMutex.UnLock();
197
198// All done
199//
200 return 0;
201}
202
203/******************************************************************************/
204/* I n i t */
205/******************************************************************************/
206
208{
209 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
210 Request tmpReq;
211 struct stat buf, Stat;
212 recEnt *First = 0;
213 char Buff[80];
214 int rc, Offs, numreq = 0;
215
216// Assume we will fail
217//
218 Ok = 0;
219
220// Open the file first in r/w mode
221//
222 if ((pocFD = XrdSysFD_Open(pocFN, O_RDWR|O_CREAT, Mode)) < 0)
223 {eDest->Emsg("Init",errno,"open",pocFN);
224 return 0;
225 }
226
227// Get file status
228//
229 if (fstat(pocFD, &buf)) {FailIni("stat"); return 0;}
230
231// Check for a new file here
232//
233 if (buf.st_size < ReqSize)
234 {pocSZ = ReqOffs;
235 if (ftruncate(pocFD, ReqOffs)) FailIni("trunc");
236 else Ok = 1;
237 return 0;
238 }
239
240// Read the full file
241//
242 for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
243 {do {rc = pread(pocFD, (void *)&tmpReq, ReqSize, Offs);}
244 while(rc < 0 && errno == EINTR);
245 if (rc < 0) {eDest->Emsg("Init",errno,"read",pocFN); return First;}
246 if (*tmpReq.LFN == '\0'
247 || ossFS->Stat(tmpReq.LFN, &Stat)
248 || !(S_ISREG(Stat.st_mode) || !(Stat.st_mode & XRDSFS_POSCPEND))) continue;
249 First = new recEnt(tmpReq, Stat.st_mode & S_IAMB, First); numreq++;
250 }
251
252// Now write out the file and return
253//
254 sprintf(Buff, " %d pending create%s", numreq, (numreq != 1 ? "s" : ""));
255 eDest->Say("Init", Buff, " recovered from ", pocFN);
256 if (ReWrite(First)) Ok = 1;
257 return First;
258}
259
260/******************************************************************************/
261/* L i s t */
262/******************************************************************************/
263
265{
267 struct stat buf;
268 recEnt *First = 0;
269 int rc, theFD, Offs;
270
271// Open the file first in r/o mode
272//
273 if ((theFD = XrdSysFD_Open(theFN, O_RDONLY)) < 0)
274 {Say->Emsg("Init",errno,"open",theFN);
275 return 0;
276 }
277
278// Get file status
279//
280 if (fstat(theFD, &buf))
281 {Say->Emsg("Init",errno,"stat",theFN);
282 close(theFD);
283 return 0;
284 }
285 if (buf.st_size < ReqSize) buf.st_size = 0;
286
287// Read the full file
288//
289 for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
290 {do {rc = pread(theFD, (void *)&tmpReq, ReqSize, Offs);}
291 while(rc < 0 && errno == EINTR);
292 if (rc < 0) {Say->Emsg("List",errno,"read",theFN);
293 close(theFD); return First;
294 }
295 if (*tmpReq.LFN != '\0') First = new recEnt(tmpReq, 0, First);
296 }
297
298// All done
299//
300 close(theFD);
301 return First;
302}
303
304/******************************************************************************/
305/* F a i l I n i */
306/******************************************************************************/
307
308void XrdOfsPoscq::FailIni(const char *txt)
309{
310 eDest->Emsg("Init", errno, txt, pocFN);
311}
312
313/******************************************************************************/
314/* r e q W r i t e */
315/******************************************************************************/
316
317bool XrdOfsPoscq::reqWrite(void *Buff, int Bsz, int Offs)
318{
319 int rc = 0;
320
321 do {rc = pwrite(pocFD, Buff, Bsz, Offs);} while(rc < 0 && errno == EINTR);
322
323 if (rc >= 0 && Bsz > 8)
324 {if (!pocWS) {pocWS = pocSV; rc = fsync(pocFD);}
325 else pocWS--;
326 }
327
328 if (rc < 0) {eDest->Emsg("reqWrite",errno,"write", pocFN); return false;}
329 return true;
330}
331
332/******************************************************************************/
333/* R e W r i t e */
334/******************************************************************************/
335
336bool XrdOfsPoscq::ReWrite(XrdOfsPoscq::recEnt *rP)
337{
338 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
339 char newFN[MAXPATHLEN], *oldFN;
340 int newFD, oldFD, Offs = ReqOffs;
341 bool aOK = true;
342
343// Construct new file and open it
344//
345 strcpy(newFN, pocFN); strcat(newFN, ".new");
346 if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
347 {eDest->Emsg("ReWrite",errno,"open",newFN); return false;}
348
349// Setup to write/swap the file
350//
351 oldFD = pocFD; pocFD = newFD;
352 oldFN = pocFN; pocFN = newFN;
353
354// Rewrite all records if we have any
355//
356 while(rP)
357 {rP->Offset = Offs;
358 if (!reqWrite((void *)&rP->reqData, ReqSize, Offs))
359 {aOK = false; break;}
360 pqMap[std::string(rP->reqData.LFN)] = Offs;
361 Offs += ReqSize;
362 rP = rP->Next;
363 }
364
365// If all went well, rename the file
366//
367 if (aOK && rename(newFN, oldFN) < 0)
368 {eDest->Emsg("ReWrite",errno,"rename",newFN); aOK = false;}
369
370// Perform post processing
371//
372 if (aOK) close(oldFD);
373 else {close(newFD); pocFD = oldFD;}
374 pocFN = oldFN;
375 pocSZ = Offs;
376 return aOK;
377}
378
379/******************************************************************************/
380/* V e r O f f s e t */
381/******************************************************************************/
382
383bool XrdOfsPoscq::VerOffset(const char *Lfn, int Offset)
384{
385
386// Verify the offset
387//
388 if (Offset < ReqOffs || (Offset-ReqOffs)%ReqSize)
389 {char buff[128];
390 sprintf(buff, "Invalid slot %d for", Offset);
391 eDest->Emsg("VerOffset", buff, Lfn);
392 return false;
393 }
394 return true;
395}
struct stat Stat
Definition XrdCks.cc:49
XrdOucPup XrdCmsParser::Pup & Say
#define S_IAMB
Definition XrdConfig.cc:160
static XrdSysError eDest(0,"crypto_")
#define close(a)
Definition XrdPosix.hh:48
#define fsync(a)
Definition XrdPosix.hh:64
#define fstat(a, b)
Definition XrdPosix.hh:62
#define stat(a, b)
Definition XrdPosix.hh:101
#define rename(a, b)
Definition XrdPosix.hh:92
#define ftruncate(a, b)
Definition XrdPosix.hh:70
#define pwrite(a, b, c, d)
Definition XrdPosix.hh:107
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
int Mode
#define XRDSFS_POSCPEND
size_t strlcpy(char *dst, const char *src, size_t sz)
static const int ReqOffs
int Del(const char *Lfn, int Offset, int Unlink=0)
static recEnt * List(XrdSysError *Say, const char *theFN)
XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn, int sv=1)
recEnt * Init(int &Ok)
static const int ReqSize
int Commit(const char *Lfn, int Offset)
int Add(const char *Tident, const char *Lfn, bool isNew)
struct Request reqData