XRootD
Loading...
Searching...
No Matches
XrdOfsEvr.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O f s E v r . c c */
4/* */
5/* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdlib>
32#include <cstdio>
33#include <cstring>
34
36#include "XrdOfs/XrdOfsEvr.hh"
37#include "XrdOfs/XrdOfsStats.hh"
38#include "XrdOfs/XrdOfsTrace.hh"
39#include "XrdSys/XrdSysError.hh"
40#include "XrdSys/XrdSysTimer.hh"
41#include "XrdOuc/XrdOucEnv.hh"
42#include "XrdNet/XrdNetOpts.hh"
45
46/******************************************************************************/
47/* E x t e r n a l L i n k a g e s */
48/******************************************************************************/
49
51
53
54void *XrdOfsEvRecv(void *pp)
55{
56 XrdOfsEvr *evr = (XrdOfsEvr *)pp;
57 evr->recvEvents();
58 return (void *)0;
59}
60
61void *XrdOfsEvFlush(void *pp)
62{
63 XrdOfsEvr *evr = (XrdOfsEvr *)pp;
64 evr->flushEvents();
65 return (void *)0;
66}
67
68int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
69 {return 0;}
70
71/******************************************************************************/
72/* D e s t r u c t o r */
73/******************************************************************************/
74
76{
77
78// Close the FIFO. This will cause the reader to exit
79//
80 myMutex.Lock();
81 eventFIFO.Close();
82 myMutex.UnLock();
83}
84
85/******************************************************************************/
86/* f l u s h E v e n t s */
87/******************************************************************************/
88
90{
91 theClient *tp, *ntp;
92 int expWait, expClock;
93
94// Compute the hash flush interval
95//
96 if ((expWait = maxLife/4) == 0) expWait = 60;
97 expClock = expWait;
98
99// We wait for the right period of time, unless there is a deferred event
100//
101 do {myMutex.Lock();
102 if ((ntp = deferQ)) deferQ = 0;
103 else runQ = 0;
104 myMutex.UnLock();
105 while(ntp)
106 {XrdSysTimer::Wait(1000*60);
107 expClock -= 60;
108 myMutex.Lock();
109 while((tp = ntp))
110 {Events.Del(tp->Path);
111 ntp = tp->Next;
112 delete tp;
113 }
114 if ((ntp = deferQ)) deferQ = 0;
115 else runQ = 0;
116 myMutex.UnLock();
117 if (expClock <= 0)
118 {myMutex.Lock();
119 Events.Apply(XrdOfsScrubScan, (void *)0);
120 myMutex.UnLock();
121 expClock = expWait;
122 }
123 }
124 mySem.Wait();
125 } while(1);
126}
127
128/******************************************************************************/
129/* I n i t */
130/******************************************************************************/
131
132int XrdOfsEvr::Init(XrdSysError *eobj) // Must be called 1st!
133{
134 XrdNetSocket *msgSock;
135 char *p, path[2048];
136 int n;
137
138// Set he error object (need to do only once)
139//
140 eDest = eobj;
141
142// Create path to the pipe we will creat
143//
144 if (!(p = getenv("XRDADMINPATH")) || !*p)
145 {eobj->Emsg("Events", "XRDADMINPATH not defined");
146 return 0;
147 }
148 strcpy(path, p); n = strlen(p);
149 if (path[n-1] != '/') {path[n] = '/'; n++;}
150 strcpy(&path[n], "ofsEvents");
151 XrdOucEnv::Export("XRDOFSEVENTS", path);
152
153// Now create a socket to a path
154//
155 if (!(msgSock = XrdNetSocket::Create(eobj,path,0,0660,XRDNET_FIFO)))
156 return 0;
157 msgFD = msgSock->Detach();
158 delete msgSock;
159
160// We succeeded and are now ready for the call to he second stage below
161//
162 return 1;
163}
164
165/******************************************************************************/
166
168{
169 pthread_t tid;
170 int rc;
171
172// Set the balancer pointers (err object set in 1st phase Init).
173//
174 Balancer = trgp;
175
176// Now start a thread to get incoming messages
177//
178 if ((rc = XrdSysThread::Run(&tid, XrdOfsEvRecv, static_cast<void *>(this),
179 0, "Event receiver")))
180 {eDest->Emsg("Evr", rc, "create event reader thread");
181 return 0;
182 }
183
184// Now start a thread to flush posted events
185//
186 if ((rc = XrdSysThread::Run(&tid, XrdOfsEvFlush,static_cast<void *>(this),
187 0, "Event flusher")))
188 {eDest->Emsg("Evr", rc, "create event flush thread");
189 return 0;
190 }
191
192// All done
193//
194 return 1;
195}
196
197/******************************************************************************/
198/* r e c v E v e n t s */
199/******************************************************************************/
200
202{
203 EPNAME("recvEvent");
204 const char *tident = 0;
205 char *lp,*tp;
206
207// Attach the fifo FD to the stream
208//
209 eventFIFO.Attach(msgFD);
210
211// Now just start reading the events until the FD is closed
212//
213 while((lp = eventFIFO.GetLine()))
214 {DEBUG("-->" <<lp);
215 if ((tp = eventFIFO.GetToken()) && *tp)
216 {if (!strcmp(tp, "stage")) eventStage();
217 else eDest->Emsg("Evr", "Unknown event name -", tp);
218 }
219 }
220}
221
222/******************************************************************************/
223/* W a i t 4 E v e n t */
224/******************************************************************************/
225
226void XrdOfsEvr::Wait4Event(const char *path, XrdOucErrInfo *einfo)
227{
228
229// Replace original callback with our callback so we can queue this event
230// after the wait request has been sent to the client. This avoids a race
231// where the client might get the resume signal before the wait request.
232//
233 einfo->setErrCB((XrdOucEICB *)new theClient(this, einfo, path));
234}
235
236/******************************************************************************/
237/* W o r k 4 E v e n t */
238/******************************************************************************/
239
241{
242 struct theEvent *anEvent;
243 theClient *aClient = 0;
244
245// First ste is to see if this event was posted
246//
247 myMutex.Lock();
248 if (!(anEvent = Events.Find(Client->Path)))
249 Events.Add(Client->Path, new theEvent(0, 0, Client), maxLife);
250 else {aClient = anEvent->aClient;
251 while(aClient)
252 {if (aClient->evtCB->Same(Client->evtCBarg,aClient->evtCBarg))
253 {aClient->evtCBarg = Client->evtCBarg;
254 break;
255 }
256 aClient = aClient->Next;
257 }
258 if (!aClient) {Client->Next = anEvent->aClient;
259 anEvent->aClient = Client;
260 }
261 if (anEvent->Happened) sendEvent(anEvent);
262 }
263 myMutex.UnLock();
264
265// Delete the Client object if we really don't need it
266//
267 if (aClient) delete Client;
268}
269
270/******************************************************************************/
271/* P r i v a t e M e t h o d s */
272/******************************************************************************/
273/******************************************************************************/
274/* e v e n t S t a g e */
275/******************************************************************************/
276
277// stage {OK | ENOENT | BAD} <path> [<msg>] \n
278
279void XrdOfsEvr::eventStage()
280{
281 int rc;
282 char *tp, *eMsg, *altMsg = 0;
283 struct theEvent *anEvent;
284
285// Get the status token and decode it
286//
287 if (!(tp = eventFIFO.GetToken()))
288 {eDest->Emsg("Evr", "Missing stage event status"); return;}
289
290 if (!strcmp(tp, "OK")) {rc = 0;
292 }
293 else if (!strcmp(tp, "ENOENT")) {rc = ENOENT;
294 altMsg = (char *)"file does not exist.";
295 }
296 else if (!strcmp(tp, "BAD")) {rc = -1;
298 altMsg = (char *)"Dynamic staging failed.";
299 }
300 else {rc = -1;
301 eDest->Emsg("Evr", "Invalid stage event status -", tp);
302 altMsg = (char *)"Dynamic staging malfunctioned.";
304 }
305
306// Get the path and optional message
307//
308 if (!(tp = eventFIFO.GetToken(&eMsg)))
309 {eDest->Emsg("Evr", "Missing stage event path"); return;}
310 if (rc)
311 if (eMsg) {while(*eMsg == ' ') eMsg++;
312 if (!*eMsg) eMsg = altMsg;
313 } else eMsg = altMsg;
314 else eMsg = 0;
315
316// At this point if we have a balancer, tell it what happened
317//
318 if (Balancer)
319 {if (rc == 0) Balancer->Added(tp);
320 else Balancer->Removed(tp);
321 }
322
323// Either people are waiting for this event or it is preposted event.
324//
325 myMutex.Lock();
326 if (!(anEvent = Events.Find(tp)))
327 Events.Add(tp, new theEvent(rc, eMsg), maxLife);
328 else {if (anEvent->finalRC == 0)
329 {anEvent->finalRC = rc;
330 if (eMsg) anEvent->finalMsg = strdup(eMsg);
331 anEvent->Happened = 1;
332 }
333 if (anEvent->aClient) sendEvent(anEvent);
334 }
335 myMutex.UnLock();
336}
337
338/******************************************************************************/
339/* s e n d E v e n t */
340/******************************************************************************/
341
342void XrdOfsEvr::sendEvent(theEvent *ep)
343{
344 theClient *cp;
345 XrdOucErrInfo *einfo;
346 int doDel = 0, Result = (ep->finalRC ? SFS_ERROR : SFS_OK);
347
348// For each client, issue a call back sending the result back
349// The event also goes in the deferred delete queue as we need to hold on
350// to it just in case a client is in-transit
351//
352 while((cp = ep->aClient))
353 {einfo = new XrdOucErrInfo(cp->User, (XrdOucEICB *)0, cp->evtCBarg);
354 einfo->setErrInfo(ep->finalRC, (ep->finalMsg ? ep->finalMsg : ""));
355 cp->evtCB->Done(Result, einfo);
356 ep->aClient = cp->Next;
357 if (doDel) delete cp;
358 else {cp->Next = deferQ; deferQ = cp; doDel = 1;}
359 }
360
361// Post the defer queue handler
362//
363 if (!runQ) {runQ = 1; mySem.Post();}
364}
#define tident
#define DEBUG(x)
#define EPNAME(x)
static XrdSysError eDest(0,"crypto_")
#define XRDNET_FIFO
Definition XrdNetOpts.hh:83
int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
Definition XrdOfsEvr.cc:68
void * XrdOfsEvRecv(void *pp)
Definition XrdOfsEvr.cc:54
void * XrdOfsEvFlush(void *pp)
Definition XrdOfsEvr.cc:61
XrdSysTrace OfsTrace("ofs")
XrdOfsStats OfsStats
Definition XrdOfs.cc:113
#define eMsg(x)
#define SFS_ERROR
#define SFS_OK
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
XrdOucEICB * evtCB
Definition XrdOfsEvr.hh:76
unsigned long long evtCBarg
Definition XrdOfsEvr.hh:77
void Work4Event(theClient *Client)
Definition XrdOfsEvr.cc:240
int Init(XrdSysError *eObj)
Definition XrdOfsEvr.cc:132
void Wait4Event(const char *path, XrdOucErrInfo *einfo)
Definition XrdOfsEvr.cc:226
void flushEvents()
Definition XrdOfsEvr.cc:89
void recvEvents()
Definition XrdOfsEvr.cc:201
struct XrdOfsStats::StatsData Data
void Add(int &Cntr)
virtual int Same(unsigned long long arg1, unsigned long long arg2)=0
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
char * GetToken(int lowcase=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
theClient * aClient
Definition XrdOfsEvr.hh:89