77 bool *stripify,
MPI_Comm comm,
int comm_size)
79 unsigned long long local_vals[2], global_sums[2];
82 local_vals[0] = num_indices_src;
86 xt_mpi_call(MPI_Allreduce(local_vals, global_sums, 2,
87 MPI_UNSIGNED_LONG_LONG, MPI_SUM, comm), comm);
89 *stripify = global_sums[1] > 0;
90 return (
Xt_int)(
MAX(((global_sums[0] + (
unsigned)comm_size - 1)
91 / (
unsigned)comm_size), 1) * (unsigned)comm_size);
100 min_index = (
Xt_int)
MIN(min_index_a, min_index_b);
110 max_index =
MAX(max_index_a, max_index_b);
116 bool *stripify,
MPI_Comm comm,
int comm_size)
166 size_t send_buffer_size = 0;
168 size_t stripes_array_size = 0;
171 size_t first_overlapping_bucket = 0;
173 if (local_index_range_lbound >= 0
174 && (local_index_range_ubound < global_interval)) {
175 first_overlapping_bucket
176 = (size_t)(local_index_range_lbound / local_interval);
177 for (
size_t i = 0; i < first_overlapping_bucket; ++i)
181 size_t start_of_non_overlapping_bucket_suffix
182 = (size_t)(((
long long)local_index_range_ubound + local_interval - 1)
183 / local_interval) + 1;
184 if (local_index_range_lbound < 0
185 || start_of_non_overlapping_bucket_suffix > (
size_t)comm_size)
186 start_of_non_overlapping_bucket_suffix = (
size_t)comm_size;
187 size_t max_num_intersect
188 = start_of_non_overlapping_bucket_suffix - first_overlapping_bucket;
192 } *restrict sends_dst
193 =
xmalloc(2 * max_num_intersect *
sizeof(*sends_dst)),
194 *restrict sends_src = sends_dst + max_num_intersect;
195 size_t i = first_overlapping_bucket;
196 size_t num_src_msg = 0, num_dst_msg = 0;
197 for (; i < (size_t)start_of_non_overlapping_bucket_suffix; ++i) {
200 &stripes, &stripes_array_size, (
int)i);
204 sends_src[num_src_msg].list = send4src;
205 sends_src[num_src_msg].rank = (int)i;
218 sends_dst[num_dst_msg].list = send4dst;
219 sends_dst[num_dst_msg].rank = (int)i;
231 for (; i < (size_t)comm_size; ++i)
234 unsigned char *send_buffer
235 = *send_buffer_ =
xrealloc(stripes, send_buffer_size);
237 for (i = 0; i < num_src_msg; ++i) {
240 (
int)(send_buffer_size-ofs), &position, comm);
242 ofs += (size_t)position;
246 for (i = 0; i < num_dst_msg; ++i) {
249 (
int)(send_buffer_size-ofs), &position, comm);
251 ofs += (size_t)position;
255 num_msg = (int)(num_src_msg + num_dst_msg);
257 memset(send_size, 0, (
size_t)comm_size *
sizeof (*send_size));
263 int recv_count,
void * recv_buffer,
int tag,
267 int total_recv_size = 0;
269 for (
int i = 0; i < recv_count; ++i)
273 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED, MPI_ANY_SOURCE,
274 tag, comm, &status), comm);
277 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
285 total_recv_size += received_count;
288 if (total_recv_size != recv_size)
289 Xt_abort(comm,
"ERROR: recv_intersections received wrong number of bytes",
296 MPI_Request *dir_init_send_requests,
297 int tag_offset,
MPI_Comm comm,
int comm_size) {
302 src_tag, comm, comm_size,
303 dir_init_send_requests,
309 dst_tag, comm, comm_size,
310 dir_init_send_requests + txstat.
num_msg,
345 const struct isect *restrict src_dst_intersections,
349 size_t total_send_size = 0;
350 for (
int i = 0; i < comm_size; ++i)
357 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
359 for (
size_t i = 0; i < num_intersections; ++i)
361 int msg_size = rank_pack_size
372 total_send_size += 2*(size_t)msg_size;
374 assert(total_send_size <= INT_MAX);
375 return total_send_size;
381 struct isect *restrict src_dst_intersections,
386 size_t total_send_size
388 src_dst_intersections,
389 comm, comm_size, send_size);
391 unsigned char *send_buffer = (*send_buffer_)
392 =
xmalloc((
size_t)total_send_size);
394 if (num_intersections > 1)
395 qsort(src_dst_intersections, num_intersections,
397 size_t num_send_indices_requests
401 send_buffer, total_send_size, &ofs, comm);
403 if (num_intersections > 1)
404 qsort(src_dst_intersections, num_intersections,
406 num_send_indices_requests
410 send_buffer, total_send_size, &ofs, comm);
411 assert(num_send_indices_requests <= INT_MAX);
412 return (
int)num_send_indices_requests;
427 int recv_size[num_sizes],
428 int (*send_size)[num_sizes],
431 #if MPI_VERSION > 2 || ( MPI_VERSION == 2 && MPI_SUBVERSION >= 2)
432 xt_mpi_call(MPI_Reduce_scatter_block((
int *)send_size, (
int *)recv_size,
433 num_sizes, MPI_INT, MPI_SUM,
437 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
439 int *recv_count =
xmalloc((
size_t)comm_size *
sizeof(*recv_count));
440 for (
int i = 0; i < comm_size; ++i) recv_count[i] = num_sizes;
442 xt_mpi_call(MPI_Reduce_scatter(send_size, recv_size, recv_count, MPI_INT,
443 MPI_SUM, comm), comm);
460 void *send_buffer = NULL;
463 =
xmalloc((
size_t)comm_size *
sizeof(*send_size));
467 src_idxlist, dst_idxlist,
468 send_size, &send_buffer,
476 MPI_Request *dir_init_send_requests
477 =
xmalloc((
size_t)num_msg *
sizeof(*dir_init_send_requests));
479 dir_init_send_requests, tag_offset, comm, comm_size);
487 xt_mpi_call(MPI_Waitall(num_msg, dir_init_send_requests,
488 MPI_STATUSES_IGNORE), comm);
489 free(dir_init_send_requests);
495 void *restrict recv_buffer,
int tag,
501 while (recv_size > 0) {
505 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED,
506 MPI_ANY_SOURCE, tag, comm, &status), comm);
509 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
511 recv_size -= received_count;
515 while (received_count > position) {
517 xt_mpi_call(MPI_Unpack(recv_buffer, received_count, &position,
519 1, MPI_INT, comm), comm);
531 Xt_abort(comm,
"ERROR: recv_and_unpack_dist_dir_result"
532 " received wrong number of bytes", __FILE__, __LINE__);
541 struct dist_dir **src_intersections,
542 struct dist_dir **dst_intersections,
543 int *num_send_indices_requests,
544 MPI_Request *send_indices_requests,
548 struct dist_dir *src_dist_dir_results
552 *dst_dist_dir_results
562 recv_buffer, tag_offset
567 enum { ops_completed_auto_size = 16 };
568 int ops_completed_auto[ops_completed_auto_size];
570 = *num_send_indices_requests > ops_completed_auto_size
571 ?
xmalloc((
size_t)*num_send_indices_requests *
sizeof (*ops_completed))
572 : ops_completed_auto;
575 ops_completed, comm);
579 recv_buffer, tag_offset
581 assert(dst_dist_dir_results->num_entries
587 ops_completed, comm);
591 *src_intersections = src_dist_dir_results;
596 ops_completed, comm);
599 *dst_intersections = dst_dist_dir_results;
600 if (ops_completed != ops_completed_auto) free(ops_completed);
604 struct dist_dir **dst_intersections,
613 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
615 struct dist_dir *src_dist_dir, *dst_dist_dir;
618 src_idxlist, dst_idxlist,
619 tag_offset, comm, comm_size);
624 =
xmalloc((
size_t)comm_size *
sizeof(*send_size));
628 struct isect *src_dst_intersections;
629 size_t num_intersections
631 &src_dst_intersections);
635 int num_send_indices_requests
637 send_size, &send_buffer, comm, comm_size);
638 free(src_dst_intersections);
644 MPI_Request *send_indices_requests
645 =
xmalloc((
size_t)num_send_indices_requests
646 *
sizeof(*send_indices_requests));
649 send_indices_requests, tag_offset, comm, comm_size);
652 src_intersections, dst_intersections,
653 &num_send_indices_requests,
654 send_indices_requests,
657 xt_mpi_call(MPI_Waitall(num_send_indices_requests, send_indices_requests,
658 MPI_STATUSES_IGNORE), comm);
662 free(send_indices_requests);
669 INSTR_DEF(this_instr,
"xt_xmap_all2all_new")
678 struct dist_dir *src_intersections, *dst_intersections;
682 src_idxlist, dst_idxlist, tag_offset, newcomm);
684 Xt_xmap (*xmap_new)(
int num_src_intersections,
686 int num_dst_intersections,
695 src_idxlist, dst_idxlist, newcomm);
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Xt_int local_index_range_lbound
Xt_int local_index_range_ubound
struct Xt_com_list entries[]
struct Xt_xmap_ * Xt_xmap
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_int xt_idxlist_get_min_index(Xt_idxlist idxlist)
Xt_int xt_idxlist_get_max_index(Xt_idxlist idxlist)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
void xt_idxlist_delete(Xt_idxlist idxlist)
Provide non-public declarations common to all index lists.
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
bool xt_mpi_test_some(int *restrict num_req, MPI_Request *restrict req, int *restrict ops_completed, MPI_Comm comm)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_xmap_dist_dir_dst_send
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static void recv_and_unpack_intersections(int recv_size[SEND_SIZE_ASIZE], struct dist_dir **src_dist_dir, struct dist_dir **dst_dist_dir, int tag_offset, MPI_Comm comm)
static void generate_distributed_directories(struct dist_dir **src_dist_dir, struct dist_dir **dst_dist_dir, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size)
static Xt_int get_dist_dir_global_interval_size(Xt_idxlist src, Xt_idxlist dst, bool *stripify, MPI_Comm comm, int comm_size)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void recv_and_unpack_intersection(struct dist_dir *dist_dir, int recv_size, int recv_count, void *recv_buffer, int tag, MPI_Comm comm)
static void exchange_idxlists(struct dist_dir **src_intersections, struct dist_dir **dst_intersections, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm)
static int compute_and_pack_bucket_intersections(struct bucket_params *bucket_params, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static void recv_and_unpack_dist_dir_results(int recv_size[SEND_SIZE_ASIZE], struct dist_dir **src_intersections, struct dist_dir **dst_intersections, int *num_send_indices_requests, MPI_Request *send_indices_requests, int tag_offset, MPI_Comm comm)
static Xt_int get_max_idxlist_index(Xt_idxlist a, Xt_idxlist b)
static Xt_int get_min_idxlist_index(Xt_idxlist a, Xt_idxlist b)
static void xt_xmap_dist_dir_reduce_scatter_sizes(int num_sizes, int recv_size[num_sizes], int(*send_size)[num_sizes], MPI_Comm comm)
wrapper for MPI_Reduce_scatter_block if available or MPI_Reduce_scatter if not
static size_t buf_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, MPI_Comm comm, int comm_size, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void send_intersections(void *send_buffer, const int(*send_size)[SEND_SIZE_ASIZE], MPI_Request *dir_init_send_requests, int tag_offset, MPI_Comm comm, int comm_size)
static struct bucket_params get_bucket_params(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, bool *stripify, MPI_Comm comm, int comm_size)
static void recv_and_unpack_dist_dir_result(struct dist_dir *dist_dir, int recv_size, void *restrict recv_buffer, int tag, MPI_Comm comm)
static int pack_src_dst_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int(*send_size)[send_size_asize])
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
Xt_idxlist xt_xmap_dist_dir_get_bucket(const struct bucket_params *bucket_params, struct Xt_stripe **stripes_, size_t *stripes_array_size, int dist_dir_rank)
generates the buckets of the distributed directory
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Uitlity functions for creation of distributed directories.
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)