81 bool *stripify,
Xt_int interval_size[2],
83 int comm_size,
int remote_size,
84 int comm_rank,
int tag_offset_inter)
88 unsigned long long local_vals[2], global_sums[2][2];
91 local_vals[0] = num_indices_src;
95 xt_mpi_call(MPI_Allreduce(local_vals, global_sums[0], 2,
96 MPI_UNSIGNED_LONG_LONG, MPI_SUM, intra_comm),
104 if (comm_rank == 0) {
106 xt_mpi_call(MPI_Sendrecv(global_sums[0], 2, MPI_UNSIGNED_LONG_LONG, 0, tag,
107 global_sums[1], 2, MPI_UNSIGNED_LONG_LONG, 0, tag,
108 inter_comm, MPI_STATUS_IGNORE), inter_comm);
110 xt_mpi_call(MPI_Bcast(global_sums[1], 2, MPI_UNSIGNED_LONG_LONG,
111 0, intra_comm), intra_comm);
112 *stripify = (global_sums[0][1] > 0 || global_sums[1][1] > 0);
114 = (
Xt_int)(((global_sums[0][0] + (
unsigned)comm_size - 1)
115 / (unsigned)comm_size) * (unsigned)comm_size);
117 = (
Xt_int)(((global_sums[1][0] + (
unsigned)remote_size - 1)
118 / (
unsigned)remote_size) * (
unsigned)remote_size);
144 if (local_index_range_lbound <= local_index_range_ubound) {
145 size_t send_buffer_size = 0;
147 size_t stripes_array_size = 0;
150 size_t first_overlapping_bucket = 0;
152 if (local_index_range_lbound >= 0
153 && (local_index_range_ubound < global_interval)) {
154 first_overlapping_bucket
155 = (size_t)(local_index_range_lbound / local_interval);
156 for (
size_t i = 0; i < first_overlapping_bucket; ++i)
160 size_t start_of_non_overlapping_bucket_suffix
161 = (size_t)(((
long long)local_index_range_ubound + local_interval - 1)
162 / local_interval) + 1;
163 if (local_index_range_lbound < 0
164 || start_of_non_overlapping_bucket_suffix > (
size_t)comm_size)
165 start_of_non_overlapping_bucket_suffix = (
size_t)comm_size;
166 size_t max_num_intersect
167 = start_of_non_overlapping_bucket_suffix - first_overlapping_bucket;
172 =
xmalloc(max_num_intersect *
sizeof(*sends));
173 size_t i = first_overlapping_bucket;
174 for (; i < (size_t)start_of_non_overlapping_bucket_suffix; ++i) {
177 &stripes, &stripes_array_size, (
int)i);
182 sends[num_msg].list = isect2send;
183 sends[num_msg].rank = (int)i;
199 for (; i < (size_t)comm_size; ++i)
202 unsigned char *send_buffer
203 = *send_buffer_ =
xrealloc(stripes, send_buffer_size);
205 for (i = 0; i < num_msg; ++i) {
208 (
int)(send_buffer_size-ofs), &position, comm);
209 send_size[sends[i].rank][
SEND_SIZE] = position;
210 ofs += (size_t)position;
216 memset(send_size, 0, (
size_t)comm_size *
sizeof (*send_size));
256 size_t tx_num = 0, size_sum = 0;
257 for (
size_t i = 0; i < (size_t)comm_size; ++i)
260 size_sum += (size_t)tx_size;
262 if (counts) counts[tx_num] = sizes[i][
SEND_NUM];
283 &
bucket_params, idxlist, send_size, send_buffer, comm, comm_size);
288 assert(num_msg == tx_stat[1].num_msg);
291 typedef int (*
tx_fp)(
void *, int, MPI_Datatype, int, int,
296 unsigned char *buffer, MPI_Request *requests,
300 for (
size_t i = 0; i < num_msg; ++i)
304 count, MPI_PACKED, rank, tag, comm, requests + i), comm);
305 ofs += (size_t)count;
312 void *recv_buffer, MPI_Request *requests,
322 void *send_buffer, MPI_Request *requests,
337 size_t num_msg = tx_stat.
num_msg, buf_size = tx_stat.
bytes;
340 +
sizeof (*dist_dir_->entries) * num_msg);
341 dist_dir_->num_entries = (int)num_msg;
343 for (
size_t i = 0; i < num_msg; ++i)
346 dist_dir_->entries[i].rank = rank;
347 dist_dir_->entries[i].list
358 int tag_offset_inter,
int tag_offset_intra,
360 int remote_size,
int comm_size,
367 stripify, interval_size,
368 intra_comm, inter_comm,
369 comm_size, remote_size,
370 comm_rank, tag_offset_inter);
371 void *send_buffer_local, *send_buffer_remote;
373 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size)
374 * 2 *
sizeof(*send_size_local)),
380 send_size_local, src_idxlist, interval_size[0],
381 intra_comm, comm_size);
383 send_size_remote, dst_idxlist, interval_size[1],
384 inter_comm, remote_size);
386 size_t num_req = tx_stat_local[0].
num_msg + tx_stat_remote[0].
num_msg
388 MPI_Request *dir_init_requests
389 =
xmalloc(num_req *
sizeof(*dir_init_requests)
390 + tx_stat_local[0].
bytes + tx_stat_remote[0].
bytes);
391 void *recv_buffer_local = dir_init_requests + num_req,
392 *recv_buffer_remote = ((
unsigned char *)recv_buffer_local
393 + tx_stat_local[0].
bytes);
395 size_t req_ofs = tx_stat_local[0].
num_msg;
398 recv_buffer_local, dir_init_requests,
399 tag_intra, intra_comm);
403 recv_buffer_remote, dir_init_requests + req_ofs,
404 tag_inter, inter_comm);
405 req_ofs += tx_stat_remote[0].
num_msg;
408 send_buffer_local, dir_init_requests + req_ofs,
409 tag_intra, intra_comm);
410 req_ofs += tx_stat_local[1].
num_msg;
413 send_buffer_remote, dir_init_requests + req_ofs,
414 tag_inter, inter_comm);
416 xt_mpi_call(MPI_Waitall((
int)num_req, dir_init_requests,
417 MPI_STATUSES_IGNORE), inter_comm);
418 free(send_buffer_local);
419 free(send_buffer_remote);
422 recv_buffer_local, src_dist_dir, intra_comm);
425 recv_buffer_remote, dst_dist_dir, inter_comm);
426 free(send_size_local);
427 free(dir_init_requests);
433 const struct isect *restrict src_dst_intersections,
438 size_t total_send_size = 0;
439 for (
int i = 0; i < comm_size; ++i)
440 (
void)(send_size_target[i][
SEND_SIZE] = 0),
441 (
void)(send_size_target[i][
SEND_NUM] = 0);
444 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
446 for (
size_t i = 0; i < num_intersections; ++i)
448 int msg_size = rank_pack_size
450 size_t target_rank = (size_t)src_dst_intersections[i].rank[target];
452 ++(send_size_target[target_rank][
SEND_NUM]);
453 total_send_size += (size_t)msg_size;
455 assert(total_send_size <= INT_MAX);
456 return total_send_size;
462 struct isect *restrict src_dst_intersections,
465 bool isect_idxlist_delete,
MPI_Comm comm,
int comm_size) {
467 size_t total_send_size
469 src_dst_intersections,
471 comm, comm_size, send_size);
473 unsigned char *send_buffer = (*send_buffer_)
474 =
xmalloc((
size_t)total_send_size);
475 qsort(src_dst_intersections, num_intersections,
476 sizeof (src_dst_intersections[0]),
482 target, num_intersections, src_dst_intersections,
483 isect_idxlist_delete,
485 send_buffer, total_send_size, &ofs, comm);
492 void *restrict recv_buffer,
493 int *restrict entry_counts,
497 int buf_size = (int)tx_stat.
bytes;
499 size_t num_entries_sent = 0;
500 for (
size_t i = 0; i <
num_msg; ++i)
501 num_entries_sent += (
size_t)entry_counts[i];
503 + (
sizeof (
struct Xt_com_list) * num_entries_sent));
504 (*dist_dir)->num_entries = (int)num_entries_sent;
505 struct Xt_com_list *restrict entries = (*dist_dir)->entries;
506 size_t num_entries = 0;
507 for (
size_t i = 0; i < num_msg; ++i) {
508 size_t num_entries_from_rank = (size_t)entry_counts[i];
509 for (
size_t j = 0; j < num_entries_from_rank; ++j) {
510 xt_mpi_call(MPI_Unpack(recv_buffer, buf_size, &position,
511 &entries[num_entries].
rank,
512 1, MPI_INT, comm), comm);
513 entries[num_entries].list =
518 assert(num_entries == num_entries_sent);
526 struct dist_dir **dst_intersections,
530 int tag_offset_inter,
int tag_offset_intra,
533 int comm_size, remote_size, comm_rank;
534 xt_mpi_call(MPI_Comm_size(inter_comm, &comm_size), inter_comm);
535 xt_mpi_call(MPI_Comm_rank(inter_comm, &comm_rank), inter_comm);
536 xt_mpi_call(MPI_Comm_remote_size(inter_comm, &remote_size), inter_comm);
538 struct dist_dir *src_dist_dir, *dst_dist_dir;
541 src_idxlist, dst_idxlist,
542 tag_offset_inter, tag_offset_intra,
543 inter_comm, intra_comm,
544 remote_size, comm_size,
549 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size)
550 * 2U *
sizeof(*send_size_local)),
557 struct isect *src_dst_intersections;
558 size_t num_intersections
560 &src_dst_intersections);
564 void *send_buffer_local, *send_buffer_remote;
565 size_t num_send_requests_local
567 send_size_local, &send_buffer_local,
569 num_send_requests_remote
571 send_size_remote, &send_buffer_remote,
573 free(src_dst_intersections);
578 intra_comm), intra_comm);
581 inter_comm), inter_comm);
584 int *isect_counts_recv_local
585 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size) *
sizeof (
int)),
586 *isect_counts_recv_remote = isect_counts_recv_local + comm_size;
587 compress_sizes(send_size_local, comm_size, tx_stat_local+1, NULL);
589 isect_counts_recv_local);
590 compress_sizes(send_size_remote, remote_size, tx_stat_remote+1, NULL);
592 isect_counts_recv_remote);
593 assert(tx_stat_local[1].
num_msg == num_send_requests_local
594 && tx_stat_remote[1].
num_msg == num_send_requests_remote);
596 = num_send_requests_local + num_send_requests_remote
598 assert(num_requests <= INT_MAX);
599 MPI_Request *requests
600 =
xmalloc(num_requests *
sizeof(*requests)
601 + tx_stat_local[0].
bytes + tx_stat_remote[0].
bytes);
602 void *recv_buf_local = requests + num_requests,
603 *recv_buf_remote = (
unsigned char *)recv_buf_local + tx_stat_local[0].
bytes;
604 size_t req_ofs = tx_stat_local[0].
num_msg;
608 recv_buf_local, requests, tag_intra, intra_comm);
612 recv_buf_remote, requests+req_ofs, tag_inter, inter_comm);
613 req_ofs += tx_stat_remote[0].
num_msg;
616 send_buffer_local, requests+req_ofs, tag_intra,
618 req_ofs += tx_stat_local[1].
num_msg;
621 send_buffer_remote, requests+req_ofs, tag_inter,
623 xt_mpi_call(MPI_Waitall((
int)num_requests, requests, MPI_STATUSES_IGNORE),
625 free(send_buffer_local);
626 free(send_buffer_remote);
627 free(send_size_local);
630 isect_counts_recv_local, intra_comm);
632 isect_counts_recv_remote, inter_comm);
634 free(isect_counts_recv_local);
643 INSTR_DEF(this_instr,
"xt_xmap_dist_dir_intercomm_new")
649 int tag_offset_inter, tag_offset_intra;
653 struct dist_dir *src_intersections, *dst_intersections;
657 src_idxlist, dst_idxlist,
658 tag_offset_inter, tag_offset_intra,
659 inter_comm, intra_comm);
661 Xt_xmap (*xmap_new)(
int num_src_intersections,
663 int num_dst_intersections,
672 src_idxlist, dst_idxlist, inter_comm);
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Xt_int local_index_range_lbound
Xt_int local_index_range_ubound
struct Xt_com_list entries[]
struct Xt_xmap_ * Xt_xmap
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_int xt_idxlist_get_min_index(Xt_idxlist idxlist)
Xt_int xt_idxlist_get_max_index(Xt_idxlist idxlist)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
void xt_idxlist_delete(Xt_idxlist idxlist)
Provide non-public declarations common to all index lists.
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
Xt_idxlist xt_xmap_dist_dir_get_bucket(const struct bucket_params *bucket_params, struct Xt_stripe **stripes_, size_t *stripes_array_size, int dist_dir_rank)
generates the buckets of the distributed directory
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Uitlity functions for creation of distributed directories.
static void exchange_idxlists(struct dist_dir **src_intersections, struct dist_dir **dst_intersections, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void irecv_intersections(size_t num_msg, const int(*recv_size)[SEND_SIZE_ASIZE], void *recv_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
static void unpack_dist_dir(struct Xt_xmdd_txstat tx_stat, const int(*sizes)[SEND_SIZE_ASIZE], void *buffer, struct dist_dir **dist_dir, MPI_Comm comm)
static void isend_intersections(size_t num_msg, const int(*send_size)[SEND_SIZE_ASIZE], void *send_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
int(* tx_fp)(void *, int, MPI_Datatype, int, int, MPI_Comm, MPI_Request *)
static void get_dist_dir_global_interval_size(Xt_idxlist src, Xt_idxlist dst, bool *stripify, Xt_int interval_size[2], MPI_Comm intra_comm, MPI_Comm inter_comm, int comm_size, int remote_size, int comm_rank, int tag_offset_inter)
static size_t send_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, enum xt_xmdd_direction target, MPI_Comm comm, int comm_size, int(*restrict send_size_target)[SEND_SIZE_ASIZE])
static size_t pack_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, enum xt_xmdd_direction target, bool isect_idxlist_delete, MPI_Comm comm, int comm_size)
static void tx_intersections(size_t num_msg, const int(*sizes)[SEND_SIZE_ASIZE], unsigned char *buffer, MPI_Request *requests, int tag, MPI_Comm comm, tx_fp tx_op)
static void create_intersections(struct Xt_xmdd_txstat tx_stat[2], int recv_size[][SEND_SIZE_ASIZE], void **send_buffer, int send_size[][SEND_SIZE_ASIZE], Xt_idxlist idxlist, Xt_int interval_size, MPI_Comm comm, int comm_size)
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm_, MPI_Comm intra_comm_)
static struct bucket_params get_bucket_params(Xt_idxlist idxlist, Xt_int global_interval, int comm_size)
static size_t compute_and_pack_bucket_intersections(struct bucket_params *bucket_params, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
static void unpack_dist_dir_results(struct Xt_xmdd_txstat tx_stat, struct dist_dir **dist_dir, void *restrict recv_buffer, int *restrict entry_counts, MPI_Comm comm)
static void generate_distributed_directories(struct dist_dir **src_dist_dir, struct dist_dir **dst_dist_dir, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, int remote_size, int comm_size, int comm_rank)
static Xt_int get_min_idxlist_index(Xt_idxlist l)
static void compress_sizes(int(*restrict sizes)[SEND_SIZE_ASIZE], int comm_size, struct Xt_xmdd_txstat *tx_stat, int *counts)
static Xt_int get_max_idxlist_index(Xt_idxlist l)
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)