XRootD
Loading...
Searching...
No Matches
XrdClLocalFileHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
3// Author: Paul-Niklas Kramp <p.n.kramp@gsi.de>
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// XRootD is free software: you can redistribute it and/or modify
7// it under the terms of the GNU Lesser General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10//
11// XRootD is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15//
16// You should have received a copy of the GNU Lesser General Public License
17// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
18//------------------------------------------------------------------------------
22#include "XrdCl/XrdClURL.hh"
26
27#include "XrdSys/XrdSysE2T.hh"
28#include "XrdSys/XrdSysXAttr.hh"
29#include "XrdSys/XrdSysFAttr.hh"
30#include "XrdSys/XrdSysFD.hh"
31
32#include <string>
33#include <memory>
34#include <stdexcept>
35
36#include <fcntl.h>
37#include <cstdio>
38#include <cstdlib>
39#include <unistd.h>
40#include <sys/stat.h>
41#include <arpa/inet.h>
42#include <aio.h>
43
44namespace
45{
46
47 class AioCtx
48 {
49 public:
50
51 enum Opcode
52 {
53 None,
54 Read,
55 Write,
56 Sync
57 };
58
59 AioCtx( const XrdCl::HostList &hostList, XrdCl::ResponseHandler *handler ) :
60 opcode( None ), hosts( new XrdCl::HostList( hostList ) ), handler( handler )
61 {
62 aiocb *ptr = new aiocb();
63 memset( ptr, 0, sizeof( aiocb ) );
64
65 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
66 int useSignals = XrdCl::DefaultAioSignal;
67 env->GetInt( "AioSignal", useSignals );
68
69 if( useSignals )
70 {
71 static SignalHandlerRegistrator registrator; // registers the signal handler
72
73 ptr->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
74 ptr->aio_sigevent.sigev_signo = SIGUSR1;
75 }
76 else
77 {
78 ptr->aio_sigevent.sigev_notify = SIGEV_THREAD;
79 ptr->aio_sigevent.sigev_notify_function = ThreadHandler;
80 }
81
82 ptr->aio_sigevent.sigev_value.sival_ptr = this;
83
84 cb.reset( ptr );
85 }
86
87
88 void SetWrite( int fd, size_t offset, size_t size, const void *buffer )
89 {
90 cb->aio_fildes = fd;
91 cb->aio_offset = offset;
92 cb->aio_buf = const_cast<void*>( buffer );
93 cb->aio_nbytes = size;
94 opcode = Opcode::Write;
95 }
96
97 void SetRead( int fd, size_t offset, size_t size, void *buffer )
98 {
99 cb->aio_fildes = fd;
100 cb->aio_offset = offset;
101 cb->aio_buf = buffer;
102 cb->aio_nbytes = size;
103 opcode = Opcode::Read;
104 }
105
106 void SetFsync( int fd )
107 {
108 cb->aio_fildes = fd;
109 opcode = Opcode::Sync;
110 }
111
112 static void ThreadHandler( sigval arg )
113 {
114 std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( arg.sival_ptr ) );
115 Handler( std::move( me ) );
116 }
117
118 static void SignalHandler( int sig, siginfo_t *info, void *ucontext )
119 {
120 std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( info->si_value.sival_ptr ) );
121 Handler( std::move( me ) );
122 }
123
124 operator aiocb*()
125 {
126 return cb.get();
127 }
128
129 private:
130
131 struct SignalHandlerRegistrator
132 {
133 SignalHandlerRegistrator()
134 {
135 struct sigaction newact, oldact;
136 newact.sa_sigaction = SignalHandler;
137 sigemptyset( &newact.sa_mask );
138 newact.sa_flags = SA_SIGINFO;
139 int rc = sigaction( SIGUSR1, &newact, &oldact );
140 if( rc < 0 )
141 throw std::runtime_error( XrdSysE2T( errno ) );
142 }
143 };
144
145 static void Handler( std::unique_ptr<AioCtx> me )
146 {
147 if( me->opcode == Opcode::None )
148 return;
149
150 using namespace XrdCl;
151
152 int rc = aio_return( me->cb.get() );
153 if( rc < 0 )
154 {
155 int errcode = aio_error( me->cb.get() );
156 Log *log = DefaultEnv::GetLog();
157 log->Error( FileMsg, GetErrMsg( me->opcode ), XrdSysE2T( errcode ) );
158 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errcode ) ;
159 QueueTask( error, 0, me->hosts, me->handler );
160 }
161 else
162 {
163 AnyObject *resp = 0;
164
165 if( me->opcode == Opcode::Read )
166 {
167 ChunkInfo *chunk = new ChunkInfo( me->cb->aio_offset, rc,
168 const_cast<void*>( me->cb->aio_buf ) );
169 resp = new AnyObject();
170 resp->Set( chunk );
171 }
172
173 QueueTask( new XRootDStatus(), resp, me->hosts, me->handler );
174 }
175 }
176
177 static const char* GetErrMsg( Opcode opcode )
178 {
179 static const char readmsg[] = "Read: failed %s";
180 static const char writemsg[] = "Write: failed %s";
181 static const char syncmsg[] = "Sync: failed %s";
182
183 switch( opcode )
184 {
185 case Opcode::Read: return readmsg;
186
187 case Opcode::Write: return writemsg;
188
189 case Opcode::Sync: return syncmsg;
190
191 default: return 0;
192 }
193 }
194
195 static void QueueTask( XrdCl::XRootDStatus *status, XrdCl::AnyObject *resp,
196 XrdCl::HostList *hosts, XrdCl::ResponseHandler *handler )
197 {
198 using namespace XrdCl;
199
200 // if it is simply the sync handler we can release the semaphore
201 // and return there is no need to execute this in the thread-pool
202 if(SyncResponseHandler *syncHandler = dynamic_cast<SyncResponseHandler*>( handler )) {
203 syncHandler->HandleResponse( status, resp );
204 } else if(auto postmaster = DefaultEnv::GetPostMaster()) {
205 if (JobManager *jmngr = postmaster->GetJobManager()) {
206 LocalFileTask *task = new LocalFileTask( status, resp, hosts, handler );
207 jmngr->QueueJob( task );
208 }
209 }
210 }
211
212 std::unique_ptr<aiocb> cb;
213 Opcode opcode;
214 XrdCl::HostList *hosts;
215 XrdCl::ResponseHandler *handler;
216 };
217
218};
219
220namespace XrdCl
221{
222
223 //------------------------------------------------------------------------
224 // Constructor
225 //------------------------------------------------------------------------
227 fd( -1 )
228 {
229 }
230
231 //------------------------------------------------------------------------
232 // Destructor
233 //------------------------------------------------------------------------
238
239 //------------------------------------------------------------------------
240 // Open
241 //------------------------------------------------------------------------
242 XRootDStatus LocalFileHandler::Open( const std::string& url, uint16_t flags,
243 uint16_t mode, ResponseHandler* handler, uint16_t timeout )
244 {
245 AnyObject *resp = 0;
246 XRootDStatus st = OpenImpl( url, flags, mode, resp );
247 if( !st.IsOK() && st.code != errLocalError )
248 return st;
249
250 return QueueTask( new XRootDStatus( st ), resp, handler );
251 }
252
253 XRootDStatus LocalFileHandler::Open( const URL *url, const Message *req, AnyObject *&resp )
254 {
255 const ClientOpenRequest* request =
256 reinterpret_cast<const ClientOpenRequest*>( req->GetBuffer() );
257 uint16_t flags = ntohs( request->options );
258 uint16_t mode = ntohs( request->mode );
259 return OpenImpl( url->GetURL(), flags, mode, resp );
260 }
261
262 //------------------------------------------------------------------------
263 // Close
264 //------------------------------------------------------------------------
266 uint16_t timeout )
267 {
268 if( close( fd ) == -1 )
269 {
270 Log *log = DefaultEnv::GetLog();
271 log->Error( FileMsg, "Close: file fd: %i %s", fd, XrdSysE2T( errno ) );
272 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
273 return QueueTask( error, 0, handler );
274 }
275
276 return QueueTask( new XRootDStatus(), 0, handler );
277 }
278
279 //------------------------------------------------------------------------
280 // Stat
281 //------------------------------------------------------------------------
283 uint16_t timeout )
284 {
285 Log *log = DefaultEnv::GetLog();
286
287 struct stat ssp;
288 if( fstat( fd, &ssp ) == -1 )
289 {
290 log->Error( FileMsg, "Stat: failed fd: %i %s", fd, XrdSysE2T( errno ) );
291 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
292 return QueueTask( error, 0, handler );
293 }
294 std::ostringstream data;
295 data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
296 << ssp.st_mtime;
297 log->Debug( FileMsg, "%s", data.str().c_str() );
298
299 StatInfo *statInfo = new StatInfo();
300 if( !statInfo->ParseServerResponse( data.str().c_str() ) )
301 {
302 log->Error( FileMsg, "Stat: ParseServerResponse failed." );
303 delete statInfo;
305 0, handler );
306 }
307
308 AnyObject *resp = new AnyObject();
309 resp->Set( statInfo );
310 return QueueTask( new XRootDStatus(), resp, handler );
311 }
312
313 //------------------------------------------------------------------------
314 // Read
315 //------------------------------------------------------------------------
316 XRootDStatus LocalFileHandler::Read( uint64_t offset, uint32_t size,
317 void* buffer, ResponseHandler* handler, uint16_t timeout )
318 {
319#if defined(__APPLE__)
320 Log *log = DefaultEnv::GetLog();
321 int read = 0;
322 if( ( read = pread( fd, buffer, size, offset ) ) == -1 )
323 {
324 log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
325 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
326 return QueueTask( error, 0, handler );
327 }
328 ChunkInfo *chunk = new ChunkInfo( offset, read, buffer );
329 AnyObject *resp = new AnyObject();
330 resp->Set( chunk );
331 return QueueTask( new XRootDStatus(), resp, handler );
332#else
333 AioCtx *ctx = new AioCtx( pHostList, handler );
334 ctx->SetRead( fd, offset, size, buffer );
335
336 int rc = aio_read( *ctx );
337
338 if( rc < 0 )
339 {
340 Log *log = DefaultEnv::GetLog();
341 log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
342 return XRootDStatus( stError, errLocalError, errno );
343 }
344
345 return XRootDStatus();
346#endif
347 }
348
349
350 //------------------------------------------------------------------------
351 // ReadV
352 //------------------------------------------------------------------------
354 struct iovec *iov,
355 int iovcnt,
356 ResponseHandler *handler,
357 uint16_t timeout )
358 {
359 Log *log = DefaultEnv::GetLog();
360#if defined(__APPLE__)
361 ssize_t ret = lseek( fd, offset, SEEK_SET );
362 if( ret >= 0 )
363 ret = readv( fd, iov, iovcnt );
364#else
365 ssize_t ret = preadv( fd, iov, iovcnt, offset );
366#endif
367 if( ret == -1 )
368 {
369 log->Error( FileMsg, "ReadV: failed %s", XrdSysE2T( errno ) );
370 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
371 return QueueTask( error, 0, handler );
372 }
373 VectorReadInfo *info = new VectorReadInfo();
374 info->SetSize( ret );
375 uint64_t choff = offset;
376 uint32_t left = ret;
377 for( int i = 0; i < iovcnt; ++i )
378 {
379 uint32_t chlen = iov[i].iov_len;
380 if( chlen > left ) chlen = left;
381 info->GetChunks().emplace_back( choff, chlen, iov[i].iov_base);
382 left -= chlen;
383 choff += chlen;
384 }
385 AnyObject *resp = new AnyObject();
386 resp->Set( info );
387 return QueueTask( new XRootDStatus(), resp, handler );
388 }
389
390 //------------------------------------------------------------------------
391 // Write
392 //------------------------------------------------------------------------
393 XRootDStatus LocalFileHandler::Write( uint64_t offset, uint32_t size,
394 const void* buffer, ResponseHandler* handler, uint16_t timeout )
395 {
396#if defined(__APPLE__)
397 const char *buff = reinterpret_cast<const char*>( buffer );
398 size_t bytesWritten = 0;
399 while( bytesWritten < size )
400 {
401 ssize_t ret = pwrite( fd, buff, size, offset );
402 if( ret < 0 )
403 {
404 Log *log = DefaultEnv::GetLog();
405 log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
406 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
407 return QueueTask( error, 0, handler );
408 }
409 offset += ret;
410 buff += ret;
411 bytesWritten += ret;
412 }
413 return QueueTask( new XRootDStatus(), 0, handler );
414#else
415 AioCtx *ctx = new AioCtx( pHostList, handler );
416 ctx->SetWrite( fd, offset, size, buffer );
417
418 int rc = aio_write( *ctx );
419
420 if( rc < 0 )
421 {
422 Log *log = DefaultEnv::GetLog();
423 log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
424 return XRootDStatus( stError, errLocalError, errno );
425 }
426
427 return XRootDStatus();
428#endif
429 }
430
431 //------------------------------------------------------------------------
432 // Sync
433 //------------------------------------------------------------------------
435 uint16_t timeout )
436 {
437#if defined(__APPLE__)
438 if( fsync( fd ) )
439 {
440 Log *log = DefaultEnv::GetLog();
441 log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
443 XProtocol::mapError( errno ),
444 XrdSysE2T( errno ) );
445 return QueueTask( error, 0, handler );
446 }
447 return QueueTask( new XRootDStatus(), 0, handler );
448#else
449 AioCtx *ctx = new AioCtx( pHostList, handler );
450 ctx->SetFsync( fd );
451 int rc = aio_fsync( O_SYNC, *ctx );
452 if( rc < 0 )
453 {
454 Log *log = DefaultEnv::GetLog();
455 log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
456 return XRootDStatus( stError, errLocalError, errno );
457 }
458#endif
459 return XRootDStatus();
460 }
461
462 //------------------------------------------------------------------------
463 // Truncate
464 //------------------------------------------------------------------------
466 ResponseHandler* handler, uint16_t timeout )
467 {
468 if( ftruncate( fd, size ) )
469 {
470 Log *log = DefaultEnv::GetLog();
471 log->Error( FileMsg, "Truncate: failed, file descriptor: %i, %s", fd,
472 XrdSysE2T( errno ) );
473 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
474 return QueueTask( error, 0, handler );
475 }
476
477 return QueueTask( new XRootDStatus( stOK ), 0, handler );
478 }
479
480 //------------------------------------------------------------------------
481 // VectorRead
482 //------------------------------------------------------------------------
484 void* buffer, ResponseHandler* handler, uint16_t timeout )
485 {
486 std::unique_ptr<VectorReadInfo> info( new VectorReadInfo() );
487 size_t totalSize = 0;
488 bool useBuffer( buffer );
489
490 for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
491 {
492 auto &chunk = *itr;
493 if( !useBuffer )
494 buffer = chunk.buffer;
495 ssize_t bytesRead = pread( fd, buffer, chunk.length,
496 chunk.offset );
497 if( bytesRead < 0 )
498 {
499 Log *log = DefaultEnv::GetLog();
500 log->Error( FileMsg, "VectorRead: failed, file descriptor: %i, %s",
501 fd, XrdSysE2T( errno ) );
502 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
503 return QueueTask( error, 0, handler );
504 }
505 totalSize += bytesRead;
506 info->GetChunks().push_back( ChunkInfo( chunk.offset, bytesRead, buffer ) );
507 if( useBuffer )
508 buffer = reinterpret_cast<char*>( buffer ) + bytesRead;
509 }
510
511 info->SetSize( totalSize );
512 AnyObject *resp = new AnyObject();
513 resp->Set( info.release() );
514 return QueueTask( new XRootDStatus(), resp, handler );
515 }
516
517 //------------------------------------------------------------------------
518 // VectorWrite
519 //------------------------------------------------------------------------
521 ResponseHandler *handler, uint16_t timeout )
522 {
523
524 for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
525 {
526 auto &chunk = *itr;
527 ssize_t bytesWritten = pwrite( fd, chunk.buffer, chunk.length,
528 chunk.offset );
529 if( bytesWritten < 0 )
530 {
531 Log *log = DefaultEnv::GetLog();
532 log->Error( FileMsg, "VectorWrite: failed, file descriptor: %i, %s",
533 fd, XrdSysE2T( errno ) );
534 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
535 return QueueTask( error, 0, handler );
536 }
537 }
538
539 return QueueTask( new XRootDStatus(), 0, handler );
540 }
541
542 //------------------------------------------------------------------------
543 // WriteV
544 //------------------------------------------------------------------------
546 ChunkList *chunks,
547 ResponseHandler *handler,
548 uint16_t timeout )
549 {
550 size_t iovcnt = chunks->size();
551 iovec iovcp[iovcnt];
552 ssize_t size = 0;
553 for( size_t i = 0; i < iovcnt; ++i )
554 {
555 iovcp[i].iov_base = (*chunks)[i].buffer;
556 iovcp[i].iov_len = (*chunks)[i].length;
557 size += (*chunks)[i].length;
558 }
559 iovec *iovptr = iovcp;
560
561 ssize_t bytesWritten = 0;
562 while( bytesWritten < size )
563 {
564#ifdef __APPLE__
565 ssize_t ret = lseek( fd, offset, SEEK_SET );
566 if( ret >= 0 )
567 ret = writev( fd, iovptr, iovcnt );
568#else
569 ssize_t ret = pwritev( fd, iovptr, iovcnt, offset );
570#endif
571 if( ret < 0 )
572 {
573 Log *log = DefaultEnv::GetLog();
574 log->Error( FileMsg, "WriteV: failed %s", XrdSysE2T( errno ) );
575 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
576 return QueueTask( error, 0, handler );
577 }
578
579 bytesWritten += ret;
580 while( ret )
581 {
582 if( size_t( ret ) > iovptr[0].iov_len )
583 {
584 ret -= iovptr[0].iov_len;
585 --iovcnt;
586 ++iovptr;
587 }
588 else
589 {
590 iovptr[0].iov_len -= ret;
591 iovptr[0].iov_base = reinterpret_cast<char*>( iovptr[0].iov_base ) + ret;
592 ret = 0;
593 }
594 }
595 }
596
597 return QueueTask( new XRootDStatus(), 0, handler );
598 }
599
600 //------------------------------------------------------------------------
601 // Fcntl
602 //------------------------------------------------------------------------
604 ResponseHandler *handler, uint16_t timeout )
605 {
607 }
608
609 //------------------------------------------------------------------------
610 // Visa
611 //------------------------------------------------------------------------
613 uint16_t timeout )
614 {
616 }
617
618 //------------------------------------------------------------------------
619 // Set extended attributes - async
620 //------------------------------------------------------------------------
621 XRootDStatus LocalFileHandler::SetXAttr( const std::vector<xattr_t> &attrs,
622 ResponseHandler *handler,
623 uint16_t timeout )
624 {
626 std::vector<XAttrStatus> response;
627
628 auto itr = attrs.begin();
629 for( ; itr != attrs.end(); ++itr )
630 {
631 std::string name = std::get<xattr_name>( *itr );
632 std::string value = std::get<xattr_value>( *itr );
633 int err = xattr->Set( name.c_str(), value.c_str(), value.size(), 0, fd );
634 XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
635 XRootDStatus();
636
637 response.push_back( XAttrStatus( name, status ) );
638 }
639
640 AnyObject *resp = new AnyObject();
641 resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
642
643 return QueueTask( new XRootDStatus(), resp, handler );
644 }
645
646 //------------------------------------------------------------------------
647 // Get extended attributes - async
648 //------------------------------------------------------------------------
649 XRootDStatus LocalFileHandler::GetXAttr( const std::vector<std::string> &attrs,
650 ResponseHandler *handler,
651 uint16_t timeout )
652 {
654 std::vector<XAttr> response;
655
656 auto itr = attrs.begin();
657 for( ; itr != attrs.end(); ++itr )
658 {
659 std::string name = *itr;
660 std::unique_ptr<char[]> buffer;
661
662 int size = xattr->Get( name.c_str(), 0, 0, 0, fd );
663 if( size < 0 )
664 {
665 XRootDStatus status( stError, errLocalError, -size );
666 response.push_back( XAttr( *itr, "", status ) );
667 continue;
668 }
669 buffer.reset( new char[size] );
670 int ret = xattr->Get( name.c_str(), buffer.get(), size, 0, fd );
671
672 XRootDStatus status;
673 std::string value;
674
675 if( ret >= 0 )
676 value.append( buffer.get(), ret );
677 else if( ret < 0 )
678 status = XRootDStatus( stError, errLocalError, -ret );
679
680 response.push_back( XAttr( *itr, value, status ) );
681 }
682
683 AnyObject *resp = new AnyObject();
684 resp->Set( new std::vector<XAttr>( std::move( response ) ) );
685
686 return QueueTask( new XRootDStatus(), resp, handler );
687 }
688
689 //------------------------------------------------------------------------
690 // Delete extended attributes - async
691 //------------------------------------------------------------------------
692 XRootDStatus LocalFileHandler::DelXAttr( const std::vector<std::string> &attrs,
693 ResponseHandler *handler,
694 uint16_t timeout )
695 {
697 std::vector<XAttrStatus> response;
698
699 auto itr = attrs.begin();
700 for( ; itr != attrs.end(); ++itr )
701 {
702 std::string name = *itr;
703 int err = xattr->Del( name.c_str(), 0, fd );
704 XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
705 XRootDStatus();
706
707 response.push_back( XAttrStatus( name, status ) );
708 }
709
710 AnyObject *resp = new AnyObject();
711 resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
712
713 return QueueTask( new XRootDStatus(), resp, handler );
714 }
715
716 //------------------------------------------------------------------------
717 // List extended attributes - async
718 //------------------------------------------------------------------------
720 uint16_t timeout )
721 {
723 std::vector<XAttr> response;
724
725 XrdSysXAttr::AList *alist = 0;
726 int err = xattr->List( &alist, 0, fd, 1 );
727
728 if( err < 0 )
729 {
730 XRootDStatus *status = new XRootDStatus( stError, XProtocol::mapError( -err ) );
731 return QueueTask( status, 0, handler );
732 }
733
734 XrdSysXAttr::AList *ptr = alist;
735 while( ptr )
736 {
737 std::string name( ptr->Name, ptr->Nlen );
738 int vlen = ptr->Vlen;
739 ptr = ptr->Next;
740
741 std::unique_ptr<char[]> buffer( new char[vlen] );
742 int ret = xattr->Get( name.c_str(),
743 buffer.get(), vlen, 0, fd );
744
745 std::string value = ret >= 0 ? std::string( buffer.get(), ret ) :
746 std::string();
747 XRootDStatus status = ret >= 0 ? XRootDStatus() :
749 response.push_back( XAttr( name, value, status ) );
750 }
751 xattr->Free( alist );
752
753 AnyObject *resp = new AnyObject();
754 resp->Set( new std::vector<XAttr>( std::move( response ) ) );
755
756 return QueueTask( new XRootDStatus(), resp, handler );
757 }
758
759 //------------------------------------------------------------------------
760 // QueueTask - queues error/success tasks for all operations.
761 // Must always return stOK.
762 // Is always creating the same HostList containing only localhost.
763 //------------------------------------------------------------------------
765 ResponseHandler *handler )
766 {
767 // if it is simply the sync handler we can release the semaphore
768 // and return there is no need to execute this in the thread-pool
769 if (SyncResponseHandler *syncHandler = dynamic_cast<SyncResponseHandler*>(handler)) {
770 syncHandler->HandleResponse( st, resp );
771 return XRootDStatus();
772 }
773
774 if (auto postmaster = DefaultEnv::GetPostMaster()) {
775 HostList *hosts = pHostList.empty() ? 0 : new HostList( pHostList );
776 LocalFileTask *task = new LocalFileTask( st, resp, hosts, handler );
777 postmaster->GetJobManager()->QueueJob( task );
778 }
779 return XRootDStatus();
780 }
781
782 //------------------------------------------------------------------------
783 // MkdirPath - creates the folders specified in file_path
784 // called if kXR_mkdir flag is set
785 //------------------------------------------------------------------------
786 XRootDStatus LocalFileHandler::MkdirPath( const std::string &path )
787 {
788 // first find the most up-front component that exists
789 size_t pos = path.rfind( '/' );
790 while( pos != std::string::npos && pos != 0 )
791 {
792 std::string tmp = path.substr( 0, pos );
793 struct stat st;
794 int rc = lstat( tmp.c_str(), &st );
795 if( rc == 0 ) break;
796 if( errno != ENOENT )
797 return XRootDStatus( stError, errLocalError, errno );
798 pos = path.rfind( '/', pos - 1 );
799 }
800
801 pos = path.find( '/', pos + 1 );
802 while( pos != std::string::npos && pos != 0 )
803 {
804 std::string tmp = path.substr( 0, pos );
805 if( mkdir( tmp.c_str(), 0755 ) )
806 {
807 if( errno != EEXIST )
808 return XRootDStatus( stError, errLocalError, errno );
809 }
810 pos = path.find( '/', pos + 1 );
811 }
812 return XRootDStatus();
813 }
814
815 XRootDStatus LocalFileHandler::OpenImpl( const std::string &url, uint16_t flags,
816 uint16_t mode, AnyObject *&resp)
817 {
818 Log *log = DefaultEnv::GetLog();
819
820 // safe the file URL for the HostList for later
821 pUrl = url;
822
823 URL fileUrl( url );
824 if( !fileUrl.IsValid() )
826
827 if( fileUrl.GetHostName() != "localhost" )
829
830 std::string path = fileUrl.GetPath();
831
832 //---------------------------------------------------------------------
833 // Prepare Flags
834 //---------------------------------------------------------------------
835 uint16_t openflags = 0;
836 if( flags & kXR_new )
837 openflags |= O_CREAT | O_EXCL;
838 if( flags & kXR_open_wrto )
839 openflags |= O_WRONLY;
840 else if( flags & kXR_open_updt )
841 openflags |= O_RDWR;
842 else
843 openflags |= O_RDONLY;
844 if( flags & kXR_delete )
845 openflags |= O_CREAT | O_TRUNC;
846
847 if( flags & (kXR_mkpath | kXR_async) )
848 {
849 XRootDStatus st = MkdirPath( path );
850 if( !st.IsOK() )
851 {
852 log->Error( FileMsg, "Open MkdirPath failed %s: %s", path.c_str(),
853 XrdSysE2T( st.errNo ) );
854 return st;
855 }
856
857 }
858 //---------------------------------------------------------------------
859 // Open File
860 //---------------------------------------------------------------------
861 if( mode == Access::Mode::None)
862 mode = 0644;
863 fd = XrdSysFD_Open( path.c_str(), openflags, mode );
864 if( fd == -1 )
865 {
866 log->Error( FileMsg, "Open: open failed: %s: %s", path.c_str(),
867 XrdSysE2T( errno ) );
868
869 return XRootDStatus( stError, errLocalError,
870 XProtocol::mapError( errno ) );
871 }
872 //---------------------------------------------------------------------
873 // Stat File and cache statInfo in openInfo
874 //---------------------------------------------------------------------
875 struct stat ssp;
876 if( fstat( fd, &ssp ) == -1 )
877 {
878 log->Error( FileMsg, "Open: stat failed." );
879 return XRootDStatus( stError, errLocalError,
880 XProtocol::mapError( errno ) );
881 }
882
883 std::ostringstream data;
884 data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
885 << ssp.st_mtime;
886
887 StatInfo *statInfo = new StatInfo();
888 if( !statInfo->ParseServerResponse( data.str().c_str() ) )
889 {
890 log->Error( FileMsg, "Open: ParseServerResponse failed." );
891 delete statInfo;
892 return XRootDStatus( stError, errLocalError, kXR_FSError );
893 }
894
895 // add the URL to hosts list
896 pHostList.push_back( HostInfo( pUrl, false ) );
897
898 //All went well
899 uint32_t ufd = fd;
900 OpenInfo *openInfo = new OpenInfo( (uint8_t*)&ufd, 1, statInfo );
901 resp = new AnyObject();
902 resp->Set( openInfo );
903 return XRootDStatus();
904 }
905
906 //------------------------------------------------------------------------
907 // Parses kXR_fattr request and calls respective XAttr operation
908 //------------------------------------------------------------------------
909 XRootDStatus LocalFileHandler::XAttrImpl( kXR_char code,
910 kXR_char numattr,
911 size_t bodylen,
912 char *body,
913 ResponseHandler *handler )
914 {
915 // shift body by 1 to omit the empty path
916 if( bodylen > 0 )
917 {
918 ++body;
919 --bodylen;
920 }
921
922 switch( code )
923 {
924 case kXR_fattrGet:
925 case kXR_fattrDel:
926 {
927 std::vector<std::string> attrs;
928 // parse namevec
929 for( kXR_char i = 0; i < numattr; ++i )
930 {
931 if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
932 // shift by RC size
933 body += sizeof( kXR_unt16 );
934 bodylen -= sizeof( kXR_unt16 );
935 // get the size of attribute name
936 size_t len = strlen( body );
937 if( len > bodylen ) return XRootDStatus( stError, errDataError );
938 attrs.push_back( std::string( body, len ) );
939 body += len + 1; // +1 for the null terminating the string
940 bodylen -= len + 1; // +1 for the null terminating the string
941 }
942
943 if( code == kXR_fattrGet )
944 return GetXAttr( attrs, handler );
945
946 return DelXAttr( attrs, handler );
947 }
948
949 case kXR_fattrSet:
950 {
951 std::vector<xattr_t> attrs;
952 // parse namevec
953 for( kXR_char i = 0; i < numattr; ++i )
954 {
955 if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
956 // shift by RC size
957 body += sizeof( kXR_unt16 );
958 bodylen -= sizeof( kXR_unt16 );
959 // get the size of attribute name
960 char *name = 0;
961 body = ClientFattrRequest::NVecRead( body, name );
962 attrs.push_back( std::make_tuple( std::string( name ), std::string() ) );
963 bodylen -= strlen( name ) + 1; // +1 for the null terminating the string
964 free( name );
965 }
966 // parse valuevec
967 for( kXR_char i = 0; i < numattr; ++i )
968 {
969 // get value length
970 if( bodylen < sizeof( kXR_int32 ) ) return XRootDStatus( stError, errDataError );
971 kXR_int32 len = 0;
972 body = ClientFattrRequest::VVecRead( body, len );
973 bodylen -= sizeof( kXR_int32 );
974 // get value
975 if( size_t( len ) > bodylen ) return XRootDStatus( stError, errDataError );
976 char *value = 0;
977 body = ClientFattrRequest::VVecRead( body, len, value );
978 bodylen -= len;
979 std::get<xattr_value>( attrs[i] ) = value;
980 free( value );
981 }
982
983 return SetXAttr( attrs, handler );
984 }
985
986 case kXR_fattrList:
987 {
988 return ListXAttr( handler );
989 }
990
991 default:
992 return XRootDStatus( stError, errInvalidArgs );
993 }
994
995 return XRootDStatus();
996 }
997
999 Message *msg,
1000 ResponseHandler *handler,
1001 MessageSendParams &sendParams )
1002 {
1003 ClientRequest *req = reinterpret_cast<ClientRequest*>( msg->GetBuffer() );
1004
1005 switch( req->header.requestid )
1006 {
1007 case kXR_open:
1008 {
1009 XRootDStatus st = Open( url.GetURL(), req->open.options,
1010 req->open.mode, handler, sendParams.timeout );
1011 delete msg; // in case of other operations msg is owned by the handler
1012 return st;
1013 }
1014
1015 case kXR_close:
1016 {
1017 return Close( handler, sendParams.timeout );
1018 }
1019
1020 case kXR_stat:
1021 {
1022 return Stat( handler, sendParams.timeout );
1023 }
1024
1025 case kXR_read:
1026 {
1027 if( msg->GetVirtReqID() == kXR_virtReadv )
1028 {
1029 auto &chunkList = *sendParams.chunkList;
1030 struct iovec iov[chunkList.size()];
1031 for( size_t i = 0; i < chunkList.size() ; ++i )
1032 {
1033 iov[i].iov_base = chunkList[i].buffer;
1034 iov[i].iov_len = chunkList[i].length;
1035 }
1036 return ReadV( chunkList.front().offset, iov, chunkList.size(),
1037 handler, sendParams.timeout );
1038 }
1039
1040 return Read( req->read.offset, req->read.rlen,
1041 sendParams.chunkList->front().buffer,
1042 handler, sendParams.timeout );
1043 }
1044
1045 case kXR_write:
1046 {
1047 ChunkList *chunks = sendParams.chunkList;
1048 if( chunks->size() == 1 )
1049 {
1050 // it's an ordinary write
1051 return Write( req->write.offset, req->write.dlen,
1052 chunks->front().buffer, handler,
1053 sendParams.timeout );
1054 }
1055 // it's WriteV call
1056 return WriteV( req->write.offset, sendParams.chunkList,
1057 handler, sendParams.timeout );
1058 }
1059
1060 case kXR_sync:
1061 {
1062 return Sync( handler, sendParams.timeout );
1063 }
1064
1065 case kXR_truncate:
1066 {
1067 return Truncate( req->truncate.offset, handler, sendParams.timeout );
1068 }
1069
1070 case kXR_writev:
1071 {
1072 return VectorWrite( *sendParams.chunkList, handler,
1073 sendParams.timeout );
1074 }
1075
1076 case kXR_readv:
1077 {
1078 return VectorRead( *sendParams.chunkList, 0,
1079 handler, sendParams.timeout );
1080 }
1081
1082 case kXR_fattr:
1083 {
1084 return XAttrImpl( req->fattr.subcode, req->fattr.numattr, req->fattr.dlen,
1085 msg->GetBuffer( sizeof(ClientRequestHdr ) ), handler );
1086 }
1087
1088 default:
1089 {
1091 }
1092 }
1093 }
1094}
@ kXR_FSError
Definition XProtocol.hh:995
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
struct ClientFattrRequest fattr
Definition XProtocol.hh:854
@ kXR_virtReadv
Definition XProtocol.hh:150
kXR_unt16 options
Definition XProtocol.hh:481
@ kXR_open_wrto
Definition XProtocol.hh:469
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_mkpath
Definition XProtocol.hh:460
@ kXR_new
Definition XProtocol.hh:455
struct ClientOpenRequest open
Definition XProtocol.hh:860
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_close
Definition XProtocol.hh:115
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientWriteRequest write
Definition XProtocol.hh:876
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
struct stat Stat
Definition XrdCks.cc:49
int lstat(const char *path, struct stat *buf)
#define close(a)
Definition XrdPosix.hh:48
#define lseek(a, b, c)
Definition XrdPosix.hh:52
#define fsync(a)
Definition XrdPosix.hh:64
#define fstat(a, b)
Definition XrdPosix.hh:62
#define mkdir(a, b)
Definition XrdPosix.hh:74
#define writev(a, b, c)
Definition XrdPosix.hh:117
#define readv(a, b, c)
Definition XrdPosix.hh:84
#define stat(a, b)
Definition XrdPosix.hh:101
#define ftruncate(a, b)
Definition XrdPosix.hh:70
#define read(a, b, c)
Definition XrdPosix.hh:82
#define pwrite(a, b, c, d)
Definition XrdPosix.hh:107
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
struct sigevent aio_sigevent
Definition XrdSfsAio.hh:51
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
static int mapError(int rc)
void Set(Type object, bool own=true)
Binary blob representation.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
XRootDStatus Truncate(uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Stat(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ReadV(uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus QueueTask(XRootDStatus *st, AnyObject *obj, ResponseHandler *handler)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus MkdirPath(const std::string &path)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus WriteV(uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus DelXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Visa(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorWrite(const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus GetXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Handle an async response.
Object stat info.
bool ParseServerResponse(const char *data)
Parse server response and fill up the object.
Synchronize the response.
URL representation.
Definition XrdClURL.hh:31
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
ChunkList & GetChunks()
Get chunks.
void SetSize(uint32_t size)
Set size.
static XrdSysXAttr * Xat
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
virtual int List(AList **aPL, const char *Path, int fd=-1, int getSz=0)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
int Nlen
The length of the attribute name that follows.
virtual void Free(AList *aPL)=0
virtual int Del(const char *Aname, const char *Path, int fd=-1)=0
AList * Next
-> next element.
const uint16_t stError
An error occurred that could potentially be retried.
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t stOK
Everything went OK.
const uint64_t FileMsg
const uint16_t errOSError
const uint16_t errInvalidArgs
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errNotSupported
const uint16_t errLocalError
const int DefaultAioSignal
Response NullRef< Response >::value
XrdSysError Log
Definition XrdConfig.cc:113
static char * NVecRead(char *buffer, kXR_unt16 &rc)
Definition XProtocol.cc:205
static char * VVecRead(char *buffer, kXR_int32 &len)
Definition XProtocol.cc:224
Describe a data chunk for vector read.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
uint32_t errNo
Errno, if any.
Extended attribute operation status.
Extended attributes with status.