XRootD
Loading...
Searching...
No Matches
XrdCephOssBufferedFile.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3// Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef __XRD_CEPH_OSS_BUFFERED_FILE_HH__
26#define __XRD_CEPH_OSS_BUFFERED_FILE_HH__
27
28#include "XrdOss/XrdOss.hh"
29#include "XrdCeph/XrdCephOss.hh"
31
35
36#include <memory>
37#include <chrono>
38#include <atomic>
39#include <map>
40#include <mutex>
41
42//------------------------------------------------------------------------------
46//------------------------------------------------------------------------------
47
48class XrdCephOssBufferedFile : virtual public XrdCephOssFile { // XrdOssDF
49
50public:
51 XrdCephOssBufferedFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF, size_t buffersize,
52 const std::string& bufferIOmode,
53 size_t maxNumberSimulBuffers);
54 //explicit XrdCephOssBufferedFile(size_t buffersize);
56 virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env);
57 virtual int Close(long long *retsz=0);
58 virtual ssize_t Read(off_t offset, size_t blen);
59 virtual ssize_t Read(void *buff, off_t offset, size_t blen);
60 virtual int Read(XrdSfsAio *aoip);
61 virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt);
62 virtual ssize_t ReadRaw(void *, off_t, size_t);
63 virtual int Fstat(struct stat *buff);
64 virtual ssize_t Write(const void *buff, off_t offset, size_t blen);
65 virtual int Write(XrdSfsAio *aiop);
66 virtual int Fsync(void);
67 virtual int Ftruncate(unsigned long long);
68
69protected:
70 std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> createBuffer();
71
73 XrdCephOssFile * m_xrdOssDF = nullptr; // holder of the XrdCephOssFile instance
74 std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> m_bufferAlg;
75 std::map<size_t, std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> > m_bufferReadAlgs;
76 std::mutex m_buf_mutex;
78
79
82
83 int m_flags = 0;
84 size_t m_bufsize = 16*1024*1024L; // default 16MiB size
85 std::string m_bufferIOmode;
86 std::string m_path;
87 std::chrono::time_point<std::chrono::system_clock> m_timestart;
88 std::atomic<size_t> m_bytesRead = {0};
89 std::atomic<size_t> m_bytesReadV = {0};
90 std::atomic<size_t> m_bytesReadAIO = {0};
91 std::atomic<size_t> m_bytesWrite = {0};
92 std::atomic<size_t> m_bytesWriteAIO= {0};
93};
94
95#endif /* __XRD_CEPH_OSS_BUFFERED_FILE_HH__ */
#define stat(a, b)
Definition XrdPosix.hh:101
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
virtual int Ftruncate(unsigned long long)
std::chrono::time_point< std::chrono::system_clock > m_timestart
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
XrdCephOss * m_cephoss
create a new instance of the buffer
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Close(long long *retsz=0)
int m_flags
number of ms to sleep if a retry is requested
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
virtual ssize_t ReadRaw(void *, off_t, size_t)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()
XrdCephOssBufferedFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)
XrdCephOssFile(XrdCephOss *cephoss)