XRootD
Loading...
Searching...
No Matches
XrdEcRedundancyProvider.cc
Go to the documentation of this file.
1/************************************************************************
2 * KineticIo - a file io interface library to kinetic devices. *
3 * *
4 * This Source Code Form is subject to the terms of the Mozilla *
5 * Public License, v. 2.0. If a copy of the MPL was not *
6 * distributed with this file, You can obtain one at *
7 * https://mozilla.org/MP:/2.0/. *
8 * *
9 * This program is distributed in the hope that it will be useful, *
10 * but is provided AS-IS, WITHOUT ANY WARRANTY; including without *
11 * the implied warranty of MERCHANTABILITY, NON-INFRINGEMENT or *
12 * FITNESS FOR A PARTICULAR PURPOSE. See the Mozilla Public *
13 * License for more details. *
14 ************************************************************************/
15
17
18#include <isa-l.h>
19#include <cstring>
20#include <sstream>
21#include <algorithm>
22
23namespace XrdEc
24{
25
26//--------------------------------------------------------------------------
30//--------------------------------------------------------------------------
31class Convert{
32 public:
33 //--------------------------------------------------------------------------
38 //--------------------------------------------------------------------------
39 template<typename...Args>
40 static std::string toString(Args&&...args){
41 std::stringstream s;
42 argsToStream(s, std::forward<Args>(args)...);
43 return s.str();
44 }
45 private:
46 //--------------------------------------------------------------------------
50 //--------------------------------------------------------------------------
51 template<typename Last>
52 static void argsToStream(std::stringstream& stream, Last&& last) {
53 stream << last;
54 }
55
56 //--------------------------------------------------------------------------
61 //--------------------------------------------------------------------------
62 template<typename First, typename...Rest >
63 static void argsToStream(std::stringstream& stream, First&& first, Rest&&...rest) {
64 stream << first;
65 argsToStream(stream, std::forward<Rest>(rest)...);
66 }
67};
68
69
70
71/* This function is (almost) completely ripped from the erasure_code_test.cc file
72 distributed with the isa-l library. */
74 unsigned char* encode_matrix, // in: encode matrix
75 unsigned char* decode_matrix, // in: buffer, out: generated decode matrix
76 unsigned int* decode_index, // out: order of healthy blocks used for decoding [data#1, data#3, ..., parity#1... ]
77 unsigned char* src_err_list, // in: array of #nerrs size [index error #1, index error #2, ... ]
78 unsigned char* src_in_err, // in: array of #data size > [1,0,0,0,1,0...] -> 0 == no error, 1 == error
79 unsigned int nerrs, // #total errors
80 unsigned int nsrcerrs, // #data errors
81 unsigned int k, // #data
82 unsigned int m // #data+parity
83)
84{
85 unsigned i, j, p;
86 unsigned int r;
87 unsigned char* invert_matrix, * backup, * b, s;
88 int incr = 0;
89
90 size_t mk = (size_t)m * (size_t)k;
91 std::vector<unsigned char> memory(3 * mk);
92 b = &memory[0];
93 backup = &memory[mk];
94 invert_matrix = &memory[2 * mk];
95
96 // Construct matrix b by removing error rows
97 for (i = 0, r = 0; i < k; i++, r++) {
98 while (src_in_err[r]) {
99 r++;
100 }
101 for (j = 0; j < k; j++) {
102 b[k * i + j] = encode_matrix[k * r + j];
103 backup[k * i + j] = encode_matrix[k * r + j];
104 }
105 decode_index[i] = r;
106 }
107 incr = 0;
108 while (gf_invert_matrix(b, invert_matrix, k) < 0) {
109 if (nerrs == (m - k)) {
110 return -1;
111 }
112 incr++;
113 memcpy(b, backup, mk);
114 for (i = nsrcerrs; i < nerrs - nsrcerrs; i++) {
115 if (src_err_list[i] == (decode_index[k - 1] + incr)) {
116 // skip the erased parity line
117 incr++;
118 continue;
119 }
120 }
121 if (decode_index[k - 1] + incr >= m) {
122 return -1;
123 }
124 decode_index[k - 1] += incr;
125 for (j = 0; j < k; j++) {
126 b[k * (k - 1) + j] = encode_matrix[k * decode_index[k - 1] + j];
127 }
128
129 };
130
131 for (i = 0; i < nsrcerrs; i++) {
132 for (j = 0; j < k; j++) {
133 decode_matrix[k * i + j] = invert_matrix[k * src_err_list[i] + j];
134 }
135 }
136 /* src_err_list from encode_matrix * invert of b for parity decoding */
137 for (p = nsrcerrs; p < nerrs; p++) {
138 for (i = 0; i < k; i++) {
139 s = 0;
140 for (j = 0; j < k; j++) {
141 s ^= gf_mul(invert_matrix[j * k + i],
142 encode_matrix[k * src_err_list[p] + j]);
143 }
144
145 decode_matrix[k * p + i] = s;
146 }
147 }
148 return 0;
149}
150
152 objcfg( objcfg ),
153 encode_matrix( objcfg.nbchunks * objcfg.nbdata )
154{
155 // k = data
156 // m = data + parity
157 gf_gen_cauchy1_matrix( encode_matrix.data(), static_cast<int>( objcfg.nbchunks ), static_cast<int>( objcfg.nbdata ) );
158}
159
160
161std::string RedundancyProvider::getErrorPattern( stripes_t &stripes ) const
162{
163 std::string pattern( objcfg.nbchunks, 0 );
164 for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
165 if( !stripes[i].valid ) pattern[i] = '\1';
166
167 return pattern;
168}
169
170
171RedundancyProvider::CodingTable& RedundancyProvider::getCodingTable( const std::string& pattern )
172{
173 std::lock_guard<std::mutex> lock(mutex);
174
175 /* If decode matrix is not already cached we have to construct it. */
176 if( !cache.count(pattern) )
177 {
178 /* Expand pattern */
179 int nerrs = 0, nsrcerrs = 0;
180 unsigned char err_indx_list[objcfg.nbparity];
181 /* Avoid narrowing cast warning, size is always < 256 */
182 uint8_t n = static_cast<uint8_t>(pattern.size() & 0xff);
183 for (uint8_t i = 0; i < n; i++) {
184 if (pattern[i]) {
185 err_indx_list[nerrs++] = i;
186 if (i < objcfg.nbdata) { nsrcerrs++; }
187 }
188 }
189
190 /* Allocate Decode Object. */
191 CodingTable dd;
192 dd.nErrors = nerrs;
193 dd.blockIndices.resize( objcfg.nbdata );
194 dd.table.resize( objcfg.nbdata * objcfg.nbparity * 32);
195
196 /* Compute decode matrix. */
197 std::vector<unsigned char> decode_matrix(objcfg.nbchunks * objcfg.nbdata);
198
199 if (gf_gen_decode_matrix( encode_matrix.data(), decode_matrix.data(), dd.blockIndices.data(),
200 err_indx_list, (unsigned char*) pattern.c_str(), nerrs, nsrcerrs,
201 static_cast<int>( objcfg.nbdata ), static_cast<int>( objcfg.nbchunks ) ) )
202 throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, errno, "Failed computing decode matrix" ) );
203
204 /* Compute Tables. */
205 ec_init_tables( static_cast<int>( objcfg.nbdata ), nerrs, decode_matrix.data(), dd.table.data() );
206 cache.insert( std::make_pair(pattern, dd) );
207 }
208 return cache.at(pattern);
209}
210
211void RedundancyProvider::replication( stripes_t &stripes )
212{
213 // get index of a valid block
214 void *healthy = nullptr;
215 for( auto itr = stripes.begin(); itr != stripes.end(); ++itr )
216 {
217 if( itr->valid )
218 healthy = itr->buffer;
219 }
220
221 if( !healthy ) throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ) );
222
223 // now replicate, by now 'buffers' should contain all chunks
224 for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
225 {
226 if( !stripes[i].valid )
227 memcpy( stripes[i].buffer, healthy, objcfg.chunksize );
228 }
229}
230
232{
233 /* throws if stripe is not recoverable */
234 std::string pattern = getErrorPattern( stripes );
235
236 /* nothing to do if there are no parity blocks. */
237 if ( !objcfg.nbparity ) return;
238
239 /* in case of a single data block use replication */
240 if ( objcfg.nbdata == 1 )
241 return replication( stripes );
242
243 /* normal operation: erasure coding */
244 CodingTable& dd = getCodingTable(pattern);
245
246 unsigned char* inbuf[objcfg.nbdata];
247 for( uint8_t i = 0; i < objcfg.nbdata; i++ )
248 inbuf[i] = reinterpret_cast<unsigned char*>( stripes[dd.blockIndices[i]].buffer );
249
250 std::vector<unsigned char> memory( dd.nErrors * objcfg.chunksize );
251
252 unsigned char* outbuf[dd.nErrors];
253 for (int i = 0; i < dd.nErrors; i++)
254 {
255 outbuf[i] = &memory[i * objcfg.chunksize];
256 }
257
258 ec_encode_data(
259 static_cast<int>( objcfg.chunksize ), // Length of each block of data (vector) of source or destination data.
260 static_cast<int>( objcfg.nbdata ), // The number of vector sources in the generator matrix for coding.
261 dd.nErrors, // The number of output vectors to concurrently encode/decode.
262 dd.table.data(), // Pointer to array of input tables
263 inbuf, // Array of pointers to source input buffers
264 outbuf // Array of pointers to coded output buffers
265 );
266
267 int e = 0;
268 for (size_t i = 0; i < objcfg.nbchunks; i++)
269 {
270 if( pattern[i] )
271 {
272 memcpy( stripes[i].buffer, outbuf[e], objcfg.chunksize );
273 e++;
274 }
275 }
276}
277
278
279};
Class for computing parities and recovering data.
static std::string toString(Args &&...args)
RedundancyProvider(const ObjCfg &objcfg)
void compute(stripes_t &stripes)
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
std::vector< stripe_t > stripes_t
All stripes in a block.
static int gf_gen_decode_matrix(unsigned char *encode_matrix, unsigned char *decode_matrix, unsigned int *decode_index, unsigned char *src_err_list, unsigned char *src_in_err, unsigned int nerrs, unsigned int nsrcerrs, unsigned int k, unsigned int m)
const uint8_t nbdata
const uint8_t nbchunks
const uint8_t nbparity