80 const void **src_data,
void **dst_data);
84 const void **src_data,
void **dst_data,
89 const void *src_data,
void *dst_data);
93 const void *src_data,
void *dst_data,
106 int *restrict *ranks);
156 int *restrict in_ranks[num_redists],
157 size_t num_ranks[num_redists],
159 MPI_Datatype *component_dt,
162 size_t rank_pos[num_redists];
163 for (
size_t j = 0; j < num_redists; ++j)
167 for (
size_t i = 0; i < nmsgs; ++i) {
168 int min_rank = INT_MAX;
169 for (
size_t j = 0; j < num_redists; ++j)
170 if (rank_pos[j] < num_ranks[j] && in_ranks[j][rank_pos[j]] < min_rank)
171 min_rank = in_ranks[j][rank_pos[j]];
173 for (
size_t j = 0; j < num_redists; ++j)
174 component_dt[i * num_redists + j] =
175 (rank_pos[j] < num_ranks[j] && in_ranks[j][rank_pos[j]] == min_rank)
179 out_ranks[i] = min_rank;
180 for (
size_t j = 0; j < num_redists; ++j)
182 += (rank_pos[j] < num_ranks[j] && in_ranks[j][rank_pos[j]] == min_rank);
185 for (
size_t j = 0; j < num_redists; ++j)
193 unsigned num_redists)
196 size_t num_displ = cache_size * num_redists;
198 for (
size_t i = 0; i < ntx; ++i) msgs[i].
datatype = MPI_DATATYPE_NULL;
200 =
xmalloc(2 * num_displ *
sizeof (*q));
202 for (
size_t i = 0; i < 2 * num_displ; i += num_redists)
209 size_t cache_size,
size_t ntx,
MPI_Comm comm)
211 for (
size_t i = 0; i < cache_size; ++i)
234 unsigned num_redists_ = num_redists >= 0 ? (unsigned)num_redists : 0;
235 size_t num_ranks[2][num_redists_];
236 int *restrict ranks[2][num_redists_];
241 size_t nmsg = (size_t)nmsg_send + nmsg_recv;
242 size_t size_all_component_dt =
sizeof (MPI_Datatype) * num_redists_ * nmsg;
244 =
xmalloc(
sizeof (*redist_coll)
245 + size_all_component_dt + nmsg *
sizeof (
int));
247 redist_coll->
nmsg[
RECV] = nmsg_recv;
248 redist_coll->
nmsg[
SEND] = nmsg_send;
255 Xt_abort(comm,
"ERROR: invalid cache size in xt_redist_collection_new",
267 all_component_dt,
SEND);
270 all_component_dt + nmsg_send * num_redists_,
RECV);
280 unsigned num_messages,
unsigned num_redists,
281 const int ranks[num_messages],
282 const MPI_Datatype *component_dt,
283 const MPI_Aint displacements[num_redists],
287 int block_lengths[num_redists];
289 for (
size_t i = 0; i < num_redists; ++i)
290 block_lengths[i] = 1;
291 for (
size_t i = 0; i < num_messages; ++i) {
292 if (redist_msgs[i].
datatype != MPI_DATATYPE_NULL)
294 redist_msgs[i].datatype
296 component_dt + i * num_redists,
297 block_lengths, comm);
298 redist_msgs[i].rank = ranks[i];
304 MPI_Aint displacements[num_redists])
307 MPI_Aint base_addr, offset;
308 base_addr = (MPI_Aint)(intptr_t)(
const void *)data[0];
309 displacements[0] = 0;
310 for (
size_t i = 1; i < num_redists; ++i) {
311 offset = (MPI_Aint)(intptr_t)(
const void *)data[i];
312 displacements[i] = offset - base_addr;
319 const MPI_Aint src_displacements[num_redists],
320 const MPI_Aint dst_displacements[num_redists],
321 const MPI_Aint (*cached_src_displacements)[num_redists],
322 const MPI_Aint (*cached_dst_displacements)[num_redists],
325 for (
size_t i = 0; i < cache_size &&
326 cached_src_displacements[i][0] == (MPI_Aint)0 &&
327 cached_dst_displacements[i][0] == (MPI_Aint)0; ++i) {
328 bool mismatch =
false;
329 for (
size_t j = 0; j < num_redists; ++j)
330 mismatch |= (src_displacements[j] != cached_src_displacements[i][j]) ||
331 (dst_displacements[j] != cached_dst_displacements[i][j]);
332 if (!mismatch)
return i;
339 const void *
const * src_data,
void *
const * dst_data,
340 unsigned num_redists)
342 MPI_Aint displacements[2][num_redists];
343 unsigned num_send_messages = redist_coll->
nmsg[
SEND],
344 num_recv_messages = redist_coll->
nmsg[
RECV];
346 compute_displ((
const void *
const *)dst_data, num_redists, displacements[1]);
359 (
const MPI_Aint (*)[num_redists])cache->src_displacements,
360 (
const MPI_Aint (*)[num_redists])cache->dst_displacements,
363 if (cache_index == cache_size)
365 cache_index = cache->token;
368 displacements[
SEND], cache->msgs, comm);
371 all_component_dt + num_send_messages * num_redists,
373 cache->msgs + num_send_messages, comm);
374 memcpy(cache->src_displacements + cache_index * num_redists,
375 displacements[0], sizeof (displacements[0]));
376 memcpy(cache->dst_displacements + cache_index * num_redists,
377 displacements[1], sizeof (displacements[1]));
379 if (cache->exchangers[cache_index] != NULL)
382 exchanger = cache->exchangers[cache_index] =
384 (
int)num_recv_messages,
385 cache->msgs, cache->msgs
386 + (
size_t)num_send_messages,
388 cache->token = (cache->token + 1) % cache_size;
391 exchanger = cache->exchangers[cache_index];
395 size_t nmsg = (size_t)num_send_messages + (
size_t)num_recv_messages;
397 for (
size_t i = 0; i < nmsg; ++i)
402 all_component_dt, displacements[0], p, comm);
405 all_component_dt + num_send_messages * num_redists,
406 displacements[1], p + num_send_messages, comm);
409 redist_coll->
exchanger_new((
int)num_send_messages, (
int)num_recv_messages,
410 p, p + (
size_t)num_send_messages, comm,
427 const void **src_data,
void **dst_data) {
432 Xt_abort(redist_coll->
comm,
"ERROR: wrong number of arrays in "
433 "redist_collection_s_exchange", __FILE__, __LINE__);
448 const void **src_data,
void **dst_data,
454 Xt_abort(redist_coll->
comm,
"ERROR: wrong number of arrays in "
455 "redist_collection_a_exchange", __FILE__, __LINE__);
471 const MPI_Datatype *component_dt_orig,
472 MPI_Datatype *component_dt_copy,
475 for (
size_t i = 0; i < num_component_dt; ++i)
477 MPI_Datatype orig_dt = component_dt_orig[i];
478 if (orig_dt != MPI_DATATYPE_NULL)
479 xt_mpi_call(MPI_Type_dup(orig_dt, component_dt_copy + i), comm);
481 component_dt_copy[i] = orig_dt;
490 nmsg_send = redist_coll->
nmsg[
SEND],
491 nmsg_recv = redist_coll->
nmsg[
RECV];
492 size_t nmsg = (size_t)nmsg_recv + nmsg_send,
493 size_all_component_dt =
sizeof (MPI_Datatype) * num_redists * nmsg;
495 =
xmalloc(
sizeof (*redist_copy)
496 + size_all_component_dt + nmsg *
sizeof (
int));
500 redist_copy->
nmsg[
SEND] = nmsg_send;
501 redist_copy->
nmsg[
RECV] = nmsg_recv;
523 for (
size_t i = 0; i < num_dt; ++i)
524 if (all_component_dt[i] != MPI_DATATYPE_NULL)
525 xt_mpi_call(MPI_Type_free(all_component_dt + i), comm);
539 nmsg, redist_coll->
comm);
549 return (
int)(
xrc(redist)->
nmsg[direction]);
558 Xt_abort(redist_coll->
comm,
"ERROR: datatype retrieval is not"
559 " supported for this xt_redist type (Xt_redist_collection)",
562 return MPI_DATATYPE_NULL;
567 const void *src_data,
void *dst_data)
574 Xt_abort(redist_coll->
comm,
"ERROR: s_exchange1 is not implemented for"
575 " this xt_redist type (Xt_redist_collection)", __FILE__, __LINE__);
580 const void *src_data,
void *dst_data,
588 Xt_abort(redist_coll->
comm,
"ERROR: a_exchange1 is not implemented for"
589 " this xt_redist type (Xt_redist_collection)", __FILE__, __LINE__);
595 int *restrict *ranks)
598 unsigned nmsg_direction = redist_coll->
nmsg[direction],
599 nmsg_send = redist_coll->
nmsg[
SEND];
600 size_t nmsg = (size_t)nmsg_direction + redist_coll->
nmsg[!direction];
603 + (((
unsigned)direction-1) & nmsg_send);
604 int *ranks_ = *ranks =
xmalloc(nmsg_direction *
sizeof (*ranks_));
605 memcpy(ranks_, ranks_orig, nmsg_direction *
sizeof (*ranks));
606 return (
int)nmsg_direction;
615 return redist_coll->
comm;
add versions of standard API functions not returning on error
#define xcalloc(nmemb, size)
Xt_exchanger_new exchanger_new
const struct xt_redist_vtable * vtable
MPI_Datatype all_component_dt[]
struct exchanger_cache cache
Xt_exchanger_new exchanger_new
struct Xt_redist_msg * msgs
Xt_exchanger * exchangers
MPI_Aint * src_displacements
MPI_Aint * dst_displacements
Xt_redist(* copy)(Xt_redist)
struct Xt_config_ xt_default_config
implementation of configuration object
void xt_exchanger_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
void xt_exchanger_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
void xt_exchanger_delete(Xt_exchanger exchanger)
exchanging of data based on information provided by redist's
Xt_exchanger(* Xt_exchanger_new)(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
#define xt_mpi_call(call, comm)
MPI_Datatype xt_redist_get_MPI_Datatype(Xt_redist redist, int rank, enum xt_msg_direction direction)
unsigned xt_redist_agg_msg_count(size_t num_redists, enum xt_msg_direction direction, const Xt_redist redists[num_redists], size_t num_ranks[num_redists], int *restrict ranks[num_redists])
void xt_redist_check_comms(Xt_redist *redists, int num_redists, MPI_Comm comm)
MPI_Datatype xt_create_compound_datatype(size_t count, const MPI_Aint displacements[count], const MPI_Datatype datatypes[count], const int block_lengths[count], MPI_Comm comm)
static void redist_collection_a_exchange(Xt_redist redist, int num_src_arrays, const void **src_data, void **dst_data, Xt_request *request)
static void copy_component_dt(size_t num_component_dt, const MPI_Datatype *component_dt_orig, MPI_Datatype *component_dt_copy, MPI_Comm comm)
Xt_redist xt_redist_collection_new(Xt_redist *redists, int num_redists, int cache_size, MPI_Comm comm)
static MPI_Comm redist_collection_get_MPI_Comm(Xt_redist redist)
static const struct xt_redist_vtable redist_collection_vtable
struct Xt_redist_collection_ * Xt_redist_collection
@ DEFFAULT_DATATYPE_CACHE_SIZE
static size_t lookup_cache_index(unsigned num_redists, const MPI_Aint src_displacements[num_redists], const MPI_Aint dst_displacements[num_redists], const MPI_Aint(*cached_src_displacements)[num_redists], const MPI_Aint(*cached_dst_displacements)[num_redists], size_t cache_size)
static void compute_displ(const void *const *data, unsigned num_redists, MPI_Aint displacements[num_redists])
static void redist_collection_a_exchange1(Xt_redist redist, const void *src_data, void *dst_data, Xt_request *request)
static void init_cache(struct exchanger_cache *cache, size_t cache_size, size_t ntx, unsigned num_redists)
static Xt_redist redist_collection_copy(Xt_redist redist)
static void destruct_cache(struct exchanger_cache *cache, size_t cache_size, size_t ntx, MPI_Comm comm)
static Xt_exchanger get_exchanger(struct Xt_redist_collection_ *redist_coll, const void *const *src_data, void *const *dst_data, unsigned num_redists)
static void redist_collection_s_exchange(Xt_redist redist, int num_src_arrays, const void **src_data, void **dst_data)
static void redist_collection_s_exchange1(Xt_redist redist, const void *src_data, void *dst_data)
static void redist_collection_delete(Xt_redist redist)
static int redist_collection_get_msg_ranks(Xt_redist redist, enum xt_msg_direction direction, int *restrict *ranks)
static MPI_Datatype redist_collection_get_MPI_Datatype(Xt_redist redist, int rank, enum xt_msg_direction direction)
static int redist_collection_get_num_msg(Xt_redist redist, enum xt_msg_direction direction)
static void free_component_dt(size_t num_dt, MPI_Datatype *all_component_dt, MPI_Comm comm)
static Xt_redist_collection xrc(void *redist)
Xt_redist xt_redist_collection_custom_new(Xt_redist *redists, int num_redists, int cache_size, MPI_Comm comm, Xt_config config)
static void align_component_dt(unsigned num_redists, unsigned nmsgs, const Xt_redist *redists, int *restrict in_ranks[num_redists], size_t num_ranks[num_redists], int *out_ranks, MPI_Datatype *component_dt, enum xt_msg_direction direction)
static void create_all_dt_for_dir(unsigned num_messages, unsigned num_redists, const int ranks[num_messages], const MPI_Datatype *component_dt, const MPI_Aint displacements[num_redists], struct Xt_redist_msg redist_msgs[num_messages], MPI_Comm comm)
redistribution of data, non-public declarations
static void xt_redist_msgs_free(size_t n, struct Xt_redist_msg *msgs, MPI_Comm comm)