XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfcIO.hh"
22#include "XrdPfcTrace.hh"
23#include <cstdio>
24#include <sstream>
25#include <fcntl.h>
26#include <assert.h>
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClFile.hh"
31#include "XrdSys/XrdSysTimer.hh"
32#include "XrdOss/XrdOss.hh"
33#include "XrdOuc/XrdOucEnv.hh"
35#include "XrdPfc.hh"
36
37
38using namespace XrdPfc;
39
40namespace
41{
42
43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44
45Cache* cache() { return &Cache::GetInstance(); }
46
47}
48
49const char *File::m_traceID = "File";
50
51//------------------------------------------------------------------------------
52
53File::File(const std::string& path, long long iOffset, long long iFileSize) :
54 m_ref_cnt(0),
55 m_data_file(0),
56 m_info_file(0),
57 m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58 m_filename(path),
59 m_offset(iOffset),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
62 m_ios_in_detach(0),
63 m_non_flushed_cnt(0),
64 m_in_sync(false),
65 m_detach_time_logged(false),
66 m_in_shutdown(false),
67 m_state_cond(0),
68 m_block_size(0),
69 m_num_blocks(0),
70 m_prefetch_state(kOff),
71 m_prefetch_bytes(0),
72 m_prefetch_read_cnt(0),
73 m_prefetch_hit_cnt(0),
74 m_prefetch_score(0)
75{}
76
78{
79 if (m_info_file)
80 {
81 TRACEF(Debug, "~File() close info ");
82 m_info_file->Close();
83 delete m_info_file;
84 m_info_file = NULL;
85 }
86
87 if (m_data_file)
88 {
89 TRACEF(Debug, "~File() close output ");
90 m_data_file->Close();
91 delete m_data_file;
92 m_data_file = NULL;
93 }
94
95 TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
96}
97
98//------------------------------------------------------------------------------
99
100File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
101{
102 File *file = new File(path, offset, fileSize);
103 if ( ! file->Open())
104 {
105 delete file;
106 file = 0;
107 }
108 return file;
109}
110
111//------------------------------------------------------------------------------
112
114{
115 // Called from Cache::Unlink() when the file is currently open.
116 // Cache::Unlink is also called on FSync error and when wrong number of bytes
117 // is received from a remote read.
118 //
119 // From this point onward the file will not be written to, cinfo file will
120 // not be updated, and all new read requests will return -ENOENT.
121 //
122 // File's entry in the Cache's active map is set to nullptr and will be
123 // removed from there shortly, in any case, well before this File object
124 // shuts down. So we do not communicate to Cache about our destruction when
125 // it happens.
126
127 {
128 XrdSysCondVarHelper _lck(m_state_cond);
129
130 m_in_shutdown = true;
131
132 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
133 {
134 m_prefetch_state = kStopped;
135 cache()->DeRegisterPrefetchFile(this);
136 }
137 }
138
139}
140
141//------------------------------------------------------------------------------
142
144{
145 // Not locked, only used from Cache / Purge thread.
146
147 Stats delta = m_last_stats;
148
149 m_last_stats = m_stats.Clone();
150
151 delta.DeltaToReference(m_last_stats);
152
153 return delta;
154}
155
156//------------------------------------------------------------------------------
157
159{
160 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
161
162 XrdSysCondVarHelper _lck(m_state_cond);
163 dec_ref_count(b);
164}
165
166void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
167{
168 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
169
170 XrdSysCondVarHelper _lck(m_state_cond);
171
172 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
173 {
174 dec_ref_count(*i);
175 }
176}
177
178//------------------------------------------------------------------------------
179
181{
182 std::string loc(io->GetLocation());
183 XrdSysCondVarHelper _lck(m_state_cond);
184 insert_remote_location(loc);
185}
186
187//------------------------------------------------------------------------------
188
190{
191 // Returns true if delay is needed.
192
193 TRACEF(Debug, "ioActive start for io " << io);
194
195 std::string loc(io->GetLocation());
196
197 {
198 XrdSysCondVarHelper _lck(m_state_cond);
199
200 IoSet_i mi = m_io_set.find(io);
201
202 if (mi != m_io_set.end())
203 {
204 unsigned int n_active_reads = io->m_active_read_reqs;
205
206 TRACE(Info, "ioActive for io " << io <<
207 ", active_reads " << n_active_reads <<
208 ", active_prefetches " << io->m_active_prefetches <<
209 ", allow_prefetching " << io->m_allow_prefetching <<
210 ", ios_in_detach " << m_ios_in_detach);
211 TRACEF(Info,
212 "\tio_map.size() " << m_io_set.size() <<
213 ", block_map.size() " << m_block_map.size() << ", file");
214
215 insert_remote_location(loc);
216
217 io->m_allow_prefetching = false;
218 io->m_in_detach = true;
219
220 // Check if any IO is still available for prfetching. If not, stop it.
221 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
222 {
223 if ( ! select_current_io_or_disable_prefetching(false) )
224 {
225 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
226 }
227 }
228
229 // On last IO, consider write queue blocks. Note, this also contains
230 // blocks being prefetched.
231
232 bool io_active_result;
233
234 if (n_active_reads > 0)
235 {
236 io_active_result = true;
237 }
238 else if (m_io_set.size() - m_ios_in_detach == 1)
239 {
240 io_active_result = ! m_block_map.empty();
241 }
242 else
243 {
244 io_active_result = io->m_active_prefetches > 0;
245 }
246
247 if ( ! io_active_result)
248 {
249 ++m_ios_in_detach;
250 }
251
252 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
253
254 return io_active_result;
255 }
256 else
257 {
258 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
259 return false;
260 }
261 }
262}
263
264//------------------------------------------------------------------------------
265
267{
268 XrdSysCondVarHelper _lck(m_state_cond);
269 m_detach_time_logged = false;
270}
271
273{
274 // Returns true if sync is required.
275 // This method is called after corresponding IO is detached from PosixCache.
276
277 XrdSysCondVarHelper _lck(m_state_cond);
278 if ( ! m_in_shutdown)
279 {
280 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
281 {
282 Stats loc_stats = m_stats.Clone();
283 m_cfi.WriteIOStatDetach(loc_stats);
284 m_detach_time_logged = true;
285 m_in_sync = true;
286 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
287 return true;
288 }
289 }
290 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
291 return false;
292}
293
294//------------------------------------------------------------------------------
295
297{
298 // Called from Cache::GetFile() when a new IO asks for the file.
299
300 TRACEF(Debug, "AddIO() io = " << (void*)io);
301
302 time_t now = time(0);
303 std::string loc(io->GetLocation());
304
305 m_state_cond.Lock();
306
307 IoSet_i mi = m_io_set.find(io);
308
309 if (mi == m_io_set.end())
310 {
311 m_io_set.insert(io);
312 io->m_attach_time = now;
313 m_stats.IoAttach();
314
315 insert_remote_location(loc);
316
317 if (m_prefetch_state == kStopped)
318 {
319 m_prefetch_state = kOn;
320 cache()->RegisterPrefetchFile(this);
321 }
322 }
323 else
324 {
325 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
326 }
327
328 m_state_cond.UnLock();
329}
330
331//------------------------------------------------------------------------------
332
334{
335 // Called from Cache::ReleaseFile.
336
337 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
338
339 time_t now = time(0);
340
341 m_state_cond.Lock();
342
343 IoSet_i mi = m_io_set.find(io);
344
345 if (mi != m_io_set.end())
346 {
347 if (mi == m_current_io)
348 {
349 ++m_current_io;
350 }
351
352 m_stats.IoDetach(now - io->m_attach_time);
353 m_io_set.erase(mi);
354 --m_ios_in_detach;
355
356 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
357 {
358 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
359 m_prefetch_state = kStopped;
360 cache()->DeRegisterPrefetchFile(this);
361 }
362 }
363 else
364 {
365 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
366 }
367
368 m_state_cond.UnLock();
369}
370
371//------------------------------------------------------------------------------
372
373bool File::Open()
374{
375 // Sets errno accordingly.
376
377 static const char *tpfx = "Open() ";
378
379 TRACEF(Dump, tpfx << "open file for disk cache");
380
382
383 XrdOss &myOss = * Cache::GetInstance().GetOss();
384 const char *myUser = conf.m_username.c_str();
385 XrdOucEnv myEnv;
386 struct stat data_stat, info_stat;
387
388 std::string ifn = m_filename + Info::s_infoExtension;
389
390 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
391 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
392
393 // Create the data file itself.
394 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
395 myEnv.Put("oss.asize", size_str);
396 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
397
398 int res;
399
400 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
401 {
402 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
403 errno = -res;
404 return false;
405 }
406
407 m_data_file = myOss.newFile(myUser);
408 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
409 {
410 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
411 errno = -res;
412 delete m_data_file; m_data_file = 0;
413 return false;
414 }
415
416 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
417 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
418 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
419 {
420 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
421 errno = -res;
422 m_data_file->Close(); delete m_data_file; m_data_file = 0;
423 return false;
424 }
425
426 m_info_file = myOss.newFile(myUser);
427 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
428 {
429 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
430 errno = -res;
431 delete m_info_file; m_info_file = 0;
432 m_data_file->Close(); delete m_data_file; m_data_file = 0;
433 return false;
434 }
435
436 bool initialize_info_file = true;
437
438 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
439 {
440 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
441 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
442 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
443
444 // Check if data file exists and is of reasonable size.
445 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
446 {
447 initialize_info_file = false;
448 } else {
449 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
450 m_cfi.ResetAllAccessStats();
451 m_data_file->Ftruncate(0);
452 }
453 }
454
455 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
456 {
457 if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
458 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
459 {
460 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
461 initialize_info_file = true;
462 m_cfi.ResetAllAccessStats();
463 m_data_file->Ftruncate(0);
464 } else {
465 // TODO: If the file is complete, we don't need to reset net cksums.
466 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
467 }
468 }
469
470 if (initialize_info_file)
471 {
472 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
473 m_cfi.SetCkSumState(conf.get_cs_Chk());
474 m_cfi.ResetNoCkSumTime();
475 m_cfi.Write(m_info_file, ifn.c_str());
476 m_info_file->Fsync();
477 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
478 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
479 }
480
481 m_cfi.WriteIOStatAttach();
482 m_state_cond.Lock();
483 m_block_size = m_cfi.GetBufferSize();
484 m_num_blocks = m_cfi.GetNBlocks();
485 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
486 m_state_cond.UnLock();
487
488 return true;
489}
490
491int File::Fstat(struct stat &sbuff)
492{
493 // Stat on an open file.
494 // Corrects size to actual full size of the file.
495 // Sets atime to 0 if the file is only partially downloaded, in accordance
496 // with pfc.onlyifcached settings.
497 // Called from IO::Fstat() and Cache::Stat() when the file is active.
498 // Returns 0 on success, -errno on error.
499
500 int res;
501
502 if ((res = m_data_file->Fstat(&sbuff))) return res;
503
504 sbuff.st_size = m_file_size;
505
506 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
507 if ( ! is_cached)
508 sbuff.st_atime = 0;
509
510 return 0;
511}
512
513//==============================================================================
514// Read and helpers
515//==============================================================================
516
517bool File::overlap(int blk, // block to query
518 long long blk_size, //
519 long long req_off, // offset of user request
520 int req_size, // size of user request
521 // output:
522 long long &off, // offset in user buffer
523 long long &blk_off, // offset in block
524 int &size) // size to copy
525{
526 const long long beg = blk * blk_size;
527 const long long end = beg + blk_size;
528 const long long req_end = req_off + req_size;
529
530 if (req_off < end && req_end > beg)
531 {
532 const long long ovlp_beg = std::max(beg, req_off);
533 const long long ovlp_end = std::min(end, req_end);
534
535 off = ovlp_beg - req_off;
536 blk_off = ovlp_beg - beg;
537 size = (int) (ovlp_end - ovlp_beg);
538
539 assert(size <= blk_size);
540 return true;
541 }
542 else
543 {
544 return false;
545 }
546}
547
548//------------------------------------------------------------------------------
549
550Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
551{
552 // Must be called w/ state_cond locked.
553 // Checks on size etc should be done before.
554 //
555 // Reference count is 0 so increase it in calling function if you want to
556 // catch the block while still in memory.
557
558 const long long off = i * m_block_size;
559 const int last_block = m_num_blocks - 1;
560 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
561
562 int blk_size, req_size;
563 if (i == last_block) {
564 blk_size = req_size = m_file_size - off;
565 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
566 } else {
567 blk_size = req_size = m_block_size;
568 }
569
570 Block *b = 0;
571 char *buf = cache()->RequestRAM(req_size);
572
573 if (buf)
574 {
575 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
576
577 if (b)
578 {
579 m_block_map[i] = b;
580
581 // Actual Read request is issued in ProcessBlockRequests().
582
583 if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
584 {
585 m_prefetch_state = kHold;
586 cache()->DeRegisterPrefetchFile(this);
587 }
588 }
589 else
590 {
591 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
592 }
593 }
594
595 return b;
596}
597
598void File::ProcessBlockRequest(Block *b)
599{
600 // This *must not* be called with block_map locked.
601
603
604 if (XRD_TRACE What >= TRACE_Dump) {
605 char buf[256];
606 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
607 b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
608 TRACEF(Dump, "ProcessBlockRequest() " << buf);
609 }
610
611 if (b->req_cksum_net())
612 {
613 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
614 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
615 } else {
616 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
617 }
618}
619
620void File::ProcessBlockRequests(BlockList_t& blks)
621{
622 // This *must not* be called with block_map locked.
623
624 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
625 {
626 ProcessBlockRequest(*bi);
627 }
628}
629
630//------------------------------------------------------------------------------
631
632void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
633{
634 int n_chunks = ioVec.size();
635 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
636
637 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
638 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
639
640 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
641
642 int pos = 0;
643 while (n_chunks > XrdProto::maxRvecsz) {
644 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
645 pos += XrdProto::maxRvecsz;
646 n_chunks -= XrdProto::maxRvecsz;
647 }
648 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
649}
650
651//------------------------------------------------------------------------------
652
653int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
654{
655 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
656
657 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
658
659 if (rs < 0)
660 {
661 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
662 return rs;
663 }
664
665 if (rs != expected_size)
666 {
667 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
668 return -EIO;
669 }
670
671 return (int) rs;
672}
673
674//------------------------------------------------------------------------------
675
676int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
677{
678 // rrc_func is ONLY called from async processing.
679 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
680 // This streamlines implementation of synchronous IO::Read().
681
682 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
683
684 m_state_cond.Lock();
685
686 if (m_in_shutdown || io->m_in_detach)
687 {
688 m_state_cond.UnLock();
689 return m_in_shutdown ? -ENOENT : -EBADF;
690 }
691
692 // Shortcut -- file is fully downloaded.
693
694 if (m_cfi.IsComplete())
695 {
696 m_state_cond.UnLock();
697 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
698 if (ret > 0) m_stats.AddBytesHit(ret);
699 return ret;
700 }
701
702 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
703
704 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
705}
706
707//------------------------------------------------------------------------------
708
709int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
710{
711 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
712
713 m_state_cond.Lock();
714
715 if (m_in_shutdown || io->m_in_detach)
716 {
717 m_state_cond.UnLock();
718 return m_in_shutdown ? -ENOENT : -EBADF;
719 }
720
721 // Shortcut -- file is fully downloaded.
722
723 if (m_cfi.IsComplete())
724 {
725 m_state_cond.UnLock();
726 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
727 if (ret > 0) m_stats.AddBytesHit(ret);
728 return ret;
729 }
730
731 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
732}
733
734//------------------------------------------------------------------------------
735
736int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
737 ReadReqRH *rh, const char *tpfx)
738{
739 // Non-trivial processing for Read and ReadV.
740 // Entered under lock.
741 //
742 // loop over reqired blocks:
743 // - if on disk, ok;
744 // - if in ram or incoming, inc ref-count
745 // - otherwise request and inc ref count (unless RAM full => request direct)
746 // unlock
747
748 int prefetch_cnt = 0;
749
750 ReadRequest *read_req = nullptr;
751 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
752
753 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
754
755 std::vector<XrdOucIOVec> iovec_disk;
756 std::vector<XrdOucIOVec> iovec_direct;
757 int iovec_disk_total = 0;
758 int iovec_direct_total = 0;
759
760 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
761 {
762 const XrdOucIOVec &iov = readV[iov_idx];
763 long long iUserOff = iov.offset;
764 int iUserSize = iov.size;
765 char *iUserBuff = iov.data;
766
767 const int idx_first = iUserOff / m_block_size;
768 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
769
770 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
771
772 enum LastBlock_e { LB_other, LB_disk, LB_direct };
773
774 LastBlock_e lbe = LB_other;
775
776 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
777 {
778 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
779 BlockMap_i bi = m_block_map.find(block_idx);
780
781 // overlap and read
782 long long off; // offset in user buffer
783 long long blk_off; // offset in block
784 int size; // size to copy
785
786 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
787
788 // In RAM or incoming?
789 if (bi != m_block_map.end())
790 {
791 inc_ref_count(bi->second);
792 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
793
794 if (bi->second->is_finished())
795 {
796 // note, blocks with error should not be here !!!
797 // they should be either removed or reissued in ProcessBlockResponse()
798 assert(bi->second->is_ok());
799
800 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
801
802 if (bi->second->m_prefetch)
803 ++prefetch_cnt;
804 }
805 else
806 {
807 if ( ! read_req)
808 read_req = new ReadRequest(io, rh);
809
810 // We have a lock on state_cond --> as we register the request before releasing the lock,
811 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
812
813 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
814 ++read_req->m_n_chunk_reqs;
815 }
816
817 lbe = LB_other;
818 }
819 // On disk?
820 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
821 {
822 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
823
824 if (lbe == LB_disk)
825 iovec_disk.back().size += size;
826 else
827 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
828 iovec_disk_total += size;
829
830 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
831 ++prefetch_cnt;
832
833 lbe = LB_disk;
834 }
835 // Neither ... then we have to go get it ...
836 else
837 {
838 if ( ! read_req)
839 read_req = new ReadRequest(io, rh);
840
841 // Is there room for one more RAM Block?
842 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
843 if (b)
844 {
845 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
846 inc_ref_count(b);
847 blks_to_request.push_back(b);
848
849 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
850 ++read_req->m_n_chunk_reqs;
851
852 lbe = LB_other;
853 }
854 else // Nope ... read this directly without caching.
855 {
856 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
857
858 iovec_direct_total += size;
859 read_req->m_direct_done = false;
860
861 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
862 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
863 // is determined in the RequestBlocksDirect().
864 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
865 iovec_direct.back().size += size;
866 } else {
867 long long in_offset = block_idx * m_block_size + blk_off;
868 char *out_pos = iUserBuff + off;
869 while (size > XrdProto::maxRVdsz) {
870 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
871 in_offset += XrdProto::maxRVdsz;
872 out_pos += XrdProto::maxRVdsz;
873 size -= XrdProto::maxRVdsz;
874 }
875 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
876 }
877
878 lbe = LB_direct;
879 }
880 }
881 } // end for over blocks in an IOVec
882 } // end for over readV IOVec
883
884 inc_prefetch_hit_cnt(prefetch_cnt);
885
886 m_state_cond.UnLock();
887
888 // First, send out remote requests for new blocks.
889 if ( ! blks_to_request.empty())
890 {
891 ProcessBlockRequests(blks_to_request);
892 blks_to_request.clear();
893 }
894
895 // Second, send out remote direct read requests.
896 if ( ! iovec_direct.empty())
897 {
898 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
899
900 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
901 }
902
903 // Begin synchronous part where we process data that is already in RAM or on disk.
904
905 long long bytes_read = 0;
906 int error_cond = 0; // to be set to -errno
907
908 // Third, process blocks that are available in RAM.
909 if ( ! blks_ready.empty())
910 {
911 for (auto &bvi : blks_ready)
912 {
913 for (auto &cr : bvi.second)
914 {
915 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
916 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
917 bytes_read += cr.m_size;
918 }
919 }
920 }
921
922 // Fourth, read blocks from disk.
923 if ( ! iovec_disk.empty())
924 {
925 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
926 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
927 if (rc >= 0)
928 {
929 bytes_read += rc;
930 }
931 else
932 {
933 error_cond = rc;
934 TRACEF(Error, tpfx << "failed read from disk");
935 }
936 }
937
938 // End synchronous part -- update with sync stats and determine actual state of this read.
939 // Note: remote reads might have already finished during disk-read!
940
941 m_state_cond.Lock();
942
943 for (auto &bvi : blks_ready)
944 dec_ref_count(bvi.first, (int) bvi.second.size());
945
946 if (read_req)
947 {
948 read_req->m_bytes_read += bytes_read;
949 if (error_cond)
950 read_req->update_error_cond(error_cond);
951 read_req->m_stats.m_BytesHit += bytes_read;
952 read_req->m_sync_done = true;
953
954 if (read_req->is_complete())
955 {
956 // Almost like FinalizeReadRequest(read_req) -- but no callout!
957 m_state_cond.UnLock();
958
959 m_stats.AddReadStats(read_req->m_stats);
960
961 int ret = read_req->return_value();
962 delete read_req;
963 return ret;
964 }
965 else
966 {
967 m_state_cond.UnLock();
968 return -EWOULDBLOCK;
969 }
970 }
971 else
972 {
973 m_stats.m_BytesHit += bytes_read;
974 m_state_cond.UnLock();
975
976 // !!! No callout.
977
978 return error_cond ? error_cond : bytes_read;
979 }
980}
981
982
983//==============================================================================
984// WriteBlock and Sync
985//==============================================================================
986
988{
989 // write block buffer into disk file
990 long long offset = b->m_offset - m_offset;
991 long long size = b->get_size();
992 ssize_t retval;
993
994 if (m_cfi.IsCkSumCache())
995 if (b->has_cksums())
996 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
997 else
998 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
999 else
1000 retval = m_data_file->Write(b->get_buff(), offset, size);
1001
1002 if (retval < size)
1003 {
1004 if (retval < 0)
1005 {
1006 GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
1007 }
1008 else
1009 {
1010 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1011 }
1012
1013 XrdSysCondVarHelper _lck(m_state_cond);
1014
1015 dec_ref_count(b);
1016
1017 return;
1018 }
1019
1020 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1021
1022 // Set written bit.
1023 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1024
1025 bool schedule_sync = false;
1026 {
1027 XrdSysCondVarHelper _lck(m_state_cond);
1028
1029 m_cfi.SetBitWritten(blk_idx);
1030
1031 if (b->m_prefetch)
1032 {
1033 m_cfi.SetBitPrefetch(blk_idx);
1034 }
1035 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1036 {
1037 m_cfi.ResetCkSumNet();
1038 }
1039
1040 dec_ref_count(b);
1041
1042 // Set synced bit or stash block index if in actual sync.
1043 // Synced state is only written out to cinfo file when data file is synced.
1044 if (m_in_sync)
1045 {
1046 m_writes_during_sync.push_back(blk_idx);
1047 }
1048 else
1049 {
1050 m_cfi.SetBitSynced(blk_idx);
1051 ++m_non_flushed_cnt;
1052 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1053 ! m_in_shutdown)
1054 {
1055 schedule_sync = true;
1056 m_in_sync = true;
1057 m_non_flushed_cnt = 0;
1058 }
1059 }
1060 }
1061
1062 if (schedule_sync)
1063 {
1064 cache()->ScheduleFileSync(this);
1065 }
1066}
1067
1068//------------------------------------------------------------------------------
1069
1071{
1072 TRACEF(Dump, "Sync()");
1073
1074 int ret = m_data_file->Fsync();
1075 bool errorp = false;
1076 if (ret == XrdOssOK)
1077 {
1078 Stats loc_stats = m_stats.Clone();
1079 m_cfi.WriteIOStat(loc_stats);
1080 m_cfi.Write(m_info_file, m_filename.c_str());
1081 int cret = m_info_file->Fsync();
1082 if (cret != XrdOssOK)
1083 {
1084 TRACEF(Error, "Sync cinfo file sync error " << cret);
1085 errorp = true;
1086 }
1087 }
1088 else
1089 {
1090 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1091 errorp = true;
1092 }
1093
1094 if (errorp)
1095 {
1096 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1097
1098 // Unlink will also call this->initiate_emergency_shutdown()
1099 Cache::GetInstance().UnlinkFile(m_filename, false);
1100
1101 XrdSysCondVarHelper _lck(&m_state_cond);
1102
1103 m_writes_during_sync.clear();
1104 m_in_sync = false;
1105
1106 return;
1107 }
1108
1109 int written_while_in_sync;
1110 bool resync = false;
1111 {
1112 XrdSysCondVarHelper _lck(&m_state_cond);
1113 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1114 {
1115 m_cfi.SetBitSynced(*i);
1116 }
1117 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1118 m_writes_during_sync.clear();
1119
1120 // If there were writes during sync and the file is now complete,
1121 // let us call Sync again without resetting the m_in_sync flag.
1122 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1123 resync = true;
1124 else
1125 m_in_sync = false;
1126 }
1127 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1128
1129 if (resync)
1130 Sync();
1131}
1132
1133
1134//==============================================================================
1135// Block processing
1136//==============================================================================
1137
1138void File::free_block(Block* b)
1139{
1140 // Method always called under lock.
1141 int i = b->m_offset / m_block_size;
1142 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1143 size_t ret = m_block_map.erase(i);
1144 if (ret != 1)
1145 {
1146 // assert might be a better option than a warning
1147 TRACEF(Error, "free_block did not erase " << i << " from map");
1148 }
1149 else
1150 {
1151 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1152 delete b;
1153 }
1154
1155 if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1156 {
1157 m_prefetch_state = kOn;
1158 cache()->RegisterPrefetchFile(this);
1159 }
1160}
1161
1162//------------------------------------------------------------------------------
1163
1164bool File::select_current_io_or_disable_prefetching(bool skip_current)
1165{
1166 // Method always called under lock. It also expects prefetch to be active.
1167
1168 int io_size = (int) m_io_set.size();
1169 bool io_ok = false;
1170
1171 if (io_size == 1)
1172 {
1173 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1174 if (io_ok)
1175 {
1176 m_current_io = m_io_set.begin();
1177 }
1178 }
1179 else if (io_size > 1)
1180 {
1181 IoSet_i mi = m_current_io;
1182 if (skip_current && mi != m_io_set.end()) ++mi;
1183
1184 for (int i = 0; i < io_size; ++i)
1185 {
1186 if (mi == m_io_set.end()) mi = m_io_set.begin();
1187
1188 if ((*mi)->m_allow_prefetching)
1189 {
1190 m_current_io = mi;
1191 io_ok = true;
1192 break;
1193 }
1194 ++mi;
1195 }
1196 }
1197
1198 if ( ! io_ok)
1199 {
1200 m_current_io = m_io_set.end();
1201 m_prefetch_state = kStopped;
1202 cache()->DeRegisterPrefetchFile(this);
1203 }
1204
1205 return io_ok;
1206}
1207
1208//------------------------------------------------------------------------------
1209
1210void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1211{
1212 // Called from DirectResponseHandler.
1213 // NOT under lock.
1214
1215 if (error_cond)
1216 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1217
1218 m_state_cond.Lock();
1219
1220 if (error_cond)
1221 rreq->update_error_cond(error_cond);
1222 else {
1223 rreq->m_stats.m_BytesBypassed += bytes_read;
1224 rreq->m_bytes_read += bytes_read;
1225 }
1226
1227 rreq->m_direct_done = true;
1228
1229 bool rreq_complete = rreq->is_complete();
1230
1231 m_state_cond.UnLock();
1232
1233 if (rreq_complete)
1234 FinalizeReadRequest(rreq);
1235}
1236
1237void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1238{
1239 // Called from ProcessBlockResponse().
1240 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1241 // Does not manage m_read_req.
1242 // Will not complete the request.
1243
1244 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1245 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1246
1247 rreq->update_error_cond(b->get_error());
1248 --rreq->m_n_chunk_reqs;
1249
1250 dec_ref_count(b);
1251}
1252
1253void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1254{
1255 // Called from ProcessBlockResponse().
1256 // NOT under lock as it does memcopy ofor exisf block data.
1257 // Acquires lock for block, m_read_req and rreq state update.
1258
1259 ReadRequest *rreq = creq.m_read_req;
1260
1261 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1262 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1263
1264 m_state_cond.Lock();
1265
1266 rreq->m_bytes_read += creq.m_size;
1267
1268 if (b->get_req_id() == (void*) rreq)
1269 rreq->m_stats.m_BytesMissed += creq.m_size;
1270 else
1271 rreq->m_stats.m_BytesHit += creq.m_size;
1272
1273 --rreq->m_n_chunk_reqs;
1274
1275 if (b->m_prefetch)
1276 inc_prefetch_hit_cnt(1);
1277
1278 dec_ref_count(b);
1279
1280 bool rreq_complete = rreq->is_complete();
1281
1282 m_state_cond.UnLock();
1283
1284 if (rreq_complete)
1285 FinalizeReadRequest(rreq);
1286}
1287
1288void File::FinalizeReadRequest(ReadRequest *rreq)
1289{
1290 // called from ProcessBlockResponse()
1291 // NOT under lock -- does callout
1292
1293 m_stats.AddReadStats(rreq->m_stats);
1294
1295 rreq->m_rh->Done(rreq->return_value());
1296 delete rreq;
1297}
1298
1299void File::ProcessBlockResponse(Block *b, int res)
1300{
1301 static const char* tpfx = "ProcessBlockResponse ";
1302
1303 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1304
1305 if (res >= 0 && res != b->get_size())
1306 {
1307 // Incorrect number of bytes received, apparently size of the file on the remote
1308 // is different than what the cache expects it to be.
1309 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1310 Cache::GetInstance().UnlinkFile(m_filename, false);
1311 }
1312
1313 m_state_cond.Lock();
1314
1315 // Deregister block from IO's prefetch count, if needed.
1316 if (b->m_prefetch)
1317 {
1318 IO *io = b->get_io();
1319 IoSet_i mi = m_io_set.find(io);
1320 if (mi != m_io_set.end())
1321 {
1322 --io->m_active_prefetches;
1323
1324 // If failed and IO is still prefetching -- disable prefetching on this IO.
1325 if (res < 0 && io->m_allow_prefetching)
1326 {
1327 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1328 io->m_allow_prefetching = false;
1329
1330 // Check if any IO is still available for prfetching. If not, stop it.
1331 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1332 {
1333 if ( ! select_current_io_or_disable_prefetching(false) )
1334 {
1335 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1336 }
1337 }
1338 }
1339
1340 // If failed with no subscribers -- delete the block and exit.
1341 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1342 {
1343 free_block(b);
1344 m_state_cond.UnLock();
1345 return;
1346 }
1347 m_prefetch_bytes += b->get_size();
1348 }
1349 else
1350 {
1351 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1352 }
1353 }
1354
1355 if (res == b->get_size())
1356 {
1357 b->set_downloaded();
1358 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1359 if ( ! m_in_shutdown)
1360 {
1361 // Increase ref-count for the writer.
1362 inc_ref_count(b);
1363 m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1364 cache()->AddWriteTask(b, true);
1365 }
1366
1367 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1368 vChunkRequest_t creqs_to_notify;
1369 creqs_to_notify.swap( b->m_chunk_reqs );
1370
1371 m_state_cond.UnLock();
1372
1373 for (auto &creq : creqs_to_notify)
1374 {
1375 ProcessBlockSuccess(b, creq);
1376 }
1377 }
1378 else
1379 {
1380 if (res < 0) {
1381 bool new_error = b->get_io()->register_block_error(res);
1382 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1383 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1384 << ", io=" << b->get_io() << ", error=" << res);
1385 } else {
1386 bool first_p = b->get_io()->register_incomplete_read();
1387 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1388 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1389 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1390#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1391 res = -EIO;
1392#else
1393 res = -EREMOTEIO;
1394#endif
1395 }
1396 b->set_error(res);
1397
1398 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1399 // Collect others with a different IO, the first of them will be used to reissue the request.
1400 // This is then done outside of lock.
1401 std::list<ReadRequest*> rreqs_to_complete;
1402 vChunkRequest_t creqs_to_keep;
1403
1404 for(ChunkRequest &creq : b->m_chunk_reqs)
1405 {
1406 ReadRequest *rreq = creq.m_read_req;
1407
1408 if (rreq->m_io == b->get_io())
1409 {
1410 ProcessBlockError(b, rreq);
1411 if (rreq->is_complete())
1412 {
1413 rreqs_to_complete.push_back(rreq);
1414 }
1415 }
1416 else
1417 {
1418 creqs_to_keep.push_back(creq);
1419 }
1420 }
1421
1422 bool reissue = false;
1423 if ( ! creqs_to_keep.empty())
1424 {
1425 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1426
1427 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1428 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1429
1430 b->reset_error_and_set_io(rreq->m_io, rreq);
1431 b->m_chunk_reqs.swap( creqs_to_keep );
1432 reissue = true;
1433 }
1434
1435 m_state_cond.UnLock();
1436
1437 for (auto rreq : rreqs_to_complete)
1438 FinalizeReadRequest(rreq);
1439
1440 if (reissue)
1441 ProcessBlockRequest(b);
1442 }
1443}
1444
1445//------------------------------------------------------------------------------
1446
1447const char* File::lPath() const
1448{
1449 return m_filename.c_str();
1450}
1451
1452//------------------------------------------------------------------------------
1453
1454int File::offsetIdx(int iIdx) const
1455{
1456 return iIdx - m_offset/m_block_size;
1457}
1458
1459
1460//------------------------------------------------------------------------------
1461
1463{
1464 // Check that block is not on disk and not in RAM.
1465 // TODO: Could prefetch several blocks at once!
1466 // blks_max could be an argument
1467
1468 BlockList_t blks;
1469
1470 TRACEF(DumpXL, "Prefetch() entering.");
1471 {
1472 XrdSysCondVarHelper _lck(m_state_cond);
1473
1474 if (m_prefetch_state != kOn)
1475 {
1476 return;
1477 }
1478
1479 if ( ! select_current_io_or_disable_prefetching(true) )
1480 {
1481 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1482 return;
1483 }
1484
1485 // Select block(s) to fetch.
1486 for (int f = 0; f < m_num_blocks; ++f)
1487 {
1488 if ( ! m_cfi.TestBitWritten(f))
1489 {
1490 int f_act = f + m_offset / m_block_size;
1491
1492 BlockMap_i bi = m_block_map.find(f_act);
1493 if (bi == m_block_map.end())
1494 {
1495 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1496 if (b)
1497 {
1498 TRACEF(Dump, "Prefetch take block " << f_act);
1499 blks.push_back(b);
1500 // Note: block ref_cnt not increased, it will be when placed into write queue.
1501
1502 inc_prefetch_read_cnt(1);
1503 }
1504 else
1505 {
1506 // This shouldn't happen as prefetching stops when RAM is 70% full.
1507 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1508 }
1509 break;
1510 }
1511 }
1512 }
1513
1514 if (blks.empty())
1515 {
1516 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1517 m_prefetch_state = kComplete;
1518 cache()->DeRegisterPrefetchFile(this);
1519 }
1520 else
1521 {
1522 (*m_current_io)->m_active_prefetches += (int) blks.size();
1523 }
1524 }
1525
1526 if ( ! blks.empty())
1527 {
1528 ProcessBlockRequests(blks);
1529 }
1530}
1531
1532
1533//------------------------------------------------------------------------------
1534
1536{
1537 return m_prefetch_score;
1538}
1539
1541{
1542 return Cache::GetInstance().GetLog();
1543}
1544
1549
1550void File::insert_remote_location(const std::string &loc)
1551{
1552 if ( ! loc.empty())
1553 {
1554 size_t p = loc.find_first_of('@');
1555 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1556 }
1557}
1558
1559std::string File::GetRemoteLocations() const
1560{
1561 std::string s;
1562 if ( ! m_remote_locations.empty())
1563 {
1564 size_t sl = 0;
1565 int nl = 0;
1566 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1567 {
1568 sl += i->size();
1569 }
1570 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1571 s = '[';
1572 int j = 1;
1573 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1574 {
1575 s += '"'; s += *i; s += '"';
1576 if (j < nl) s += ',';
1577 }
1578 s += ']';
1579 }
1580 else
1581 {
1582 s = "[]";
1583 }
1584 return s;
1585}
1586
1587//==============================================================================
1588//======================= RESPONSE HANDLERS ==============================
1589//==============================================================================
1590
1592{
1593 m_block->m_file->ProcessBlockResponse(m_block, res);
1594 delete this;
1595}
1596
1597//------------------------------------------------------------------------------
1598
1600{
1601 m_mutex.Lock();
1602
1603 int n_left = --m_to_wait;
1604
1605 if (res < 0) {
1606 if (m_errno == 0) m_errno = res; // store first reported error
1607 } else {
1608 m_bytes_read += res;
1609 }
1610
1611 m_mutex.UnLock();
1612
1613 if (n_left == 0)
1614 {
1615 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1616 delete this;
1617 }
1618}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:96
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:319
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:402
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:163
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1179
XrdOss * GetOss() const
Definition XrdPfc.hh:389
XrdSysError * GetLog()
Definition XrdPfc.hh:401
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
~File()
Destructor.
Definition XrdPfcFile.cc:77
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
bool register_incomplete_read()
Definition XrdPfcIO.hh:92
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:30
bool register_block_error(int res)
Definition XrdPfcIO.hh:95
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:72
const char * GetLocation()
Definition XrdPfcIO.hh:46
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
Statistics of cache utilisation by a File object.
long long m_BytesBypassed
number of bytes served directly through XrdCl
long long m_BytesHit
number of bytes served from disk
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::list< Block * > BlockList_t
XrdSysTrace * GetTrace()
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:74
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:67
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:76
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
void update_error_cond(int ec)
Definition XrdPfcFile.hh:92
bool is_complete() const
Definition XrdPfcFile.hh:94
int return_value() const
Definition XrdPfcFile.hh:95
long long m_bytes_read
Definition XrdPfcFile.hh:79