80static const char filename[] =
"xt_xmap_dist_dir.c";
84#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
85#pragma GCC diagnostic push
86#pragma GCC diagnostic ignored "-Wstringop-overread"
87#pragma GCC diagnostic ignored "-Wstringop-overflow"
121 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
122 bgd_size = (bgd_size +
sizeof (
void *) - 1)/
sizeof (
void *) *
sizeof (
void *);
125 config->xmdd_bucket_gen->init(
126 &bgd, src_idxlist, dst_idxlist, config,
128 .intra_comm = comm, .tag_offset_intra = tag_offset,
130 config->xmdd_bucket_gen->init_params,
131 config->xmdd_bucket_gen);
144 size_t max_num_intersect
145 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
147 size_t send_size_filled = (size_t)-1;
148 if (max_num_intersect) {
149 size_t send_buffer_size = 0;
151 =
xmalloc(2 * max_num_intersect *
sizeof(*sends_dst)),
152 *restrict sends_src = sends_dst + max_num_intersect;
153 size_t num_src_msg = 0, num_dst_msg = 0;
155 while ((bucket = config->xmdd_bucket_gen->next(
161 src_idxlist_sorted, bucket.
list, config);
163 sends_src[num_src_msg].list = send4src;
164 sends_src[num_src_msg].rank = (int)
rank;
175 bucket.
list, dst_idxlist_sorted, config);
177 sends_dst[num_dst_msg].list = send4dst;
178 sends_dst[num_dst_msg].rank = (int)
rank;
187 send_size_filled =
rank;
190 for (
size_t rank = send_size_filled+1;
rank < (size_t)comm_size; ++
rank)
192 unsigned char *send_buffer
193 = *send_buffer_ =
xmalloc(send_buffer_size);
195 for (
size_t i = 0; i < num_src_msg; ++i) {
198 (
int)(send_buffer_size-ofs), &position, comm);
200 ofs += (size_t)position;
204 for (
size_t i = 0; i < num_dst_msg; ++i) {
207 (
int)(send_buffer_size-ofs), &position, comm);
209 ofs += (size_t)position;
213 num_msg = (int)(num_src_msg + num_dst_msg);
216 memset(send_size, 0, (
size_t)comm_size *
sizeof (*send_size));
219 if (dst_idxlist_sorted != dst_idxlist)
221 if (src_idxlist_sorted != src_idxlist)
224 return (
struct capbi_result){ .stripify = stripify, .num_msg = num_msg };
229 int recv_count,
void * recv_buffer,
int tag,
233 int total_recv_size = 0;
235 for (
int i = 0; i < recv_count; ++i)
239 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED, MPI_ANY_SOURCE,
240 tag, comm, &status), comm);
243 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
251 total_recv_size += received_count;
254 if (total_recv_size != recv_size)
255 Xt_abort(comm,
"ERROR: recv_intersections received wrong number of bytes",
262 MPI_Request *dir_init_send_requests,
263 int tag_offset,
MPI_Comm comm,
int comm_size) {
268 src_tag, comm, comm_size,
269 dir_init_send_requests,
275 dst_tag, comm, comm_size,
276 dir_init_send_requests + txstat.
num_msg,
284 struct dist_dir *src_dist_dir, *dst_dist_dir;
306 .dst = dst_dist_dir };
312 const struct isect *restrict src_dst_intersections,
316 size_t total_send_size = 0;
317 for (
int i = 0; i < comm_size; ++i)
324 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
326 for (
size_t i = 0; i < num_intersections; ++i)
328 int msg_size = rank_pack_size
339 total_send_size += 2*(size_t)msg_size;
341 assert(total_send_size <= INT_MAX);
342 return total_send_size;
348 struct isect *restrict src_dst_intersections,
353 size_t total_send_size
355 src_dst_intersections,
356 comm, comm_size, send_size);
358 unsigned char *send_buffer = (*send_buffer_)
359 =
xmalloc((
size_t)total_send_size);
361 if (num_intersections > 1)
362 qsort(src_dst_intersections, num_intersections,
364 size_t num_send_indices_requests
368 send_buffer, total_send_size, &ofs, comm);
370 if (num_intersections > 1)
371 qsort(src_dst_intersections, num_intersections,
373 num_send_indices_requests
377 send_buffer, total_send_size, &ofs, comm);
378 assert(num_send_indices_requests <= INT_MAX);
379 return (
int)num_send_indices_requests;
394 int recv_size[num_sizes],
395 int (*send_size)[num_sizes],
398#if MPI_VERSION > 2 || ( MPI_VERSION == 2 && MPI_SUBVERSION >= 2)
399 xt_mpi_call(MPI_Reduce_scatter_block((
int *)send_size, (
int *)recv_size,
400 num_sizes, MPI_INT, MPI_SUM,
404 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
406 int *recv_count =
xmalloc((
size_t)comm_size *
sizeof(*recv_count));
407 for (
int i = 0; i < comm_size; ++i) recv_count[i] = num_sizes;
409 xt_mpi_call(MPI_Reduce_scatter(send_size, recv_size, recv_count, MPI_INT,
410 MPI_SUM, comm), comm);
428 void *send_buffer = NULL;
431 =
xmalloc((
size_t)comm_size *
sizeof(*send_size));
435 send_size, &send_buffer,
436 comm, tag_offset, comm_size,
444 MPI_Request *dir_init_send_requests
447 dir_init_send_requests, tag_offset, comm, comm_size);
456 MPI_STATUSES_IGNORE), comm);
457 free(dir_init_send_requests);
466 void *restrict recv_buffer,
int tag,
472 while (recv_size > 0) {
476 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED,
477 MPI_ANY_SOURCE, tag, comm, &status), comm);
480 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
482 recv_size -= received_count;
486 while (received_count > position) {
488 xt_mpi_call(MPI_Unpack(recv_buffer, received_count, &position,
490 1, MPI_INT, comm), comm);
502 Xt_abort(comm,
"ERROR: recv_and_unpack_dist_dir_result"
503 " received wrong number of bytes",
filename, __LINE__);
512 int *num_send_indices_requests,
513 MPI_Request *send_indices_requests,
532 recv_buffer, tag_offset
536 enum { ops_completed_auto_size = 16 };
537 int ops_completed_auto[ops_completed_auto_size];
539 = *num_send_indices_requests > ops_completed_auto_size
540 ?
xmalloc((
size_t)*num_send_indices_requests *
sizeof (*ops_completed))
541 : ops_completed_auto;
544 ops_completed, comm);
548 recv_buffer, tag_offset
555 ops_completed, comm);
563 ops_completed, comm);
566 if (ops_completed != ops_completed_auto) free(ops_completed);
567 return dist_dir_results;
580 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
584 tag_offset, comm, comm_size, config);
589 =
xmalloc((
size_t)comm_size *
sizeof(*send_size));
593 struct isect *src_dst_intersections;
594 size_t num_intersections
596 &src_dst_intersections, config);
599 int num_send_indices_requests
601 send_size, &send_buffer, comm, comm_size);
602 free(src_dst_intersections);
608 MPI_Request *send_indices_requests
609 =
xmalloc((
size_t)num_send_indices_requests
610 *
sizeof(*send_indices_requests));
613 send_indices_requests, tag_offset, comm, comm_size);
617 &num_send_indices_requests,
618 send_indices_requests,
621 xt_mpi_call(MPI_Waitall(num_send_indices_requests, send_indices_requests,
622 MPI_STATUSES_IGNORE), comm);
626 free(send_indices_requests);
628 .dist_dirs.src = ddp.
src,
637 INSTR_DEF(this_instr,
"xt_xmap_all2all_new")
652 Xt_xmap (*xmap_new)(
int num_src_intersections,
654 int num_dst_intersections,
666 src_idxlist, dst_idxlist, newcomm, config);
add versions of standard API functions not returning on error
struct dist_dir_pair dist_dirs
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_XMAP_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
struct Xt_xmap_ * Xt_xmap
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Xt_idxlist xt_idxlist_get_intersection_custom(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst, Xt_config config)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
void xt_idxlist_delete(Xt_idxlist idxlist)
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
bool xt_mpi_test_some(int *restrict num_req, MPI_Request *restrict req, int *restrict ops_completed, MPI_Comm comm)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_xmap_dist_dir_dst_send
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static const char filename[]
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct dist_dir_pair recv_and_unpack_intersections(int recv_size[SEND_SIZE_ASIZE], int tag_offset, MPI_Comm comm)
static void recv_and_unpack_intersection(struct dist_dir *dist_dir, int recv_size, int recv_count, void *recv_buffer, int tag, MPI_Comm comm)
Xt_xmap xt_xmap_dist_dir_intracomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size, Xt_config config)
static void xt_xmap_dist_dir_reduce_scatter_sizes(int num_sizes, int recv_size[num_sizes], int(*send_size)[num_sizes], MPI_Comm comm)
wrapper for MPI_Reduce_scatter_block if available or MPI_Reduce_scatter if not
static size_t buf_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, MPI_Comm comm, int comm_size, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct capbi_result compute_and_pack_bucket_intersections(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int tag_offset, int comm_size, Xt_config config)
static void send_intersections(void *send_buffer, const int(*send_size)[SEND_SIZE_ASIZE], MPI_Request *dir_init_send_requests, int tag_offset, MPI_Comm comm, int comm_size)
static void recv_and_unpack_dist_dir_result(struct dist_dir *dist_dir, int recv_size, void *restrict recv_buffer, int tag, MPI_Comm comm)
static struct dist_dir_pair recv_and_unpack_dist_dir_results(int recv_size[SEND_SIZE_ASIZE], int *num_send_indices_requests, MPI_Request *send_indices_requests, int tag_offset, MPI_Comm comm)
static int pack_src_dst_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
@ Xt_dist_dir_bucket_gen_type_sendrecv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int send_size[rank_lim][send_size_asize])
Utility functions for creation of distributed directories.
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, const struct Xt_xmdd_bucket_gen_comms *comms, int remote_size, int comm_size, Xt_config config)
Xt_xmap xt_xmap_intersection_ext_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_intersection_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)