XRootD
Loading...
Searching...
No Matches
XrdEcWrtBuff.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
26#define SRC_XRDEC_XRDECWRTBUFF_HH_
27
29#include "XrdEc/XrdEcObjCfg.hh"
30#include "XrdEc/XrdEcConfig.hh"
32
33#include "XrdCl/XrdClBuffer.hh"
35
37
38#include <vector>
39#include <condition_variable>
40#include <mutex>
41#include <future>
42
43namespace XrdEc
44{
45 //---------------------------------------------------------------------------
47 //---------------------------------------------------------------------------
48 class BufferPool
49 {
50 public:
51
52 //-----------------------------------------------------------------------
54 //-----------------------------------------------------------------------
55 static BufferPool& Instance()
56 {
57 static BufferPool instance;
58 return instance;
59 }
60
61 //-----------------------------------------------------------------------
63 //-----------------------------------------------------------------------
64 XrdCl::Buffer Create( const ObjCfg &objcfg )
65 {
66 std::unique_lock<std::mutex> lck( mtx );
67 //---------------------------------------------------------------------
68 // If pool is not empty, recycle existing buffer
69 //---------------------------------------------------------------------
70 if( !pool.empty() )
71 {
72 XrdCl::Buffer buffer( std::move( pool.front() ) );
73 pool.pop();
74 return buffer;
75 }
76 //---------------------------------------------------------------------
77 // Check if we can create a new buffer object without exceeding the
78 // the maximum size of the pool
79 //---------------------------------------------------------------------
80 if( currentsize < totalsize )
81 {
82 XrdCl::Buffer buffer( objcfg.blksize );
83 ++currentsize;
84 return buffer;
85 }
86 //---------------------------------------------------------------------
87 // If not, we have to wait until there is a buffer we can recycle
88 //---------------------------------------------------------------------
89 while( pool.empty() ) cv.wait( lck );
90 XrdCl::Buffer buffer( std::move( pool.front() ) );
91 pool.pop();
92 return buffer;
93 }
94
95 //-----------------------------------------------------------------------
97 //-----------------------------------------------------------------------
98 void Recycle( XrdCl::Buffer && buffer )
99 {
100 if( !buffer.GetBuffer() ) return;
101 std::unique_lock<std::mutex> lck( mtx );
102 buffer.SetCursor( 0 );
103 pool.emplace( std::move( buffer ) );
104 cv.notify_all();
105 }
106
107 private:
108
109 //-----------------------------------------------------------------------
110 // Default constructor
111 //-----------------------------------------------------------------------
112 BufferPool() : totalsize( 1024 ), currentsize( 0 )
113 {
114 }
115
116 BufferPool( const BufferPool& ) = delete; //< Copy constructor
117 BufferPool( BufferPool&& ) = delete; //< Move constructor
118 BufferPool& operator=( const BufferPool& ) = delete; //< Copy assigment operator
119 BufferPool& operator=( BufferPool&& ) = delete; //< Move assigment operator
120
121 const size_t totalsize; //< maximum size of the pool
122 size_t currentsize; //< current size of the pool
123 std::condition_variable cv;
124 std::mutex mtx;
125 std::queue<XrdCl::Buffer> pool; //< the pool itself
126 };
127
128 //---------------------------------------------------------------------------
131 //---------------------------------------------------------------------------
133 {
134 public:
135 //-----------------------------------------------------------------------
139 //-----------------------------------------------------------------------
140 WrtBuff( const ObjCfg &objcfg ) : objcfg( objcfg ),
141 wrtbuff( BufferPool::Instance().Create( objcfg ) )
142 {
143 stripes.reserve( objcfg.nbchunks );
144 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
145 }
146 //-----------------------------------------------------------------------
148 //-----------------------------------------------------------------------
149 WrtBuff( WrtBuff && wrtbuff ) : objcfg( wrtbuff.objcfg ),
150 wrtbuff( std::move( wrtbuff.wrtbuff ) ),
151 stripes( std::move( wrtbuff.stripes ) ),
152 cksums( std::move( wrtbuff.cksums ) )
153 {
154 }
155 //-----------------------------------------------------------------------
156 // Destructor
157 //-----------------------------------------------------------------------
159 {
160 BufferPool::Instance().Recycle( std::move( wrtbuff ) );
161 }
162 //-----------------------------------------------------------------------
168 //-----------------------------------------------------------------------
169 uint32_t Write( uint32_t size, const char *buffer )
170 {
171 uint64_t bytesAccepted = size; // bytes accepted by the buffer
172 if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize )
173 bytesAccepted = objcfg.datasize - wrtbuff.GetCursor();
174 memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted );
175 wrtbuff.AdvanceCursor( bytesAccepted );
176 return bytesAccepted;
177 }
178 //-----------------------------------------------------------------------
182 //-----------------------------------------------------------------------
183 void Pad( uint32_t size )
184 {
185 // if the buffer exist we only need to move the cursor
186 if( wrtbuff.GetSize() != 0 )
187 {
188 wrtbuff.AdvanceCursor( size );
189 return;
190 }
191 // otherwise we allocate the buffer and set the cursor
192 wrtbuff.Allocate( objcfg.datasize );
193 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
194 wrtbuff.SetCursor( size );
195 return;
196 }
197 //-----------------------------------------------------------------------
201 //-----------------------------------------------------------------------
202 inline char* GetStrpBuff( uint8_t strpnb )
203 {
204 return stripes[strpnb].buffer;
205 }
206 //-----------------------------------------------------------------------
210 //-----------------------------------------------------------------------
211 uint32_t GetStrpSize( uint8_t strp )
212 {
213 // Check if it is a data chunk?
214 if( strp < objcfg.nbdata )
215 {
216 // If the cursor is at least at the expected size
217 // it means we have the full chunk.
218 uint64_t expsize = ( strp + 1) * objcfg.chunksize;
219 if( expsize <= wrtbuff.GetCursor() )
220 return objcfg.chunksize;
221 // If the cursor is of by less than the chunk size
222 // it means we have a partial chunk
223 uint64_t delta = expsize - wrtbuff.GetCursor();
224 if( delta < objcfg.chunksize )
225 return objcfg.chunksize - delta;
226 // otherwise we are handling an empty chunk
227 return 0;
228 }
229 // It is a parity chunk so its size has to be equal
230 // to the size of the first chunk
231 return GetStrpSize( 0 );
232 }
233 //-----------------------------------------------------------------------
235 //-----------------------------------------------------------------------
236 inline uint32_t GetBlkSize()
237 {
238 return wrtbuff.GetCursor();
239 }
240 //-----------------------------------------------------------------------
242 //-----------------------------------------------------------------------
243 inline bool Complete()
244 {
245 return wrtbuff.GetCursor() == objcfg.datasize;
246 }
247 //-----------------------------------------------------------------------
249 //-----------------------------------------------------------------------
250 inline bool Empty()
251 {
252 return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 );
253 }
254 //-----------------------------------------------------------------------
256 //-----------------------------------------------------------------------
257 inline void Encode()
258 {
259 // first calculate the parity
260 uint8_t i ;
261 for( i = 0; i < objcfg.nbchunks; ++i )
262 stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata );
263 Config &cfg = Config::Instance();
264 cfg.GetRedundancy( objcfg ).compute( stripes );
265 // then calculate the checksums
266 cksums.reserve( objcfg.nbchunks );
267 for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb )
268 {
269 size_t chunksize = GetStrpSize( strpnb );
270 std::future<uint32_t> ftr = ThreadPool::Instance().Execute( objcfg.digest, 0, stripes[strpnb].buffer, chunksize );
271 cksums.emplace_back( std::move( ftr ) );
272 }
273 }
274 //-----------------------------------------------------------------------
279 //-----------------------------------------------------------------------
280 inline uint32_t GetCrc32c( size_t strpnb )
281 {
282 return cksums[strpnb].get();
283 }
284
285 private:
286
287 ObjCfg objcfg; //< configuration for the data object
288 XrdCl::Buffer wrtbuff; //< the buffer for the data
289 stripes_t stripes; //< data stripes
290 std::vector<std::future<uint32_t>> cksums; //< crc32cs for the data stripes
291 };
292
293
294} /* namespace XrdEc */
295
296#endif /* SRC_XRDEC_XRDECWRTBUFF_HH_ */
bool Create
Binary blob representation.
Pool of buffer for caching writes.
void Recycle(XrdCl::Buffer &&buffer)
Give back a buffer to the poool.
static BufferPool & Instance()
Singleton access to the object.
XrdCl::Buffer Create(const ObjCfg &objcfg)
Create now buffer (or recycle existing one)
Global configuration for the EC module.
static Config & Instance()
Singleton access.
RedundancyProvider & GetRedundancy(const ObjCfg &objcfg)
Get redundancy provider for given data object configuration.
void compute(stripes_t &stripes)
static ThreadPool & Instance()
Singleton access.
std::future< std::invoke_result_t< FUNC, ARGs... > > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.
uint32_t Write(uint32_t size, const char *buffer)
WrtBuff(const ObjCfg &objcfg)
uint32_t GetBlkSize()
Get size of the data in the buffer.
uint32_t GetStrpSize(uint8_t strp)
void Pad(uint32_t size)
uint32_t GetCrc32c(size_t strpnb)
void Encode()
Calculate the parity for the data stripes and the crc32cs.
bool Empty()
True if there are no data in the buffer, false otherwise.
bool Complete()
True if the buffer if full, false otherwise.
char * GetStrpBuff(uint8_t strpnb)
WrtBuff(WrtBuff &&wrtbuff)
Move constructor.
std::vector< stripe_t > stripes_t
All stripes in a block.
const uint64_t blksize