XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

Inheritance diagram for XrdCl::XRootDTransport:
Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor.
 ~XRootDTransport ()
 Destructor.
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel.
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups.
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel.
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream.
virtual XRootDStatus GetBody (Message &message, Socket *socket)
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
virtual XRootDStatus GetMore (Message &message, Socket *socket)
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message.
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message.
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake.
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel.
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected.
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action.
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent.
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedControlConnection ()
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel.
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created.
virtual void WaitBeforeExit ()
 Wait until the program can safely exit.
Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message.
static void LogErrorResponse (const Message &msg)
 Log server error response.
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message.
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message.
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams.
static void SetDescription (Message *msg)
 Get the description of a message.
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite.
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message.
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message.
static XRootDStatus UnMarshallRequest (Message *msg)
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response.

Friends

struct PluginUnloadHandler

Additional Inherited Members

Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 291 of file XrdClXRootDTransport.cc.

291 :
292 pSecUnloadHandler( new PluginUnloadHandler() )
293 {
294 }

References PluginUnloadHandler.

Here is the call graph for this function:

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 299 of file XrdClXRootDTransport.cc.

300 {
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
302 }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject & channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1812 of file XrdClXRootDTransport.cc.

1813 {
1814 XRootDChannelInfo *info = 0;
1815 channelData.Get( info );
1816 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1817 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1818 }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject & channelData,
uint16_t subStreamId )
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1544 of file XrdClXRootDTransport.cc.

1546 {
1547 XRootDChannelInfo *info = 0;
1548 channelData.Get( info );
1549
1550 if (!info) {
1551 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1552 return;
1553 }
1554
1555 XrdSysMutexHelper scopedLock( info->mutex );
1556
1557 CleanUpProtection( info );
1558
1559 if( !info->stream.empty() )
1560 {
1561 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1562 sInfo.status = XRootDStreamInfo::Disconnected;
1563 }
1564
1565 if( subStreamId == 0 )
1566 {
1567 info->sidManager->ReleaseAllTimedOut();
1568 info->sentOpens.clear();
1569 info->sentCloses.clear();
1570 info->openFiles = 0;
1571 info->waitBarrier = 0;
1572 }
1573 }
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
const uint64_t XRootDTransportMsg

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject & channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 460 of file XrdClXRootDTransport.cc.

461 {
462 }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char * msg,
std::ostringstream & o )
static

Get the description of a message.

Definition at line 2975 of file XrdClXRootDTransport.cc.

2976 {
2977 Log *log = DefaultEnv::GetLog();
2978 if( log->GetLevel() < Log::ErrorMsg )
2979 return;
2980
2981 ClientRequestHdr *req = (ClientRequestHdr *)msg;
2982 switch( req->requestid )
2983 {
2984 //------------------------------------------------------------------------
2985 // kXR_open
2986 //------------------------------------------------------------------------
2987 case kXR_open:
2988 {
2989 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2990 o << "kXR_open (";
2991 char *fn = GetDataAsString( msg );
2992 o << "file: " << fn << ", ";
2993 delete [] fn;
2994 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2995 o << std::setbase(10);
2996 o << "flags: ";
2997 if( sreq->options == 0 )
2998 o << "none";
2999 else
3000 {
3001 if( sreq->options & kXR_compress )
3002 o << "kXR_compress ";
3003 if( sreq->options & kXR_delete )
3004 o << "kXR_delete ";
3005 if( sreq->options & kXR_force )
3006 o << "kXR_force ";
3007 if( sreq->options & kXR_mkpath )
3008 o << "kXR_mkpath ";
3009 if( sreq->options & kXR_new )
3010 o << "kXR_new ";
3011 if( sreq->options & kXR_nowait )
3012 o << "kXR_nowait ";
3013 if( sreq->options & kXR_open_apnd )
3014 o << "kXR_open_apnd ";
3015 if( sreq->options & kXR_open_read )
3016 o << "kXR_open_read ";
3017 if( sreq->options & kXR_open_updt )
3018 o << "kXR_open_updt ";
3019 if( sreq->options & kXR_open_wrto )
3020 o << "kXR_open_wrto ";
3021 if( sreq->options & kXR_posc )
3022 o << "kXR_posc ";
3023 if( sreq->options & kXR_prefname )
3024 o << "kXR_prefname ";
3025 if( sreq->options & kXR_refresh )
3026 o << "kXR_refresh ";
3027 if( sreq->options & kXR_4dirlist )
3028 o << "kXR_4dirlist ";
3029 if( sreq->options & kXR_replica )
3030 o << "kXR_replica ";
3031 if( sreq->options & kXR_seqio )
3032 o << "kXR_seqio ";
3033 if( sreq->options & kXR_async )
3034 o << "kXR_async ";
3035 if( sreq->options & kXR_retstat )
3036 o << "kXR_retstat ";
3037 }
3038 o << ")";
3039 break;
3040 }
3041
3042 //------------------------------------------------------------------------
3043 // kXR_close
3044 //------------------------------------------------------------------------
3045 case kXR_close:
3046 {
3047 ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
3048 o << "kXR_close (";
3049 o << "handle: " << FileHandleToStr( sreq->fhandle );
3050 o << ")";
3051 break;
3052 }
3053
3054 //------------------------------------------------------------------------
3055 // kXR_stat
3056 //------------------------------------------------------------------------
3057 case kXR_stat:
3058 {
3059 ClientStatRequest *sreq = (ClientStatRequest *)msg;
3060 o << "kXR_stat (";
3061 if( sreq->dlen )
3062 {
3063 char *fn = GetDataAsString( msg );;
3064 o << "path: " << fn << ", ";
3065 delete [] fn;
3066 }
3067 else
3068 {
3069 o << "handle: " << FileHandleToStr( sreq->fhandle );
3070 o << ", ";
3071 }
3072 o << "flags: ";
3073 if( sreq->options == 0 )
3074 o << "none";
3075 else
3076 {
3077 if( sreq->options & kXR_vfs )
3078 o << "kXR_vfs";
3079 }
3080 o << ")";
3081 break;
3082 }
3083
3084 //------------------------------------------------------------------------
3085 // kXR_read
3086 //------------------------------------------------------------------------
3087 case kXR_read:
3088 {
3089 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3090 o << "kXR_read (";
3091 o << "handle: " << FileHandleToStr( sreq->fhandle );
3092 o << std::setbase(10);
3093 o << ", ";
3094 o << "offset: " << sreq->offset << ", ";
3095 o << "size: " << sreq->rlen << ")";
3096 break;
3097 }
3098
3099 //------------------------------------------------------------------------
3100 // kXR_pgread
3101 //------------------------------------------------------------------------
3102 case kXR_pgread:
3103 {
3104 ClientPgReadRequest *sreq = (ClientPgReadRequest *)msg;
3105 o << "kXR_pgread (";
3106 o << "handle: " << FileHandleToStr( sreq->fhandle );
3107 o << std::setbase(10);
3108 o << ", ";
3109 o << "offset: " << sreq->offset << ", ";
3110 o << "size: " << sreq->rlen << ")";
3111 break;
3112 }
3113
3114 //------------------------------------------------------------------------
3115 // kXR_write
3116 //------------------------------------------------------------------------
3117 case kXR_write:
3118 {
3119 ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3120 o << "kXR_write (";
3121 o << "handle: " << FileHandleToStr( sreq->fhandle );
3122 o << std::setbase(10);
3123 o << ", ";
3124 o << "offset: " << sreq->offset << ", ";
3125 o << "size: " << sreq->dlen << ")";
3126 break;
3127 }
3128
3129 //------------------------------------------------------------------------
3130 // kXR_pgwrite
3131 //------------------------------------------------------------------------
3132 case kXR_pgwrite:
3133 {
3134 ClientPgWriteRequest *sreq = (ClientPgWriteRequest *)msg;
3135 o << "kXR_pgwrite (";
3136 o << "handle: " << FileHandleToStr( sreq->fhandle );
3137 o << std::setbase(10);
3138 o << ", ";
3139 o << "offset: " << sreq->offset << ", ";
3140 o << "size: " << sreq->dlen << ")";
3141 break;
3142 }
3143
3144 //------------------------------------------------------------------------
3145 // kXR_fattr
3146 //------------------------------------------------------------------------
3147 case kXR_fattr:
3148 {
3149 ClientFattrRequest *sreq = (ClientFattrRequest *)msg;
3150 int nattr = sreq->numattr;
3151 int options = sreq->options;
3152 o << "kXR_fattr";
3153 switch (sreq->subcode) {
3154 case kXR_fattrGet:
3155 o << "Get";
3156 break;
3157 case kXR_fattrSet:
3158 o << "Set";
3159 break;
3160 case kXR_fattrList:
3161 o << "List";
3162 break;
3163 case kXR_fattrDel:
3164 o << "Delete";
3165 break;
3166 default:
3167 o << " unknown subcode: " << sreq->subcode;
3168 break;
3169 }
3170 o << " (handle: " << FileHandleToStr( sreq->fhandle );
3171 o << std::setbase(10);
3172 if (nattr)
3173 o << ", numattr: " << nattr;
3174 if (options) {
3175 o << ", options: ";
3176 if (options & 0x01)
3177 o << "new";
3178 if (options & 0x10)
3179 o << "list values";
3180 }
3181 o << ", total size: " << req->dlen << ")";
3182 break;
3183 }
3184
3185 //------------------------------------------------------------------------
3186 // kXR_sync
3187 //------------------------------------------------------------------------
3188 case kXR_sync:
3189 {
3190 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3191 o << "kXR_sync (";
3192 o << "handle: " << FileHandleToStr( sreq->fhandle );
3193 o << ")";
3194 break;
3195 }
3196
3197 //------------------------------------------------------------------------
3198 // kXR_truncate
3199 //------------------------------------------------------------------------
3200 case kXR_truncate:
3201 {
3202 ClientTruncateRequest *sreq = (ClientTruncateRequest *)msg;
3203 o << "kXR_truncate (";
3204 if( !sreq->dlen )
3205 o << "handle: " << FileHandleToStr( sreq->fhandle );
3206 else
3207 {
3208 char *fn = GetDataAsString( msg );
3209 o << "file: " << fn;
3210 delete [] fn;
3211 }
3212 o << std::setbase(10);
3213 o << ", ";
3214 o << "offset: " << sreq->offset;
3215 o << ")";
3216 break;
3217 }
3218
3219 //------------------------------------------------------------------------
3220 // kXR_readv
3221 //------------------------------------------------------------------------
3222 case kXR_readv:
3223 {
3224 unsigned char *fhandle = 0;
3225 o << "kXR_readv (";
3226
3227 o << "handle: ";
3228 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3229 fhandle = dataChunk[0].fhandle;
3230 if( fhandle )
3231 o << FileHandleToStr( fhandle );
3232 else
3233 o << "unknown";
3234 o << ", ";
3235 o << std::setbase(10);
3236 o << "chunks: [";
3237 uint64_t size = 0;
3238 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3239 {
3240 size += dataChunk[i].rlen;
3241 o << "(offset: " << dataChunk[i].offset;
3242 o << ", size: " << dataChunk[i].rlen << "); ";
3243 }
3244 o << "], ";
3245 o << "total size: " << size << ")";
3246 break;
3247 }
3248
3249 //------------------------------------------------------------------------
3250 // kXR_writev
3251 //------------------------------------------------------------------------
3252 case kXR_writev:
3253 {
3254 unsigned char *fhandle = 0;
3255 o << "kXR_writev (";
3256
3257 XrdProto::write_list *wrtList =
3258 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3259 uint64_t size = 0;
3260 uint32_t numChunks = 0;
3261 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3262 {
3263 fhandle = wrtList[i].fhandle;
3264 size += wrtList[i].wlen;
3265 ++numChunks;
3266 }
3267 o << "handle: ";
3268 if( fhandle )
3269 o << FileHandleToStr( fhandle );
3270 else
3271 o << "unknown";
3272 o << ", ";
3273 o << std::setbase(10);
3274 o << "chunks: " << numChunks << ", ";
3275 o << "total size: " << size << ")";
3276 break;
3277 }
3278
3279 //------------------------------------------------------------------------
3280 // kXR_locate
3281 //------------------------------------------------------------------------
3282 case kXR_locate:
3283 {
3284 ClientLocateRequest *sreq = (ClientLocateRequest *)msg;
3285 char *fn = GetDataAsString( msg );;
3286 o << "kXR_locate (";
3287 o << "path: " << fn << ", ";
3288 delete [] fn;
3289 o << "flags: ";
3290 if( sreq->options == 0 )
3291 o << "none";
3292 else
3293 {
3294 if( sreq->options & kXR_refresh )
3295 o << "kXR_refresh ";
3296 if( sreq->options & kXR_prefname )
3297 o << "kXR_prefname ";
3298 if( sreq->options & kXR_nowait )
3299 o << "kXR_nowait ";
3300 if( sreq->options & kXR_force )
3301 o << "kXR_force ";
3302 if( sreq->options & kXR_compress )
3303 o << "kXR_compress ";
3304 }
3305 o << ")";
3306 break;
3307 }
3308
3309 //------------------------------------------------------------------------
3310 // kXR_mv
3311 //------------------------------------------------------------------------
3312 case kXR_mv:
3313 {
3314 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3315 o << "kXR_mv (";
3316 o << "source: ";
3317 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3318 o << ", ";
3319 o << "destination: ";
3320 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3321 o << ")";
3322 break;
3323 }
3324
3325 //------------------------------------------------------------------------
3326 // kXR_query
3327 //------------------------------------------------------------------------
3328 case kXR_query:
3329 {
3330 ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3331 o << "kXR_query (";
3332 o << "code: ";
3333 switch( sreq->infotype )
3334 {
3335 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3336 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3337 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3338 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3339 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3340 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3341 case kXR_QPrep: o << "kXR_QPrep"; break;
3342 case kXR_Qspace: o << "kXR_Qspace"; break;
3343 case kXR_QStats: o << "kXR_QStats"; break;
3344 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3345 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3346 default: o << sreq->infotype; break;
3347 }
3348 o << ", ";
3349
3350 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3351 {
3352 o << "handle: " << FileHandleToStr( sreq->fhandle );
3353 o << ", ";
3354 }
3355
3356 o << "arg length: " << sreq->dlen << ")";
3357 break;
3358 }
3359
3360 //------------------------------------------------------------------------
3361 // kXR_rm
3362 //------------------------------------------------------------------------
3363 case kXR_rm:
3364 {
3365 o << "kXR_rm (";
3366 char *fn = GetDataAsString( msg );;
3367 o << "path: " << fn << ")";
3368 delete [] fn;
3369 break;
3370 }
3371
3372 //------------------------------------------------------------------------
3373 // kXR_mkdir
3374 //------------------------------------------------------------------------
3375 case kXR_mkdir:
3376 {
3377 ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3378 o << "kXR_mkdir (";
3379 char *fn = GetDataAsString( msg );
3380 o << "path: " << fn << ", ";
3381 delete [] fn;
3382 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3383 o << std::setbase(10);
3384 o << "flags: ";
3385 if( sreq->options[0] == 0 )
3386 o << "none";
3387 else
3388 {
3389 if( sreq->options[0] & kXR_mkdirpath )
3390 o << "kXR_mkdirpath";
3391 }
3392 o << ")";
3393 break;
3394 }
3395
3396 //------------------------------------------------------------------------
3397 // kXR_rmdir
3398 //------------------------------------------------------------------------
3399 case kXR_rmdir:
3400 {
3401 o << "kXR_rmdir (";
3402 char *fn = GetDataAsString( msg );
3403 o << "path: " << fn << ")";
3404 delete [] fn;
3405 break;
3406 }
3407
3408 //------------------------------------------------------------------------
3409 // kXR_chmod
3410 //------------------------------------------------------------------------
3411 case kXR_chmod:
3412 {
3413 ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3414 o << "kXR_chmod (";
3415 char *fn = GetDataAsString( msg );
3416 o << "path: " << fn << ", ";
3417 delete [] fn;
3418 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3419 break;
3420 }
3421
3422 //------------------------------------------------------------------------
3423 // kXR_ping
3424 //------------------------------------------------------------------------
3425 case kXR_ping:
3426 {
3427 o << "kXR_ping ()";
3428 break;
3429 }
3430
3431 //------------------------------------------------------------------------
3432 // kXR_protocol
3433 //------------------------------------------------------------------------
3434 case kXR_protocol:
3435 {
3436 ClientProtocolRequest *sreq = (ClientProtocolRequest *)msg;
3437 o << "kXR_protocol (";
3438 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3439 break;
3440 }
3441
3442 //------------------------------------------------------------------------
3443 // kXR_dirlist
3444 //------------------------------------------------------------------------
3445 case kXR_dirlist:
3446 {
3447 o << "kXR_dirlist (";
3448 char *fn = GetDataAsString( msg );;
3449 o << "path: " << fn << ")";
3450 delete [] fn;
3451 break;
3452 }
3453
3454 //------------------------------------------------------------------------
3455 // kXR_set
3456 //------------------------------------------------------------------------
3457 case kXR_set:
3458 {
3459 o << "kXR_set (";
3460 char *fn = GetDataAsString( msg );;
3461 o << "data: " << fn << ")";
3462 delete [] fn;
3463 break;
3464 }
3465
3466 //------------------------------------------------------------------------
3467 // kXR_prepare
3468 //------------------------------------------------------------------------
3469 case kXR_prepare:
3470 {
3471 ClientPrepareRequest *sreq = (ClientPrepareRequest *)msg;
3472 o << "kXR_prepare (";
3473 o << "flags: ";
3474
3475 if( sreq->options == 0 )
3476 o << "none";
3477 else
3478 {
3479 if( sreq->options & kXR_stage )
3480 o << "kXR_stage ";
3481 if( sreq->options & kXR_wmode )
3482 o << "kXR_wmode ";
3483 if( sreq->options & kXR_coloc )
3484 o << "kXR_coloc ";
3485 if( sreq->options & kXR_fresh )
3486 o << "kXR_fresh ";
3487 }
3488
3489 o << ", priority: " << (int) sreq->prty << ", ";
3490
3491 char *fn = GetDataAsString( msg );
3492 char *cursor;
3493 for( cursor = fn; *cursor; ++cursor )
3494 if( *cursor == '\n' ) *cursor = ' ';
3495
3496 o << "paths: " << fn << ")";
3497 delete [] fn;
3498 break;
3499 }
3500
3501 case kXR_chkpoint:
3502 {
3503 ClientChkPointRequest *sreq = (ClientChkPointRequest*)msg;
3504 o << "kXR_chkpoint (";
3505 o << "opcode: ";
3506 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3507 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3508 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3509 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3510 else if( sreq->opcode == kXR_ckpXeq )
3511 {
3512 o << "kXR_ckpXeq) ";
3513 // In this case our request body will be one of kXR_pgwrite,
3514 // kXR_truncate, kXR_write, or kXR_writev request.
3515 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3516 }
3517
3518 break;
3519 }
3520
3521 //------------------------------------------------------------------------
3522 // Default
3523 //------------------------------------------------------------------------
3524 default:
3525 {
3526 o << "kXR_unknown (length: " << req->dlen << ")";
3527 break;
3528 }
3529 };
3530 }
static const int kXR_ckpRollback
Definition XProtocol.hh:215
kXR_int16 arg1len
Definition XProtocol.hh:430
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:782
kXR_char fhandle[4]
Definition XProtocol.hh:807
kXR_char fhandle[4]
Definition XProtocol.hh:771
kXR_int32 dlen
Definition XProtocol.hh:431
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_open_wrto
Definition XProtocol.hh:469
@ kXR_compress
Definition XProtocol.hh:452
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_prefname
Definition XProtocol.hh:461
@ kXR_nowait
Definition XProtocol.hh:467
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_mkpath
Definition XProtocol.hh:460
@ kXR_seqio
Definition XProtocol.hh:468
@ kXR_replica
Definition XProtocol.hh:465
@ kXR_posc
Definition XProtocol.hh:466
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_force
Definition XProtocol.hh:454
@ kXR_4dirlist
Definition XProtocol.hh:464
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
kXR_char fhandle[4]
Definition XProtocol.hh:509
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:659
kXR_char fhandle[4]
Definition XProtocol.hh:229
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_char options[1]
Definition XProtocol.hh:416
static const int kXR_ckpCommit
Definition XProtocol.hh:213
kXR_int64 offset
Definition XProtocol.hh:661
@ kXR_vfs
Definition XProtocol.hh:763
@ kXR_mkdirpath
Definition XProtocol.hh:410
@ kXR_wmode
Definition XProtocol.hh:591
@ kXR_fresh
Definition XProtocol.hh:593
@ kXR_coloc
Definition XProtocol.hh:592
@ kXR_stage
Definition XProtocol.hh:590
static const int kXR_ckpQuery
Definition XProtocol.hh:214
@ kXR_QPrep
Definition XProtocol.hh:616
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qconfig
Definition XProtocol.hh:621
@ kXR_Qopaquf
Definition XProtocol.hh:624
@ kXR_Qckscan
Definition XProtocol.hh:620
@ kXR_Qxattr
Definition XProtocol.hh:618
@ kXR_Qspace
Definition XProtocol.hh:619
@ kXR_Qvisa
Definition XProtocol.hh:622
@ kXR_QStats
Definition XProtocol.hh:615
@ kXR_Qcksum
Definition XProtocol.hh:617
@ kXR_Qopaque
Definition XProtocol.hh:623
static const int kXR_ckpBegin
Definition XProtocol.hh:212
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition XrdConfig.cc:113
kXR_char fhandle[4]
Definition XProtocol.hh:832
kXR_char fhandle[4]
Definition XProtocol.hh:288

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientRequestHdr::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdCl::Log::ErrorMsg, ClientCloseRequest::fhandle, ClientFattrRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, readahead_list::fhandle, XrdProto::write_list::fhandle, GenerateDescription(), XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_4dirlist, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_fattr, kXR_fattrDel, kXR_fattrGet, kXR_fattrList, kXR_fattrSet, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_open_wrto, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientFattrRequest::numattr, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, readahead_list::offset, ClientChkPointRequest::opcode, ClientFattrRequest::options, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientFattrRequest::subcode, and XrdProto::write_list::wlen.

Referenced by GenerateDescription(), and SetDescription().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL & url,
AnyObject & channelData )
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1911 of file XrdClXRootDTransport.cc.

1913 {
1914 XRootDChannelInfo *info = 0;
1915 channelData.Get( info );
1916
1917 if(!info || !info->bindSelector)
1918 return url;
1919
1920 return URL( info->bindSelector->Get() );
1921 }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message & message,
Socket * socket )
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 347 of file XrdClXRootDTransport.cc.

348 {
349 //--------------------------------------------------------------------------
350 // Retrieve the body
351 //--------------------------------------------------------------------------
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
354 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
355 bodySize = rsphdr->dlen;
356
357 if( message.GetSize() < bodySize + 8 )
358 message.ReAllocate( bodySize + 8 );
359
360 leftToBeRead = bodySize-(message.GetCursor()-8);
361 while( leftToBeRead )
362 {
363 int bytesRead = 0;
364 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365
366 if( !status.IsOK() || status.code == suRetry )
367 return status;
368
369 leftToBeRead -= bytesRead;
370 message.AdvanceCursor( bytesRead );
371 }
372
373 return XRootDStatus( stOK, suDone );
374 }
const uint16_t suRetry
const uint16_t stOK
Everything went OK.
const uint16_t suDone

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message & message,
Socket * socket )
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 307 of file XrdClXRootDTransport.cc.

308 {
309 //--------------------------------------------------------------------------
310 // A new message - allocate the space needed for the header
311 //--------------------------------------------------------------------------
312 if( message.GetCursor() == 0 && message.GetSize() < 8 )
313 message.Allocate( 8 );
314
315 //--------------------------------------------------------------------------
316 // Read the message header
317 //--------------------------------------------------------------------------
318 if( message.GetCursor() < 8 )
319 {
320 size_t leftToBeRead = 8 - message.GetCursor();
321 while( leftToBeRead )
322 {
323 int bytesRead = 0;
324 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325 leftToBeRead, bytesRead );
326 if( !status.IsOK() || status.code == suRetry )
327 return status;
328
329 leftToBeRead -= bytesRead;
330 message.AdvanceCursor( bytesRead );
331 }
332 UnMarshallHeader( message );
333
334 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335 Log *log = DefaultEnv::GetLog();
336 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337 "body", &message, bodySize );
338
339 return XRootDStatus( stOK, suDone );
340 }
341 return XRootDStatus( stError, errInternal );
342 }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message & message,
Socket * socket )
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 379 of file XrdClXRootDTransport.cc.

380 {
381 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
382 if( rsphdr->status != kXR_status )
383 return XRootDStatus( stError, errInvalidOp );
384
385 //--------------------------------------------------------------------------
386 // In case of non kXR_status responses we read all the response, including
387 // data. For kXR_status responses we first read only the remainder of the
388 // header. The header must then be unmarshalled, and then a second call to
389 // GetMore (repeated for suRetry as needed) will read the data.
390 //--------------------------------------------------------------------------
391
392 uint32_t bodySize = rsphdr->dlen;
393 if( bodySize+8 < sizeof( ServerResponseStatus ) )
394 return XRootDStatus( stError, errInvalidMessage, 0,
395 "kXR_status: invalid message size." );
396
397 ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
398 bodySize += rspst->bdy.dlen;
399
400 if( message.GetSize() < bodySize + 8 )
401 message.ReAllocate( bodySize + 8 );
402
403 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404 while( leftToBeRead )
405 {
406 int bytesRead = 0;
407 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408
409 if( !status.IsOK() || status.code == suRetry )
410 return status;
411
412 leftToBeRead -= bytesRead;
413 message.AdvanceCursor( bytesRead );
414 }
415
416 // Unmarchal to message body
417 Log *log = DefaultEnv::GetLog();
418 XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
419 if( !st.IsOK() && st.code == errDataError )
420 {
421 log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422 st.GetErrorMessage().c_str() );
423 return st;
424 }
425
426 if( !st.IsOK() )
427 {
428 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429 &message );
430 return st;
431 }
432
433 return XRootDStatus( stOK, suDone );
434 }
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidOp
const uint16_t errInvalidMessage

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
AnyObject & channelData )
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1772 of file XrdClXRootDTransport.cc.

1773 {
1774 XRootDChannelInfo *info = 0;
1775 channelData.Get( info );
1776 return GetSignature( toSign, sign, info );
1777 }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get(), and GetSignature().

Referenced by GetSignature().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
XRootDChannelInfo * info )
virtual

Get signature for given message.

Definition at line 1782 of file XrdClXRootDTransport.cc.

1785 {
1786 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1787 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1788
1789 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1790 if( !info ) return Status( stError, errInternal );
1791 if( info->protection )
1792 {
1793 SecurityRequest *newreq = 0;
1794 // check if we have to secure the request in the first place
1795 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1796 // secure (sign/encrypt) the request
1797 int rc = info->protection->Secure( newreq, *thereq, 0 );
1798 // there was an error
1799 if( rc < 0 )
1800 return Status( stError, errInternal, -rc );
1801
1802 sign = new Message();
1803 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1804 }
1805
1806 return Status();
1807 }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), and XrdCl::stError.

Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 467 of file XrdClXRootDTransport.cc.

469 {
470 XRootDChannelInfo *info = 0;
471 channelData.Get( info );
472
473 if (!info)
474 return XRootDStatus(stFatal, errInternal);
475
476 XrdSysMutexHelper scopedLock( info->mutex );
477
478 if( info->stream.size() <= handShakeData->subStreamId )
479 {
480 Log *log = DefaultEnv::GetLog();
481 log->Error( XRootDTransportMsg,
482 "[%s] Internal error: not enough substreams",
483 handShakeData->streamName.c_str() );
484 return XRootDStatus( stFatal, errInternal );
485 }
486
487 if( handShakeData->subStreamId == 0 )
488 {
489 info->streamName = handShakeData->streamName;
490 return HandShakeMain( handShakeData, channelData );
491 }
492 return HandShakeParallel( handShakeData, channelData );
493 }
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

Implements XrdCl::TransportHandler.

Definition at line 746 of file XrdClXRootDTransport.cc.

748 {
749 XRootDChannelInfo *info = 0;
750 channelData.Get( info );
751
752 if (!info) {
754 "[%s] Internal error: no channel info",
755 handShakeData->streamName.c_str());
756 return false;
757 }
758
759 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
760 return ( sInfo.status == XRootDStreamInfo::Connected );
761 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL & url,
AnyObject & channelData )
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 439 of file XrdClXRootDTransport.cc.

441 {
442 XRootDChannelInfo *info = new XRootDChannelInfo( url );
443 XrdSysMutexHelper scopedLock( info->mutex );
444 channelData.Set( info );
445
446 Env *env = DefaultEnv::GetEnv();
447 int streams = DefaultSubStreamsPerChannel;
448 env->GetInt( "SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->stream.resize( streams );
451 info->strmSelector.reset( new StreamSelector( streams ) );
452 info->encrypted = url.IsSecure();
453 info->istpc = url.IsTPC();
454 info->logintoken = url.GetLoginToken();
455 }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t inactiveTime,
AnyObject & channelData )
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 819 of file XrdClXRootDTransport.cc.

821 {
822 XRootDChannelInfo *info = 0;
823 channelData.Get( info );
824 Env *env = DefaultEnv::GetEnv();
825 Log *log = DefaultEnv::GetLog();
826
827 if (!info) {
828 log->Error(XRootDTransportMsg,
829 "Internal error: no channel info, behaving as if stream is broken");
830 return true;
831 }
832
833 int streamTimeout = DefaultStreamTimeout;
834 env->GetInt( "StreamTimeout", streamTimeout );
835
836 XrdSysMutexHelper scopedLock( info->mutex );
837
838 const time_t now = time(0);
839 const bool anySID =
840 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
841
842 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
843 "stream timeout: %d, any SID: %d, wait barrier: %s",
844 info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
845 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
846
847 if( inactiveTime < streamTimeout )
848 return Status();
849
850 if( now < info->waitBarrier )
851 return Status();
852
853 if( !anySID )
854 return Status();
855
856 return Status( stError, errSocketTimeout );
857 }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
const uint16_t errSocketTimeout
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t time,
AnyObject & channelData )
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 766 of file XrdClXRootDTransport.cc.

768 {
769 XRootDChannelInfo *info = 0;
770 channelData.Get( info );
771
772 Env *env = DefaultEnv::GetEnv();
773 Log *log = DefaultEnv::GetLog();
774
775 if (!info) {
776 log->Error(XRootDTransportMsg,
777 "Internal error: no channel info, behaving as if TTL has elapsed");
778 return true;
779 }
780
781 //--------------------------------------------------------------------------
782 // Check the TTL settings for the current server
783 //--------------------------------------------------------------------------
784 int ttl;
785 if( info->serverFlags & kXR_isServer )
786 {
788 env->GetInt( "DataServerTTL", ttl );
789 }
790 else
791 {
793 env->GetInt( "LoadBalancerTTL", ttl );
794 }
795
796 //--------------------------------------------------------------------------
797 // See whether we can give a go-ahead for the disconnection
798 //--------------------------------------------------------------------------
799 XrdSysMutexHelper scopedLock( info->mutex );
800 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
801 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
802 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
803 info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
804 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
805
806 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
807 return false;
808
809 if( !allocatedSIDs && inactiveTime > ttl )
810 return true;
811
812 return false;
813 }
#define kXR_isServer
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message & msg)
static

Log server error response.

Definition at line 1507 of file XrdClXRootDTransport.cc.

1508 {
1509 Log *log = DefaultEnv::GetLog();
1510 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1511 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1512 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1513 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1514 rsp->body.error.errnum, errmsg );
1515 delete [] errmsg;
1516 }
union ServerResponse::@040373375333017131300127053271011057331004327334 body
ServerResponseHeader hdr

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char * msg)
static

Marshal the outgoing message.

Definition at line 1103 of file XrdClXRootDTransport.cc.

1104 {
1105 ClientRequest *req = (ClientRequest*)msg;
1106 switch( req->header.requestid )
1107 {
1108 //------------------------------------------------------------------------
1109 // kXR_protocol
1110 //------------------------------------------------------------------------
1111 case kXR_protocol:
1112 req->protocol.clientpv = htonl( req->protocol.clientpv );
1113 break;
1114
1115 //------------------------------------------------------------------------
1116 // kXR_login
1117 //------------------------------------------------------------------------
1118 case kXR_login:
1119 req->login.pid = htonl( req->login.pid );
1120 break;
1121
1122 //------------------------------------------------------------------------
1123 // kXR_locate
1124 //------------------------------------------------------------------------
1125 case kXR_locate:
1126 req->locate.options = htons( req->locate.options );
1127 break;
1128
1129 //------------------------------------------------------------------------
1130 // kXR_query
1131 //------------------------------------------------------------------------
1132 case kXR_query:
1133 req->query.infotype = htons( req->query.infotype );
1134 break;
1135
1136 //------------------------------------------------------------------------
1137 // kXR_truncate
1138 //------------------------------------------------------------------------
1139 case kXR_truncate:
1140 req->truncate.offset = htonll( req->truncate.offset );
1141 break;
1142
1143 //------------------------------------------------------------------------
1144 // kXR_mkdir
1145 //------------------------------------------------------------------------
1146 case kXR_mkdir:
1147 req->mkdir.mode = htons( req->mkdir.mode );
1148 break;
1149
1150 //------------------------------------------------------------------------
1151 // kXR_chmod
1152 //------------------------------------------------------------------------
1153 case kXR_chmod:
1154 req->chmod.mode = htons( req->chmod.mode );
1155 break;
1156
1157 //------------------------------------------------------------------------
1158 // kXR_open
1159 //------------------------------------------------------------------------
1160 case kXR_open:
1161 req->open.mode = htons( req->open.mode );
1162 req->open.options = htons( req->open.options );
1163 break;
1164
1165 //------------------------------------------------------------------------
1166 // kXR_read
1167 //------------------------------------------------------------------------
1168 case kXR_read:
1169 req->read.offset = htonll( req->read.offset );
1170 req->read.rlen = htonl( req->read.rlen );
1171 break;
1172
1173 //------------------------------------------------------------------------
1174 // kXR_write
1175 //------------------------------------------------------------------------
1176 case kXR_write:
1177 req->write.offset = htonll( req->write.offset );
1178 break;
1179
1180 //------------------------------------------------------------------------
1181 // kXR_mv
1182 //------------------------------------------------------------------------
1183 case kXR_mv:
1184 req->mv.arg1len = htons( req->mv.arg1len );
1185 break;
1186
1187 //------------------------------------------------------------------------
1188 // kXR_readv
1189 //------------------------------------------------------------------------
1190 case kXR_readv:
1191 {
1192 uint16_t numChunks = (req->readv.dlen)/16;
1193 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1194 for( size_t i = 0; i < numChunks; ++i )
1195 {
1196 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1197 dataChunk[i].offset = htonll( dataChunk[i].offset );
1198 }
1199 break;
1200 }
1201
1202 //------------------------------------------------------------------------
1203 // kXR_writev
1204 //------------------------------------------------------------------------
1205 case kXR_writev:
1206 {
1207 uint16_t numChunks = (req->writev.dlen)/16;
1208 XrdProto::write_list *wrtList =
1209 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1210 for( size_t i = 0; i < numChunks; ++i )
1211 {
1212 wrtList[i].wlen = htonl( wrtList[i].wlen );
1213 wrtList[i].offset = htonll( wrtList[i].offset );
1214 }
1215
1216 break;
1217 }
1218
1219 case kXR_pgread:
1220 {
1221 req->pgread.offset = htonll( req->pgread.offset );
1222 req->pgread.rlen = htonl( req->pgread.rlen );
1223 break;
1224 }
1225
1226 case kXR_pgwrite:
1227 {
1228 req->pgwrite.offset = htonll( req->pgwrite.offset );
1229 break;
1230 }
1231
1232 //------------------------------------------------------------------------
1233 // kXR_prepare
1234 //------------------------------------------------------------------------
1235 case kXR_prepare:
1236 {
1237 req->prepare.optionX = htons( req->prepare.optionX );
1238 req->prepare.port = htons( req->prepare.port );
1239 break;
1240 }
1241
1242 case kXR_chkpoint:
1243 {
1244 if( req->chkpoint.opcode == kXR_ckpXeq )
1245 MarshallRequest( msg + 24 );
1246 break;
1247 }
1248 };
1249
1250 req->header.requestid = htons( req->header.requestid );
1251 req->header.dlen = htonl( req->header.dlen );
1252 return XRootDStatus();
1253 }
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:858
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
struct ClientReadVRequest readv
Definition XProtocol.hh:868
struct ClientOpenRequest open
Definition XProtocol.hh:860
struct ClientRequestHdr header
Definition XProtocol.hh:846
struct ClientWriteVRequest writev
Definition XProtocol.hh:877
struct ClientLoginRequest login
Definition XProtocol.hh:857
@ kXR_login
Definition XProtocol.hh:119
struct ClientChmodRequest chmod
Definition XProtocol.hh:850
struct ClientQueryRequest query
Definition XProtocol.hh:866
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientMvRequest mv
Definition XProtocol.hh:859
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ClientPrepareRequest prepare
Definition XProtocol.hh:864
struct ClientWriteRequest write
Definition XProtocol.hh:876
struct ClientProtocolRequest protocol
Definition XProtocol.hh:865
struct ClientLocateRequest locate
Definition XProtocol.hh:856
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientReadVRequest::dlen, ClientRequestHdr::dlen, ClientWriteVRequest::dlen, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, readahead_list::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

Here is the call graph for this function:

◆ MarshallRequest() [2/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message * msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176 {
177 MarshallRequest( msg->GetBuffer() );
178 msg->SetIsMarshalled( true );
179 return XRootDStatus();
180 }

References XrdCl::Buffer::GetBuffer(), MarshallRequest(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message & msg,
uint16_t subStream,
AnyObject & channelData )
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1630 of file XrdClXRootDTransport.cc.

1633 {
1634 XRootDChannelInfo *info = 0;
1635 channelData.Get( info );
1636 XrdSysMutexHelper scopedLock( info->mutex );
1637 Log *log = DefaultEnv::GetLog();
1638
1639 //--------------------------------------------------------------------------
1640 // Update the substream queues
1641 //--------------------------------------------------------------------------
1642 info->strmSelector->MsgReceived( subStream );
1643
1644 //--------------------------------------------------------------------------
1645 // Check whether this message is a response to a request that has
1646 // timed out, and if so, drop it
1647 //--------------------------------------------------------------------------
1648 ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1649 if( rsp->hdr.status == kXR_attn )
1650 {
1651 return NoAction;
1652 }
1653
1654 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1655 {
1656 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1657 "response that we're no longer interested in (timed out)",
1658 &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1659 //------------------------------------------------------------------------
1660 // If it is kXR_waitresp there will be another one,
1661 // so we don't release the sid yet
1662 //------------------------------------------------------------------------
1663 if( rsp->hdr.status != kXR_waitresp )
1664 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1665 //------------------------------------------------------------------------
1666 // If it is a successful response to an open request
1667 // that timed out, we need to send a close
1668 //------------------------------------------------------------------------
1669 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1670 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1671 if( sidIt != info->sentOpens.end() )
1672 {
1673 info->sentOpens.erase( sidIt );
1674 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1675 }
1676 return DigestMsg;
1677 }
1678
1679 //--------------------------------------------------------------------------
1680 // If we have a wait or waitresp
1681 //--------------------------------------------------------------------------
1682 uint32_t seconds = 0;
1683 if( rsp->hdr.status == kXR_wait )
1684 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1685 // to re-send the request
1686 else if( rsp->hdr.status == kXR_waitresp )
1687 {
1688 seconds = ntohl( rsp->body.waitresp.seconds );
1689
1690 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1691 "setting up wait barrier.",
1692 info->streamName.c_str(),
1693 seconds );
1694 }
1695
1696 time_t barrier = time(0) + seconds;
1697 if( info->waitBarrier < barrier )
1698 info->waitBarrier = barrier;
1699
1700 //--------------------------------------------------------------------------
1701 // If we got a response to an open request, we may need to bump the counter
1702 // of open files
1703 //--------------------------------------------------------------------------
1704 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1705 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1706 if( sidIt != info->sentOpens.end() )
1707 {
1708 if( rsp->hdr.status == kXR_waitresp )
1709 return NoAction;
1710 info->sentOpens.erase( sidIt );
1711 if( rsp->hdr.status == kXR_ok )
1712 {
1713 ++info->openFiles;
1714 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1715 }
1716 return NoAction;
1717 }
1718
1719 //--------------------------------------------------------------------------
1720 // If we got a response to a close, we may need to decrement the counter of
1721 // open files
1722 //--------------------------------------------------------------------------
1723 sidIt = info->sentCloses.find( sid );
1724 if( sidIt != info->sentCloses.end() )
1725 {
1726 if( rsp->hdr.status == kXR_waitresp )
1727 return NoAction;
1728 info->sentCloses.erase( sidIt );
1729 --info->openFiles;
1730 return NoAction;
1731 }
1732 return NoAction;
1733 }
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message * msg,
uint16_t subStream,
uint32_t bytesSent,
AnyObject & channelData )
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1738 of file XrdClXRootDTransport.cc.

1742 {
1743 // Called when a message has been sent. For messages that return on a
1744 // different pathid (and hence may use a different poller) it is possible
1745 // that the server has already replied and the reply will trigger
1746 // MessageReceived() before this method has been called. However for open
1747 // and close this is never the case and this method is used for tracking
1748 // only those.
1749 XRootDChannelInfo *info = 0;
1750 channelData.Get( info );
1751 XrdSysMutexHelper scopedLock( info->mutex );
1752 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1753 uint16_t reqid = ntohs( req->header.requestid );
1754
1755
1756 //--------------------------------------------------------------------------
1757 // We need to track opens to know if we can close streams due to idleness
1758 //--------------------------------------------------------------------------
1759 uint16_t sid;
1760 memcpy( &sid, req->header.streamid, 2 );
1761
1762 if( reqid == kXR_open )
1763 info->sentOpens.insert( sid );
1764 else if( reqid == kXR_close )
1765 info->sentCloses.insert( sid );
1766 }
kXR_char streamid[2]
Definition XProtocol.hh:156

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 862 of file XrdClXRootDTransport.cc.

863 {
864 return PathID( 0, 0 );
865 }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 870 of file XrdClXRootDTransport.cc.

873 {
874 XRootDChannelInfo *info = 0;
875 channelData.Get( info );
876
877 if (!info) {
879 "Internal error: no channel info, cannot multiplex");
880 return PathID(0,0);
881 }
882
883 XrdSysMutexHelper scopedLock( info->mutex );
884
885 //--------------------------------------------------------------------------
886 // If we're not connected to a data server or we don't know that yet
887 // we stream through 0
888 //--------------------------------------------------------------------------
889 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
890 return PathID( 0, 0 );
891
892 //--------------------------------------------------------------------------
893 // Select the streams
894 //--------------------------------------------------------------------------
895 Log *log = DefaultEnv::GetLog();
896 uint16_t upStream = 0;
897 uint16_t downStream = 0;
898
899 if( hint )
900 {
901 upStream = hint->up;
902 downStream = hint->down;
903 }
904 else
905 {
906 upStream = 0;
907 std::vector<bool> connected;
908 connected.reserve( info->stream.size() - 1 );
909 size_t nbConnected = 0;
910 for( size_t i = 1; i < info->stream.size(); ++i )
911 if( info->stream[i].status == XRootDStreamInfo::Connected )
912 {
913 connected.push_back( true );
914 ++nbConnected;
915 }
916 else
917 connected.push_back( false );
918
919 if( nbConnected == 0 )
920 downStream = 0;
921 else
922 downStream = info->strmSelector->Select( connected );
923 }
924
925 if( upStream >= info->stream.size() )
926 {
927 log->Debug( XRootDTransportMsg,
928 "[%s] Up link stream %d does not exist, using 0",
929 info->streamName.c_str(), upStream );
930 upStream = 0;
931 }
932
933 if( downStream >= info->stream.size() )
934 {
935 log->Debug( XRootDTransportMsg,
936 "[%s] Down link stream %d does not exist, using 0",
937 info->streamName.c_str(), downStream );
938 downStream = 0;
939 }
940
941 //--------------------------------------------------------------------------
942 // Modify the message
943 //--------------------------------------------------------------------------
944 UnMarshallRequest( msg );
945 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
946 switch( hdr->requestid )
947 {
948 //------------------------------------------------------------------------
949 // Read - we update the path id to tell the server where we want to
950 // get the response, but we still send the request through stream 0
951 // We need to allocate space for read_args if we don't have it
952 // included yet
953 //------------------------------------------------------------------------
954 case kXR_read:
955 {
956 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
957 {
958 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
959 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
960 memset( newBuf, 0, 8 );
961 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
962 req->dlen += 8;
963 }
964 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
965 args->pathid = info->stream[downStream].pathId;
966 break;
967 }
968
969
970 //------------------------------------------------------------------------
971 // PgRead - we update the path id to tell the server where we want to
972 // get the response, but we still send the request through stream 0
973 // We need to allocate space for ClientPgReadReqArgs if we don't have it
974 // included yet
975 //------------------------------------------------------------------------
976 case kXR_pgread:
977 {
978 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
979 {
980 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
981 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
982 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
983 ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
984 req->dlen += sizeof( ClientPgReadReqArgs );
985 }
986 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
987 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
988 args->pathid = info->stream[downStream].pathId;
989 break;
990 }
991
992 //------------------------------------------------------------------------
993 // ReadV - the situation is identical to read but we don't need any
994 // additional structures to specify the return path
995 //------------------------------------------------------------------------
996 case kXR_readv:
997 {
998 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
999 req->pathid = info->stream[downStream].pathId;
1000 break;
1001 }
1002
1003 //------------------------------------------------------------------------
1004 // Write - multiplexing writes doesn't work properly in the server
1005 //------------------------------------------------------------------------
1006 case kXR_write:
1007 {
1008// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
1009// req->pathid = info->stream[downStream].pathId;
1010 break;
1011 }
1012
1013 //------------------------------------------------------------------------
1014 // WriteV - multiplexing writes doesn't work properly in the server
1015 //------------------------------------------------------------------------
1016 case kXR_writev:
1017 {
1018// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1019// req->pathid = info->stream[downStream].pathId;
1020 break;
1021 }
1022
1023 //------------------------------------------------------------------------
1024 // PgWrite - multiplexing writes doesn't work properly in the server
1025 //------------------------------------------------------------------------
1026 case kXR_pgwrite:
1027 {
1028// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1029// req->pathid = info->stream[downStream].pathId;
1030 break;
1031 }
1032 };
1033 MarshallRequest( msg );
1034 return PathID( upStream, downStream );
1035 }
kXR_char pathid
Definition XProtocol.hh:653
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, ClientReadVRequest::pathid, read_args::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject & channelData)
static

Number of currently connected data streams.

Definition at line 1521 of file XrdClXRootDTransport.cc.

1522 {
1523 XRootDChannelInfo *info = 0;
1524 channelData.Get( info );
1525
1526 if (!info) {
1527 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1528 return 0;
1529 }
1530
1531 XrdSysMutexHelper scopedLock( info->mutex );
1532
1533 uint16_t nbConnected = 0;
1534 for( size_t i = 1; i < info->stream.size(); ++i )
1535 if( info->stream[i].status == XRootDStreamInfo::Connected )
1536 ++nbConnected;
1537
1538 return nbConnected;
1539 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDTransportMsg.

Referenced by XrdCl::Channel::NbConnectedStrm().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168 {
169 return true;
170 }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1832 of file XrdClXRootDTransport.cc.

1834 {
1835 XRootDChannelInfo *info = 0;
1836 channelData.Get( info );
1837
1838 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
1839 int notlsok = DefaultNoTlsOK;
1840 env->GetInt( "NoTlsOK", notlsok );
1841
1842
1843 if( notlsok )
1844 return info->encrypted;
1845
1846 // Did the server instructed us to switch to TLS right away?
1847 if( info->serverFlags & kXR_gotoTLS )
1848 {
1849 info->encrypted = true;
1850 return true ;
1851 }
1852
1853 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1854
1855 //--------------------------------------------------------------------------
1856 // The control stream (sub-stream 0) might need to switch to TLS before
1857 // login or after login
1858 //--------------------------------------------------------------------------
1859 if( handShakeData->subStreamId == 0 )
1860 {
1861 //------------------------------------------------------------------------
1862 // We are about to login and the server asked to start encrypting
1863 // before login
1864 //------------------------------------------------------------------------
1865 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1866 ( info->serverFlags & kXR_tlsLogin ) )
1867 {
1868 info->encrypted = true;
1869 return true;
1870 }
1871
1872 //--------------------------------------------------------------------
1873 // The hand-shake is done and the server requested to encrypt the session
1874 //--------------------------------------------------------------------
1875 if( (sInfo.status == XRootDStreamInfo::Connected ||
1876 //--------------------------------------------------------------------
1877 // we really need to turn on TLS before we sent kXR_endsess and we
1878 // are about to do so (1st enable encryption, then send kXR_endsess)
1879 //--------------------------------------------------------------------
1880 sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1881 ( info->serverFlags & kXR_tlsSess ) )
1882 {
1883 info->encrypted = true;
1884 return true;
1885 }
1886 }
1887 //--------------------------------------------------------------------------
1888 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1889 // bind.
1890 //--------------------------------------------------------------------------
1891 else
1892 {
1893 //------------------------------------------------------------------------
1894 // We are about to bind a data stream and the server asked to start
1895 // encrypting before bind
1896 //------------------------------------------------------------------------
1897 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1898 ( info->serverFlags & kXR_tlsData ) )
1899 {
1900 info->encrypted = true;
1901 return true;
1902 }
1903 }
1904
1905 return false;
1906 }
#define kXR_tlsLogin
#define kXR_gotoTLS
#define kXR_tlsSess
#define kXR_tlsData
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t query,
AnyObject & result,
AnyObject & channelData )
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1578 of file XrdClXRootDTransport.cc.

1581 {
1582 XRootDChannelInfo *info = 0;
1583 channelData.Get( info );
1584
1585 if (!info)
1586 return XRootDStatus(stFatal, errInternal);
1587
1588 XrdSysMutexHelper scopedLock( info->mutex );
1589
1590 switch( query )
1591 {
1592 //------------------------------------------------------------------------
1593 // Protocol name
1594 //------------------------------------------------------------------------
1596 result.Set( (const char*)"XRootD", false );
1597 return Status();
1598
1599 //------------------------------------------------------------------------
1600 // Authentication
1601 //------------------------------------------------------------------------
1603 result.Set( new std::string( info->authProtocolName ), false );
1604 return Status();
1605
1606 //------------------------------------------------------------------------
1607 // Server flags
1608 //------------------------------------------------------------------------
1610 result.Set( new int( info->serverFlags ), false );
1611 return Status();
1612
1613 //------------------------------------------------------------------------
1614 // Protocol version
1615 //------------------------------------------------------------------------
1617 result.Set( new int( info->protocolVersion ), false );
1618 return Status();
1619
1621 result.Set( new bool( info->encrypted ), false );
1622 return Status();
1623 };
1624 return Status( stError, errQueryNotSupported );
1625 }
const uint16_t errQueryNotSupported
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errInternal, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), XrdCl::stError, and XrdCl::stFatal.

Here is the call graph for this function:

◆ SetDescription()

void XrdCl::XRootDTransport::SetDescription ( Message * msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246 {
247 std::ostringstream o;
248 GenerateDescription( msg->GetBuffer(), o );
249 msg->SetDescription( o.str() );
250 }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::Open(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject & channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 1042 of file XrdClXRootDTransport.cc.

1043 {
1044 XRootDChannelInfo *info = 0;
1045 channelData.Get( info );
1046
1047 if (!info) {
1048 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1049 return 1;
1050 }
1051
1052 XrdSysMutexHelper scopedLock( info->mutex );
1053
1054 //--------------------------------------------------------------------------
1055 // If the connection has been opened in order to orchestrate a TPC or
1056 // the remote server is a Manager or Metamanager we will need only one
1057 // (control) stream.
1058 //--------------------------------------------------------------------------
1059 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1060
1061 //--------------------------------------------------------------------------
1062 // Number of streams requested by user
1063 //--------------------------------------------------------------------------
1064 uint16_t ret = info->stream.size();
1065
1066 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
1067 int nodata = DefaultTlsNoData;
1068 env->GetInt( "TlsNoData", nodata );
1069
1070 // Does the server require the stream 0 to be encrypted?
1071 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1072 ( info->serverFlags & kXR_tlsLogin ) ||
1073 ( info->serverFlags & kXR_tlsSess );
1074 // Does the server NOT require the data streams to be encrypted?
1075 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1076 // Does the user require the stream 0 to be encrypted?
1077 bool usrTlsStrm0 = info->encrypted;
1078 // Does the user NOT require the data streams to be encrypted?
1079 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1080
1081 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1082 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1083 {
1084 //------------------------------------------------------------------------
1085 // The server or user asked us to encrypt stream 0, but to send the data
1086 // (read/write) using a plain TCP connection
1087 //------------------------------------------------------------------------
1088 if( ret == 1 ) ++ret;
1089 }
1090
1091 if( ret > info->stream.size() )
1092 {
1093 info->stream.resize( ret );
1094 info->strmSelector->AdjustQueues( ret );
1095 }
1096
1097 return ret;
1098 }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::strmSelector, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message & msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1434 of file XrdClXRootDTransport.cc.

1435 {
1436 ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1437 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1438
1439 switch( reqType )
1440 {
1441 case kXR_pgwrite:
1442 {
1443 //--------------------------------------------------------------------------
1444 // If there's no additional data there's nothing to unmarshal
1445 //--------------------------------------------------------------------------
1446 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1447 //--------------------------------------------------------------------------
1448 // If there's not enough data to form correction-segment report an error
1449 //--------------------------------------------------------------------------
1450 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1451 return XRootDStatus( stError, errInvalidMessage, 0,
1452 "kXR_status: invalid message size." );
1453
1454 //--------------------------------------------------------------------------
1455 // Calculate the crc32c for the additional data
1456 //--------------------------------------------------------------------------
1457 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) );
1458 cse->cseCRC = ntohl( cse->cseCRC );
1459 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1460 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1461 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1462
1463 //--------------------------------------------------------------------------
1464 // Do the integrity checks
1465 //--------------------------------------------------------------------------
1466 if( crcval != cse->cseCRC )
1467 {
1468 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1469 "corrupted (crc32c integrity check failed)." );
1470 }
1471
1472 cse->dlFirst = ntohs( cse->dlFirst );
1473 cse->dlLast = ntohs( cse->dlLast );
1474
1475 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1476 sizeof( kXR_int64 );
1477 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1478 sizeof( ServerResponseBody_pgWrCSE ) );
1479
1480 for( size_t i = 0; i < pgcnt; ++i )
1481 pgoffs[i] = ntohll( pgoffs[i] );
1482
1483 return XRootDStatus();
1484 break;
1485 }
1486
1487 default:
1488 break;
1489 }
1490
1491 return XRootDStatus( stError, errNotSupported );
1492 }
ServerResponseStatus status
@ kXR_1stRequest
Definition XProtocol.hh:111
long long kXR_int64
Definition XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
const uint16_t errNotSupported

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message * msg,
uint16_t reqType )
static

Unmarshall the body of the incoming message.

Definition at line 1280 of file XrdClXRootDTransport.cc.

1281 {
1282 ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1283
1284 //--------------------------------------------------------------------------
1285 // kXR_ok
1286 //--------------------------------------------------------------------------
1287 if( m->hdr.status == kXR_ok )
1288 {
1289 switch( reqType )
1290 {
1291 //----------------------------------------------------------------------
1292 // kXR_protocol
1293 //----------------------------------------------------------------------
1294 case kXR_protocol:
1295 if( m->hdr.dlen < 8 )
1296 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1297 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1298 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1299 break;
1300 }
1301 }
1302 //--------------------------------------------------------------------------
1303 // kXR_error
1304 //--------------------------------------------------------------------------
1305 else if( m->hdr.status == kXR_error )
1306 {
1307 if( m->hdr.dlen < 4 )
1308 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1309 m->body.error.errnum = ntohl( m->body.error.errnum );
1310 }
1311
1312 //--------------------------------------------------------------------------
1313 // kXR_wait
1314 //--------------------------------------------------------------------------
1315 else if( m->hdr.status == kXR_wait )
1316 {
1317 if( m->hdr.dlen < 4 )
1318 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1319 m->body.wait.seconds = htonl( m->body.wait.seconds );
1320 }
1321
1322 //--------------------------------------------------------------------------
1323 // kXR_redirect
1324 //--------------------------------------------------------------------------
1325 else if( m->hdr.status == kXR_redirect )
1326 {
1327 if( m->hdr.dlen < 4 )
1328 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1329 m->body.redirect.port = htonl( m->body.redirect.port );
1330 }
1331
1332 //--------------------------------------------------------------------------
1333 // kXR_waitresp
1334 //--------------------------------------------------------------------------
1335 else if( m->hdr.status == kXR_waitresp )
1336 {
1337 if( m->hdr.dlen < 4 )
1338 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1339 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1340 }
1341
1342 //--------------------------------------------------------------------------
1343 // kXR_attn
1344 //--------------------------------------------------------------------------
1345 else if( m->hdr.status == kXR_attn )
1346 {
1347 if( m->hdr.dlen < 4 )
1348 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1349 m->body.attn.actnum = htonl( m->body.attn.actnum );
1350 }
1351
1352 return XRootDStatus();
1353 }
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_error
Definition XProtocol.hh:903

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message & msg)
static

Unmarshall the header incoming message.

Definition at line 1497 of file XrdClXRootDTransport.cc.

1498 {
1499 ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1500 header->status = ntohs( header->status );
1501 header->dlen = ntohl( header->dlen );
1502 }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message * msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1259 of file XrdClXRootDTransport.cc.

1260 {
1261 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1262 // We rely on the marshaling process to be symmetric!
1263 // First we unmarshall the request ID and the length because
1264 // MarshallRequest() relies on these, and then we need to unmarshall these
1265 // two again, because they get marshalled in MarshallRequest().
1266 // All this is pretty damn ugly and should be rewritten.
1267 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1268 req->header.requestid = htons( req->header.requestid );
1269 req->header.dlen = htonl( req->header.dlen );
1270 XRootDStatus st = MarshallRequest( msg );
1271 req->header.requestid = htons( req->header.requestid );
1272 req->header.dlen = htonl( req->header.dlen );
1273 msg->SetIsMarshalled( false );
1274 return st;
1275 }
const uint16_t suAlreadyDone

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message & msg,
uint16_t reqType )
static

Unmarshall the body of the status response.

Definition at line 1358 of file XrdClXRootDTransport.cc.

1359 {
1360 //--------------------------------------------------------------------------
1361 // Calculate the crc32c before the unmarshaling the body!
1362 //--------------------------------------------------------------------------
1363 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1364 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1365 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1366 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1367
1368 size_t stlen = sizeof( ServerResponseStatus );
1369 switch( reqType )
1370 {
1371 case kXR_pgread:
1372 {
1373 stlen += sizeof( ServerResponseBody_pgRead );
1374 break;
1375 }
1376
1377 case kXR_pgwrite:
1378 {
1379 stlen += sizeof( ServerResponseBody_pgWrite );
1380 break;
1381 }
1382 }
1383
1384 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1385 "kXR_status: invalid message size." );
1386
1387 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1388 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1389
1390 switch( reqType )
1391 {
1392 case kXR_pgread:
1393 {
1394 ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1395 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1396 break;
1397 }
1398
1399 case kXR_pgwrite:
1400 {
1401 ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1402 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1403 break;
1404 }
1405 }
1406
1407 //--------------------------------------------------------------------------
1408 // Do the integrity checks
1409 //--------------------------------------------------------------------------
1410 if( crcval != rspst->bdy.crc32c )
1411 {
1412 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1413 "corrupted (crc32c integrity check failed)." );
1414 }
1415
1416 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1417 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1418 {
1419 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1420 "(stream ID mismatch)." );
1421 }
1422
1423
1424
1425 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1426 {
1427 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1428 "(request ID mismatch)." );
1429 }
1430
1431 return XRootDStatus();
1432 }
struct ServerResponseHeader hdr

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseBody_Status::streamID, and ServerResponseHeader::streamid.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1823 of file XrdClXRootDTransport.cc.

1824 {
1825 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1826 pSecUnloadHandler->unloaded = true;
1827 }

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.

References PluginUnloadHandler.

Referenced by XRootDTransport(), and PluginUnloadHandler.


The documentation for this class was generated from the following files: