XRootD
Loading...
Searching...
No Matches
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClStream.hh"
28
29#include <memory>
30
31namespace XrdCl
32{
33 //----------------------------------------------------------------------------
35 //----------------------------------------------------------------------------
37 {
38 public:
39 //------------------------------------------------------------------------
47 //------------------------------------------------------------------------
49 Socket &socket,
50 const std::string &strmname,
51 Stream &strm,
52 uint16_t substrmnb) : readstage( ReadStart ),
53 xrdTransport( xrdTransport ),
54 socket( socket ),
55 strmname( strmname ),
56 strm( strm ),
57 substrmnb( substrmnb ),
58 inmsgsize( 0 ),
59 inhandler( nullptr )
60 {
61 }
62
63 //------------------------------------------------------------------------
65 //------------------------------------------------------------------------
66 virtual ~AsyncMsgReader(){ }
67
68 //------------------------------------------------------------------------
70 //------------------------------------------------------------------------
71 inline void Reset()
72 {
73 readstage = ReadStart;
74 inmsg.reset();
75 inmsgsize = 0;
76 inhandler = nullptr;
77 }
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 {
84 Log *log = DefaultEnv::GetLog();
85
86 while( true )
87 {
88 switch( readstage )
89 {
90 //------------------------------------------------------------------
91 // There is no incoming message currently being processed so we
92 // create a new one
93 //------------------------------------------------------------------
94 case ReadStart:
95 {
96 inmsg = std::make_shared<Message>();
97 //----------------------------------------------------------------
98 // The next step is to read the header
99 //----------------------------------------------------------------
100 readstage = ReadHeader;
101 continue;
102 }
103 //------------------------------------------------------------------
104 // We need to read the header
105 //------------------------------------------------------------------
106 case ReadHeader:
107 {
108 XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
109 if( !st.IsOK() || st.code == suRetry )
110 return st;
111
112 log->Dump( AsyncSockMsg, "[%s] Received message header for %p size: %d",
113 strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114
115 ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116 if( rsp->hdr.status == kXR_attn )
117 {
118 log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119 "of message %p", strmname.c_str(), inmsg.get() );
120 inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
121 readstage = ReadAttn;
122 continue;
123 }
124
125 inmsgsize = inmsg->GetCursor();
126 inhandler = strm.InstallIncHandler( inmsg, substrmnb );
127
128 if( inhandler )
129 {
130 log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131 "of message %p", strmname.c_str(), inmsg.get() );
132 //--------------------------------------------------------------
133 // The next step is to read raw data
134 //--------------------------------------------------------------
135 readstage = ReadRawData;
136 continue;
137 }
138
139 //----------------------------------------------------------------
140 // The next step is to read the message body
141 //----------------------------------------------------------------
142 readstage = ReadMsgBody;
143 continue;
144 }
145 //------------------------------------------------------------------
146 // Before proceeding we need to figure out the attn action code
147 //------------------------------------------------------------------
148 case ReadAttn:
149 {
150 XRootDStatus st = ReadAttnActnum();
151 if( !st.IsOK() || st.code == suRetry )
152 return st;
153
154 //----------------------------------------------------------------
155 // There is an embedded response, overwrite the message with that
156 //----------------------------------------------------------------
157 if( HasEmbeddedRsp() )
158 {
159 inmsg->Free();
160 readstage = ReadHeader;
161 continue;
162 }
163
164 //----------------------------------------------------------------
165 // Readout the rest of the body
166 //----------------------------------------------------------------
167 inmsgsize = inmsg->GetCursor();
168 readstage = ReadMsgBody;
169 continue;
170 }
171 //------------------------------------------------------------------
172 // kXR_status is special as it can have both body and raw data,
173 // handle it separately
174 //------------------------------------------------------------------
175 case ReadMore:
176 {
177 XRootDStatus st = xrdTransport.GetMore( *inmsg, &socket );
178 if( !st.IsOK() || st.code == suRetry )
179 return st;
180 inmsgsize = inmsg->GetCursor();
181
182 //----------------------------------------------------------------
183 // The next step is to finalize the read
184 //----------------------------------------------------------------
185 readstage = ReadDone;
186 continue;
187 }
188 //------------------------------------------------------------------
189 // We need to call a raw message handler to get the data from the
190 // socket
191 //------------------------------------------------------------------
192 case ReadRawData:
193 {
194 uint32_t bytesRead = 0;
195 XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196 if( !st.IsOK() )
197 return st;
198 inmsgsize += bytesRead;
199 if( st.code == suRetry )
200 return st;
201 //----------------------------------------------------------------
202 // The next step is to finalize the read
203 //----------------------------------------------------------------
204 readstage = ReadDone;
205 continue;
206 }
207 //------------------------------------------------------------------
208 // No raw handler, so we read the message to the buffer
209 //------------------------------------------------------------------
210 case ReadMsgBody:
211 {
212 XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
213 if( !st.IsOK() || st.code == suRetry )
214 return st;
215 inmsgsize = inmsg->GetCursor();
216
217
218 //----------------------------------------------------------------
219 // kXR_status response needs special handling as it can have
220 // either (body + raw data) or (body + additional body data)
221 //----------------------------------------------------------------
222 if( IsStatusRsp() )
223 {
224 uint16_t action = strm.InspectStatusRsp( substrmnb,
225 inhandler );
226
227 if( action & MsgHandler::Corrupted )
229
230 if( action & MsgHandler::Raw )
231 {
232 //--------------------------------------------------------------
233 // The next step is to read the raw data
234 //--------------------------------------------------------------
235 readstage = ReadRawData;
236 continue;
237 }
238
239 if( action & MsgHandler::More )
240 {
241
242 //--------------------------------------------------------------
243 // The next step is to read the additional data in the message
244 // body
245 //--------------------------------------------------------------
246 readstage = ReadMore;
247 continue;
248 }
249 }
250
251 //----------------------------------------------------------------
252 // The next step is to finalize the read
253 //----------------------------------------------------------------
254 readstage = ReadDone;
255 continue;
256 }
257
258 case ReadDone:
259 {
260 //----------------------------------------------------------------
261 // Report the incoming message
262 //----------------------------------------------------------------
263 log->Dump( AsyncSockMsg, "[%s] Received message %p of %d bytes",
264 strmname.c_str(), inmsg.get(), inmsgsize );
265
266 strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267 }
268 }
269 // just in case
270 break;
271 }
272
273 //----------------------------------------------------------------------
274 // We are done
275 //----------------------------------------------------------------------
276 return XRootDStatus();
277 }
278
279 private:
280
281 XRootDStatus ReadAttnActnum()
282 {
283 //----------------------------------------------------------------------
284 // Readout the action code from the socket. We are reading out 8 bytes
285 // into the message, the 8 byte header is already there.
286 //----------------------------------------------------------------------
287 size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
288 while( btsleft > 0 )
289 {
290 int btsrd = 0;
291 XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
292 if( !st.IsOK() || st.code == suRetry )
293 return st;
294 btsleft -= btsrd;
295 inmsg->AdvanceCursor( btsrd );
296 }
297
298 //----------------------------------------------------------------------
299 // Marshal the action code
300 //----------------------------------------------------------------------
301 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
302 attn->actnum = ntohl( attn->actnum );
303
304 return XRootDStatus();
305 }
306
307 inline bool IsStatusRsp()
308 {
309 ServerResponseHeader *hdr = (ServerResponseHeader*)inmsg->GetBuffer();
310 return ( hdr->status == kXR_status );
311 }
312
313 inline bool HasEmbeddedRsp()
314 {
315 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
316 return ( attn->actnum == kXR_asynresp );
317 }
318
319 //------------------------------------------------------------------------
321 //------------------------------------------------------------------------
322 enum Stage
323 {
324 ReadStart, //< the next step is to initialize the read
325 ReadHeader, //< the next step is to read the header
326 ReadAttn, //< the next step is to read attn action code
327 ReadMore, //< the next step is to read more status body
328 ReadMsgBody, //< the next step is to read the body
329 ReadRawData, //< the next step is to read the raw data
330 ReadDone //< the next step is to finalize the read
331 };
332
333 //------------------------------------------------------------------------
334 // Current read stage
335 //------------------------------------------------------------------------
336 Stage readstage;
337
338 //------------------------------------------------------------------------
339 // The context of the read operation
340 //------------------------------------------------------------------------
341 TransportHandler &xrdTransport;
342 Socket &socket;
343 const std::string &strmname;
344 Stream &strm;
345 uint16_t substrmnb;
346
347
348 //------------------------------------------------------------------------
349 // The internal state of the the reader
350 //------------------------------------------------------------------------
351 std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
352 uint32_t inmsgsize;
353 MsgHandler *inhandler;
354
355 };
356
357} /* namespace XrdCl */
358
359#endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
@ kXR_asynresp
Definition XProtocol.hh:938
@ kXR_status
Definition XProtocol.hh:907
@ kXR_attn
Definition XProtocol.hh:901
ServerResponseHeader hdr
void Reset()
Reset the state of the object (makes it ready to read out next msg)
XRootDStatus Read()
Read out the response from the socket.
virtual ~AsyncMsgReader()
Destructor.
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
@ More
there are more (non-raw) data to be read
A network socket.
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
Perform the handshake and the authentication for each physical stream.
const uint16_t suRetry
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t AsyncSockMsg
const uint16_t errCorruptedHeader
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.