25#ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
26#define SRC_XRDEC_XRDECWRTBUFF_HH_
39#include <condition_variable>
57 static BufferPool instance;
66 std::unique_lock<std::mutex> lck( mtx );
80 if( currentsize < totalsize )
89 while( pool.empty() ) cv.wait( lck );
100 if( !buffer.GetBuffer() )
return;
101 std::unique_lock<std::mutex> lck( mtx );
102 buffer.SetCursor( 0 );
103 pool.emplace( std::move( buffer ) );
112 BufferPool() : totalsize( 1024 ), currentsize( 0 )
116 BufferPool(
const BufferPool& ) =
delete;
117 BufferPool( BufferPool&& ) =
delete;
118 BufferPool& operator=(
const BufferPool& ) =
delete;
119 BufferPool& operator=( BufferPool&& ) =
delete;
121 const size_t totalsize;
123 std::condition_variable cv;
125 std::queue<XrdCl::Buffer> pool;
143 stripes.reserve( objcfg.nbchunks );
144 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
150 wrtbuff( std::move( wrtbuff.wrtbuff ) ),
151 stripes( std::move( wrtbuff.stripes ) ),
152 cksums( std::move( wrtbuff.cksums ) )
169 uint32_t
Write( uint32_t size,
const char *buffer )
171 uint64_t bytesAccepted = size;
172 if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize )
173 bytesAccepted = objcfg.datasize - wrtbuff.GetCursor();
174 memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted );
175 wrtbuff.AdvanceCursor( bytesAccepted );
176 return bytesAccepted;
186 if( wrtbuff.GetSize() != 0 )
188 wrtbuff.AdvanceCursor( size );
192 wrtbuff.Allocate( objcfg.datasize );
193 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
194 wrtbuff.SetCursor( size );
204 return stripes[strpnb].buffer;
214 if( strp < objcfg.nbdata )
218 uint64_t expsize = ( strp + 1) * objcfg.chunksize;
219 if( expsize <= wrtbuff.GetCursor() )
220 return objcfg.chunksize;
223 uint64_t delta = expsize - wrtbuff.GetCursor();
224 if( delta < objcfg.chunksize )
225 return objcfg.chunksize - delta;
238 return wrtbuff.GetCursor();
245 return wrtbuff.GetCursor() == objcfg.datasize;
252 return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 );
261 for( i = 0; i < objcfg.nbchunks; ++i )
262 stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata );
266 cksums.reserve( objcfg.nbchunks );
267 for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb )
271 cksums.emplace_back( std::move( ftr ) );
282 return cksums[strpnb].get();
290 std::vector<std::future<uint32_t>> cksums;
Binary blob representation.
Pool of buffer for caching writes.
void Recycle(XrdCl::Buffer &&buffer)
Give back a buffer to the poool.
static BufferPool & Instance()
Singleton access to the object.
XrdCl::Buffer Create(const ObjCfg &objcfg)
Create now buffer (or recycle existing one)
Global configuration for the EC module.
static Config & Instance()
Singleton access.
RedundancyProvider & GetRedundancy(const ObjCfg &objcfg)
Get redundancy provider for given data object configuration.
void compute(stripes_t &stripes)
static ThreadPool & Instance()
Singleton access.
std::future< std::invoke_result_t< FUNC, ARGs... > > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.
uint32_t Write(uint32_t size, const char *buffer)
WrtBuff(const ObjCfg &objcfg)
uint32_t GetBlkSize()
Get size of the data in the buffer.
uint32_t GetStrpSize(uint8_t strp)
uint32_t GetCrc32c(size_t strpnb)
void Encode()
Calculate the parity for the data stripes and the crc32cs.
bool Empty()
True if there are no data in the buffer, false otherwise.
bool Complete()
True if the buffer if full, false otherwise.
char * GetStrpBuff(uint8_t strpnb)
WrtBuff(WrtBuff &&wrtbuff)
Move constructor.
std::vector< stripe_t > stripes_t
All stripes in a block.