XRootD
Loading...
Searching...
No Matches
XrdFrmXfrAgent.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r m X f r A g e n t . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdlib>
32#include <cstring>
33#include <strings.h>
34#include <unistd.h>
35#include <fcntl.h>
36#include <sys/types.h>
37#include <sys/stat.h>
38
40#include "XrdFrc/XrdFrcTrace.hh"
41#include "XrdFrc/XrdFrcUtils.hh"
46
47using namespace XrdFrc;
48using namespace XrdFrm;
49
50/******************************************************************************/
51/* S t a t i c V a r i a b l e s */
52/******************************************************************************/
53
54XrdFrcReqAgent XrdFrmXfrAgent::GetAgent("getf", XrdFrcRequest::getQ);
55
56XrdFrcReqAgent XrdFrmXfrAgent::MigAgent("migr", XrdFrcRequest::migQ);
57
58XrdFrcReqAgent XrdFrmXfrAgent::StgAgent("pstg", XrdFrcRequest::stgQ);
59
60XrdFrcReqAgent XrdFrmXfrAgent::PutAgent("putf", XrdFrcRequest::putQ);
61
62/******************************************************************************/
63/* Private: A d d */
64/******************************************************************************/
65
66void XrdFrmXfrAgent::Add(XrdOucStream &Request, char *Tok,
67 XrdFrcReqAgent &Server)
68{
69 XrdFrcRequest myReq;
70 const char *Miss = 0;
71 char *tp, *op;
72
73// Handle: op[<traceid>] <requestid> <npath> <prty> <mode> <path> [. . .]
74//
75// op: + | & | ^ | < | = | >
76//
77 memset(&myReq, 0, sizeof(myReq));
78 myReq.OPc = *Tok;
79 if (*Tok == '=' || *Tok == '^') myReq.Options |= XrdFrcRequest::Purge;
80 Tok++;
81
82 if (*Tok) strlcpy(myReq.User, Tok, sizeof(myReq.User));
83 else strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
84
85 if (!(tp = Request.GetToken())) Miss = "request id";
86 else strlcpy(myReq.ID, tp, sizeof(myReq.ID));
87
88 if (!Miss)
89 {if (!(tp = Request.GetToken())) Miss = "notify path";
90 else strlcpy(myReq.Notify, tp, sizeof(myReq.Notify));
91 }
92
93 if (!Miss)
94 {if (!(tp = Request.GetToken())) Miss = "priority";
95 else {myReq.Prty = atoi(tp);
96 if (myReq.Prty < 0) myReq.Prty = 0;
97 else if (myReq.Prty > XrdFrcRequest::maxPrty)
99 }
100 }
101
102 if (!Miss)
103 {if (!(tp = Request.GetToken())) Miss = "mode";
104 else myReq.Options |= XrdFrcUtils::MapM2O(myReq.Notify, tp);
105 }
106
107 if (!Miss && !(tp = Request.GetToken())) Miss = "path";
108
109// Check for any errors
110//
111 if (Miss) {Say.Emsg("Agent_Add", Miss, "missing in '+' request.");
112 return;
113 }
114
115// Add all paths in the request
116//
117 do {strlcpy(myReq.LFN, tp, sizeof(myReq.LFN));
118 if ((op = index(tp, '?'))) {myReq.Opaque = op-tp+1; *op = '\0';}
119 else myReq.Opaque = 0;
120 myReq.LFO = 0;
121 if (myReq.LFN[0] != '/' && !(myReq.LFO = XrdFrcUtils::chkURL(myReq.LFN)))
122 Say.Emsg("Agent_Add", "Invalid url -", myReq.LFN);
123 else Server.Add(myReq);
124 if ((tp = Request.GetToken())) memset(myReq.LFN, 0, sizeof(myReq.LFN));
125 } while(tp);
126}
127
128/******************************************************************************/
129/* Private: A g e n t */
130/******************************************************************************/
131
132XrdFrcReqAgent *XrdFrmXfrAgent::Agent(char bType)
133{
134
135// Return the agent corresponding to the type
136//
137 switch(bType)
138 {case 0 : return &StgAgent;
139 case '+': return &StgAgent;
140 case '^':
141 case '&': return &MigAgent;
142 case '<': return &GetAgent;
143 case '=':
144 case '>': return &PutAgent;
145 default: break;
146 }
147 return 0;
148}
149
150/******************************************************************************/
151/* Private: D e l */
152/******************************************************************************/
153
154void XrdFrmXfrAgent::Del(XrdOucStream &Request, char *Tok,
155 XrdFrcReqAgent &Server)
156{
157 XrdFrcRequest myReq;
158
159// If the requestid is adjacent to the operation, use it o/w get it
160//
161 if (!(*Tok) && (!(Tok = Request.GetToken()) || !(*Tok)))
162 {Say.Emsg("Del", "request id missing in cancel request.");
163 return;
164 }
165
166// Copy the request ID into the request and remove it from peer server
167//
168 memset(&myReq, 0, sizeof(myReq));
169 strlcpy(myReq.ID, Tok, sizeof(myReq.ID));
170 Server.Del(myReq);
171}
172
173/******************************************************************************/
174/* Private: L i s t */
175/******************************************************************************/
176
177void XrdFrmXfrAgent::List(XrdOucStream &Request, char *Tok)
178{
180 XrdFrcReqAgent *agentP;
181 int n = 0;
182 char *tp;
183
184 while((tp = Request.GetToken()) && n < XrdFrcRequest::getLast)
185 {if (XrdFrcUtils::MapV2I(tp, Items[n])) n++;}
186
187// List entries queued for specific servers
188//
189 if (!(*Tok)) {StgAgent.List(Items, n); GetAgent.List(Items, n);}
190 else do {if ((agentP = Agent(*Tok))) agentP->List(Items, n);
191 } while(*(++Tok));
192 std::cout <<std::endl;
193}
194
195/******************************************************************************/
196/* Public: P r o c e s s */
197/******************************************************************************/
198
200{
201 char *tp;
202
203// Each frm request comes in as:
204//
205// Copy in: <[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
206// Copy purge: =[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
207// Copy out: >[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
208// Migrate: &[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
209// Migr+Purge: ^[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
210// Stage: +[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
211// Cancel in: - <requestid>
212// Cancel out: ~ <requestid>
213// List: ?[<][+][&][>]
214// Wakeup: ![<][+][&][>]
215//
216 if ((tp = Request.GetToken()))
217 switch(*tp)
218 {case '+': Add(Request, tp, StgAgent); break;
219 case '<': Add(Request, tp, GetAgent); break;
220 case '=':
221 case '>': Add(Request, tp, PutAgent); break;
222 case '&':
223 case '^': Add(Request, tp, MigAgent); break;
224 case '-': Del(Request, tp+1, StgAgent);
225 Del(Request, tp+1, GetAgent);
226 break;
227 case '~': Del(Request, tp+1, MigAgent);
228 Del(Request, tp+1, PutAgent);
229 break;
230 case '?': List(Request, tp+1); break;
231 case '!': GetAgent.Ping(tp); break;
232 default: Say.Emsg("Agent", "Invalid request, '", tp, "'.");
233 }
234}
235
236/******************************************************************************/
237/* Public: S t a r t */
238/******************************************************************************/
239
241{
242 EPNAME("Agent");
243 XrdOucStream Request;
244 char *tp;
245
246// Prepare our agents
247//
248 if (!StgAgent.Start(Config.QPath, Config.AdminMode)
249 || !MigAgent.Start(Config.QPath, Config.AdminMode)
250 || !GetAgent.Start(Config.QPath, Config.AdminMode)
251 || !PutAgent.Start(Config.QPath, Config.AdminMode)) return 2;
252
253// Attach stdin to the Request stream
254//
255 Request.Attach(STDIN_FILENO, 8*1024);
256
257// Process all input
258//
259 while((tp = Request.GetLine()))
260 {DEBUG ("Request: '" <<tp <<"'");
261 Process(Request);
262 }
263
264// If we exit then we lost the connection
265//
266 Say.Emsg("Agent", "Exiting; lost request connection!");
267 return 8;
268}
#define DEBUG(x)
#define EPNAME(x)
XrdOucPup XrdCmsParser::Pup & Say
size_t strlcpy(char *dst, const char *src, size_t sz)
void Add(XrdFrcRequest &Request)
void Del(XrdFrcRequest &Request)
int List(XrdFrcRequest::Item *Items, int Num)
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
static const int putQ
static const int Purge
signed char Prty
char Notify[512]
static const int maxPrty
static int chkURL(const char *Url)
static int MapV2I(const char *Opc, XrdFrcRequest::Item &ICode)
static int MapM2O(const char *Nop, const char *Pop)
const char * myProg
static int Start()
static void Process(XrdOucStream &Request)
char * GetLine()
int Attach(int FileDescriptor, int bsz=2047)
char * GetToken(int lowcase=0)
XrdFrmConfig Config