XRootD
Loading...
Searching...
No Matches
XrdEcStrmWriter.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27
29
30#include "XrdZip/XrdZipLFH.hh"
31#include "XrdZip/XrdZipCDFH.hh"
32#include "XrdZip/XrdZipEOCD.hh"
33#include "XrdZip/XrdZipUtils.hh"
34
35#include <numeric>
36#include <algorithm>
37#include <future>
38
39namespace XrdEc
40{
41 //---------------------------------------------------------------------------
42 // Open the data object for writting
43 //---------------------------------------------------------------------------
44 void StrmWriter::Open( XrdCl::ResponseHandler *handler, uint16_t timeout )
45 {
46 const size_t size = objcfg.plgr.size();
47
48 std::vector<XrdCl::Pipeline> opens;
49 opens.reserve( size );
50 // initialize all zip archive objects
51 for( size_t i = 0; i < size; ++i )
52 dataarchs.emplace_back( std::make_shared<XrdCl::ZipArchive>(
53 Config::Instance().enable_plugins ) );
54
55 for( size_t i = 0; i < size; ++i )
56 {
57 std::string url = objcfg.GetDataUrl( i );
58 XrdCl::Ctx<XrdCl::ZipArchive> zip( *dataarchs[i] );
59 opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) );
60 }
61
62 XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >>
63 [handler,this]( XrdCl::XRootDStatus &st )
64 {
65 if( !st.IsOK() ) this->global_status.report_open( st );
66 handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
67 }, timeout );
68 }
69
70 //---------------------------------------------------------------------------
71 // Write data to the data object
72 //---------------------------------------------------------------------------
73 void StrmWriter::Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler )
74 {
75 //-------------------------------------------------------------------------
76 // First, check the global status, if we are in an error state just
77 // fail the request.
78 //-------------------------------------------------------------------------
79 XrdCl::XRootDStatus gst = global_status.get();
80 if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
81
82 //-------------------------------------------------------------------------
83 // Update the number of bytes left to be written
84 //-------------------------------------------------------------------------
85 global_status.issue_write( size );
86
87 const char* buffer = reinterpret_cast<const char*>( buff );
88 uint32_t wrtsize = size;
89 while( wrtsize > 0 )
90 {
91 if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) );
92 uint64_t written = wrtbuff->Write( wrtsize, buffer );
93 buffer += written;
94 wrtsize -= written;
95 if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) );
96 }
97
98 //-------------------------------------------------------------------------
99 // We can tell the user it's done as we have the date cached in the
100 // buffer
101 //-------------------------------------------------------------------------
102 ScheduleHandler( handler );
103 }
104
105 //---------------------------------------------------------------------------
106 // Close the data object
107 //---------------------------------------------------------------------------
108 void StrmWriter::Close( XrdCl::ResponseHandler *handler, uint16_t timeout )
109 {
110 //-------------------------------------------------------------------------
111 // First, check the global status, if we are in an error state just
112 // fail the request.
113 //-------------------------------------------------------------------------
114 XrdCl::XRootDStatus gst = global_status.get();
115 if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
116 //-------------------------------------------------------------------------
117 // Take care of the left-over data ...
118 //-------------------------------------------------------------------------
119 if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) );
120 //-------------------------------------------------------------------------
121 // Let the global status handle the close
122 //-------------------------------------------------------------------------
123 global_status.issue_close( handler, timeout );
124 }
125
126 //---------------------------------------------------------------------------
127 // Issue the write requests for the given write buffer
128 //---------------------------------------------------------------------------
129 void StrmWriter::WriteBuff( std::unique_ptr<WrtBuff> buff )
130 {
131 //-------------------------------------------------------------------------
132 // Our buffer with the data block, will be shared between all pipelines
133 // writing to different servers.
134 //-------------------------------------------------------------------------
135 std::shared_ptr<WrtBuff> wrtbuff( std::move( buff ) );
136
137 //-------------------------------------------------------------------------
138 // Shuffle the servers so every block has a different placement
139 //-------------------------------------------------------------------------
140 static std::default_random_engine random_engine( std::chrono::system_clock::now().time_since_epoch().count() );
141 std::shared_ptr<sync_queue<size_t>> servers = std::make_shared<sync_queue<size_t>>();
142 std::vector<size_t> zipid( dataarchs.size() );
143 std::iota( zipid.begin(), zipid.end(), 0 );
144 std::shuffle( zipid.begin(), zipid.end(), random_engine );
145 auto itr = zipid.begin();
146 for( ; itr != zipid.end() ; ++itr ) servers->enqueue( std::move( *itr ) );
147
148 //-------------------------------------------------------------------------
149 // Create the write pipelines for updating stripes
150 //-------------------------------------------------------------------------
151 const size_t nbchunks = objcfg.nbchunks;
152 std::vector<XrdCl::Pipeline> writes;
153 writes.reserve( nbchunks );
154 size_t blknb = next_blknb++;
155 uint64_t blksize = 0;
156 for( size_t strpnb = 0; strpnb < nbchunks; ++strpnb )
157 {
158 std::string fn = objcfg.GetFileName( blknb, strpnb );
159 uint32_t crc32c = wrtbuff->GetCrc32c( strpnb );
160 uint64_t strpsize = wrtbuff->GetStrpSize( strpnb );
161 char* strpbuff = wrtbuff->GetStrpBuff( strpnb );
162 if( strpnb < objcfg.nbdata ) blksize += strpsize;
163
164 //-----------------------------------------------------------------------
165 // Find a server where we can append the next data chunk
166 //-----------------------------------------------------------------------
168 size_t srvid;
169 if( !servers->dequeue( srvid ) )
170 {
172 0, "No more data servers to try." );
173 //---------------------------------------------------------------------
174 // calculate the full block size, otherwise the user handler
175 // will be never called
176 //---------------------------------------------------------------------
177 for( size_t i = strpnb + 1; i < objcfg.nbdata; ++i )
178 blksize += wrtbuff->GetStrpSize( i );
179 global_status.report_wrt( err, blksize );
180 return;
181 }
182 zip = *dataarchs[srvid];
183
184 //-----------------------------------------------------------------------
185 // Create the Write request
186 //-----------------------------------------------------------------------
187 XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >>
188 [servers,srvid,wrtbuff,zip,this]( XrdCl::XRootDStatus &st ) mutable
189 {
190 //------------------------------------------------
191 // Try to recover from error
192 //------------------------------------------------
193 if( !st.IsOK() )
194 {
195 //----------------------------------------------
196 // Select another server
197 //----------------------------------------------
198 if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail
199 zip = *this->dataarchs[srvid];
200 //----------------------------------------------
201 // Retry this operation at different server
202 //----------------------------------------------
204 }
205 //------------------------------------------------
206 // Make sure the buffer is only deallocated
207 // after the handler is called
208 //------------------------------------------------
209 wrtbuff.reset();
210 };
211 writes.emplace_back( std::move( p ) );
212 }
213
214 XrdCl::WaitFor( XrdCl::Parallel( writes ) >> [blksize,this]( XrdCl::XRootDStatus &st ){ this->global_status.report_wrt( st, blksize ); } );
215 }
216
217 //---------------------------------------------------------------------------
218 // Get a buffer with metadata (CDFH and EOCD records)
219 //---------------------------------------------------------------------------
220 XrdZip::buffer_t StrmWriter::GetMetadataBuffer()
221 {
222 using namespace XrdZip;
223
224 const size_t cdcnt = objcfg.plgr.size();
225 std::vector<buffer_t> buffs; buffs.reserve( cdcnt ); // buffers with raw data
226 std::vector<LFH> lfhs; lfhs.reserve( cdcnt ); // LFH records
227 std::vector<CDFH> cdfhs; cdfhs.reserve( cdcnt ); // CDFH records
228
229 //-------------------------------------------------------------------------
230 // prepare data structures (LFH and CDFH records)
231 //-------------------------------------------------------------------------
232 uint64_t offset = 0;
233 uint64_t cdsize = 0;
234 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
235 for( size_t i = 0; i < cdcnt; ++i )
236 {
237 std::string fn = std::to_string( i ); // file name (URL of the data archive)
238 buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive)
239 uint32_t cksum = objcfg.digest( 0, buff.data(), buff.size() ); // digest (crc) of the buffer
240 lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer
241 LFH &lfh = lfhs.back();
242 cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer
243 offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset
244 cdsize += cdfhs.back().cdfhSize; // update central directory size
245 buffs.emplace_back( std::move( buff ) ); // keep the buffer for later
246 }
247
248 uint64_t zipsize = offset + cdsize + EOCD::eocdBaseSize;
249 buffer_t zipbuff; zipbuff.reserve( zipsize );
250
251 //-------------------------------------------------------------------------
252 // write into the final buffer LFH records + raw data
253 //-------------------------------------------------------------------------
254 for( size_t i = 0; i < cdcnt; ++i )
255 {
256 lfhs[i].Serialize( zipbuff );
257 std::copy( buffs[i].begin(), buffs[i].end(), std::back_inserter( zipbuff ) );
258 }
259 //-------------------------------------------------------------------------
260 // write into the final buffer CDFH records
261 //-------------------------------------------------------------------------
262 for( size_t i = 0; i < cdcnt; ++i )
263 cdfhs[i].Serialize( zipbuff );
264 //-------------------------------------------------------------------------
265 // prepare and write into the final buffer the EOCD record
266 //-------------------------------------------------------------------------
267 EOCD eocd( offset, cdcnt, cdsize );
268 eocd.Serialize( zipbuff );
269
270 return zipbuff;
271 }
272
273 //---------------------------------------------------------------------------
274 // Close the data object (implementation)
275 //---------------------------------------------------------------------------
276 void StrmWriter::CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout )
277 {
278 //-------------------------------------------------------------------------
279 // First, check the global status, if we are in an error state just
280 // fail the request.
281 //-------------------------------------------------------------------------
282 XrdCl::XRootDStatus gst = global_status.get();
283 if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
284
285 const size_t size = objcfg.plgr.size();
286 //-------------------------------------------------------------------------
287 // prepare the metadata (the Central Directory of each data ZIP)
288 //-------------------------------------------------------------------------
289 auto zipbuff = objcfg.nomtfile ? nullptr :
290 std::make_shared<XrdZip::buffer_t>( GetMetadataBuffer() );
291 //-------------------------------------------------------------------------
292 // prepare the pipelines ...
293 //-------------------------------------------------------------------------
294 std::vector<XrdCl::Pipeline> closes;
295 std::vector<XrdCl::Pipeline> save_metadata;
296 closes.reserve( size );
297 std::string closeTime = std::to_string( time(NULL) );
298
299 std::vector<XrdCl::xattr_t> xav{ {"xrdec.filesize", std::to_string(GetSize())},
300 {"xrdec.strpver", closeTime.c_str()} };
301
302 for( size_t i = 0; i < size; ++i )
303 {
304 //-----------------------------------------------------------------------
305 // close ZIP archives with data
306 //-----------------------------------------------------------------------
307 if( dataarchs[i]->IsOpen() )
308 {
309 XrdCl::Pipeline p = XrdCl::SetXAttr( dataarchs[i]->GetFile(), xav )
310 | XrdCl::CloseArchive( *dataarchs[i] );
311 closes.emplace_back( std::move( p ) );
312 }
313 //-----------------------------------------------------------------------
314 // replicate the metadata
315 //-----------------------------------------------------------------------
316 if( zipbuff )
317 {
318 std::string url = objcfg.GetMetadataUrl( i );
319 metadataarchs.emplace_back( std::make_shared<XrdCl::File>(
320 Config::Instance().enable_plugins ) );
321 XrdCl::Pipeline p = XrdCl::Open( *metadataarchs[i], url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write )
322 | XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() )
323 | XrdCl::Close( *metadataarchs[i] )
324 | XrdCl::Final( [zipbuff]( const XrdCl::XRootDStatus& ){ } );
325
326 save_metadata.emplace_back( std::move( p ) );
327 }
328 }
329
330 //-------------------------------------------------------------------------
331 // If we were instructed not to create the the additional metadata file
332 // do the simplified close
333 //-------------------------------------------------------------------------
334 if( save_metadata.empty() )
335 {
336 XrdCl::Pipeline p = XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ) >> handler;
337 XrdCl::Async( std::move( p ), timeout );
338 return;
339 }
340
341 //-------------------------------------------------------------------------
342 // compose closes & save_metadata:
343 // - closes must be successful at least for #data + #parity
344 // - save_metadata must be successful at least for #parity + 1
345 //-------------------------------------------------------------------------
346 XrdCl::Pipeline p = XrdCl::Parallel(
347 XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ),
348 XrdCl::Parallel( save_metadata ).AtLeast( objcfg.nbparity + 1 )
349 ) >> handler;
350 XrdCl::Async( std::move( p ), timeout );
351 }
352}
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
static void Repeat()
Repeat current operation.
Handle an async response.
static Config & Instance()
Singleton access.
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
AppendFileImpl< false > AppendFile(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< uint32_t > crc32, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating ArchiveReadImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
FinalOperation Final
const uint16_t errNoMoreReplicas
No more replicas to try.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
a buffer type
std::vector< char > buffer_t
Utility class for storing a pointer to operation context.
Definition XrdClCtx.hh:39
@ Write
Open only for writing.
bool IsOK() const
We're fine.
const uint8_t nbdata
std::string GetFileName(size_t blknb, size_t strpnb) const
const uint8_t nbchunks