XRootD
Loading...
Searching...
No Matches
CephIOAdapterAIORaw.cc
Go to the documentation of this file.
2#include "../XrdCephPosix.hh"
3#include "XrdOuc/XrdOucEnv.hh"
4
5#include <iostream>
6#include <chrono>
7#include <ratio>
8#include <functional>
9#include <memory>
10#include <thread>
11#include <chrono>
12
13using namespace XrdCephBuffer;
14
15using myclock = std::chrono::steady_clock;
16//using myseconds = std::chrono::duration<float,
17
18namespace
19{
20 static void aioReadCallback(XrdSfsAio *aiop, size_t rc)
21 {
22 // as in XrdCephOssFile
23 aiop->Result = rc;
24 aiop->doneRead();
25 }
26 static void aioWriteCallback(XrdSfsAio *aiop, size_t rc)
27 {
28 aiop->Result = rc;
29 aiop->doneWrite();
30 }
31
32} // anonymous namespace
33
37
39{
40 //BUFLOG("DoneRead");
41 m_dataOpDone = true;
42 m_lock.unlock();
43 m_condVar.notify_all();
44}
45
47{
48 //BUFLOG("DoneWrite");
49 m_dataOpDone = true;
50 m_lock.unlock();
51 m_condVar.notify_all();
52}
53
54CephIOAdapterAIORaw::CephIOAdapterAIORaw(IXrdCephBufferData *bufferdata, int fd) : m_bufferdata(bufferdata), m_fd(fd)
55{
56}
57
59{
60 // nothing to specifically to do; just print out some stats
61 float read_speed{0}, write_speed{0};
62 if (m_stats_read_req.load() > 0) {
63 read_speed = m_stats_read_bytes.load() / m_stats_read_timer.load() * 1e-3;
64 }
65 if (m_stats_write_req.load() > 0) {
66 write_speed = m_stats_write_bytes.load() / m_stats_write_timer.load() * 1e-3;
67 }
68 BUFLOG("CephIOAdapterAIORaw::Summary fd:" << m_fd
69 << " nwrite:" << m_stats_write_req << " byteswritten:" << m_stats_write_bytes << " write_s:"
70 << m_stats_write_timer * 1e-3 << " writemax_s" << m_stats_write_longest * 1e-3
71 << " write_MBs:" << write_speed
72 << " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:"
73 << m_stats_read_timer * 1e-3 << " readmax_s:" << m_stats_read_longest * 1e-3
74 << " read_MBs:" << read_speed );
75}
76
77ssize_t CephIOAdapterAIORaw::write(off64_t offset, size_t count)
78{
79 void *buf = m_bufferdata->raw();
80 if (!buf) {
81 BUFLOG("CephIOAdapterAIORaw::write null buffer was provided.")
82 return -EINVAL;
83 }
84 //BUFLOG("Make aio");
85 std::unique_ptr<XrdSfsAio> aiop = std::unique_ptr<XrdSfsAio>(new CephBufSfsAio());
86 aiocb &sfsAio = aiop->sfsAio;
87 // set the necessary parameters for the read, e.g. buffer pointer, offset and length
88 sfsAio.aio_buf = buf;
89 sfsAio.aio_nbytes = count;
90 sfsAio.aio_offset = offset;
91 // need the concrete object for the blocking / wait
92 CephBufSfsAio *ceph_aiop = dynamic_cast<CephBufSfsAio *>(aiop.get());
93
94 long dt_ns{0};
95 ssize_t rc{0};
96 { // brace is for timer RAII
97 XrdCephBuffer::Timer_ns timer(dt_ns);
98 rc = ceph_aio_write(m_fd, aiop.get(), aioWriteCallback);
99
100 if (rc < 0) {
101 BUFLOG("CephIOAdapterAIORaw::write ceph_aio_write returned rc:" << rc)
102 return rc;
103 }
104
105 while (!ceph_aiop->isDone())
106 {
107 ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop));
108 }
109 } // timer brace
110
111 // cleanup
112 rc = ceph_aiop->Result;
113 if (rc < 0) {
114 BUFLOG("CephIOAdapterAIORaw::write ceph_aiop->Result returned rc:" << rc)
115 }
116
117 // BUFLOG("CephIOAdapterAIORaw::write fd:" << m_fd << " off:"
118 // << offset << " len:" << count << " rc:" << rc << " ms:" << dt_ns / 1000000);
119
120 m_stats_write_longest = std::max(m_stats_write_longest, dt_ns / 1000000);
121 m_stats_write_timer.fetch_add(dt_ns / 1000000);
122 m_stats_write_bytes.fetch_add(rc);
123 ++m_stats_write_req;
124 return rc;
125}
126
127ssize_t CephIOAdapterAIORaw::read(off64_t offset, size_t count)
128{
129 void *buf = m_bufferdata->raw();
130 if (!buf)
131 {
132 BUFLOG("CephIOAdapterAIORaw::read null buffer was provided.")
133 return -EINVAL;
134 }
135
136 std::unique_ptr<XrdSfsAio> aiop = std::unique_ptr<XrdSfsAio>(new CephBufSfsAio());
137 aiocb &sfsAio = aiop->sfsAio;
138 // set the necessary parameters for the read, e.g. buffer pointer, offset and length
139 sfsAio.aio_buf = buf;
140 sfsAio.aio_nbytes = count;
141 sfsAio.aio_offset = offset;
142 // need the concrete object for the blocking / wait
143 CephBufSfsAio *ceph_aiop = dynamic_cast<CephBufSfsAio *>(aiop.get());
144
145 long dt_ns{0};
146 ssize_t rc{0};
147 { // timer brace RAII
148 XrdCephBuffer::Timer_ns timer(dt_ns);
149 // no check is made whether the buffer has sufficient capacity
150 // rc = ceph_posix_pread(m_fd,buf,count,offset);
151 //BUFLOG("Submit aio read: ");
152 rc = ceph_aio_read(m_fd, aiop.get(), aioReadCallback);
153
154 if (rc < 0)
155 return rc;
156
157 // now block until the read is done
158 // take the lock on the aio object
159 // while(!ceph_aiop->isDone()) { ceph_aiop->m_condVar.wait(lock,std::bind(&CephBufSfsAio::isDone,ceph_aiop) ); }
160 while (!ceph_aiop->isDone())
161 {
162 ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop));
163 }
164 } // timer brace
165
166 // cleanup
167 rc = ceph_aiop->Result;
168
169 m_stats_read_longest = std::max(m_stats_read_longest, dt_ns / 1000000);
170 m_stats_read_timer.fetch_add(dt_ns * 1e-6);
171 m_stats_read_bytes.fetch_add(rc);
172 ++m_stats_read_req;
173
174 // BUFLOG("CephIOAdapterAIORaw::read fd:" << m_fd << " " << offset
175 // << " " << count << " " << rc << " " << dt_ns * 1e-6);
176
177 if (rc >= 0)
178 {
179 m_bufferdata->setLength(rc);
180 m_bufferdata->setStartingOffset(offset);
181 m_bufferdata->setValid(true);
182 }
183 return rc;
184}
#define BUFLOG(x)
std::chrono::steady_clock myclock
static void aioReadCallback(XrdSfsAio *aiop, size_t rc)
static void aioWriteCallback(XrdSfsAio *aiop, size_t rc)
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
virtual void doneWrite() override
std::unique_lock< std::mutex > m_lock
std::condition_variable m_condVar
virtual void doneRead() override
virtual ssize_t read(off64_t offset, size_t count) override
Issue a ceph_posix_pread to read to the buffer data from file offset and len count....
virtual ssize_t write(off64_t offset, size_t count) override
Take the data in the buffer and write to ceph at given offset Issues a ceph_posix_pwrite for data in ...
CephIOAdapterAIORaw(IXrdCephBufferData *bufferdata, int fd)
Interface to the Buffer's physical representation. Allow an interface to encapsulate the requirements...
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void doneRead()=0
virtual void doneWrite()=0
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...