91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i < pipelines.size(); i++ )
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
146 policy.reset(
new AllPolicy() );
147 return std::move( *
this );
158 policy.reset(
new AnyPolicy( pipelines.size() ) );
159 return std::move( *
this );
170 policy.reset(
new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *
this );
183 policy.reset(
new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *
this );
201 if( status.
IsOK() )
return false;
206 XRootDStatus Result()
220 struct AnyPolicy :
public PolicyExecutor
222 AnyPolicy(
size_t size) : cnt( size )
226 bool Examine(
const XrdCl::XRootDStatus &status )
231 size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
233 if( status.
IsOK() )
return true;
235 if( nb == 1 )
return true;
240 XRootDStatus Result()
246 std::atomic<size_t> cnt;
256 struct SomePolicy : PolicyExecutor
258 SomePolicy(
size_t size,
size_t threshold ) : failed( 0 ), succeeded( 0 ),
259 threshold( threshold ), size( size )
263 bool Examine(
const XrdCl::XRootDStatus &status )
269 size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270 if( s + 1 == threshold )
return true;
274 size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
276 if( f == size - threshold )
return true;
281 XRootDStatus Result()
287 std::atomic<size_t> failed;
288 std::atomic<size_t> succeeded;
289 const size_t threshold;
301 struct AtLeastPolicy : PolicyExecutor
303 AtLeastPolicy(
size_t size,
size_t threshold ) : pending_cnt( size ),
305 failed_threshold( size - threshold )
309 bool Examine(
const XrdCl::XRootDStatus &status )
312 size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
314 if( status.
IsOK() )
return ( pending == 0 );
315 size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed );
316 if( nb == failed_threshold ) res = status;
319 return ( pending == 0 );
322 XRootDStatus Result()
328 std::atomic<size_t> pending_cnt;
329 std::atomic<size_t> failed_cnt;
330 const size_t failed_threshold;
339 barrier_t() : on( true ) { }
343 std::unique_lock<std::mutex> lck( mtx );
344 if( on ) cv.wait( lck );
349 std::unique_lock<std::mutex> lck( mtx );
355 std::condition_variable cv;
373 Ctx(
PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
383 Handle( XRootDStatus() );
392 inline void Examine(
const XRootDStatus &st )
394 if( policy->Examine( st ) )
395 Handle( policy->Result() );
404 inline void Handle(
const XRootDStatus &st )
406 PipelineHandler* hdlr = handler.exchange(
nullptr, std::memory_order_relaxed );
410 hdlr->HandleResponse(
new XRootDStatus( st ),
nullptr );
417 std::atomic<PipelineHandler*> handler;
422 std::unique_ptr<PolicyExecutor> policy;
434 struct PipelineEnd :
public Job
439 PipelineEnd( std::shared_ptr<Ctx> &ctx,
440 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
454 std::shared_ptr<Ctx> ctx;
455 XrdCl::XRootDStatus st;
462 void Schedule( std::shared_ptr<Ctx> &ctx,
const XrdCl::XRootDStatus &st)
465 PipelineEnd *end =
new PipelineEnd( ctx, st );
479 if( !policy ) policy.reset(
new AllPolicy() );
481 std::shared_ptr<Ctx> ctx =
482 std::make_shared<Ctx>(
handler, policy.release() );
485 pipelineTimeout : this->
timeout;
487 for(
size_t i = 0; i < pipelines.size(); ++i )
489 if( !pipelines[i] )
continue;
491 [ctx](
const XRootDStatus &st )
mutable { Schedule( ctx, st ); } );
495 return XRootDStatus();
498 std::vector<Pipeline> pipelines;
499 std::unique_ptr<PolicyExecutor> policy;
JobManager * GetJobManager()
Get the job manager object user by the post master.