XRootD
Loading...
Searching...
No Matches
XrdEcStrmWriter.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_
26#define SRC_XRDEC_XRDECSTRMWRITER_HH_
27
28#include "XrdEc/XrdEcWrtBuff.hh"
30
34
35#include <random>
36#include <chrono>
37#include <future>
38#include <atomic>
39#include <memory>
40#include <vector>
41#include <thread>
42#include <iterator>
43
44#include <sys/stat.h>
45
46namespace XrdEc
47{
48 //---------------------------------------------------------------------------
51 //---------------------------------------------------------------------------
53 {
54 //-------------------------------------------------------------------------
55 // Type for queue of buffers to be written
56 //-------------------------------------------------------------------------
57 typedef sync_queue<std::future<WrtBuff*>> buff_queue;
58
59 public:
60
61 //-----------------------------------------------------------------------
63 //-----------------------------------------------------------------------
64 StrmWriter( const ObjCfg &objcfg ) : objcfg( objcfg ),
65 writer_thread_stop( false ),
66 writer_thread( writer_routine, this ),
67 next_blknb( 0 ),
68 global_status( this )
69 {
70 }
71
72 //-----------------------------------------------------------------------
74 //-----------------------------------------------------------------------
75 virtual ~StrmWriter()
76 {
77 writer_thread_stop = true;
78 buffers.interrupt();
79 writer_thread.join();
80 }
81
82 //-----------------------------------------------------------------------
86 //-----------------------------------------------------------------------
87 void Open( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
88
89 //-----------------------------------------------------------------------
95 //-----------------------------------------------------------------------
96 void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler );
97
98 //-----------------------------------------------------------------------
102 //-----------------------------------------------------------------------
103 void Close( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
104
105 //-----------------------------------------------------------------------
107 //-----------------------------------------------------------------------
108 uint64_t GetSize()
109 {
110 return global_status.get_btswritten();
111 }
112
113 private:
114
115 //-----------------------------------------------------------------------
116 // Global status of the StrmWriter
117 //-----------------------------------------------------------------------
118 struct global_status_t
119 {
120 //---------------------------------------------------------------------
121 // Constructor
122 //---------------------------------------------------------------------
123 global_status_t( StrmWriter *writer ) : writer( writer ),
124 btsleft( 0 ),
125 btswritten( 0 ),
126 stopped_writing( false ),
127 closeHandler( 0 )
128 {
129 }
130
131 //---------------------------------------------------------------------
132 // Report status of write operation
133 //---------------------------------------------------------------------
134 void report_wrt( const XrdCl::XRootDStatus &st, uint64_t wrtsize )
135 {
136 std::unique_lock<std::recursive_mutex> lck( mtx );
137 //-------------------------------------------------------------------
138 // Update the global status
139 //-------------------------------------------------------------------
140 btsleft -= wrtsize;
141 if( !st.IsOK() ) status = st;
142 else btswritten += wrtsize;
143
144 //-------------------------------------------------------------------
145 // check if we are done, and if yes call the close implementation
146 //-------------------------------------------------------------------
147 if( btsleft == 0 && stopped_writing )
148 {
149 lck.unlock();
150 writer->CloseImpl( closeHandler );
151 }
152 }
153
154 //---------------------------------------------------------------------
155 // Report status of open operation
156 //---------------------------------------------------------------------
157 inline void report_open( const XrdCl::XRootDStatus &st )
158 {
159 report_wrt( st, 0 );
160 }
161
162 //---------------------------------------------------------------------
163 // Indicate that the user issued close
164 //---------------------------------------------------------------------
165 void issue_close( XrdCl::ResponseHandler *handler, uint16_t timeout )
166 {
167 std::unique_lock<std::recursive_mutex> lck( mtx );
168 //-------------------------------------------------------------------
169 // There will be no more new write requests
170 //-------------------------------------------------------------------
171 stopped_writing = true;
172 //-------------------------------------------------------------------
173 // If there are no outstanding writes, we can simply call the close
174 // routine
175 //-------------------------------------------------------------------
176 if( btsleft == 0 ) return writer->CloseImpl( handler, timeout );
177 //-------------------------------------------------------------------
178 // Otherwise we save the handler for later
179 //-------------------------------------------------------------------
180 closeHandler = handler;
181 }
182
183 //---------------------------------------------------------------------
184 // get the global status value
185 //---------------------------------------------------------------------
186 inline const XrdCl::XRootDStatus& get() const
187 {
188 std::unique_lock<std::recursive_mutex> lck( mtx );
189 return status;
190 }
191
192 inline void issue_write( uint64_t wrtsize )
193 {
194 std::unique_lock<std::recursive_mutex> lck( mtx );
195 btsleft += wrtsize;
196 }
197
198 inline uint64_t get_btswritten()
199 {
200 return btswritten;
201 }
202
203 private:
204 mutable std::recursive_mutex mtx;
205 StrmWriter *writer; //> pointer to the StrmWriter
206 uint64_t btsleft; //> bytes left to be written
207 uint64_t btswritten; //> total number of bytes written
208 bool stopped_writing; //> true, if user called close
209 XrdCl::XRootDStatus status; //> the global status
210 XrdCl::ResponseHandler *closeHandler; //> user close handler
211 };
212
213 //-----------------------------------------------------------------------
217 //-----------------------------------------------------------------------
218 inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff )
219 {
220 // the routine to be called in the thread-pool
221 // - does erasure coding
222 // - calculates crc32cs
223 static auto prepare_buff = []( WrtBuff *wrtbuff )
224 {
225 std::unique_ptr<WrtBuff> ptr( wrtbuff );
226 ptr->Encode();
227 return ptr.release();
228 };
229 buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) );
230 }
231
232 //-----------------------------------------------------------------------
236 //-----------------------------------------------------------------------
237 inline std::unique_ptr<WrtBuff> DequeueBuff()
238 {
239 std::future<WrtBuff*> ftr = buffers.dequeue();
240 std::unique_ptr<WrtBuff> result( ftr.get() );
241 return result;
242 }
243
244 //-----------------------------------------------------------------------
248 //-----------------------------------------------------------------------
249 static void writer_routine( StrmWriter *me )
250 {
251 try
252 {
253 while( !me->writer_thread_stop )
254 {
255 std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() );
256 if( !wrtbuff ) continue;
257 me->WriteBuff( std::move( wrtbuff ) );
258 }
259 }
260 catch( const buff_queue::wait_interrupted& ){ }
261 }
262
263 //-----------------------------------------------------------------------
267 //-----------------------------------------------------------------------
268 void WriteBuff( std::unique_ptr<WrtBuff> buff );
269
270 //-----------------------------------------------------------------------
274 //-----------------------------------------------------------------------
275 std::vector<char> GetMetadataBuffer();
276
277 //-----------------------------------------------------------------------
281 //-----------------------------------------------------------------------
282 void CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
283
284 const ObjCfg &objcfg;
285 std::unique_ptr<WrtBuff> wrtbuff; //< current write buffer
286 std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs; //< ZIP archives with data
287 std::vector<std::shared_ptr<XrdCl::File>> metadataarchs; //< ZIP archives with metadata
288 std::vector<std::vector<char>> cdbuffs; //< buffers with CDs
289 buff_queue buffers; //< queue of buffer for writing
290 //< (waiting to be erasure coded)
291 std::atomic<bool> writer_thread_stop; //< true if the writer thread should be stopped,
292 //< flase otherwise
293 std::thread writer_thread; //< handle to the writer thread
294 size_t next_blknb; //< number of the next block to be created
295 global_status_t global_status; //< global status of the writer
296 };
297
298}
299
300#endif /* SRC_XRDEC_XRDECSTRMWRITER_HH_ */
Handle an async response.
StrmWriter(const ObjCfg &objcfg)
Constructor.
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
virtual ~StrmWriter()
Destructor.
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
static ThreadPool & Instance()
Singleton access.
bool IsOK() const
We're fine.