XRootD
Loading...
Searching...
No Matches
XrdPfc.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <fcntl.h>
20#include <sstream>
21#include <algorithm>
22#include <sys/statvfs.h>
23
25#include "XrdCl/XrdClURL.hh"
26
27#include "XrdOuc/XrdOucEnv.hh"
28#include "XrdOuc/XrdOucUtils.hh"
30
32#include "XrdSys/XrdSysTimer.hh"
33#include "XrdSys/XrdSysTrace.hh"
34#include "XrdSys/XrdSysXAttr.hh"
35
37
38#include "XrdOss/XrdOss.hh"
39
40#include "XrdPfc.hh"
41#include "XrdPfcTrace.hh"
42#include "XrdPfcFSctl.hh"
43#include "XrdPfcInfo.hh"
44#include "XrdPfcIOFile.hh"
45#include "XrdPfcIOFileBlock.hh"
46
48
49using namespace XrdPfc;
50
51Cache * Cache::m_instance = 0;
52
54
55
61
62void *PurgeThread(void*)
63{
65 return 0;
66}
67
69{
71 return 0;
72}
73
74void *PrefetchThread(void*)
75{
77 return 0;
78}
79
80//==============================================================================
81
82extern "C"
83{
85 const char *config_filename,
86 const char *parameters,
87 XrdOucEnv *env)
88{
89 XrdSysError err(logger, "");
90 err.Say("++++++ Proxy file cache initialization started.");
91
92 if ( ! env ||
93 ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
94 {
96 XrdPfc::Cache::schedP->Start();
97 }
98
99 Cache &instance = Cache::CreateInstance(logger, env);
100
101 if (! instance.Config(config_filename, parameters))
102 {
103 err.Say("Config Proxy file cache initialization failed.");
104 return 0;
105 }
106 err.Say("------ Proxy file cache initialization completed.");
107
108 {
109 pthread_t tid;
110
111 for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
112 {
113 XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
114 }
115
116 if (instance.RefConfiguration().m_prefetch_max_blocks > 0)
117 {
118 XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
119 }
120
121 XrdSysThread::Run(&tid, ResourceMonitorHeartBeatThread, 0, 0, "XrdPfc ResourceMonitorHeartBeat");
122
123 XrdSysThread::Run(&tid, PurgeThread, 0, 0, "XrdPfc Purge");
124 }
125
126 XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
127 env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
128
129 return &instance;
130}
131}
132
133//==============================================================================
134
135void Configuration::calculate_fractional_usages(long long du, long long fu,
136 double &frac_du, double &frac_fu)
137{
138 // Calculate fractional disk / file usage and clamp them to [0, 1].
139
140 // Fractional total usage above LWM:
141 // - can be > 1 if usage is above HWM;
142 // - can be < 0 if triggered via age-based-purging.
143 frac_du = (double) (du - m_diskUsageLWM) / (m_diskUsageHWM - m_diskUsageLWM);
144
145 // Fractional file usage above baseline.
146 // - can be > 1 if file usage is above max;
147 // - can be < 0 if file usage is below baseline.
148 frac_fu = (double) (fu - m_fileUsageBaseline) / (m_fileUsageMax - m_fileUsageBaseline);
149
150 frac_du = std::min( std::max( frac_du, 0.0), 1.0 );
151 frac_fu = std::min( std::max( frac_fu, 0.0), 1.0 );
152}
153
154//==============================================================================
155
157{
158 assert (m_instance == 0);
159 m_instance = new Cache(logger, env);
160 return *m_instance;
161}
162
163 Cache& Cache::GetInstance() { return *m_instance; }
164const Cache& Cache::TheOne() { return *m_instance; }
165const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
166
168{
169 if (! m_decisionpoints.empty())
170 {
171 XrdCl::URL url(io->Path());
172 std::string filename = url.GetPath();
173 std::vector<Decision*>::const_iterator it;
174 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
175 {
176 XrdPfc::Decision *d = *it;
177 if (! d) continue;
178 if (! d->Decide(filename, *m_oss))
179 {
180 return false;
181 }
182 }
183 }
184
185 return true;
186}
187
189 XrdOucCache("pfc"),
190 m_env(env),
191 m_log(logger, "XrdPfc_"),
192 m_trace(new XrdSysTrace("XrdPfc", logger)),
193 m_traceID("Cache"),
194 m_oss(0),
195 m_gstream(0),
196 m_prefetch_condVar(0),
197 m_prefetch_enabled(false),
198 m_RAM_used(0),
199 m_RAM_write_queue(0),
200 m_RAM_std_size(0),
201 m_isClient(false),
202 m_in_purge(false),
203 m_active_cond(0),
204 m_stats_n_purge_cond(0),
205 m_fs_state(0),
206 m_last_scan_duration(0),
207 m_last_purge_duration(0),
208 m_spt_state(SPTS_Idle)
209{
210 // Default log level is Warning.
211 m_trace->What = 2;
212}
213
215{
216 const char* tpfx = "Attach() ";
217
218 if (Cache::GetInstance().Decide(io))
219 {
220 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
221
222 IO *cio;
223
224 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
225 {
226 cio = new IOFileBlock(io, *this);
227 }
228 else
229 {
230 IOFile *iof = new IOFile(io, *this);
231
232 if ( ! iof->HasFile())
233 {
234 delete iof;
235 // TODO - redirect instead. But this is kind of an awkward place for it.
236 // errno is set during IOFile construction.
237 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
238 return io;
239 }
240
241 cio = iof;
242 }
243
244 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
245 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
246
247 return cio;
248 }
249 else
250 {
251 TRACE(Info, tpfx << "decision decline " << io->Path());
252 }
253 return io;
254}
255
256void Cache::AddWriteTask(Block* b, bool fromRead)
257{
258 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
259
260 {
261 XrdSysMutexHelper lock(&m_RAM_mutex);
262 m_RAM_write_queue += b->get_size();
263 }
264
265 m_writeQ.condVar.Lock();
266 if (fromRead)
267 m_writeQ.queue.push_back(b);
268 else
269 m_writeQ.queue.push_front(b);
270 m_writeQ.size++;
271 m_writeQ.condVar.Signal();
272 m_writeQ.condVar.UnLock();
273}
274
276{
277 std::list<Block*> removed_blocks;
278 long long sum_size = 0;
279
280 m_writeQ.condVar.Lock();
281 std::list<Block*>::iterator i = m_writeQ.queue.begin();
282 while (i != m_writeQ.queue.end())
283 {
284 if ((*i)->m_file == file)
285 {
286 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
287 std::list<Block*>::iterator j = i++;
288 removed_blocks.push_back(*j);
289 sum_size += (*j)->get_size();
290 m_writeQ.queue.erase(j);
291 --m_writeQ.size;
292 }
293 else
294 {
295 ++i;
296 }
297 }
298 m_writeQ.condVar.UnLock();
299
300 {
301 XrdSysMutexHelper lock(&m_RAM_mutex);
302 m_RAM_write_queue -= sum_size;
303 }
304
305 file->BlocksRemovedFromWriteQ(removed_blocks);
306}
307
309{
310 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
311
312 while (true)
313 {
314 m_writeQ.condVar.Lock();
315 while (m_writeQ.size == 0)
316 {
317 m_writeQ.condVar.Wait();
318 }
319
320 // MT -- optimize to pop several blocks if they are available (or swap the list).
321 // This makes sense especially for smallish block sizes.
322
323 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
324 long long sum_size = 0;
325
326 for (int bi = 0; bi < n_pushed; ++bi)
327 {
328 Block* block = m_writeQ.queue.front();
329 m_writeQ.queue.pop_front();
330 m_writeQ.writes_between_purges += block->get_size();
331 sum_size += block->get_size();
332
333 blks_to_write[bi] = block;
334
335 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
336 }
337 m_writeQ.size -= n_pushed;
338
339 m_writeQ.condVar.UnLock();
340
341 {
342 XrdSysMutexHelper lock(&m_RAM_mutex);
343 m_RAM_write_queue -= sum_size;
344 }
345
346 for (int bi = 0; bi < n_pushed; ++bi)
347 {
348 Block* block = blks_to_write[bi];
349
350 block->m_file->WriteBlockToDisk(block);
351 }
352 }
353}
354
355//==============================================================================
356
357char* Cache::RequestRAM(long long size)
358{
359 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
360
361 bool std_size = (size == m_configuration.m_bufferSize);
362
363 m_RAM_mutex.Lock();
364
365 long long total = m_RAM_used + size;
366
367 if (total <= m_configuration.m_RamAbsAvailable)
368 {
369 m_RAM_used = total;
370 if (std_size && m_RAM_std_size > 0)
371 {
372 char *buf = m_RAM_std_blocks.back();
373 m_RAM_std_blocks.pop_back();
374 --m_RAM_std_size;
375
376 m_RAM_mutex.UnLock();
377
378 return buf;
379 }
380 else
381 {
382 m_RAM_mutex.UnLock();
383 char *buf;
384 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
385 {
386 // Report out of mem? Probably should report it at least the first time,
387 // then periodically.
388 return 0;
389 }
390 return buf;
391 }
392 }
393 m_RAM_mutex.UnLock();
394 return 0;
395}
396
397void Cache::ReleaseRAM(char* buf, long long size)
398{
399 bool std_size = (size == m_configuration.m_bufferSize);
400 {
401 XrdSysMutexHelper lock(&m_RAM_mutex);
402
403 m_RAM_used -= size;
404
405 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
406 {
407 m_RAM_std_blocks.push_back(buf);
408 ++m_RAM_std_size;
409 return;
410 }
411 }
412 free(buf);
413}
414
415File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
416{
417 // Called from virtual IOFile constructor.
418
419 TRACE(Debug, "GetFile " << path << ", io " << io);
420
421 ActiveMap_i it;
422
423 {
424 XrdSysCondVarHelper lock(&m_active_cond);
425
426 while (true)
427 {
428 it = m_active.find(path);
429
430 // File is not open or being opened. Mark it as being opened and
431 // proceed to opening it outside of while loop.
432 if (it == m_active.end())
433 {
434 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
435 break;
436 }
437
438 if (it->second != 0)
439 {
440 it->second->AddIO(io);
441 inc_ref_cnt(it->second, false, true);
442
443 return it->second;
444 }
445 else
446 {
447 // Wait for some change in m_active, then recheck.
448 m_active_cond.Wait();
449 }
450 }
451 }
452
453 // This is always true, now that IOFileBlock is unsupported.
454 if (filesize == 0)
455 {
456 struct stat st;
457 int res = io->Fstat(st);
458 if (res < 0) {
459 errno = res;
460 TRACE(Error, "GetFile, could not get valid stat");
461 } else if (res > 0) {
462 errno = ENOTSUP;
463 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
464 } else {
465 filesize = st.st_size;
466 }
467 }
468
469 File *file = 0;
470
471 if (filesize >= 0)
472 {
473 file = File::FileOpen(path, off, filesize);
474 }
475
476 {
477 XrdSysCondVarHelper lock(&m_active_cond);
478
479 if (file)
480 {
481 inc_ref_cnt(file, false, true);
482 it->second = file;
483
484 file->AddIO(io);
485 }
486 else
487 {
488 m_active.erase(it);
489 }
490
491 m_active_cond.Broadcast();
492 }
493
494 return file;
495}
496
498{
499 // Called from virtual IO::DetachFinalize.
500
501 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
502
503 {
504 XrdSysCondVarHelper lock(&m_active_cond);
505
506 f->RemoveIO(io);
507 }
508 dec_ref_cnt(f, true);
509}
510
511
512namespace
513{
514
515class DiskSyncer : public XrdJob
516{
517private:
518 File *m_file;
519 bool m_high_debug;
520
521public:
522 DiskSyncer(File *f, bool high_debug, const char *desc = "") :
523 XrdJob(desc),
524 m_file(f),
525 m_high_debug(high_debug)
526 {}
527
528 void DoIt()
529 {
530 m_file->Sync();
531 Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
532 delete this;
533 }
534};
535
536
537class CommandExecutor : public XrdJob
538{
539private:
540 std::string m_command_url;
541
542public:
543 CommandExecutor(const std::string& command, const char *desc = "") :
544 XrdJob(desc),
545 m_command_url(command)
546 {}
547
548 void DoIt()
549 {
550 Cache::GetInstance().ExecuteCommandUrl(m_command_url);
551 delete this;
552 }
553};
554
555}
556
557//==============================================================================
558
559void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
560{
561 DiskSyncer* ds = new DiskSyncer(f, high_debug);
562
563 if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
564
565 schedP->Schedule(ds);
566}
567
568void Cache::FileSyncDone(File* f, bool high_debug)
569{
570 dec_ref_cnt(f, high_debug);
571}
572
573void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
574{
575 // called from GetFile() or SheduleFileSync();
576
577 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
578
579 if (lock) m_active_cond.Lock();
580 int rc = f->inc_ref_cnt();
581 if (lock) m_active_cond.UnLock();
582
583 TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
584}
585
586void Cache::dec_ref_cnt(File* f, bool high_debug)
587{
588 // NOT under active lock.
589 // Called from ReleaseFile(), DiskSync callback and stat-like functions.
590
591 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
592 int cnt;
593
594 {
595 XrdSysCondVarHelper lock(&m_active_cond);
596
597 cnt = f->get_ref_cnt();
598 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
599
601 {
602 // In this case file has been already removed from m_active map and
603 // does not need to be synced.
604
605 if (cnt == 1)
606 {
607 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
608 << " -- deleting File object without further ado");
609 delete f;
610 }
611 else
612 {
613 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
614 << " -- waiting");
615 }
616 return;
617 }
618 if (cnt > 1)
619 {
620 f->dec_ref_cnt();
621 return;
622 }
623 }
624
625 if (cnt == 1)
626 {
627 if (f->FinalizeSyncBeforeExit())
628 {
629 // Note, here we "reuse" the existing reference count for the
630 // final sync.
631
632 TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
633 schedule_file_sync(f, true, true);
634 return;
635 }
636 }
637
638 {
639 XrdSysCondVarHelper lock(&m_active_cond);
640
641 cnt = f->dec_ref_cnt();
642 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
643 if (cnt == 0)
644 {
645 ActiveMap_i it = m_active.find(f->GetLocalPath());
646 m_active.erase(it);
647
648 m_closed_files_stats.insert(std::make_pair(f->GetLocalPath(), f->DeltaStatsFromLastCall()));
649
650 if (m_gstream)
651 {
652 const Stats &st = f->RefStats();
653 const Info::AStat *as = f->GetLastAccessStats();
654
655 char buf[4096];
656 int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
657 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
658 "\"access_cnt\":%zu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
659 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,"
660 "\"b_todisk\":%lld,\"b_prefetch\":%lld,\"n_cks_errs\":%d}",
661 f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
663 f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
664 f->GetRemoteLocations().c_str(),
665 as->BytesHit, as->BytesMissed, as->BytesBypassed,
667 );
668 bool suc = false;
669 if (len < 4096)
670 {
671 suc = m_gstream->Insert(buf, len + 1);
672 }
673 if ( ! suc)
674 {
675 TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
676 }
677 }
678
679 delete f;
680 }
681 }
682}
683
684bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)
685{
686 XrdSysCondVarHelper lock(&m_active_cond);
687
688 return m_active.find(path) != m_active.end() ||
689 m_purge_delay_set.find(path) != m_purge_delay_set.end();
690}
691
692
693//==============================================================================
694//=== PREFETCH
695//==============================================================================
696
698{
699 // Can be called with other locks held.
700
701 if ( ! m_prefetch_enabled)
702 {
703 return;
704 }
705
706 m_prefetch_condVar.Lock();
707 m_prefetchList.push_back(file);
708 m_prefetch_condVar.Signal();
709 m_prefetch_condVar.UnLock();
710}
711
712
714{
715 // Can be called with other locks held.
716
717 if ( ! m_prefetch_enabled)
718 {
719 return;
720 }
721
722 m_prefetch_condVar.Lock();
723 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
724 {
725 if (*it == file)
726 {
727 m_prefetchList.erase(it);
728 break;
729 }
730 }
731 m_prefetch_condVar.UnLock();
732}
733
734
736{
737 m_prefetch_condVar.Lock();
738 while (m_prefetchList.empty())
739 {
740 m_prefetch_condVar.Wait();
741 }
742
743 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
744
745 size_t l = m_prefetchList.size();
746 int idx = rand() % l;
747 File* f = m_prefetchList[idx];
748
749 m_prefetch_condVar.UnLock();
750 return f;
751}
752
753
755{
756 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
757
758 while (true)
759 {
760 m_RAM_mutex.Lock();
761 bool doPrefetch = (m_RAM_used < limit_RAM);
762 m_RAM_mutex.UnLock();
763
764 if (doPrefetch)
765 {
767 f->Prefetch();
768 }
769 else
770 {
772 }
773 }
774}
775
776
777//==============================================================================
778//=== Virtuals from XrdOucCache
779//==============================================================================
780
781//------------------------------------------------------------------------------
795
796int Cache::LocalFilePath(const char *curl, char *buff, int blen,
797 LFP_Reason why, bool forall)
798{
799 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
800 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
801 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
802
803 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
804
805 if (buff && blen > 0) buff[0] = 0;
806
807 XrdCl::URL url(curl);
808 std::string f_name = url.GetPath();
809 std::string i_name = f_name + Info::s_infoExtension;
810
811 if (why == ForPath)
812 {
813 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
814 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
815 return ret;
816 }
817
818 {
819 XrdSysCondVarHelper lock(&m_active_cond);
820 m_purge_delay_set.insert(f_name);
821 }
822
823 struct stat sbuff, sbuff2;
824 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
825 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
826 {
827 if (S_ISDIR(sbuff.st_mode))
828 {
829 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
830 return -EISDIR;
831 }
832 else
833 {
834 bool read_ok = false;
835 bool is_complete = false;
836
837 // Lock and check if the file is active. If NOT, keep the lock
838 // and add dummy access after successful reading of info file.
839 // If it IS active, just release the lock, this ongoing access will
840 // assure the file continues to exist.
841
842 // XXXX How can I just loop over the cinfo file when active?
843 // Can I not get is_complete from the existing file?
844 // Do I still want to inject access record?
845 // Oh, it writes only if not active .... still let's try to use existing File.
846
847 m_active_cond.Lock();
848
849 bool is_active = m_active.find(f_name) != m_active.end();
850
851 if (is_active) m_active_cond.UnLock();
852
853 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
854 XrdOucEnv myEnv;
855 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
856 if (res >= 0)
857 {
858 Info info(m_trace, 0);
859 if (info.Read(infoFile, i_name.c_str()))
860 {
861 read_ok = true;
862
863 is_complete = info.IsComplete();
864
865 // Add full-size access if reason is for access.
866 if ( ! is_active && is_complete && why == ForAccess)
867 {
868 info.WriteIOStatSingle(info.GetFileSize());
869 info.Write(infoFile, i_name.c_str());
870 }
871 }
872 infoFile->Close();
873 }
874 delete infoFile;
875
876 if ( ! is_active) m_active_cond.UnLock();
877
878 if (read_ok)
879 {
880 if ((is_complete || why == ForInfo) && buff != 0)
881 {
882 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
883 if (res2 < 0)
884 return res2;
885
886 // Normally, files are owned by us but when direct cache access
887 // is wanted and possible, make sure the file is world readable.
888 if (why == ForAccess)
889 {mode_t mode = (forall ? worldReadable : groupReadable);
890 if (((sbuff.st_mode & worldReadable) != mode)
891 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
892 {is_complete = false;
893 *buff = 0;
894 }
895 }
896 }
897
898 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
899 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
900
901 return is_complete ? 0 : -EREMOTE;
902 }
903 }
904 }
905
906 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
907 return -ENOENT;
908}
909
910//______________________________________________________________________________
911// If supported, write file_size as xattr to cinfo file.
912//------------------------------------------------------------------------------
913void Cache::WriteFileSizeXAttr(int cinfo_fd, long long file_size)
914{
915 if (m_metaXattr) {
916 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
917 if (res != 0) {
918 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
919 }
920 }
921}
922
923//______________________________________________________________________________
924// Determine full size of the data file from the corresponding cinfo-file name.
925// Attempts to read xattr first and falls back to reading of the cinfo file.
926// Returns -error on failure.
927//------------------------------------------------------------------------------
928long long Cache::DetermineFullFileSize(const std::string &cinfo_fname)
929{
930 if (m_metaXattr) {
931 char pfn[4096];
932 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
933 long long fsize = -1ll;
934 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
935 if (res == sizeof(long long))
936 {
937 return fsize;
938 }
939 else
940 {
941 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
942 }
943 }
944
945 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
946 XrdOucEnv env;
947 long long ret;
948 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
949 if (res < 0) {
950 ret = res;
951 } else {
952 Info info(m_trace, 0);
953 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
954 ret = -EBADF;
955 } else {
956 ret = info.GetFileSize();
957 }
958 infoFile->Close();
959 }
960 delete infoFile;
961 return ret;
962}
963
964//______________________________________________________________________________
965// Calculate if the file is to be considered cached for the purposes of
966// only-if-cached and setting of atime of the Stat() calls.
967// Returns true if the file is to be conidered cached.
968//------------------------------------------------------------------------------
969bool Cache::DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
970{
971 if (file_size == 0 || bytes_on_disk >= file_size)
972 return true;
973
974 double frac_on_disk = (double) bytes_on_disk / file_size;
975
976 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
977 {
978 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
979 return true;
980 }
981 else
982 {
983 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
984 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
985 return true;
986 }
987 return false;
988}
989
990//______________________________________________________________________________
991// Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
992// pfc configuration parameters. The logic of accessing the Info file is the same
993// as in Cache::LocalFilePath.
1001//------------------------------------------------------------------------------
1002int Cache::ConsiderCached(const char *curl)
1003{
1004 static const char* tpfx = "ConsiderCached ";
1005
1006 TRACE(Debug, tpfx << curl);
1007
1008 XrdCl::URL url(curl);
1009 std::string f_name = url.GetPath();
1010
1011 File *file = nullptr;
1012 {
1013 XrdSysCondVarHelper lock(&m_active_cond);
1014 auto it = m_active.find(f_name);
1015 if (it != m_active.end()) {
1016 file = it->second;
1017 inc_ref_cnt(file, false, false);
1018 }
1019 }
1020 if (file) {
1021 struct stat sbuff;
1022 int res = file->Fstat(sbuff);
1023 dec_ref_cnt(file, false);
1024 if (res)
1025 return res;
1026 // DecideIfConsideredCached() already called in File::Fstat().
1027 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1028 }
1029
1030 struct stat sbuff;
1031 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1032 if (res != XrdOssOK) {
1033 TRACE(Debug, tpfx << curl << " -> " << res);
1034 return res;
1035 }
1036 if (S_ISDIR(sbuff.st_mode))
1037 {
1038 TRACE(Debug, tpfx << curl << " -> EISDIR");
1039 return -EISDIR;
1040 }
1041
1042 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1043 if (file_size < 0) {
1044 TRACE(Debug, tpfx << curl << " -> " << file_size);
1045 return (int) file_size;
1046 }
1047 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1048
1049 return is_cached ? 0 : -EREMOTE;
1050}
1051
1052//______________________________________________________________________________
1060//------------------------------------------------------------------------------
1061
1062int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1063{
1064 XrdCl::URL url(curl);
1065 std::string f_name = url.GetPath();
1066 std::string i_name = f_name + Info::s_infoExtension;
1067
1068 // Do not allow write access.
1069 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1070 {
1071 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1072 return -EROFS;
1073 }
1074
1075 // Intercept xrdpfc_command requests.
1076 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1077 {
1078 // Schedule a job to process command request.
1079 {
1080 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1081
1082 schedP->Schedule(ce);
1083 }
1084
1085 return -EAGAIN;
1086 }
1087
1088 {
1089 XrdSysCondVarHelper lock(&m_active_cond);
1090 m_purge_delay_set.insert(f_name);
1091 }
1092
1093 struct stat sbuff;
1094 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1095 {
1096 TRACE(Dump, "Prepare defer open " << f_name);
1097 return 1;
1098 }
1099 else
1100 {
1101 return 0;
1102 }
1103}
1104
1105//______________________________________________________________________________
1106// virtual method of XrdOucCache.
1111//------------------------------------------------------------------------------
1112
1113int Cache::Stat(const char *curl, struct stat &sbuff)
1114{
1115 const char *tpfx = "Stat ";
1116
1117 XrdCl::URL url(curl);
1118 std::string f_name = url.GetPath();
1119
1120 File *file = nullptr;
1121 {
1122 XrdSysCondVarHelper lock(&m_active_cond);
1123 auto it = m_active.find(f_name);
1124 if (it != m_active.end()) {
1125 file = it->second;
1126 inc_ref_cnt(file, false, false);
1127 }
1128 }
1129 if (file) {
1130 int res = file->Fstat(sbuff);
1131 dec_ref_cnt(file, false);
1132 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1133 return res;
1134 }
1135
1136 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1137 if (res != XrdOssOK) {
1138 TRACE(Debug, tpfx << curl << " -> " << res);
1139 return 1; // res; -- for only-if-cached
1140 }
1141 if (S_ISDIR(sbuff.st_mode))
1142 {
1143 TRACE(Debug, tpfx << curl << " -> EISDIR");
1144 return -EISDIR;
1145 }
1146
1147 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1148 if (file_size < 0) {
1149 TRACE(Debug, tpfx << curl << " -> " << file_size);
1150 return 1; // (int) file_size; -- for only-if-cached
1151 }
1152 sbuff.st_size = file_size;
1153 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1154 if ( ! is_cached)
1155 sbuff.st_atime = 0;
1156
1157 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1158
1159 return 0;
1160}
1161
1162//______________________________________________________________________________
1163// virtual method of XrdOucCache.
1167//------------------------------------------------------------------------------
1168
1169int Cache::Unlink(const char *curl)
1170{
1171 XrdCl::URL url(curl);
1172 std::string f_name = url.GetPath();
1173
1174 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1175
1176 return UnlinkFile(f_name, false);
1177}
1178
1179int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1180{
1181 ActiveMap_i it;
1182 File *file = 0;
1183 {
1184 XrdSysCondVarHelper lock(&m_active_cond);
1185
1186 it = m_active.find(f_name);
1187
1188 if (it != m_active.end())
1189 {
1190 if (fail_if_open)
1191 {
1192 TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1193 return -EBUSY;
1194 }
1195
1196 // Null File* in m_active map means an operation is ongoing, probably
1197 // Attach() with possible File::Open(). Ask for retry.
1198 if (it->second == 0)
1199 {
1200 TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1201 return -EAGAIN;
1202 }
1203
1204 file = it->second;
1206 it->second = 0;
1207 }
1208 else
1209 {
1210 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1211 }
1212 }
1213
1214 if (file)
1215 {
1217 }
1218
1219 std::string i_name = f_name + Info::s_infoExtension;
1220
1221 // Unlink file & cinfo
1222 int f_ret = m_oss->Unlink(f_name.c_str());
1223 int i_ret = m_oss->Unlink(i_name.c_str());
1224
1225 TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1226
1227 {
1228 XrdSysCondVarHelper lock(&m_active_cond);
1229
1230 m_active.erase(it);
1231 }
1232
1233 return std::min(f_ret, i_ret);
1234}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
void * ProcessWriteTaskThread(void *)
Definition XrdPfc.cc:68
void * ResourceMonitorHeartBeatThread(void *)
Definition XrdPfc.cc:56
XrdSysXAttr * XrdSysXAttrActive
void * PrefetchThread(void *)
Definition XrdPfc.cc:74
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:84
void * PurgeThread(void *)
Definition XrdPfc.cc:62
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
XrdOucCache(const char *ctype)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition XrdOucEnv.cc:298
int get_size() const
long long m_offset
File * get_file() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:928
void FileSyncDone(File *, bool high_debug)
Definition XrdPfc.cc:568
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:415
static const Configuration & Conf()
Definition XrdPfc.cc:165
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition XrdPfc.cc:796
virtual int Stat(const char *url, struct stat &sbuff)
Definition XrdPfc.cc:1113
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:319
void Purge()
Thread function invoked to scan and purge files from disk when needed.
void ReleaseRAM(char *buf, long long size)
Definition XrdPfc.cc:397
virtual int ConsiderCached(const char *url)
Definition XrdPfc.cc:1002
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:163
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
void DeRegisterPrefetchFile(File *)
Definition XrdPfc.cc:713
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition XrdPfc.cc:697
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
Definition XrdPfc.cc:913
void Prefetch()
Definition XrdPfc.cc:754
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:497
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition XrdPfc.cc:256
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:188
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:167
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1179
static XrdScheduler * schedP
Definition XrdPfc.hh:408
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:684
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:735
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition XrdPfc.cc:308
virtual int Unlink(const char *url)
Definition XrdPfc.cc:1169
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:275
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
Definition XrdPfc.cc:214
static const Cache & TheOne()
Definition XrdPfc.cc:164
char * RequestRAM(long long size)
Definition XrdPfc.cc:357
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition XrdPfc.cc:1062
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:969
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition XrdPfc.cc:156
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
int GetNBlocks() const
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
long long GetPrefetchedBytes() const
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
long long GetFileSize()
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void initiate_emergency_shutdown()
int inc_ref_cnt()
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
void RemoveIO(IO *io)
Stats DeltaStatsFromLastCall()
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
int m_NCksumErrors
number of checksum errors while getting data from remote
long long m_BytesWritten
number of bytes written to disk
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
XrdPosixStats Stats
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:88
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:87
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition XrdPfc.cc:135
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:86
long long BytesHit
read from cache
Definition XrdPfcInfo.hh:68
long long BytesBypassed
read from remote and dropped
Definition XrdPfcInfo.hh:70
time_t DetachTime
close time
Definition XrdPfcInfo.hh:63
long long BytesMissed
read from remote and cached
Definition XrdPfcInfo.hh:69
time_t AttachTime
open time
Definition XrdPfcInfo.hh:62