71 for (
size_t i = 0; i < num_entries; ++i)
118 size_t *stripes_array_size,
129 Xt_int start = (
Xt_int)(0 + dist_dir_rank * local_interval);
131 long long start_correction
132 = (
long long)local_index_range_lbound - (
long long)start;
134 = (
Xt_int)((start_correction
136 & (
long long)(global_interval - 1)))
137 / (
long long)global_interval);
138 start = (
Xt_int)(start + corr_steps * global_interval);
144 + (((
long long)local_index_range_ubound - (
long long)start)
145 / global_interval) * global_interval);
147 = (
Xt_int)(start + local_interval > local_index_range_lbound);
148 num_stripes = (int)(((
long long)end - (
long long)start)/global_interval)
149 + (
int)use_start_stripe;
156 struct Xt_stripe *restrict stripes = *stripes_;
159 for (
int j = 0; j < num_stripes; ++j) {
161 stripes[j].start = (
Xt_int)(
start + j * global_interval);
162 stripes[j].stride = 1;
163 stripes[j].nstrides = (int)local_interval;
172 void *restrict send_buffer,
173 size_t send_size_asize,
size_t send_size_entry,
174 int tag,
MPI_Comm comm,
int rank_lim,
175 MPI_Request *restrict requests,
176 const int (*send_size)[send_size_asize])
182 for (
size_t rank = 0; rank < (size_t)rank_lim; ++rank)
183 if (send_size[rank][send_size_entry] > 0) {
184 xt_mpi_call(MPI_Isend((
char *)send_buffer + offset,
185 send_size[rank][send_size_entry],
186 MPI_PACKED, (
int)rank, tag,
187 comm, requests + reqOfs),
190 offset += (size_t)send_size[rank][send_size_entry];
192 return (
struct Xt_xmdd_txstat){ .bytes = offset, .num_msg = reqOfs };
197 const struct dist_dir *dst_dist_dir,
198 struct isect **src_dst_intersections)
200 struct isect *src_dst_intersections_ = *src_dst_intersections
203 *
sizeof(**src_dst_intersections));
204 size_t isect_fill = 0;
206 *restrict entries_dst = dst_dist_dir->
entries;
207 size_t num_entries_src = (size_t)src_dist_dir->
num_entries,
208 num_entries_dst = (
size_t)dst_dist_dir->
num_entries;
209 for (
size_t i = 0; i < num_entries_src; ++i)
210 for (
size_t j = 0; j < num_entries_dst; ++j)
215 src_dst_intersections_[isect_fill]
219 .idxlist = intersection };
224 *src_dst_intersections
226 isect_fill *
sizeof (*src_dst_intersections_));
234 size_t num_intersections,
235 const struct isect *restrict src_dst_intersections,
236 bool isect_idxlist_delete,
237 size_t send_size_asize,
size_t send_size_idx,
238 int (*send_size)[send_size_asize],
239 unsigned char *buffer,
size_t buf_size,
size_t *ofs,
MPI_Comm comm)
241 int prev_send_rank = -1;
242 size_t num_send_indices_requests = 0;
243 size_t origin = 1 ^ target, ofs_ = *ofs;
245 for (
size_t i = 0; i < num_intersections; ++i)
248 int send_rank = src_dst_intersections[i].rank[target];
249 num_send_indices_requests += send_rank != prev_send_rank;
252 XT_MPI_SEND_BUF_CONST
int *prank
253 = CAST_MPI_SEND_BUF(src_dst_intersections[i].rank + origin);
254 if (send_rank != prev_send_rank && prev_send_rank != -1) {
255 send_size[prev_send_rank][send_size_idx] = position;
256 ofs_ += (size_t)position;
259 prev_send_rank = send_rank;
260 xt_mpi_call(MPI_Pack(prank, 1, MPI_INT, buffer+ofs_, (
int)(buf_size-ofs_),
261 &position, comm), comm);
264 (
int)(buf_size-ofs_), &position, comm);
266 if (isect_idxlist_delete)
269 if (prev_send_rank != -1)
270 send_size[prev_send_rank][send_size_idx] = position;
272 *ofs = ofs_ + (size_t)position;
273 return num_send_indices_requests;
282 - (((csx)b)->start > ((csx)a)->start);
296 struct Xt_com_list *restrict entries = (*dist_dir_results)->entries;
297 size_t num_isect_agg = 0;
299 size_t i = 0, num_shards = (size_t)(*dist_dir_results)->num_entries;
300 while (i < num_shards) {
301 int rank = entries[i].rank;
306 while (j < num_shards && entries[j].
rank ==
rank);
308 struct Xt_stripe *restrict stripes = NULL;
309 size_t num_stripes = 0;
311 struct Xt_stripe *stripes_of_intersection;
312 int num_stripes_of_intersection;
314 &stripes_of_intersection,
315 &num_stripes_of_intersection);
319 (num_stripes + (
size_t)num_stripes_of_intersection)
320 *
sizeof (*stripes));
321 memcpy(stripes + num_stripes, stripes_of_intersection,
322 (
size_t)num_stripes_of_intersection *
sizeof (*stripes));
323 free(stripes_of_intersection);
325 stripes = stripes_of_intersection;
326 num_stripes += (size_t)num_stripes_of_intersection;
328 qsort(stripes, num_stripes,
sizeof (*stripes),
stripe_cmp);
331 entries[num_isect_agg].rank = rank;
334 (*dist_dir_results)->num_entries = (int)num_isect_agg;
336 + (
size_t)num_isect_agg
344 const struct isect *a = a_, *b = b_;
352 const struct isect *a = a_, *b = b_;
362 return a->
rank - b->rank;
373 xt_mpi_call(MPI_Comm_test_inter(comm, &is_inter), comm);
378 MPI_Comm merge_comm, local_intra_comm;
379 MPI_Group local_group;
380 xt_mpi_call(MPI_Comm_group(comm, &local_group), comm);
381 xt_mpi_call(MPI_Intercomm_merge(comm, 0, &merge_comm), comm);
382 xt_mpi_call(MPI_Comm_create(merge_comm, local_group, &local_intra_comm),
389 comm, local_intra_comm);
391 xt_mpi_call(MPI_Comm_free(&local_intra_comm), local_intra_comm);
#define ENSURE_ARRAY_SIZE(arrayp, curr_array_size, req_size)
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Xt_int local_index_range_lbound
Xt_int local_index_range_ubound
struct Xt_com_list entries[]
static long long llsign_mask(long long x)
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
void xt_idxlist_get_index_stripes(Xt_idxlist idxlist, struct Xt_stripe **stripes, int *num_stripes)
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
void xt_idxlist_delete(Xt_idxlist idxlist)
Xt_idxlist xt_idxstripes_prealloc_new(const struct Xt_stripe *stripes, int num_stripes)
Xt_idxlist xt_idxstripes_new(struct Xt_stripe const *stripes, int num_stripes)
#define xt_mpi_call(call, comm)
void xt_mpi_comm_mark_exclusive(MPI_Comm comm)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int(*send_size)[send_size_asize])
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
Xt_xmap xt_xmap_dist_dir_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
Xt_idxlist xt_xmap_dist_dir_get_bucket(const struct bucket_params *bucket_params, struct Xt_stripe **stripes_, size_t *stripes_array_size, int dist_dir_rank)
generates the buckets of the distributed directory
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
static int stripe_cmp(const void *a, const void *b)
Uitlity functions for creation of distributed directories.
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm)