XRootD
Loading...
Searching...
No Matches
XrdPfcResourceMonitor.cc
Go to the documentation of this file.
2#include "XrdPfc.hh"
5#include "XrdPfcDirState.hh"
8#include "XrdPfcTrace.hh"
9#include "XrdPfcPurgePin.hh"
10
11#include "XrdOss/XrdOss.hh"
12
13#include <algorithm>
14
15// #define RM_DEBUG
16#ifdef RM_DEBUG
17#define dprintf(...) printf(__VA_ARGS__)
18#else
19#define dprintf(...) (void(0))
20#endif
21
22using namespace XrdPfc;
23
24namespace
25{
26 XrdSysTrace* GetTrace() { return Cache::GetInstance().GetTrace(); }
27 const char *m_traceID = "ResourceMonitor";
28}
29
30//------------------------------------------------------------------------------
31
33 m_fs_state(* new DataFsState),
34 m_oss(oss)
35{}
36
38{
39 delete &m_fs_state;
40}
41
42//------------------------------------------------------------------------------
43// Initial scan
44//------------------------------------------------------------------------------
45
47{
48 m_dir_scan_mutex.Lock();
49 if (m_dir_scan_in_progress) {
50 m_dir_scan_open_requests.push_back({lfn, cond});
51 LfnCondRecord &lcr = m_dir_scan_open_requests.back();
52 cond.Lock();
53 m_dir_scan_mutex.UnLock();
54 while ( ! lcr.f_checked)
55 cond.Wait();
56 cond.UnLock();
57 } else {
58 m_dir_scan_mutex.UnLock();
59 }
60}
61
62void ResourceMonitor::process_inter_dir_scan_open_requests(FsTraversal &fst)
63{
64 m_dir_scan_mutex.Lock();
65 while ( ! m_dir_scan_open_requests.empty())
66 {
67 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
68 m_dir_scan_mutex.UnLock();
69
70 cross_check_or_process_oob_lfn(lcr.f_lfn, fst);
71 lcr.f_cond.Lock();
72 lcr.f_checked = true;
73 lcr.f_cond.Signal();
74 lcr.f_cond.UnLock();
75
76 m_dir_scan_mutex.Lock();
77 m_dir_scan_open_requests.pop_front();
78 }
79 m_dir_scan_mutex.UnLock();
80}
81
82void ResourceMonitor::cross_check_or_process_oob_lfn(const std::string &lfn, FsTraversal &fst)
83{
84 // Check if lfn has already been processed ... or process it now and mark
85 // the DirState accordingly (partially processed oob).
86 static const char *trc_pfx = "cross_check_or_process_oob_lfn() ";
87
88 DirState *last_existing_ds = nullptr;
89 DirState *ds = m_fs_state.find_dirstate_for_lfn(lfn, &last_existing_ds);
90 if (ds->m_scanned)
91 return;
92
93 size_t pos = lfn.find_last_of("/");
94 std::string dir = (pos == std::string::npos) ? "" : lfn.substr(0, pos);
95
96 XrdOssDF *dhp = m_oss.newDir(trc_pfx);
97 if (dhp->Opendir(dir.c_str(), fst.default_env()) == XrdOssOK)
98 {
99 fst.slurp_dir_ll(*dhp, ds->m_depth, dir.c_str(), trc_pfx);
100
101 // XXXX clone of function below .... move somewhere? Esp. removal of non-paired files?
102 DirUsage &here = ds->m_here_usage;
103 for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
104 {
105 if (it->second.has_data && it->second.has_cinfo) {
106 here.m_StBlocks += it->second.stat_data.st_blocks;
107 here.m_NFiles += 1;
108 }
109 }
110 }
111 delete dhp;
112 ds->m_scanned = true;
113}
114
116{
117 dprintf("In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
118 fst.m_current_path.c_str(),
119 (int)fst.m_current_dirs.size(), (int)fst.m_current_files.size());
120
121 // Breadth first, accumulate into "here", unless it was already scanned via an
122 // OOB open file request.
123 if ( ! fst.m_dir_state->m_scanned)
124 {
125 DirUsage &here = fst.m_dir_state->m_here_usage;
126 for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
127 {
128 dprintf("would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
129 it->first.c_str(), it->second.has_data, it->second.has_cinfo);
130
131 // XXX Make some of these optional?
132 // Remove files that do not have both cinfo and data?
133 // Remove empty directories before even descending?
134 // Leave this for some consistency pass?
135 // Note that FsTraversal supports ignored paths ... some details (config, N2N) to be clarified.
136
137 if (it->second.has_data && it->second.has_cinfo) {
138 here.m_StBlocks += it->second.stat_data.st_blocks;
139 here.m_NFiles += 1;
140 }
141 }
142 fst.m_dir_state->m_scanned = true;
143 }
144
145 // Swap-out directories as inter_dir_scan can use the FsTraversal.
146 std::vector<std::string> dirs;
147 dirs.swap(fst.m_current_dirs);
148
149 if (++m_dir_scan_check_counter >= 100)
150 {
151 process_inter_dir_scan_open_requests(fst);
152 m_dir_scan_check_counter = 0;
153 }
154
155 // Descend into sub-dirs, do not accumulate into recursive_subdir_usage yet. This is done
156 // in a separate pass to allow for proper accounting of files being opened during the initial scan.
157 for (auto &dname : dirs)
158 {
159 if (fst.cd_down(dname))
160 {
162 fst.cd_up();
163 }
164 // XXX else try to remove it?
165 }
166}
167
169{
170 // Called after PFC configuration is complete, but before full startup of the daemon.
171 // Base line usages are accumulated as part of the file-system, traversal.
172
174
175 DirState *root_ds = m_fs_state.get_root();
176 FsTraversal fst(m_oss);
177 fst.m_protected_top_dirs.insert("pfc-stats"); // XXXX This should come from config. Also: N2N?
178
179 if ( ! fst.begin_traversal(root_ds, "/"))
180 return false;
181
182 {
183 XrdSysMutexHelper _lock(m_dir_scan_mutex);
184 m_dir_scan_in_progress = true;
185 m_dir_scan_check_counter = 0; // recheck oob file-open requests periodically.
186 }
187
189
190 fst.end_traversal();
191
192 // We have all directories scanned, available in DirState tree, let all remaining files go
193 // and then we shall do the upward propagation of usages.
194 {
195 XrdSysMutexHelper _lock(m_dir_scan_mutex);
196 m_dir_scan_in_progress = false;
197 m_dir_scan_check_counter = 0;
198
199 while ( ! m_dir_scan_open_requests.empty())
200 {
201 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
202 lcr.f_cond.Lock();
203 lcr.f_checked = true;
204 lcr.f_cond.Signal();
205 lcr.f_cond.UnLock();
206
207 m_dir_scan_open_requests.pop_front();
208 }
209 }
210
211 // Do upward propagation of usages.
213 m_current_usage_in_st_blocks = root_ds->m_here_usage.m_StBlocks +
216
217 return true;
218}
219
220//------------------------------------------------------------------------------
221// Processing of queues
222//------------------------------------------------------------------------------
223
225{
226 static const char *trc_pfx = "process_queues() ";
227
228 // Assure that we pick up only entries that are present now.
229 // We really want all open records to be processed before file-stats updates
230 // and all those before the close records.
231 // Purges are sort of tangential as they really just modify bytes / number
232 // of files in a directory and do not deal with any persistent file id tokens.
233
234 int n_records = 0;
235 {
236 XrdSysMutexHelper _lock(&m_queue_mutex);
237 n_records += m_file_open_q.swap_queues();
238 n_records += m_file_update_stats_q.swap_queues();
239 n_records += m_file_close_q.swap_queues();
240 n_records += m_file_purge_q1.swap_queues();
241 n_records += m_file_purge_q2.swap_queues();
242 n_records += m_file_purge_q3.swap_queues();
243 ++m_queue_swap_u1;
244 }
245
246 for (auto &i : m_file_open_q.read_queue())
247 {
248 // i.id: LFN, i.record: OpenRecord
249 AccessToken &at = token(i.id);
250 dprintf("process file open for token %d, time %ld -- %s\n",
251 i.id, i.record.m_open_time, at.m_filename.c_str());
252
253 // Resolve fname into DirState.
254 // We could clear the filename after this ... or keep it, should we need it later on.
255 // For now it is just used for debug printouts.
256 DirState *last_existing_ds = nullptr;
257 DirState *ds = m_fs_state.find_dirstate_for_lfn(at.m_filename, &last_existing_ds);
258 at.m_dir_state = ds;
260
261 // If this is a new file figure out how many new parent dirs got created along the way.
262 if ( ! i.record.m_existing_file) {
264 DirState *pp = ds;
265 while (pp != last_existing_ds) {
266 pp = pp->get_parent();
268 }
269 }
270
271 ds->m_here_usage.m_LastOpenTime = i.record.m_open_time;
272 }
273
274 for (auto &i : m_file_update_stats_q.read_queue())
275 {
276 // i.id: token, i.record: Stats
277 AccessToken &at = token(i.id);
278 // Stats
279 DirState *ds = at.m_dir_state;
280 dprintf("process file update for token %d, %p -- %s\n",
281 i.id, ds, at.m_filename.c_str());
282
283 ds->m_here_stats.AddUp(i.record);
284 m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
285 }
286
287 for (auto &i : m_file_close_q.read_queue())
288 {
289 // i.id: token, i.record: CloseRecord
290 AccessToken &at = token(i.id);
291 dprintf("process file close for token %d, time %ld -- %s\n",
292 i.id, i.record.m_close_time, at.m_filename.c_str());
293
294 DirState *ds = at.m_dir_state;
296 ds->m_here_usage.m_LastCloseTime = i.record.m_close_time;
297
298 at.clear();
299 }
300 { // Release the AccessToken slots under lock.
301 XrdSysMutexHelper _lock(&m_queue_mutex);
302 for (auto &i : m_file_close_q.read_queue())
303 m_access_tokens_free_slots.push_back(i.id);
304 }
305
306 for (auto &i : m_file_purge_q1.read_queue())
307 {
308 // i.id: DirState*, i.record: PurgeRecord
309 DirState *ds = i.id;
310 ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
311 ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
312 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
313 }
314 for (auto &i : m_file_purge_q2.read_queue())
315 {
316 // i.id: directory-path, i.record: PurgeRecord
317 DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, false, false);
318 if ( ! ds) {
319 TRACE(Error, trc_pfx << "DirState not found for directory path '" << i.id << "'.");
320 // find_path can return the last dir found ... but this clearly isn't a valid purge record.
321 continue;
322 }
323 ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
324 ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
325 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
326 }
327 for (auto &i : m_file_purge_q3.read_queue())
328 {
329 // i.id: LFN, i.record: size of file in st_blocks
330 DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, true, false);
331 if ( ! ds) {
332 TRACE(Error, trc_pfx << "DirState not found for LFN path '" << i.id << "'.");
333 continue;
334 }
335 ds->m_here_stats.m_StBlocksRemoved += i.record;
337 m_current_usage_in_st_blocks -= i.record;
338 }
339
340 // Read queues / vectors are cleared at swap time.
341 // We might consider reducing their capacity by half if, say, their usage is below 25%.
342
343 return n_records;
344}
345
346//------------------------------------------------------------------------------
347// Heart beat
348//------------------------------------------------------------------------------
349
351{
352 static const char *tpfx = "heart_beat() ";
353
354 const Configuration &conf = Cache::Conf();
355
356 const int s_queue_proc_interval = 10;
357 const int s_sshot_report_interval = conf.m_dirStatsInterval; // 1, 5, 10, 15, 30 or 60 minutes
358 const int s_purge_check_interval = 60;
359 const int s_purge_report_interval = conf.m_purgeInterval;
360 const int s_purge_cold_files_interval = conf.m_purgeInterval * conf.m_purgeAgeBasedPeriod;
361
362 // initial scan performed as part of config
363
364 time_t now = time(0);
365 time_t next_queue_proc_time = now + s_queue_proc_interval;
366 time_t next_sshot_report_time = (now / 60) * 60 + 60; // at next full minute
367 time_t next_purge_check_time = now + s_purge_check_interval;
368 time_t next_purge_report_time = now + s_purge_report_interval;
369 time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
370
371 while (true)
372 {
373 time_t start = time(0);
374 time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
375 next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
376
377 if (next_event > start)
378 {
379 unsigned int t_sleep = next_event - start;
380 TRACE(Debug, tpfx << "sleeping for " << t_sleep << " seconds until the next beat.");
381 sleep(t_sleep);
382 }
383
384 // Check if purge has been running and has completed yet.
385 // For now this is only used to prevent removal of empty leaf directories
386 // during stat propagation so we do not need to wait for the condition in
387 // the above sleep.
392 }
393 }
394
395 time_t queue_swap_time = time(0);
396
397 // Always process the queues.
398 int n_processed = process_queues();
399 next_queue_proc_time = queue_swap_time + s_queue_proc_interval;
400 TRACE(Debug, tpfx << "process_queues -- n_records=" << n_processed);
401
402 // Always update basic info on m_fs_state (space, usage, file_usage).
404
405 now = time(0);
406
407 // Make planning for fs_state_update, sshot dump and purge task.
408 // Second two require the first, so figure out what is going to happen.
409 bool do_sshot_report = next_sshot_report_time <= now;
410 bool do_purge_check = next_purge_check_time <= now;
411 bool do_purge_report = next_purge_report_time <= now;
412 bool do_purge_cold_files = next_purge_cold_files_time <= now;
413
414 // Update stats in usages if any secondary activity will happen.
415 if (do_sshot_report || do_purge_check || do_purge_report || do_purge_cold_files)
416 {
417 unlink_func unlink_foo = [&](const std::string &dp)->int {
418 int ret = m_oss.Unlink(dp.c_str());
419 if (ret != 0) {
420 TRACE(Info, tpfx << "Empty dir unlink error: " << ret << " at " << dp);
421 } else {
422 TRACE(Debug, tpfx << "Empty dir unlink success: " << dp);
423 }
424 return ret;
425 };
426
427 // Potentially prune the empty leaf dirs even less frequently, once per hour, maybe?
428 bool purge_leaf_dirs = do_sshot_report && ! m_purge_task_active;
429 m_fs_state.update_stats_and_usages(queue_swap_time, purge_leaf_dirs, unlink_foo);
430
431 // This reporting into log/stdout is to be removed.
432 // Meaning of conf.is_dir_stat_reporting_on() etc is to be clarified / improved.
433 if (do_sshot_report && conf.is_dir_stat_reporting_on())
434 {
435 const int store_depth = conf.m_dirStatsStoreDepth;
436 #ifdef RM_DEBUG
437 const DirState &root_ds = *m_fs_state.get_root();
438 dprintf("Snapshot n_dirs=%d, total n_dirs=%d\n", root_ds.count_dirs_to_level(store_depth),
440 #endif
441 m_fs_state.dump_recursively(store_depth);
442 }
443
444 m_fs_state.reset_stats(queue_swap_time);
445 }
446
447 if (do_sshot_report)
448 {
449 // Sshot reports are equidistant, at "full" reporting interval.
450 next_sshot_report_time = ((now + 1) / s_sshot_report_interval) * s_sshot_report_interval + s_sshot_report_interval;
451
452 // This should dump out binary snapshot into /pfc-stats/, if so configured.
453
454 // json dump to std::out for debug purpose
455 DataFsSnapshot ss(m_fs_state, m_fs_state.m_sshot_stats_reset_time);
456 const DirState &root_ds = *m_fs_state.get_root();
457 const int store_depth = conf.m_dirStatsStoreDepth;
458 const int n_sshot_dirs = root_ds.count_dirs_to_level(store_depth);
459 ss.m_dir_states.reserve(n_sshot_dirs);
460 ss.m_dir_states.emplace_back( DirStateElement(root_ds, -1) );
461 fill_sshot_vec_children(root_ds, 0, ss.m_dir_states, store_depth);
462
463 // This should really be export to a file (preferably binary, but then bin->json command is needed, too).
464 // ss.dump();
465
466 const char* dumpfile = "/pfc-stats/DirStat.json";
467 ss.write_json_file(dumpfile, m_oss, false);
468 m_fs_state.reset_sshot_stats(queue_swap_time);
469 }
470
471 if (do_purge_check || do_purge_report || do_purge_cold_files)
472 {
473 perform_purge_check(do_purge_cold_files, do_purge_report ? TRACE_Info : TRACE_Debug);
474
475 next_purge_check_time = now + s_purge_check_interval;
476 if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
477 if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
478 }
479
480 } // end while forever
481}
482
483//------------------------------------------------------------------------------
484// DirState export helpers
485//------------------------------------------------------------------------------
486
488 int parent_idx,
489 std::vector<DirStateElement> &vec,
490 int max_depth)
491{
492 int pos = vec.size();
493 int n_children = parent_ds.m_subdirs.size();
494
495 DirStateElement &parent_dse = vec[parent_idx];
496 parent_dse.m_daughters_begin = pos;
497 parent_dse.m_daughters_end = pos + n_children;
498
499 if (n_children == 0) return;
500
501 for (auto const & [name, child] : parent_ds.m_subdirs)
502 {
503 vec.emplace_back( DirStateElement(child, parent_idx) );
504 }
505
506 if (parent_ds.m_depth < max_depth)
507 {
508 for (auto const & [name, child] : parent_ds.m_subdirs)
509 {
510 fill_sshot_vec_children(child, pos, vec, max_depth);
511 ++pos;
512 }
513 }
514}
515
517 int parent_idx,
518 std::vector<DirPurgeElement> &vec,
519 int max_depth)
520{
521 int pos = vec.size();
522 int n_children = parent_ds.m_subdirs.size();
523
524 DirPurgeElement &parent_dpe = vec[parent_idx];
525 parent_dpe.m_daughters_begin = pos;
526 parent_dpe.m_daughters_end = pos + n_children;
527
528 if (n_children == 0) return;
529
530 for (auto const & [name, child] : parent_ds.m_subdirs)
531 {
532 vec.emplace_back( DirPurgeElement(child, child.m_here_usage, child.m_recursive_subdir_usage, parent_idx) );
533 }
534
535 if (parent_ds.m_depth < max_depth)
536 {
537 for (auto const & [name, child] : parent_ds.m_subdirs)
538 {
539 fill_pshot_vec_children(child, pos, vec, max_depth);
540 ++pos;
541 }
542 }
543}
544
545//------------------------------------------------------------------------------
546// Purge helpers, drivers, etc.
547//------------------------------------------------------------------------------
548
550{
551 static const char *trc_pfx = "update_vs_and_file_usage_info() ";
552
553 const auto &conf = Cache::Conf();
554 XrdOssVSInfo vsi;
555
556 // StatVS error (after it succeeded in config) implies a memory corruption (according to Mr. H).
557 if (m_oss.StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
558 TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_data_space << "'. This is a fatal error.");
559 _exit(1);
560 }
561 m_fs_state.m_disk_total = vsi.Total;
562 m_fs_state.m_disk_used = vsi.Total - vsi.Free;
563 m_fs_state.m_file_usage = 512ll * m_current_usage_in_st_blocks;
564 if (m_oss.StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
565 TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_meta_space << "'. This is a fatal error.");
566 _exit(1);
567 }
568 m_fs_state.m_meta_total = vsi.Total;
569 m_fs_state.m_meta_used = vsi.Total - vsi.Free;
570}
571
572long long ResourceMonitor::get_file_usage_bytes_to_remove(const DataFsPurgeshot &ps, long long write_estimate, int tl)
573{
574 // short names from config values
575 const Configuration &conf = Cache::Conf();
576 long long f0 = conf.m_fileUsageBaseline;
577 long long f1 = conf.m_fileUsageNominal;
578 long long f2 = conf.m_fileUsageMax;
579 long long w1 = conf.m_diskUsageLWM;
580 long long w2 = conf.m_diskUsageHWM;
581
582 // get usage from purge snapshot
583 long long T = ps.m_disk_total;
584 long long x = ps.m_file_usage;
585 long long u = ps.m_disk_used;
586
587 // get file usage increase from the previous time interval check
588 long long delta = write_estimate;
589 TRACE_INT(tl, "file usage increased since the previous purge interval in bytes: " << delta );
590
591 long long bytes_to_remove = 0;
592
593 // helper lambda function
594 auto clamp = [&x, &bytes_to_remove](long long lowval, long long highval)
595 {
596 long long val = x;
597 long long newval = val - bytes_to_remove;
598
599 // removed too much
600 if (newval < lowval)
601 {
602 return lowval - val;
603 }
604
605 // removed too little
606 if (newval > highval)
607 {
608 return val - highval;
609 }
610 // keep the original value
611 return bytes_to_remove;
612 };
613
614 // under file quota, nothing to do
615 if (x < f0)
616 return 0;
617
618 // total disk usage exceeds highWatermark
619 if (u >= w2)
620 {
621 TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark " << conf.m_diskUsageHWM);
622 float frac_u = static_cast<float>(u - w2) / (T - w2);
623 float frac_x = static_cast<float>(x - f0) / (f1 - f0);
624
625 if (w2 == T)
626 {
627 bytes_to_remove = u -w1;
628 }
629 else
630 {
631 if (frac_x > frac_u)
632 {
633 // the cache is the reason for going out of w2 range
634 bytes_to_remove = (frac_x - frac_u) * (f1 - f0);
635 bytes_to_remove += delta;
636 bytes_to_remove = clamp(f0, f1);
637 }
638 else
639 {
640 // someone else is filling disk space, go to f1
641 bytes_to_remove = clamp(f0, f2);
642 }
643 return bytes_to_remove;
644 }
645 }
646
647 // file quota and total disk usage is within normal range, check if this space usage is
648 // proportinal to disk usage and correct it
649 if (u > w1 && x > f1)
650 {
651 float frac_u = static_cast<float>(u - w1) / (w2 - w1);
652 float frac_x = static_cast<float>(x - f1) / (f2 - f1);
653 if (frac_x > frac_u)
654 {
655 TRACE_INT(tl, "Disproportional file quota usage comapared to disc usage (frac_x/frac_u) = " << frac_x << "/"<< frac_u);
656 bytes_to_remove = (frac_x - frac_u) * (f2 - f1);
657 bytes_to_remove += delta;
658 }
659
660 // check the new x val will not be below f0
661 bytes_to_remove = clamp(f0, f2);
662 return bytes_to_remove;
663 }
664
665 // final check: disk useage is lower that w1, check if exceed the max file usage f2
666 if (x > f2)
667 {
668 // drop usage to f2
669 // compare with global disk usage in the previous purge cycle (default 300s)
670 // check delta is not overflowing f2, else set numver of bytes to remove according remove to f0
671
672 TRACE_INT(tl, "File usage exceeds maxim file usage. Total disk usage is under lowWatermark. Clearing to low file usage.");
673 long long f2delta = std::max(f2 - delta, f0);
674 bytes_to_remove = clamp(f0, f2delta);
675 return bytes_to_remove;
676 }
677
678 return bytes_to_remove;
679}
680
681void ResourceMonitor::perform_purge_check(bool purge_cold_files, int tl)
682{
683 static const char *trc_pfx = "perform_purge_check() ";
684 const Configuration &conf = Cache::Conf();
685
686 std::unique_ptr<DataFsPurgeshot> psp( new DataFsPurgeshot(m_fs_state) );
687 DataFsPurgeshot &ps = *psp;
688
689 ps.m_file_usage = 512ll * m_current_usage_in_st_blocks;
690 // These are potentially wrong as cache might be writing over preallocated byte ranges.
692 // Can have another estimate based on eiter writes or st-blocks from purge-stats, once we have them.
693
694 TRACE_INT(tl, trc_pfx << "Purge check:");
695
696 ps.m_bytes_to_remove = 0;
697 if (conf.are_file_usage_limits_set())
698 {
699 ps.m_bytes_to_remove = get_file_usage_bytes_to_remove(ps, ps.m_estimated_writes_from_writeq, tl);
700 }
701 else
702 {
703 if (ps.m_disk_used > conf.m_diskUsageHWM)
704 {
705 TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark.");
707 }
708 }
709
711
712 // Purge precheck -- check if age-based purge is required
713 // We ignore uvkeep time, it requires reading of cinfo files and it is enforced in File::Open() anyway.
714
715 if (purge_cold_files && conf.is_age_based_purge_in_effect()) // || conf.is_uvkeep_purge_in_effect())
716 {
717 ps.m_age_based_purge = true;
718 }
719
720 TRACE_INT(tl, "\tbytes_to_remove = " << ps.m_bytes_to_remove << " B");
721 TRACE_INT(tl, "\tspace_based_purge = " << ps.m_space_based_purge);
722 TRACE_INT(tl, "\tage_based_purge = " << ps.m_age_based_purge);
723
724 bool periodic = Cache::GetInstance().GetPurgePin() ?
726
727 if ( ! ps.m_space_based_purge && ! ps.m_age_based_purge && !periodic ) {
728 TRACE(Info, trc_pfx << "purge not required.");
730 return;
731 }
733 TRACE(Warning, trc_pfx << "purge required but previous purge task is still active!");
734 return;
735 }
736
737 TRACE(Info, trc_pfx << "scheduling purge task.");
738
739 // At this point we have all the information: report, decide on action.
740 // There is still some missing infrastructure, especially as regards to purge-plugin:
741 // - at what point do we start bugging the pu-pin to start coughing up purge lists?
742 // - have a new parameter or just do it "one cycle before full"?
743 // - what if it doesn't -- when do we do the old-stlye scan & purge?
744 // - how do we do age-based purge and uvkeep purge?
745 // - they are really quite different -- and could run separately, registering
746 // files into a purge-candidate list. This has to be rechecked before the actual
747 // deletion -- eg, by comparing stat time of cinfo + doing the is-active / is-purge-protected.
748
749 const DirState &root_ds = *m_fs_state.get_root();
750 const int n_calc_dirs = 1 + root_ds.m_here_usage.m_NDirectories + root_ds.m_recursive_subdir_usage.m_NDirectories;
751#ifdef RM_DEBUG
752 const int n_pshot_dirs = root_ds.count_dirs_to_level(9999);
753 dprintf("purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
754#endif
755 ps.m_dir_vec.reserve(n_calc_dirs);
756 ps.m_dir_vec.emplace_back( DirPurgeElement(root_ds, root_ds.m_here_usage, root_ds.m_recursive_subdir_usage, -1) );
757 fill_pshot_vec_children(root_ds, 0, ps.m_dir_vec, 9999);
758
759 m_purge_task_active = true;
760
761 struct PurgeDriverJob : public XrdJob
762 {
763 DataFsPurgeshot *m_purge_shot_ptr;
764
765 PurgeDriverJob(DataFsPurgeshot *psp) :
766 XrdJob("XrdPfc::ResourceMonitor::PurgeDriver"),
767 m_purge_shot_ptr(psp)
768 {}
769
770 void DoIt() override
771 {
772 Cache::ResMon().perform_purge_task(*m_purge_shot_ptr);
774
775 delete m_purge_shot_ptr;
776 delete this;
777 }
778 };
779
780 Cache::schedP->Schedule( new PurgeDriverJob(psp.release()) );
781}
782
783namespace XrdPfc
784{
786}
787
789{
790 // BEWARE: Runs in a dedicated thread - is only to communicate back to the
791 // hear_beat() / data structs via the purge queues and condition variable.
792
793 // const char *tpfx = "perform_purge_task ";
794
795 {
797 m_purge_task_start = time(0);
798 }
799
800 // For now, fall back to the old purge ... to be improved with:
801 // - new scan, following the DataFsPurgeshot;
802 // - usage of cinfo stat mtime for time of last access (touch already done at output);
803 // - use DirState* to report back purged files.
804 // Already changed to report back purged files --- but using the string / path variant.
805 OldStylePurgeDriver(ps); // In XrdPfcPurge.cc
806}
807
809{
810 // Separated out so the purge_task can exit without post-checks.
811
812 {
814 m_purge_task_end = time(0);
816 m_purge_task_cond.Signal();
817 }
819}
820
821//==============================================================================
822// Main thread function, do initial test, then enter heart_beat().
823//==============================================================================
824
826{
827 // setup for in-scan -- this is called from initial setup.
828 MutexHolder _lck(m_dir_scan_mutex);
829 m_dir_scan_in_progress = true;
830}
831
833{
834 const char *tpfx = "main_thread_function ";
835 {
836 time_t is_start = time(0);
837 m_fs_state.init_stat_reset_times(is_start);
838 TRACE(Info, tpfx << "Stating initial directory scan.");
839
840 if ( ! perform_initial_scan()) {
841 TRACE(Error, tpfx << "Initial directory scan has failed. This is a terminal error, aborting.")
842 _exit(1);
843 }
844 // Reset of m_dir_scan_in_progress is done in perform_initial_scan()
845
846 time_t is_duration = time(0) - is_start;
847 TRACE(Info, tpfx << "Initial directory scan complete, duration=" << is_duration <<"s");
848
849 // run first process queues
850 int n_proc_is = process_queues();
851 TRACE(Info, tpfx << "First process_queues finished, n_records=" << n_proc_is);
852
853 // shrink queues if scan time was longer than 30s.
854 if (is_duration > 30 || n_proc_is > 3000)
855 {
856 m_file_open_q.shrink_read_queue();
857 m_file_update_stats_q.shrink_read_queue();
858 m_file_close_q.shrink_read_queue();
859 m_file_purge_q1.shrink_read_queue();
860 m_file_purge_q2.shrink_read_queue();
861 m_file_purge_q3.shrink_read_queue();
862 }
863 }
864 heart_beat();
865}
866
867//==============================================================================
868// Old prototype from Cache / Purge, now to go into heart_beat() here, above.
869//==============================================================================
870
872{
873 // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
874
875 // Pause before initial run
876 sleep(1);
877
878 // XXXX Setup initial / constant stats (total RAM, total disk, ???)
879
882
883 S.Lock();
884
886
888
889 S.UnLock();
890
891 // XXXX Schedule initial disk scan, time it!
892 //
893 // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
894 // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
895 //
896 // bool scan_and_purge_running = true;
897
898 // XXXX Could we really hold last-usage for all files in memory?
899
900 // XXXX Think how to handle disk-full, scan/purge not finishing:
901 // - start dropping things out of write queue, but only when RAM gets near full;
902 // - monitoring this then becomes a high-priority job, inner loop with sleep of,
903 // say, 5 or 10 seconds.
904
905 while (true)
906 {
907 time_t heartbeat_start = time(0);
908
909 // TRACE(Info, trc_pfx << "HeartBeat starting ...");
910
911 // if sumary monitoring configured, pupulate OucCacheStats:
912 S.Lock();
913
914 // - available / used disk space (files usage calculated elsewhere (maybe))
915
916 // - RAM usage
917 /* XXXX From Cache
918 { XrdSysMutexHelper lck(&m_RAM_mutex);
919 X.MemUsed = m_RAM_used;
920 X.MemWriteQ = m_RAM_write_queue;
921 }
922 */
923
924 // - files opened / closed etc
925
926 // do estimate of available space
927 S.UnLock();
928
929 // if needed, schedule purge in a different thread.
930 // purge is:
931 // - deep scan + gather FSPurgeState
932 // - actual purge
933 //
934 // this thread can continue running and, if needed, stop writing to disk
935 // if purge is taking too long.
936
937 // think how data is passed / synchronized between this and purge thread
938
939 // !!!! think how stat collection is done and propgated upwards;
940 // until now it was done once per purge-interval.
941 // now stats will be added up more often, but purge will be done
942 // only occasionally.
943 // also, do we report cumulative values or deltas? cumulative should
944 // be easier and consistent with summary data.
945 // still, some are state - like disk usage, num of files.
946
947 // Do we take care of directories that need to be newly added into DirState hierarchy?
948 // I.e., when user creates new directories and these are covered by either full
949 // spec or by root + depth declaration.
950
951 int heartbeat_duration = time(0) - heartbeat_start;
952
953 // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
954
955 // int sleep_time = m_fs_state..m_purgeInterval - heartbeat_duration;
956 int sleep_time = 60 - heartbeat_duration;
957 if (sleep_time > 0)
958 {
959 sleep(sleep_time);
960 }
961 }
962}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
static void child()
#define TRACE_Debug
#define TRACE_Info
#define XrdOssOK
Definition XrdOss.hh:50
#define dprintf(...)
void Proto_ResourceMonitorHeartBeat()
#define TRACE_INT(act, x)
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition XrdOss.hh:79
long long Total
Definition XrdOssVS.hh:90
long long Free
Definition XrdOssVS.hh:91
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
static const Configuration & Conf()
Definition XrdPfc.cc:134
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:283
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
void ClearPurgeProtectedSet()
Definition XrdPfc.cc:684
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
static XrdScheduler * schedP
Definition XrdPfc.hh:290
long long WritesSinceLastCall()
Definition XrdPfc.cc:320
PurgePin * GetPurgePin() const
Definition XrdPfc.hh:272
void AddUp(const DirStats &s)
long long m_StBlocksRemoved
std::vector< std::string > m_current_dirs
bool begin_traversal(DirState *root, const char *root_path)
std::set< std::string > m_protected_top_dirs
bool cd_down(const std::string &dir_name)
std::map< std::string, FilePairStat > m_current_files
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
virtual bool CallPeriodically()
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
void perform_purge_task(DataFsPurgeshot &ps)
void scan_dir_and_recurse(FsTraversal &fst)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
std::function< int(const std::string &)> unlink_func
void OldStylePurgeDriver(DataFsPurgeshot &ps)
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:109
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:106
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:93
bool are_file_usage_limits_set() const
Definition XrdPfc.hh:67
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:95
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:99
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:92
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition XrdPfc.hh:104
bool is_age_based_purge_in_effect() const
Definition XrdPfc.hh:68
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:97
bool is_dir_stat_reporting_on() const
Definition XrdPfc.hh:70
std::vector< DirPurgeElement > m_dir_vec
void write_json_file(const std::string &fname, XrdOss &oss, bool include_preamble)
std::vector< DirStateElement > m_dir_states
DirUsage m_recursive_subdir_usage
int count_dirs_to_level(int max_depth) const
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
void upward_propagate_initial_scan_usages()
DirState * get_parent()