XRootD
Loading...
Searching...
No Matches
XrdCl::ActionExecutor Class Reference

Executes an action registered in the csv file. More...

+ Collaboration diagram for XrdCl::ActionExecutor:

Public Member Functions

 ActionExecutor (File &file, const std::string &action, const std::string &args, const std::string &orgststr, const std::string &resp, const double &duration)
 
void Execute (std::shared_ptr< barrier_t > &ending, std::shared_ptr< barrier_t > &closing, ActionMetrics &metric, bool simulate)
 
std::string Name () const
 Get aciton name.
 
double NominalDuration () const
 Get nominal duration variable.
 

Detailed Description

Executes an action registered in the csv file.

Definition at line 314 of file XrdClReplay.cc.

Constructor & Destructor Documentation

◆ ActionExecutor()

XrdCl::ActionExecutor::ActionExecutor ( File & file,
const std::string & action,
const std::string & args,
const std::string & orgststr,
const std::string & resp,
const double & duration )
inline

Constructor

Parameters
file: the file that should be the context of the action
action: the action to be executed
args: arguments for the action
orgststr: original status
resp: original response
duration: nominal duration of this action

Definition at line 328 of file XrdClReplay.cc.

334 : file(file)
335 , action(action)
336 , args(args)
337 , orgststr(orgststr)
338 , nominalduration(duration)
339 {
340 }

Member Function Documentation

◆ Execute()

void XrdCl::ActionExecutor::Execute ( std::shared_ptr< barrier_t > & ending,
std::shared_ptr< barrier_t > & closing,
ActionMetrics & metric,
bool simulate )
inline

Execute the action

Parameters
ending: synchronization object for ending the execution

Definition at line 346 of file XrdClReplay.cc.

350 {
351 if (action == "Open") // open action
352 {
353 std::string url;
354 OpenFlags::Flags flags;
355 Access::Mode mode;
356 uint16_t timeout;
357 std::tie(url, flags, mode, timeout) = GetOpenArgs();
358
359 std::string lmetric;
360 if ((flags & OpenFlags::Update) || (flags & OpenFlags::Write))
361 {
362 metric.ios["OpenW::n"]++;
363 }
364 else
365 {
366 metric.ios["OpenR::n"]++;
367 }
368
369 metric.ios["Open::n"]++;
370
371 mytimer_t timer;
372
373 if (!simulate)
374 WaitFor(Open(file, url, flags, mode, timeout) >>
375 [orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
376 {
377 metric.addIos("Open", "e", HandleStatus(s, orgststr, "Open"));
378 metric.addDelays("Open", "tmeas", timer.elapsed());
379 ending.reset();
380 closing.reset();
381 });
382 else
383 {
384 ending.reset();
385 closing.reset();
386 }
387 }
388 else if (action == "Close") // close action
389 {
390 uint16_t timeout = GetCloseArgs();
391 mytimer_t timer;
392
393 if (closing)
394 {
395 auto& sem = closing->get();
396 closing.reset();
397 sem.Wait();
398 }
399
400 metric.ios["Close::n"]++;
401
402 if (!simulate)
403 Async(Close(file, timeout) >>
404 [orgststr{ orgststr }, ending, timer, &metric](XRootDStatus& s) mutable
405 {
406 metric.addIos("Close", "e", HandleStatus(s, orgststr, "Close"));
407 metric.addDelays("Close", "tmeas", timer.elapsed());
408 ending.reset();
409 });
410 else
411 {
412 ending.reset();
413 }
414 }
415 else if (action == "Stat") // stat action
416 {
417 bool force;
418 uint16_t timeout;
419 std::tie(force, timeout) = GetStatArgs();
420 metric.ios["Stat::n"]++;
421 mytimer_t timer;
422
423 if (!simulate)
424 Async(Stat(file, force, timeout) >>
425 [orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s, StatInfo& r) mutable
426 {
427 metric.addIos("Stat", "e", HandleStatus(s, orgststr, "Stat"));
428 metric.addDelays("Stat", "tmeas", timer.elapsed());
429 ending.reset();
430 closing.reset();
431 });
432 else
433 {
434 ending.reset();
435 closing.reset();
436 }
437 }
438 else if (action == "Read") // read action
439 {
440 uint64_t offset;
441 buffer_t buffer;
442 uint16_t timeout;
443 std::tie(offset, buffer, timeout) = GetReadArgs();
444 metric.ios["Read::n"]++;
445 metric.ios["Read::b"] += buffer->size();
446 if ((offset + buffer->size()) > metric.ios["Read::o"])
447 metric.ios["Read::o"] = offset + buffer->size();
448
449 mytimer_t timer;
450 if (!simulate)
451 Async(Read(file, offset, buffer->size(), buffer->data(), timeout) >>
452 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
453 ChunkInfo& r) mutable
454 {
455 metric.addIos("Read", "e", HandleStatus(s, orgststr, "Read"));
456 metric.addDelays("Read", "tmeas", timer.elapsed());
457 buffer.reset();
458 ending.reset();
459 closing.reset();
460 });
461 else
462 {
463 buffer.reset();
464 ending.reset();
465 closing.reset();
466 }
467 }
468 else if (action == "PgRead") // pgread action
469 {
470 uint64_t offset;
471 buffer_t buffer;
472 uint16_t timeout;
473 std::tie(offset, buffer, timeout) = GetPgReadArgs();
474 metric.ios["PgRead::n"]++;
475 metric.ios["PgRead::b"] += buffer->size();
476 if ((offset + buffer->size()) > metric.ios["Read::o"])
477 metric.ios["Read::o"] = offset + buffer->size();
478 mytimer_t timer;
479 if (!simulate)
480 Async(PgRead(file, offset, buffer->size(), buffer->data(), timeout) >>
481 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
482 PageInfo& r) mutable
483 {
484 metric.addIos("PgRead", "e", HandleStatus(s, orgststr, "PgRead"));
485 metric.addDelays("PgRead", "tmeas", timer.elapsed());
486 buffer.reset();
487 ending.reset();
488 closing.reset();
489 });
490 else
491 {
492 buffer.reset();
493 ending.reset();
494 closing.reset();
495 }
496 }
497 else if (action == "Write") // write action
498 {
499 uint64_t offset;
500 buffer_t buffer;
501 uint16_t timeout;
502 std::tie(offset, buffer, timeout) = GetWriteArgs();
503 metric.ios["Write::n"]++;
504 metric.ios["Write::b"] += buffer->size();
505 if ((offset + buffer->size()) > metric.ios["Write::o"])
506 metric.ios["Write::o"] = offset + buffer->size();
507 mytimer_t timer;
508
509 if (!simulate)
510 Async(
511 Write(file, offset, buffer->size(), buffer->data(), timeout) >>
512 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
513 {
514 metric.addIos("Write", "e", HandleStatus(s, orgststr, "Write"));
515 metric.addDelays("Write", "tmeas", timer.elapsed());
516 buffer.reset();
517 ending.reset();
518 closing.reset();
519 });
520 else
521 {
522 buffer.reset();
523 ending.reset();
524 closing.reset();
525 }
526 }
527 else if (action == "PgWrite") // pgwrite action
528 {
529 uint64_t offset;
530 buffer_t buffer;
531 uint16_t timeout;
532 std::tie(offset, buffer, timeout) = GetPgWriteArgs();
533 metric.ios["PgWrite::n"]++;
534 metric.ios["PgWrite::b"] += buffer->size();
535 if ((offset + buffer->size()) > metric.ios["Write::o"])
536 metric.ios["Write::o"] = offset + buffer->size();
537 mytimer_t timer;
538 if (!simulate)
539 Async(
540 PgWrite(file, offset, buffer->size(), buffer->data(), timeout) >>
541 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
542 {
543 metric.addIos("PgWrite", "e", HandleStatus(s, orgststr, "PgWrite"));
544 metric.addDelays("PgWrite", "tmeas", timer.elapsed());
545 buffer.reset();
546 ending.reset();
547 closing.reset();
548 });
549 else
550 {
551 buffer.reset();
552 ending.reset();
553 closing.reset();
554 }
555 }
556 else if (action == "Sync") // sync action
557 {
558 uint16_t timeout = GetSyncArgs();
559 metric.ios["Sync::n"]++;
560 mytimer_t timer;
561 if (!simulate)
562 Async(Sync(file, timeout) >>
563 [orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
564 {
565 metric.addIos("Sync", "e", HandleStatus(s, orgststr, "Sync"));
566 metric.addDelays("Sync", "tmeas", timer.elapsed());
567 ending.reset();
568 closing.reset();
569 });
570 else
571 {
572 ending.reset();
573 closing.reset();
574 }
575 }
576 else if (action == "Truncate") // truncate action
577 {
578 uint64_t size;
579 uint16_t timeout;
580 std::tie(size, timeout) = GetTruncateArgs();
581 metric.ios["Truncate::n"]++;
582 if (size > metric.ios["Truncate::o"])
583 metric.ios["Truncate::o"] = size;
584
585 mytimer_t timer;
586 if (!simulate)
587 Async(Truncate(file, size, timeout) >>
588 [orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
589 {
590 metric.addIos("Truncate", "e", HandleStatus(s, orgststr, "Truncate"));
591 metric.addDelays("Truncate", "tmeas", timer.elapsed());
592 ending.reset();
593 closing.reset();
594 });
595 else
596 {
597 ending.reset();
598 closing.reset();
599 }
600 }
601 else if (action == "VectorRead") // vector read action
602 {
603 ChunkList chunks;
604 uint16_t timeout;
605 std::vector<buffer_t> buffers;
606 std::tie(chunks, timeout, buffers) = GetVectorReadArgs();
607 metric.ios["VectorRead::n"]++;
608 for (auto& ch : chunks)
609 {
610 metric.ios["VectorRead::b"] += ch.GetLength();
611 if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Read::o"])
612 metric.ios["Read::o"] = ch.GetOffset() + ch.GetLength();
613 }
614
615 mytimer_t timer;
616 if (!simulate)
617 Async(
618 VectorRead(file, chunks, timeout) >>
619 [orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s, VectorReadInfo& r) mutable
620 {
621 metric.addIos("VectorRead", "e", HandleStatus(s, orgststr, "VectorRead"));
622 metric.addDelays("VectorRead", "tmeas", timer.elapsed());
623 buffers.clear();
624 ending.reset();
625 closing.reset();
626 });
627 else
628 {
629 buffers.clear();
630 ending.reset();
631 closing.reset();
632 }
633 }
634 else if (action == "VectorWrite") // vector write
635 {
636 ChunkList chunks;
637 uint16_t timeout;
638 std::vector<buffer_t> buffers;
639 std::tie(chunks, timeout, buffers) = GetVectorWriteArgs();
640 metric.ios["VectorWrite::n"]++;
641 for (auto& ch : chunks)
642 {
643 metric.ios["VectorWrite::b"] += ch.GetLength();
644 if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Write::o"])
645 metric.ios["Write::o"] = ch.GetOffset() + ch.GetLength();
646 }
647 mytimer_t timer;
648 if (!simulate)
649 Async(VectorWrite(file, chunks, timeout) >>
650 [orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s) mutable
651 {
652 metric.addIos("VectorWrite", "e", HandleStatus(s, orgststr, "VectorWrite"));
653 metric.addDelays("VectorWrite", "tmeas", timer.elapsed());
654 buffers.clear();
655 ending.reset();
656 closing.reset();
657 });
658 else
659 {
660 buffers.clear();
661 ending.reset();
662 closing.reset();
663 }
664 }
665 else
666 {
667 DefaultEnv::GetLog()->Warning(AppMsg, "Cannot replyt %s action.", action.c_str());
668 }
669 }
struct stat Stat
Definition XrdCks.cc:49
static Log * GetLog()
Get default log.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
VectorWriteImpl< false > VectorWrite(Ctx< File > file, Arg< ChunkList > chunks, uint16_t timeout=0)
Factory for creating VectorWriteImpl objects.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint64_t AppMsg
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
TruncateImpl< false > Truncate(Ctx< File > file, Arg< uint64_t > size, uint16_t timeout)
PgReadImpl< false > PgRead(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
PgWriteImpl< false > PgWrite(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, Arg< std::vector< uint32_t > > cksums, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
Flags
Open flags, may be or'd when appropriate.
@ Write
Open only for writing.
@ Update
Open for reading and writing.

References XrdCl::ActionMetrics::addDelays(), XrdCl::ActionMetrics::addIos(), XrdCl::AppMsg, XrdCl::Async(), XrdCl::Close(), XrdCl::mytimer_t::elapsed(), XrdCl::DefaultEnv::GetLog(), XrdCl::ActionMetrics::ios, XrdCl::Open(), XrdCl::PgRead(), XrdCl::PgWrite(), XrdCl::Read(), Stat, XrdCl::Sync(), XrdCl::Truncate(), XrdCl::OpenFlags::Update, XrdCl::VectorRead(), XrdCl::VectorWrite(), XrdCl::WaitFor(), XrdCl::Log::Warning(), XrdCl::OpenFlags::Write, and XrdCl::Write().

+ Here is the call graph for this function:

◆ Name()

std::string XrdCl::ActionExecutor::Name ( ) const
inline

Get aciton name.

Definition at line 679 of file XrdClReplay.cc.

679{ return action; }

◆ NominalDuration()

double XrdCl::ActionExecutor::NominalDuration ( ) const
inline

Get nominal duration variable.

Definition at line 674 of file XrdClReplay.cc.

674{ return nominalduration; }

The documentation for this class was generated from the following file: