XRootD
Loading...
Searching...
No Matches
XrdEc::StrmWriter Class Reference

#include <XrdEcStrmWriter.hh>

+ Collaboration diagram for XrdEc::StrmWriter:

Public Member Functions

 StrmWriter (const ObjCfg &objcfg)
 Constructor.
 
virtual ~StrmWriter ()
 Destructor.
 
void Close (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 
uint64_t GetSize ()
 
void Open (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 
void Write (uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
 

Detailed Description

The Stream Writer objects, responsible for writing erasure coded data into selected placement group.

Definition at line 52 of file XrdEcStrmWriter.hh.

Constructor & Destructor Documentation

◆ StrmWriter()

XrdEc::StrmWriter::StrmWriter ( const ObjCfg & objcfg)
inline

Constructor.

Definition at line 64 of file XrdEcStrmWriter.hh.

64 : objcfg( objcfg ),
65 writer_thread_stop( false ),
66 writer_thread( writer_routine, this ),
67 next_blknb( 0 ),
68 global_status( this )
69 {
70 }

◆ ~StrmWriter()

virtual XrdEc::StrmWriter::~StrmWriter ( )
inlinevirtual

Destructor.

Definition at line 75 of file XrdEcStrmWriter.hh.

76 {
77 writer_thread_stop = true;
78 buffers.interrupt();
79 writer_thread.join();
80 }

Member Function Documentation

◆ Close()

void XrdEc::StrmWriter::Close ( XrdCl::ResponseHandler * handler,
uint16_t timeout = 0 )

Close the data object

Parameters
handler: user callback

Definition at line 108 of file XrdEcStrmWriter.cc.

109 {
110 //-------------------------------------------------------------------------
111 // First, check the global status, if we are in an error state just
112 // fail the request.
113 //-------------------------------------------------------------------------
114 XrdCl::XRootDStatus gst = global_status.get();
115 if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
116 //-------------------------------------------------------------------------
117 // Take care of the left-over data ...
118 //-------------------------------------------------------------------------
119 if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) );
120 //-------------------------------------------------------------------------
121 // Let the global status handle the close
122 //-------------------------------------------------------------------------
123 global_status.issue_close( handler, timeout );
124 }
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
bool IsOK() const
We're fine.

References XrdCl::Status::IsOK(), and XrdEc::ScheduleHandler().

+ Here is the call graph for this function:

◆ GetSize()

uint64_t XrdEc::StrmWriter::GetSize ( )
inline
Returns
: get file size

Definition at line 108 of file XrdEcStrmWriter.hh.

109 {
110 return global_status.get_btswritten();
111 }

◆ Open()

void XrdEc::StrmWriter::Open ( XrdCl::ResponseHandler * handler,
uint16_t timeout = 0 )

Open the data object for writting

Parameters
handler: user callback

Definition at line 44 of file XrdEcStrmWriter.cc.

45 {
46 const size_t size = objcfg.plgr.size();
47
48 std::vector<XrdCl::Pipeline> opens;
49 opens.reserve( size );
50 // initialize all zip archive objects
51 for( size_t i = 0; i < size; ++i )
52 dataarchs.emplace_back( std::make_shared<XrdCl::ZipArchive>(
54
55 for( size_t i = 0; i < size; ++i )
56 {
57 std::string url = objcfg.GetDataUrl( i );
58 XrdCl::Ctx<XrdCl::ZipArchive> zip( *dataarchs[i] );
59 opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) );
60 }
61
62 XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >>
63 [=]( XrdCl::XRootDStatus &st )
64 {
65 if( !st.IsOK() ) global_status.report_open( st );
66 handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
67 }, timeout );
68 }
static Config & Instance()
Singleton access.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
@ Write
Open only for writing.

References XrdCl::Async(), XrdEc::Config::Instance(), XrdCl::OpenFlags::New, XrdCl::OpenArchive(), XrdCl::Parallel(), and XrdCl::OpenFlags::Write.

+ Here is the call graph for this function:

◆ Write()

void XrdEc::StrmWriter::Write ( uint32_t size,
const void * buff,
XrdCl::ResponseHandler * handler )

Write data to the data object

Parameters
size: number of bytes to be written
buff: buffer with data to be written
handler: user callback

Definition at line 73 of file XrdEcStrmWriter.cc.

74 {
75 //-------------------------------------------------------------------------
76 // First, check the global status, if we are in an error state just
77 // fail the request.
78 //-------------------------------------------------------------------------
79 XrdCl::XRootDStatus gst = global_status.get();
80 if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
81
82 //-------------------------------------------------------------------------
83 // Update the number of bytes left to be written
84 //-------------------------------------------------------------------------
85 global_status.issue_write( size );
86
87 const char* buffer = reinterpret_cast<const char*>( buff );
88 uint32_t wrtsize = size;
89 while( wrtsize > 0 )
90 {
91 if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) );
92 uint64_t written = wrtbuff->Write( wrtsize, buffer );
93 buffer += written;
94 wrtsize -= written;
95 if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) );
96 }
97
98 //-------------------------------------------------------------------------
99 // We can tell the user it's done as we have the date cached in the
100 // buffer
101 //-------------------------------------------------------------------------
102 ScheduleHandler( handler );
103 }

References XrdCl::Status::IsOK(), and XrdEc::ScheduleHandler().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: