Yet Another eXchange Tool  0.9.0
xt_xmap_dist_dir_common.c
Go to the documentation of this file.
1 
15 /*
16  * Keywords:
17  * Maintainer: Jörg Behrens <behrens@dkrz.de>
18  * Moritz Hanke <hanke@dkrz.de>
19  * Thomas Jahns <jahns@dkrz.de>
20  * URL: https://doc.redmine.dkrz.de/yaxt/html/
21  *
22  * Redistribution and use in source and binary forms, with or without
23  * modification, are permitted provided that the following conditions are
24  * met:
25  *
26  * Redistributions of source code must retain the above copyright notice,
27  * this list of conditions and the following disclaimer.
28  *
29  * Redistributions in binary form must reproduce the above copyright
30  * notice, this list of conditions and the following disclaimer in the
31  * documentation and/or other materials provided with the distribution.
32  *
33  * Neither the name of the DKRZ GmbH nor the names of its contributors
34  * may be used to endorse or promote products derived from this software
35  * without specific prior written permission.
36  *
37  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
38  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
39  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
40  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
41  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
45  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
46  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
47  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48  */
49 #ifdef HAVE_CONFIG_H
50 #include <config.h>
51 #endif
52 
53 #include <string.h>
54 #include <assert.h>
55 
56 #include "core/ppm_xfuncs.h"
57 #include "xt/xt_idxstripes.h"
58 #include "xt/xt_mpi.h"
59 #include "xt/xt_xmap_dist_dir.h"
61 #include "xt_arithmetic_util.h"
63 #include "ensure_array_size.h"
64 #include "xt_idxstripes_internal.h"
65 
67  size_t num_entries
68  = dist_dir->num_entries > 0
69  ? (size_t)dist_dir->num_entries : (size_t)0;
70  struct Xt_com_list *entries = dist_dir->entries;
71  for (size_t i = 0; i < num_entries; ++i)
72  xt_idxlist_delete(entries[i].list);
73  free(dist_dir);
74 }
75 
117  struct Xt_stripe **stripes_,
118  size_t *stripes_array_size,
119  int dist_dir_rank)
120 {
121  Xt_int global_interval = bucket_params->global_interval;
122  Xt_int local_interval = bucket_params->local_interval;
123  Xt_int local_index_range_lbound = bucket_params->local_index_range_lbound;
124  Xt_int local_index_range_ubound = bucket_params->local_index_range_ubound;
125  int num_stripes = 0;
126 
127  /* find first index in bucket of dist_dir_rank
128  <= local_index_range_lbound */
129  Xt_int start = (Xt_int)(0 + dist_dir_rank * local_interval);
130  {
131  long long start_correction
132  = (long long)local_index_range_lbound - (long long)start;
133  Xt_int corr_steps
134  = (Xt_int)((start_correction
135  - (llsign_mask(start_correction)
136  & (long long)(global_interval - 1)))
137  / (long long)global_interval);
138  start = (Xt_int)(start + corr_steps * global_interval);
139  }
140  /* next find last stripe in bucket of dist_dir_rank
141  * <= local_index_range_ubound */
142  Xt_int end
143  = (Xt_int)(start
144  + (((long long)local_index_range_ubound - (long long)start)
145  / global_interval) * global_interval);
146  Xt_int use_start_stripe
147  = (Xt_int)(start + local_interval > local_index_range_lbound);
148  num_stripes = (int)(((long long)end - (long long)start)/global_interval)
149  + (int)use_start_stripe;
150  start = (Xt_int)(start
151  + ((Xt_int)((Xt_uint)use_start_stripe - (Xt_uint)1)
152  & global_interval));
153  if (!num_stripes)
154  return NULL;
155 
156  struct Xt_stripe *restrict stripes = *stripes_;
157  ENSURE_ARRAY_SIZE(stripes, *stripes_array_size, (size_t)num_stripes);
158 
159  for (int j = 0; j < num_stripes; ++j) {
160 
161  stripes[j].start = (Xt_int)(start + j * global_interval);
162  stripes[j].stride = 1;
163  stripes[j].nstrides = (int)local_interval;
164  }
165 
166  *stripes_ = stripes;
167  return xt_idxstripes_prealloc_new(stripes, num_stripes);
168 }
169 
170 struct Xt_xmdd_txstat
172  void *restrict send_buffer,
173  size_t send_size_asize, size_t send_size_entry,
174  int tag, MPI_Comm comm, int rank_lim,
175  MPI_Request *restrict requests,
176  const int (*send_size)[send_size_asize])
177 {
178  size_t offset = 0;
179  size_t reqOfs = 0;
180 
181  // pack the intersections into the send buffer
182  for (size_t rank = 0; rank < (size_t)rank_lim; ++rank)
183  if (send_size[rank][send_size_entry] > 0) {
184  xt_mpi_call(MPI_Isend((char *)send_buffer + offset,
185  send_size[rank][send_size_entry],
186  MPI_PACKED, (int)rank, tag,
187  comm, requests + reqOfs),
188  comm);
189  ++reqOfs;
190  offset += (size_t)send_size[rank][send_size_entry];
191  }
192  return (struct Xt_xmdd_txstat){ .bytes = offset, .num_msg = reqOfs };
193 }
194 
195 size_t
196 xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir,
197  const struct dist_dir *dst_dist_dir,
198  struct isect **src_dst_intersections)
199 {
200  struct isect *src_dst_intersections_ = *src_dst_intersections
201  = xmalloc((size_t)src_dist_dir->num_entries
202  * (size_t)dst_dist_dir->num_entries
203  * sizeof(**src_dst_intersections));
204  size_t isect_fill = 0;
205  const struct Xt_com_list *restrict entries_src = src_dist_dir->entries,
206  *restrict entries_dst = dst_dist_dir->entries;
207  size_t num_entries_src = (size_t)src_dist_dir->num_entries,
208  num_entries_dst = (size_t)dst_dist_dir->num_entries;
209  for (size_t i = 0; i < num_entries_src; ++i)
210  for (size_t j = 0; j < num_entries_dst; ++j)
211  {
212  Xt_idxlist intersection
213  = xt_idxlist_get_intersection(entries_src[i].list, entries_dst[j].list);
214  if (xt_idxlist_get_num_indices(intersection) > 0) {
215  src_dst_intersections_[isect_fill]
216  = (struct isect){
217  .rank = { [xt_xmdd_direction_src]=entries_src[i].rank,
218  [xt_xmdd_direction_dst]=entries_dst[j].rank},
219  .idxlist = intersection };
220  ++isect_fill;
221  } else
222  xt_idxlist_delete(intersection);
223  }
224  *src_dst_intersections
225  = xrealloc(src_dst_intersections_,
226  isect_fill * sizeof (*src_dst_intersections_));
227  return isect_fill;
228 }
229 
230 
231 size_t
233  enum xt_xmdd_direction target,
234  size_t num_intersections,
235  const struct isect *restrict src_dst_intersections,
236  bool isect_idxlist_delete,
237  size_t send_size_asize, size_t send_size_idx,
238  int (*send_size)[send_size_asize],
239  unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
240 {
241  int prev_send_rank = -1;
242  size_t num_send_indices_requests = 0;
243  size_t origin = 1 ^ target, ofs_ = *ofs;
244  int position = 0;
245  for (size_t i = 0; i < num_intersections; ++i)
246  {
247  /* see if this generates a new request? */
248  int send_rank = src_dst_intersections[i].rank[target];
249  num_send_indices_requests += send_rank != prev_send_rank;
250 
251  // pack rank
252  XT_MPI_SEND_BUF_CONST int *prank
253  = CAST_MPI_SEND_BUF(src_dst_intersections[i].rank + origin);
254  if (send_rank != prev_send_rank && prev_send_rank != -1) {
255  send_size[prev_send_rank][send_size_idx] = position;
256  ofs_ += (size_t)position;
257  position = 0;
258  }
259  prev_send_rank = send_rank;
260  xt_mpi_call(MPI_Pack(prank, 1, MPI_INT, buffer+ofs_, (int)(buf_size-ofs_),
261  &position, comm), comm);
262  // pack intersection
263  xt_idxlist_pack(src_dst_intersections[i].idxlist, buffer+ofs_,
264  (int)(buf_size-ofs_), &position, comm);
265 
266  if (isect_idxlist_delete)
267  xt_idxlist_delete(src_dst_intersections[i].idxlist);
268  }
269  if (prev_send_rank != -1)
270  send_size[prev_send_rank][send_size_idx] = position;
271 
272  *ofs = ofs_ + (size_t)position;
273  return num_send_indices_requests;
274 }
275 
276 
277 static int
278 stripe_cmp(const void *a, const void *b)
279 {
280  typedef const struct Xt_stripe *csx;
281  return (((csx)a)->start > ((csx)b)->start)
282  - (((csx)b)->start > ((csx)a)->start);
283 }
284 
285 /*
286  * @param dist_dir_results contains the intersections of this ranks
287  * dst or src idxlist with other ranks in bucket-sized chunks, the
288  * chunks belonging to the same communication partner are merged
289  * in-place here, i.e. on return (*dist_dir_results)->num_entries is
290  * less than or equal to the previous count and *dist_dir_results
291  * might point somewhere else
292  */
293 void
294 xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results) {
295 
296  struct Xt_com_list *restrict entries = (*dist_dir_results)->entries;
297  size_t num_isect_agg = 0;
298 
299  size_t i = 0, num_shards = (size_t)(*dist_dir_results)->num_entries;
300  while (i < num_shards) {
301  int rank = entries[i].rank;
302  size_t j = i;
303  /* find all entries matching the currently considered rank */
304  do
305  ++j;
306  while (j < num_shards && entries[j].rank == rank);
307 
308  struct Xt_stripe *restrict stripes = NULL;
309  size_t num_stripes = 0;
310  for (; i < j; ++i) {
311  struct Xt_stripe *stripes_of_intersection;
312  int num_stripes_of_intersection;
313  xt_idxlist_get_index_stripes(entries[i].list,
314  &stripes_of_intersection,
315  &num_stripes_of_intersection);
316  xt_idxlist_delete(entries[i].list);
317  if (stripes) {
318  stripes = xrealloc(stripes,
319  (num_stripes + (size_t)num_stripes_of_intersection)
320  * sizeof (*stripes));
321  memcpy(stripes + num_stripes, stripes_of_intersection,
322  (size_t)num_stripes_of_intersection * sizeof (*stripes));
323  free(stripes_of_intersection);
324  } else
325  stripes = stripes_of_intersection;
326  num_stripes += (size_t)num_stripes_of_intersection;
327  }
328  qsort(stripes, num_stripes, sizeof (*stripes), stripe_cmp);
329  entries[num_isect_agg].list = xt_idxstripes_new(stripes, (int)num_stripes);
330  free(stripes);
331  entries[num_isect_agg].rank = rank;
332  ++num_isect_agg;
333  }
334  (*dist_dir_results)->num_entries = (int)num_isect_agg;
335  *dist_dir_results = xrealloc(*dist_dir_results, sizeof (struct dist_dir)
336  + (size_t)num_isect_agg
337  * sizeof(struct Xt_com_list));
338 }
339 
340 
341 int
342 xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
343 {
344  const struct isect *a = a_, *b = b_;
345  /* this is safe vs. overflow because ranks are in [0..MAX_INT) */
346  return a->rank[xt_xmdd_direction_src] - b->rank[xt_xmdd_direction_src];
347 }
348 
349 int
350 xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
351 {
352  const struct isect *a = a_, *b = b_;
353  /* this is safe vs. overflow because ranks are in [0..MAX_INT) */
354  return a->rank[xt_xmdd_direction_dst] - b->rank[xt_xmdd_direction_dst];
355 }
356 
357 int
358 xt_com_list_rank_cmp(const void *a_, const void *b_)
359 {
360  const struct Xt_com_list *a = a_, *b = b_;
361  /* this is overflow-safe because rank's are non-negative ints */
362  return a->rank - b->rank;
363 }
364 
365 
367  MPI_Comm comm)
368 {
369  // ensure that yaxt is initialized
370  assert(xt_initialized());
371 
372  int is_inter;
373  xt_mpi_call(MPI_Comm_test_inter(comm, &is_inter), comm);
374  Xt_xmap xmap;
375  if (!is_inter)
376  xmap = xt_xmap_dist_dir_intracomm_new(src_idxlist, dst_idxlist, comm);
377  else {
378  MPI_Comm merge_comm, local_intra_comm;
379  MPI_Group local_group;
380  xt_mpi_call(MPI_Comm_group(comm, &local_group), comm);
381  xt_mpi_call(MPI_Intercomm_merge(comm, 0, &merge_comm), comm);
382  xt_mpi_call(MPI_Comm_create(merge_comm, local_group, &local_intra_comm),
383  comm);
384  xt_mpi_call(MPI_Group_free(&local_group), comm);
385  xt_mpi_call(MPI_Comm_free(&merge_comm), comm);
386  xt_mpi_comm_mark_exclusive(local_intra_comm);
387 
388  xmap = xt_xmap_dist_dir_intercomm_new(src_idxlist, dst_idxlist,
389  comm, local_intra_comm);
390 
391  xt_mpi_call(MPI_Comm_free(&local_intra_comm), local_intra_comm);
392  }
393  return xmap;
394 }
395 
396 
397 /*
398  * Local Variables:
399  * c-basic-offset: 2
400  * coding: utf-8
401  * indent-tabs-mode: nil
402  * show-trailing-whitespace: t
403  * require-trailing-newline: t
404  * End:
405  */
int MPI_Comm
Definition: core.h:64
#define ENSURE_ARRAY_SIZE(arrayp, curr_array_size, req_size)
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Definition: ppm_xfuncs.h:71
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
Xt_int start
Definition: xt_stripe.h:55
struct Xt_com_list entries[]
static long long llsign_mask(long long x)
int xt_initialized(void)
Definition: xt_init.c:107
XT_INT Xt_int
Definition: xt_core.h:68
unsigned XT_INT Xt_uint
Definition: xt_core.h:70
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
Definition: xt_idxlist.c:98
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition: xt_idxlist.c:85
void xt_idxlist_get_index_stripes(Xt_idxlist idxlist, struct Xt_stripe **stripes, int *num_stripes)
Definition: xt_idxlist.c:118
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition: xt_idxlist.c:74
Xt_idxlist xt_idxstripes_prealloc_new(const struct Xt_stripe *stripes, int num_stripes)
Xt_idxlist xt_idxstripes_new(struct Xt_stripe const *stripes, int num_stripes)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
void xt_mpi_comm_mark_exclusive(MPI_Comm comm)
Definition: xt_mpi.c:881
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int(*send_size)[send_size_asize])
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
Xt_xmap xt_xmap_dist_dir_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
Xt_idxlist xt_xmap_dist_dir_get_bucket(const struct bucket_params *bucket_params, struct Xt_stripe **stripes_, size_t *stripes_array_size, int dist_dir_rank)
generates the buckets of the distributed directory
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
static int stripe_cmp(const void *a, const void *b)
Uitlity functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm)