43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49const char *File::m_traceID =
"File";
53File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().
GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
70 m_prefetch_state(kOff),
72 m_prefetch_read_cnt(0),
73 m_prefetch_hit_cnt(0),
95 TRACEF(
Debug,
"~File() ended, prefetch score = " << m_prefetch_score);
100File*
File::FileOpen(
const std::string &path,
long long offset,
long long fileSize)
102 File *file =
new File(path, offset, fileSize);
130 m_in_shutdown =
true;
132 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
134 m_prefetch_state = kStopped;
135 cache()->DeRegisterPrefetchFile(
this);
147 Stats delta = m_last_stats;
149 m_last_stats = m_stats.
Clone();
160 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
168 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
172 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
184 insert_remote_location(loc);
200 IoSet_i mi = m_io_set.find(io);
202 if (mi != m_io_set.end())
207 ", active_reads " << n_active_reads <<
208 ", active_prefetches " << io->m_active_prefetches <<
209 ", allow_prefetching " << io->m_allow_prefetching <<
210 ", ios_in_detach " << m_ios_in_detach);
212 "\tio_map.size() " << m_io_set.size() <<
213 ", block_map.size() " << m_block_map.size() <<
", file");
215 insert_remote_location(loc);
217 io->m_allow_prefetching =
false;
218 io->m_in_detach =
true;
221 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
223 if ( ! select_current_io_or_disable_prefetching(
false) )
225 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
232 bool io_active_result;
234 if (n_active_reads > 0)
236 io_active_result =
true;
238 else if (m_io_set.size() - m_ios_in_detach == 1)
240 io_active_result = ! m_block_map.empty();
244 io_active_result = io->m_active_prefetches > 0;
247 if ( ! io_active_result)
252 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
254 return io_active_result;
258 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
269 m_detach_time_logged =
false;
278 if ( ! m_in_shutdown)
280 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
283 m_cfi.WriteIOStatDetach(loc_stats);
284 m_detach_time_logged =
true;
286 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
290 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
302 time_t now = time(0);
307 IoSet_i mi = m_io_set.find(io);
309 if (mi == m_io_set.end())
312 io->m_attach_time = now;
315 insert_remote_location(loc);
317 if (m_prefetch_state == kStopped)
319 m_prefetch_state = kOn;
320 cache()->RegisterPrefetchFile(
this);
325 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
328 m_state_cond.UnLock();
339 time_t now = time(0);
343 IoSet_i mi = m_io_set.find(io);
345 if (mi != m_io_set.end())
347 if (mi == m_current_io)
352 m_stats.IoDetach(now - io->m_attach_time);
356 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
358 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
359 m_prefetch_state = kStopped;
360 cache()->DeRegisterPrefetchFile(
this);
365 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
368 m_state_cond.UnLock();
377 static const char *tpfx =
"Open() ";
379 TRACEF(Dump, tpfx <<
"open file for disk cache");
386 struct stat data_stat, info_stat;
390 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
391 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
394 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
395 myEnv.
Put(
"oss.asize", size_str);
407 m_data_file = myOss.
newFile(myUser);
408 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
412 delete m_data_file; m_data_file = 0;
416 myEnv.
Put(
"oss.asize",
"64k");
422 m_data_file->Close();
delete m_data_file; m_data_file = 0;
426 m_info_file = myOss.
newFile(myUser);
427 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
431 delete m_info_file; m_info_file = 0;
432 m_data_file->Close();
delete m_data_file; m_data_file = 0;
436 bool initialize_info_file =
true;
438 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
440 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
441 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
442 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
")");
445 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
447 initialize_info_file =
false;
449 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
450 m_cfi.ResetAllAccessStats();
451 m_data_file->Ftruncate(0);
455 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.
get_cs_Chk())
460 TRACEF(Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
461 initialize_info_file =
true;
462 m_cfi.ResetAllAccessStats();
463 m_data_file->Ftruncate(0);
470 if (initialize_info_file)
472 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.
m_bufferSize, m_file_size);
474 m_cfi.ResetNoCkSumTime();
475 m_cfi.Write(m_info_file, ifn.c_str());
476 m_info_file->Fsync();
477 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
478 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.GetNBlocks());
481 m_cfi.WriteIOStatAttach();
483 m_block_size = m_cfi.GetBufferSize();
484 m_num_blocks = m_cfi.GetNBlocks();
485 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped;
486 m_state_cond.UnLock();
502 if ((res = m_data_file->Fstat(&sbuff)))
return res;
504 sbuff.st_size = m_file_size;
506 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
517bool File::overlap(
int blk,
526 const long long beg = blk * blk_size;
527 const long long end = beg + blk_size;
528 const long long req_end = req_off + req_size;
530 if (req_off < end && req_end > beg)
532 const long long ovlp_beg = std::max(beg, req_off);
533 const long long ovlp_end = std::min(end, req_end);
535 off = ovlp_beg - req_off;
536 blk_off = ovlp_beg - beg;
537 size = (int) (ovlp_end - ovlp_beg);
539 assert(size <= blk_size);
550Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
558 const long long off = i * m_block_size;
559 const int last_block = m_num_blocks - 1;
560 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
562 int blk_size, req_size;
563 if (i == last_block) {
564 blk_size = req_size = m_file_size - off;
565 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
567 blk_size = req_size = m_block_size;
571 char *buf = cache()->RequestRAM(req_size);
575 b =
new (std::nothrow) Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
585 m_prefetch_state = kHold;
586 cache()->DeRegisterPrefetchFile(
this);
591 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
598void File::ProcessBlockRequest(
Block *b)
606 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
608 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
624 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
626 ProcessBlockRequest(*bi);
632void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
634 int n_chunks = ioVec.size();
637 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
638 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
648 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
653int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
655 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
657 long long rs = m_data_file->ReadV(ioVec.data(), (
int) ioVec.size());
661 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
665 if (rs != expected_size)
667 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
686 if (m_in_shutdown || io->m_in_detach)
688 m_state_cond.UnLock();
689 return m_in_shutdown ? -ENOENT : -EBADF;
694 if (m_cfi.IsComplete())
696 m_state_cond.UnLock();
697 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
698 if (ret > 0) m_stats.AddBytesHit(ret);
702 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
704 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
711 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
715 if (m_in_shutdown || io->m_in_detach)
717 m_state_cond.UnLock();
718 return m_in_shutdown ? -ENOENT : -EBADF;
723 if (m_cfi.IsComplete())
725 m_state_cond.UnLock();
726 int ret = m_data_file->ReadV(
const_cast<XrdOucIOVec*
>(readV), readVnum);
727 if (ret > 0) m_stats.AddBytesHit(ret);
731 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
736int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
748 int prefetch_cnt = 0;
753 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
755 std::vector<XrdOucIOVec> iovec_disk;
756 std::vector<XrdOucIOVec> iovec_direct;
757 int iovec_disk_total = 0;
758 int iovec_direct_total = 0;
760 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
767 const int idx_first = iUserOff / m_block_size;
768 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
770 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
772 enum LastBlock_e { LB_other, LB_disk, LB_direct };
774 LastBlock_e lbe = LB_other;
776 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
779 BlockMap_i bi = m_block_map.find(block_idx);
786 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
789 if (bi != m_block_map.end())
791 inc_ref_count(bi->second);
792 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
794 if (bi->second->is_finished())
798 assert(bi->second->is_ok());
800 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
802 if (bi->second->m_prefetch)
808 read_req =
new ReadRequest(io, rh);
813 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
820 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
822 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
825 iovec_disk.back().size += size;
827 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
828 iovec_disk_total += size;
830 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
839 read_req =
new ReadRequest(io, rh);
842 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
845 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
847 blks_to_request.push_back(b);
849 b->
m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
856 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
858 iovec_direct_total += size;
865 iovec_direct.back().size += size;
867 long long in_offset = block_idx * m_block_size + blk_off;
868 char *out_pos = iUserBuff + off;
875 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
884 inc_prefetch_hit_cnt(prefetch_cnt);
886 m_state_cond.UnLock();
889 if ( ! blks_to_request.empty())
891 ProcessBlockRequests(blks_to_request);
892 blks_to_request.clear();
896 if ( ! iovec_direct.empty())
898 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
900 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
905 long long bytes_read = 0;
909 if ( ! blks_ready.empty())
911 for (
auto &bvi : blks_ready)
913 for (
auto &cr : bvi.second)
915 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
916 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
917 bytes_read += cr.m_size;
923 if ( ! iovec_disk.empty())
925 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
926 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
943 for (
auto &bvi : blks_ready)
944 dec_ref_count(bvi.first, (
int) bvi.second.size());
957 m_state_cond.UnLock();
959 m_stats.AddReadStats(read_req->
m_stats);
967 m_state_cond.UnLock();
973 m_stats.m_BytesHit += bytes_read;
974 m_state_cond.UnLock();
978 return error_cond ? error_cond : bytes_read;
990 long long offset = b->
m_offset - m_offset;
994 if (m_cfi.IsCkSumCache())
998 retval = m_data_file->pgWrite(b->
get_buff(), offset, size, 0, 0);
1000 retval = m_data_file->Write(b->
get_buff(), offset, size);
1010 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1020 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1023 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1025 bool schedule_sync =
false;
1029 m_cfi.SetBitWritten(blk_idx);
1033 m_cfi.SetBitPrefetch(blk_idx);
1037 m_cfi.ResetCkSumNet();
1046 m_writes_during_sync.push_back(blk_idx);
1050 m_cfi.SetBitSynced(blk_idx);
1051 ++m_non_flushed_cnt;
1052 if ((m_cfi.IsComplete() || m_non_flushed_cnt >=
Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1055 schedule_sync =
true;
1057 m_non_flushed_cnt = 0;
1064 cache()->ScheduleFileSync(
this);
1074 int ret = m_data_file->Fsync();
1075 bool errorp =
false;
1079 m_cfi.WriteIOStat(loc_stats);
1080 m_cfi.Write(m_info_file, m_filename.c_str());
1081 int cret = m_info_file->Fsync();
1084 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1090 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1096 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1103 m_writes_during_sync.clear();
1109 int written_while_in_sync;
1110 bool resync =
false;
1113 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1115 m_cfi.SetBitSynced(*i);
1117 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1118 m_writes_during_sync.clear();
1122 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1127 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1138void File::free_block(
Block* b)
1141 int i = b->
m_offset / m_block_size;
1142 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1143 size_t ret = m_block_map.erase(i);
1147 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1157 m_prefetch_state = kOn;
1158 cache()->RegisterPrefetchFile(
this);
1164bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1168 int io_size = (int) m_io_set.size();
1173 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1176 m_current_io = m_io_set.begin();
1179 else if (io_size > 1)
1181 IoSet_i mi = m_current_io;
1182 if (skip_current && mi != m_io_set.end()) ++mi;
1184 for (
int i = 0; i < io_size; ++i)
1186 if (mi == m_io_set.end()) mi = m_io_set.begin();
1188 if ((*mi)->m_allow_prefetching)
1200 m_current_io = m_io_set.end();
1201 m_prefetch_state = kStopped;
1202 cache()->DeRegisterPrefetchFile(
this);
1210void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1216 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1218 m_state_cond.Lock();
1231 m_state_cond.UnLock();
1234 FinalizeReadRequest(rreq);
1261 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1264 m_state_cond.Lock();
1269 rreq->m_stats.m_BytesMissed += creq.
m_size;
1271 rreq->m_stats.m_BytesHit += creq.
m_size;
1273 --rreq->m_n_chunk_reqs;
1276 inc_prefetch_hit_cnt(1);
1280 bool rreq_complete = rreq->is_complete();
1282 m_state_cond.UnLock();
1285 FinalizeReadRequest(rreq);
1293 m_stats.AddReadStats(rreq->
m_stats);
1299void File::ProcessBlockResponse(
Block *b,
int res)
1301 static const char* tpfx =
"ProcessBlockResponse ";
1303 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1305 if (res >= 0 && res != b->
get_size())
1309 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1313 m_state_cond.Lock();
1319 IoSet_i mi = m_io_set.find(io);
1320 if (mi != m_io_set.end())
1322 --io->m_active_prefetches;
1325 if (res < 0 && io->m_allow_prefetching)
1327 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1328 io->m_allow_prefetching =
false;
1331 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1333 if ( ! select_current_io_or_disable_prefetching(
false) )
1335 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1341 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1344 m_state_cond.UnLock();
1358 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1359 if ( ! m_in_shutdown)
1364 cache()->AddWriteTask(b,
true);
1371 m_state_cond.UnLock();
1373 for (
auto &creq : creqs_to_notify)
1375 ProcessBlockSuccess(b, creq);
1384 <<
", io=" << b->
get_io() <<
", error=" << res);
1389 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1390#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1401 std::list<ReadRequest*> rreqs_to_complete;
1410 ProcessBlockError(b, rreq);
1413 rreqs_to_complete.push_back(rreq);
1418 creqs_to_keep.push_back(creq);
1422 bool reissue =
false;
1423 if ( ! creqs_to_keep.empty())
1425 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1427 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1428 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1435 m_state_cond.UnLock();
1437 for (
auto rreq : rreqs_to_complete)
1438 FinalizeReadRequest(rreq);
1441 ProcessBlockRequest(b);
1449 return m_filename.c_str();
1454int File::offsetIdx(
int iIdx)
const
1456 return iIdx - m_offset/m_block_size;
1470 TRACEF(DumpXL,
"Prefetch() entering.");
1474 if (m_prefetch_state != kOn)
1479 if ( ! select_current_io_or_disable_prefetching(
true) )
1481 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1486 for (
int f = 0; f < m_num_blocks; ++f)
1488 if ( ! m_cfi.TestBitWritten(f))
1490 int f_act = f + m_offset / m_block_size;
1492 BlockMap_i bi = m_block_map.find(f_act);
1493 if (bi == m_block_map.end())
1495 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1498 TRACEF(Dump,
"Prefetch take block " << f_act);
1502 inc_prefetch_read_cnt(1);
1507 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1516 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1517 m_prefetch_state = kComplete;
1518 cache()->DeRegisterPrefetchFile(
this);
1522 (*m_current_io)->m_active_prefetches += (int) blks.size();
1526 if ( ! blks.empty())
1528 ProcessBlockRequests(blks);
1537 return m_prefetch_score;
1550void File::insert_remote_location(
const std::string &loc)
1554 size_t p = loc.find_first_of(
'@');
1555 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1562 if ( ! m_remote_locations.empty())
1566 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1570 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1573 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1575 s +=
'"'; s += *i; s +=
'"';
1576 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
Statistics of cache utilisation by a File object.
long long m_BytesBypassed
number of bytes served directly through XrdCl
long long m_BytesHit
number of bytes served from disk
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)