XRootD
Loading...
Searching...
No Matches
XrdPfc::File Class Reference

#include <XrdPfcFile.hh>

+ Collaboration diagram for XrdPfc::File:

Public Member Functions

void AddIO (IO *io)
 
void BlockRemovedFromWriteQ (Block *)
 Handle removal of a block from Cache's write queue.
 
void BlocksRemovedFromWriteQ (std::list< Block * > &)
 Handle removal of a set of blocks from Cache's write queue.
 
int dec_ref_cnt ()
 
bool FinalizeSyncBeforeExit ()
 Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
 
int Fstat (struct stat &sbuff)
 
int get_ref_cnt ()
 
size_t GetAccessCnt () const
 
int GetBlockSize () const
 
long long GetFileSize () const
 
const Info::AStatGetLastAccessStats () const
 
const std::string & GetLocalPath () const
 
XrdSysErrorGetLog ()
 
int GetNBlocks () const
 
int GetNDownloadedBlocks () const
 
int GetPrefetchCountOnIO (IO *io)
 
long long GetPrefetchedBytes () const
 
float GetPrefetchScore () const
 
std::string GetRemoteLocations () const
 
XrdSysTraceGetTrace ()
 
int inc_ref_cnt ()
 
long long initiate_emergency_shutdown ()
 
bool ioActive (IO *io)
 Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
 
void ioUpdated (IO *io)
 Notification from IO that it has been updated (remote open).
 
bool is_in_emergency_shutdown ()
 
const char * lPath () const
 Log path.
 
void Prefetch ()
 
int Read (IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
 Normal read.
 
int ReadV (IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
 Vector read.
 
const StatsRefStats () const
 
void RemoveIO (IO *io)
 
void RequestSyncOfDetachStats ()
 Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
 
void StopPrefetchingOnIO (IO *io)
 
void Sync ()
 Sync file cache inf o and output data with disk.
 
void WriteBlockToDisk (Block *b)
 

Static Public Member Functions

static FileFileOpen (const std::string &path, long long offset, long long fileSize)
 Static constructor that also does Open. Returns null ptr if Open fails.
 

Friends

class BlockResponseHandler
 
class Cache
 
class DirectResponseHandler
 

Detailed Description

Definition at line 202 of file XrdPfcFile.hh.

Member Function Documentation

◆ AddIO()

void File::AddIO ( IO * io)

Definition at line 346 of file XrdPfcFile.cc.

347{
348 // Called from Cache::GetFile() when a new IO asks for the file.
349
350 TRACEF(Debug, "AddIO() io = " << (void*)io);
351
352 time_t now = time(0);
353 std::string loc(io->GetLocation());
354
355 m_state_cond.Lock();
356
357 IoSet_i mi = m_io_set.find(io);
358
359 if (mi == m_io_set.end())
360 {
361 m_io_set.insert(io);
362 io->m_attach_time = now;
363 m_delta_stats.IoAttach();
364
365 insert_remote_location(loc);
366
367 if (m_prefetch_state == kStopped)
368 {
369 m_prefetch_state = kOn;
370 cache()->RegisterPrefetchFile(this);
371 }
372 }
373 else
374 {
375 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
376 }
377
378 m_state_cond.UnLock();
379}
#define TRACEF(act, x)
bool Debug
const char * GetLocation()
Definition XrdPfcIO.hh:44

References Debug, Error, XrdPfc::IO::GetLocation(), and TRACEF.

Referenced by XrdPfc::Cache::GetFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ BlockRemovedFromWriteQ()

void File::BlockRemovedFromWriteQ ( Block * b)

Handle removal of a block from Cache's write queue.

Definition at line 208 of file XrdPfcFile.cc.

209{
210 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
211
212 XrdSysCondVarHelper _lck(m_state_cond);
213 dec_ref_count(b);
214}
long long m_offset

References XrdPfc::Block::m_offset, and TRACEF.

◆ BlocksRemovedFromWriteQ()

void File::BlocksRemovedFromWriteQ ( std::list< Block * > & blocks)

Handle removal of a set of blocks from Cache's write queue.

Definition at line 216 of file XrdPfcFile.cc.

217{
218 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
219
220 XrdSysCondVarHelper _lck(m_state_cond);
221
222 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
223 {
224 dec_ref_count(*i);
225 }
226}

References TRACEF.

Referenced by XrdPfc::Cache::RemoveWriteQEntriesFor().

+ Here is the caller graph for this function:

◆ dec_ref_cnt()

int XrdPfc::File::dec_ref_cnt ( )
inline

Definition at line 288 of file XrdPfcFile.hh.

288{ return --m_ref_cnt; }

◆ FileOpen()

File * File::FileOpen ( const std::string & path,
long long offset,
long long fileSize )
static

Static constructor that also does Open. Returns null ptr if Open fails.

Definition at line 138 of file XrdPfcFile.cc.

139{
140 File *file = new File(path, offset, fileSize);
141 if ( ! file->Open())
142 {
143 delete file;
144 file = 0;
145 }
146 return file;
147}
XrdOucString File

References File.

Referenced by XrdPfc::Cache::GetFile().

+ Here is the caller graph for this function:

◆ FinalizeSyncBeforeExit()

bool File::FinalizeSyncBeforeExit ( )

Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.

Definition at line 322 of file XrdPfcFile.cc.

323{
324 // Returns true if sync is required.
325 // This method is called after corresponding IO is detached from PosixCache.
326
327 XrdSysCondVarHelper _lck(m_state_cond);
328 if ( ! m_in_shutdown)
329 {
330 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
331 {
332 report_and_merge_delta_stats();
333 m_cfi.WriteIOStatDetach(m_stats);
334 m_detach_time_logged = true;
335 m_in_sync = true;
336 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
337 return true;
338 }
339 }
340 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
341 return false;
342}

References Debug, and TRACEF.

◆ Fstat()

int File::Fstat ( struct stat & sbuff)

Definition at line 563 of file XrdPfcFile.cc.

564{
565 // Stat on an open file.
566 // Corrects size to actual full size of the file.
567 // Sets atime to 0 if the file is only partially downloaded, in accordance
568 // with pfc.onlyifcached settings.
569 // Called from IO::Fstat() and Cache::Stat() when the file is active.
570 // Returns 0 on success, -errno on error.
571
572 int res;
573
574 if ((res = m_data_file->Fstat(&sbuff))) return res;
575
576 sbuff.st_size = m_file_size;
577
578 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
579 if ( ! is_cached)
580 sbuff.st_atime = 0;
581
582 return 0;
583}

References stat.

Referenced by XrdPfc::Cache::ConsiderCached(), and XrdPfc::Cache::Stat().

+ Here is the caller graph for this function:

◆ get_ref_cnt()

int XrdPfc::File::get_ref_cnt ( )
inline

Definition at line 286 of file XrdPfcFile.hh.

286{ return m_ref_cnt; }

◆ GetAccessCnt()

size_t XrdPfc::File::GetAccessCnt ( ) const
inline

Definition at line 276 of file XrdPfcFile.hh.

276{ return m_cfi.GetAccessCnt(); }

◆ GetBlockSize()

int XrdPfc::File::GetBlockSize ( ) const
inline

Definition at line 277 of file XrdPfcFile.hh.

277{ return m_cfi.GetBufferSize(); }

◆ GetFileSize()

long long XrdPfc::File::GetFileSize ( ) const
inline

Definition at line 267 of file XrdPfcFile.hh.

267{ return m_file_size; }

◆ GetLastAccessStats()

const Info::AStat * XrdPfc::File::GetLastAccessStats ( ) const
inline

Definition at line 275 of file XrdPfcFile.hh.

275{ return m_cfi.GetLastAccessStats(); }

◆ GetLocalPath()

const std::string & XrdPfc::File::GetLocalPath ( ) const
inline

Definition at line 262 of file XrdPfcFile.hh.

262{ return m_filename; }

Referenced by XrdPfc::Cache::AddWriteTask(), and XrdPfc::Cache::ReleaseFile().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * File::GetLog ( )

Definition at line 1627 of file XrdPfcFile.cc.

1628{
1629 return Cache::GetInstance().GetLog();
1630}
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
XrdSysError * GetLog()
Definition XrdPfc.hh:282

References XrdPfc::Cache::GetInstance(), and XrdPfc::Cache::GetLog().

+ Here is the call graph for this function:

◆ GetNBlocks()

int XrdPfc::File::GetNBlocks ( ) const
inline

Definition at line 278 of file XrdPfcFile.hh.

278{ return m_cfi.GetNBlocks(); }

◆ GetNDownloadedBlocks()

int XrdPfc::File::GetNDownloadedBlocks ( ) const
inline

Definition at line 279 of file XrdPfcFile.hh.

279{ return m_cfi.GetNDownloadedBlocks(); }

◆ GetPrefetchCountOnIO()

int XrdPfc::File::GetPrefetchCountOnIO ( IO * io)

◆ GetPrefetchedBytes()

long long XrdPfc::File::GetPrefetchedBytes ( ) const
inline

Definition at line 280 of file XrdPfcFile.hh.

280{ return m_prefetch_bytes; }

◆ GetPrefetchScore()

float File::GetPrefetchScore ( ) const

Definition at line 1622 of file XrdPfcFile.cc.

1623{
1624 return m_prefetch_score;
1625}

◆ GetRemoteLocations()

std::string File::GetRemoteLocations ( ) const

Definition at line 1646 of file XrdPfcFile.cc.

1647{
1648 std::string s;
1649 if ( ! m_remote_locations.empty())
1650 {
1651 size_t sl = 0;
1652 int nl = 0;
1653 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1654 {
1655 sl += i->size();
1656 }
1657 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1658 s = '[';
1659 int j = 1;
1660 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1661 {
1662 s += '"'; s += *i; s += '"';
1663 if (j < nl) s += ',';
1664 }
1665 s += ']';
1666 }
1667 else
1668 {
1669 s = "[]";
1670 }
1671 return s;
1672}

◆ GetTrace()

XrdSysTrace * File::GetTrace ( )

Definition at line 1632 of file XrdPfcFile.cc.

1633{
1634 return Cache::GetInstance().GetTrace();
1635}
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:283

References XrdPfc::Cache::GetInstance(), and XrdPfc::Cache::GetTrace().

+ Here is the call graph for this function:

◆ inc_ref_cnt()

int XrdPfc::File::inc_ref_cnt ( )
inline

Definition at line 287 of file XrdPfcFile.hh.

287{ return ++m_ref_cnt; }

◆ initiate_emergency_shutdown()

long long File::initiate_emergency_shutdown ( )

Definition at line 151 of file XrdPfcFile.cc.

152{
153 // Called from Cache::Unlink() when the file is currently open.
154 // Cache::Unlink is also called on FSync error and when wrong number of bytes
155 // is received from a remote read.
156 //
157 // From this point onward the file will not be written to, cinfo file will
158 // not be updated, and all new read requests will return -ENOENT.
159 //
160 // File's entry in the Cache's active map is set to nullptr and will be
161 // removed from there shortly, in any case, well before this File object
162 // shuts down. Cache::Unlink() also reports the appropriate purge event.
163
164 XrdSysCondVarHelper _lck(m_state_cond);
165
166 m_in_shutdown = true;
167
168 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
169 {
170 m_prefetch_state = kStopped;
171 cache()->DeRegisterPrefetchFile(this);
172 }
173
174 report_and_merge_delta_stats();
175
176 return m_st_blocks;
177}

Referenced by XrdPfc::Cache::UnlinkFile().

+ Here is the caller graph for this function:

◆ ioActive()

bool File::ioActive ( IO * io)

Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()

Definition at line 239 of file XrdPfcFile.cc.

240{
241 // Returns true if delay is needed.
242
243 TRACEF(Debug, "ioActive start for io " << io);
244
245 std::string loc(io->GetLocation());
246
247 {
248 XrdSysCondVarHelper _lck(m_state_cond);
249
250 IoSet_i mi = m_io_set.find(io);
251
252 if (mi != m_io_set.end())
253 {
254 unsigned int n_active_reads = io->m_active_read_reqs;
255
256 TRACE(Info, "ioActive for io " << io <<
257 ", active_reads " << n_active_reads <<
258 ", active_prefetches " << io->m_active_prefetches <<
259 ", allow_prefetching " << io->m_allow_prefetching <<
260 ", ios_in_detach " << m_ios_in_detach);
261 TRACEF(Info,
262 "\tio_map.size() " << m_io_set.size() <<
263 ", block_map.size() " << m_block_map.size() << ", file");
264
265 insert_remote_location(loc);
266
267 io->m_allow_prefetching = false;
268 io->m_in_detach = true;
269
270 // Check if any IO is still available for prfetching. If not, stop it.
271 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
272 {
273 if ( ! select_current_io_or_disable_prefetching(false) )
274 {
275 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
276 }
277 }
278
279 // On last IO, consider write queue blocks. Note, this also contains
280 // blocks being prefetched.
281
282 bool io_active_result;
283
284 if (n_active_reads > 0)
285 {
286 io_active_result = true;
287 }
288 else if (m_io_set.size() - m_ios_in_detach == 1)
289 {
290 io_active_result = ! m_block_map.empty();
291 }
292 else
293 {
294 io_active_result = io->m_active_prefetches > 0;
295 }
296
297 if ( ! io_active_result)
298 {
299 ++m_ios_in_detach;
300 }
301
302 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
303
304 return io_active_result;
305 }
306 else
307 {
308 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
309 return false;
310 }
311 }
312}
#define TRACE(act, x)
Definition XrdTrace.hh:63
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70

References Debug, Error, XrdPfc::IO::GetLocation(), XrdPfc::IO::m_active_read_reqs, TRACE, and TRACEF.

+ Here is the call graph for this function:

◆ ioUpdated()

void File::ioUpdated ( IO * io)

Notification from IO that it has been updated (remote open).

Definition at line 230 of file XrdPfcFile.cc.

231{
232 std::string loc(io->GetLocation());
233 XrdSysCondVarHelper _lck(m_state_cond);
234 insert_remote_location(loc);
235}

References XrdPfc::IO::GetLocation().

+ Here is the call graph for this function:

◆ is_in_emergency_shutdown()

bool XrdPfc::File::is_in_emergency_shutdown ( )
inline

Definition at line 291 of file XrdPfcFile.hh.

291{ return m_in_shutdown; }

◆ lPath()

const char * File::lPath ( ) const

Log path.

Definition at line 1534 of file XrdPfcFile.cc.

1535{
1536 return m_filename.c_str();
1537}

Referenced by XrdPfc::Cache::ProcessWriteTasks(), and XrdPfc::Cache::RemoveWriteQEntriesFor().

+ Here is the caller graph for this function:

◆ Prefetch()

void File::Prefetch ( )

Definition at line 1549 of file XrdPfcFile.cc.

1550{
1551 // Check that block is not on disk and not in RAM.
1552 // TODO: Could prefetch several blocks at once!
1553 // blks_max could be an argument
1554
1555 BlockList_t blks;
1556
1557 TRACEF(DumpXL, "Prefetch() entering.");
1558 {
1559 XrdSysCondVarHelper _lck(m_state_cond);
1560
1561 if (m_prefetch_state != kOn)
1562 {
1563 return;
1564 }
1565
1566 if ( ! select_current_io_or_disable_prefetching(true) )
1567 {
1568 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1569 return;
1570 }
1571
1572 // Select block(s) to fetch.
1573 for (int f = 0; f < m_num_blocks; ++f)
1574 {
1575 if ( ! m_cfi.TestBitWritten(f))
1576 {
1577 int f_act = f + m_offset / m_block_size;
1578
1579 BlockMap_i bi = m_block_map.find(f_act);
1580 if (bi == m_block_map.end())
1581 {
1582 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1583 if (b)
1584 {
1585 TRACEF(Dump, "Prefetch take block " << f_act);
1586 blks.push_back(b);
1587 // Note: block ref_cnt not increased, it will be when placed into write queue.
1588
1589 inc_prefetch_read_cnt(1);
1590 }
1591 else
1592 {
1593 // This shouldn't happen as prefetching stops when RAM is 70% full.
1594 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1595 }
1596 break;
1597 }
1598 }
1599 }
1600
1601 if (blks.empty())
1602 {
1603 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1604 m_prefetch_state = kComplete;
1605 cache()->DeRegisterPrefetchFile(this);
1606 }
1607 else
1608 {
1609 (*m_current_io)->m_active_prefetches += (int) blks.size();
1610 }
1611 }
1612
1613 if ( ! blks.empty())
1614 {
1615 ProcessBlockRequests(blks);
1616 }
1617}
std::list< Block * > BlockList_t

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::Prefetch().

+ Here is the caller graph for this function:

◆ Read()

int File::Read ( IO * io,
char * buff,
long long offset,
int size,
ReadReqRH * rh )

Normal read.

Definition at line 748 of file XrdPfcFile.cc.

749{
750 // rrc_func is ONLY called from async processing.
751 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
752 // This streamlines implementation of synchronous IO::Read().
753
754 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
755
756 m_state_cond.Lock();
757
758 if (m_in_shutdown || io->m_in_detach)
759 {
760 m_state_cond.UnLock();
761 return m_in_shutdown ? -ENOENT : -EBADF;
762 }
763
764 // Shortcut -- file is fully downloaded.
765
766 if (m_cfi.IsComplete())
767 {
768 m_state_cond.UnLock();
769 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
770 if (ret > 0) {
771 XrdSysCondVarHelper _lck(m_state_cond);
772 m_delta_stats.AddBytesHit(ret);
773 check_delta_stats();
774 }
775 return ret;
776 }
777
778 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
779
780 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
781}
unsigned short m_seq_id
Definition XrdPfcFile.hh:53

References Xrd::hex1, XrdPfc::ReadReqRH::m_seq_id, and TRACEF.

Referenced by XrdPfc::IOFileBlock::Read().

+ Here is the caller graph for this function:

◆ ReadV()

int File::ReadV ( IO * io,
const XrdOucIOVec * readV,
int readVnum,
ReadReqRH * rh )

Vector read.

Definition at line 785 of file XrdPfcFile.cc.

786{
787 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
788
789 m_state_cond.Lock();
790
791 if (m_in_shutdown || io->m_in_detach)
792 {
793 m_state_cond.UnLock();
794 return m_in_shutdown ? -ENOENT : -EBADF;
795 }
796
797 // Shortcut -- file is fully downloaded.
798
799 if (m_cfi.IsComplete())
800 {
801 m_state_cond.UnLock();
802 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
803 if (ret > 0) {
804 XrdSysCondVarHelper _lck(m_state_cond);
805 m_delta_stats.AddBytesHit(ret);
806 check_delta_stats();
807 }
808 return ret;
809 }
810
811 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
812}

References TRACEF.

◆ RefStats()

const Stats & XrdPfc::File::RefStats ( ) const
inline

Definition at line 281 of file XrdPfcFile.hh.

281{ return m_stats; }

◆ RemoveIO()

void File::RemoveIO ( IO * io)

Definition at line 383 of file XrdPfcFile.cc.

384{
385 // Called from Cache::ReleaseFile.
386
387 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
388
389 time_t now = time(0);
390
391 m_state_cond.Lock();
392
393 IoSet_i mi = m_io_set.find(io);
394
395 if (mi != m_io_set.end())
396 {
397 if (mi == m_current_io)
398 {
399 ++m_current_io;
400 }
401
402 m_delta_stats.IoDetach(now - io->m_attach_time);
403 m_io_set.erase(mi);
404 --m_ios_in_detach;
405
406 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
407 {
408 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
409 m_prefetch_state = kStopped;
410 cache()->DeRegisterPrefetchFile(this);
411 }
412 }
413 else
414 {
415 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
416 }
417
418 m_state_cond.UnLock();
419}

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::ReleaseFile().

+ Here is the caller graph for this function:

◆ RequestSyncOfDetachStats()

void File::RequestSyncOfDetachStats ( )

Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.

Definition at line 316 of file XrdPfcFile.cc.

317{
318 XrdSysCondVarHelper _lck(m_state_cond);
319 m_detach_time_logged = false;
320}

◆ StopPrefetchingOnIO()

void XrdPfc::File::StopPrefetchingOnIO ( IO * io)

◆ Sync()

void File::Sync ( )

Sync file cache inf o and output data with disk.

Definition at line 1148 of file XrdPfcFile.cc.

1149{
1150 TRACEF(Dump, "Sync()");
1151
1152 int ret = m_data_file->Fsync();
1153 bool errorp = false;
1154 if (ret == XrdOssOK)
1155 {
1156 Stats loc_stats;
1157 {
1158 XrdSysCondVarHelper _lck(&m_state_cond);
1159 report_and_merge_delta_stats();
1160 loc_stats = m_stats;
1161 }
1162 m_cfi.WriteIOStat(loc_stats);
1163 m_cfi.Write(m_info_file, m_filename.c_str());
1164 int cret = m_info_file->Fsync();
1165 if (cret != XrdOssOK)
1166 {
1167 TRACEF(Error, "Sync cinfo file sync error " << cret);
1168 errorp = true;
1169 }
1170 }
1171 else
1172 {
1173 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1174 errorp = true;
1175 }
1176
1177 if (errorp)
1178 {
1179 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1180
1181 // Unlink will also call this->initiate_emergency_shutdown()
1182 Cache::GetInstance().UnlinkFile(m_filename, false);
1183
1184 XrdSysCondVarHelper _lck(&m_state_cond);
1185
1186 m_writes_during_sync.clear();
1187 m_in_sync = false;
1188
1189 return;
1190 }
1191
1192 int written_while_in_sync;
1193 bool resync = false;
1194 {
1195 XrdSysCondVarHelper _lck(&m_state_cond);
1196 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1197 {
1198 m_cfi.SetBitSynced(*i);
1199 }
1200 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1201 m_writes_during_sync.clear();
1202
1203 // If there were writes during sync and the file is now complete,
1204 // let us call Sync again without resetting the m_in_sync flag.
1205 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1206 resync = true;
1207 else
1208 m_in_sync = false;
1209 }
1210 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1211
1212 if (resync)
1213 Sync();
1214}
#define XrdOssOK
Definition XrdOss.hh:50
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1187
void Sync()
Sync file cache inf o and output data with disk.
XrdPosixStats Stats

References Error, XrdPfc::Cache::GetInstance(), Sync(), TRACEF, XrdPfc::Cache::UnlinkFile(), and XrdOssOK.

Referenced by Sync().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WriteBlockToDisk()

void File::WriteBlockToDisk ( Block * b)

Definition at line 1068 of file XrdPfcFile.cc.

1069{
1070 // write block buffer into disk file
1071 long long offset = b->m_offset - m_offset;
1072 long long size = b->get_size();
1073 ssize_t retval;
1074
1075 if (m_cfi.IsCkSumCache())
1076 if (b->has_cksums())
1077 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1078 else
1079 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1080 else
1081 retval = m_data_file->Write(b->get_buff(), offset, size);
1082
1083 if (retval < size)
1084 {
1085 if (retval < 0) {
1086 TRACEF(Error, "WriteToDisk() write error " << retval);
1087 } else {
1088 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1089 }
1090
1091 XrdSysCondVarHelper _lck(m_state_cond);
1092
1093 dec_ref_count(b);
1094
1095 return;
1096 }
1097
1098 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1099
1100 // Set written bit.
1101 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1102
1103 bool schedule_sync = false;
1104 {
1105 XrdSysCondVarHelper _lck(m_state_cond);
1106
1107 m_cfi.SetBitWritten(blk_idx);
1108
1109 if (b->m_prefetch)
1110 {
1111 m_cfi.SetBitPrefetch(blk_idx);
1112 }
1113 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1114 {
1115 m_cfi.ResetCkSumNet();
1116 }
1117
1118 dec_ref_count(b);
1119
1120 // Set synced bit or stash block index if in actual sync.
1121 // Synced state is only written out to cinfo file when data file is synced.
1122 if (m_in_sync)
1123 {
1124 m_writes_during_sync.push_back(blk_idx);
1125 }
1126 else
1127 {
1128 m_cfi.SetBitSynced(blk_idx);
1129 ++m_non_flushed_cnt;
1130 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1131 ! m_in_shutdown)
1132 {
1133 schedule_sync = true;
1134 m_in_sync = true;
1135 m_non_flushed_cnt = 0;
1136 }
1137 }
1138 }
1139
1140 if (schedule_sync)
1141 {
1142 cache()->ScheduleFileSync(this);
1143 }
1144}
int get_size() const
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:204
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:116

References Error, XrdPfc::Block::get_buff(), XrdPfc::Block::get_size(), XrdPfc::Cache::GetInstance(), XrdPfc::Block::has_cksums(), XrdPfc::Block::m_offset, XrdPfc::Block::m_prefetch, XrdPfc::Block::ref_cksum_vec(), XrdPfc::Block::req_cksum_net(), and TRACEF.

Referenced by XrdPfc::Cache::ProcessWriteTasks().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ BlockResponseHandler

friend class BlockResponseHandler
friend

Definition at line 205 of file XrdPfcFile.hh.

References BlockResponseHandler.

Referenced by BlockResponseHandler.

◆ Cache

friend class Cache
friend

Definition at line 204 of file XrdPfcFile.hh.

References Cache.

Referenced by Cache.

◆ DirectResponseHandler

friend class DirectResponseHandler
friend

Definition at line 206 of file XrdPfcFile.hh.

References DirectResponseHandler.

Referenced by DirectResponseHandler.


The documentation for this class was generated from the following files: