Yet Another eXchange Tool  0.9.0
xt_xmap_dist_dir.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include <stdbool.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <string.h>
54 #include <assert.h>
55 #include <limits.h>
56 
57 #include <mpi.h>
58 
59 #include "xt/xt_idxlist.h"
61 #include "xt/xt_idxvec.h"
62 #include "xt/xt_idxstripes.h"
63 #include "xt/xt_idxempty.h"
64 #include "xt/xt_xmap.h"
65 #include "xt/xt_xmap_dist_dir.h"
66 #include "xt/xt_mpi.h"
67 #include "xt_mpi_internal.h"
68 #include "core/core.h"
69 #include "core/ppm_xfuncs.h"
71 #include "xt_idxlist_internal.h"
72 #include "instr.h"
74 
75 static inline Xt_int
77  bool *stripify, MPI_Comm comm, int comm_size)
78 {
79  unsigned long long local_vals[2], global_sums[2];
80 
81  unsigned num_indices_src = (unsigned)xt_idxlist_get_num_indices(src);
82  local_vals[0] = num_indices_src;
83  local_vals[1] = (num_indices_src >= CHEAP_VECTOR_SIZE)
85 
86  xt_mpi_call(MPI_Allreduce(local_vals, global_sums, 2,
87  MPI_UNSIGNED_LONG_LONG, MPI_SUM, comm), comm);
88 
89  *stripify = global_sums[1] > 0;
90  return (Xt_int)(MAX(((global_sums[0] + (unsigned)comm_size - 1)
91  / (unsigned)comm_size), 1) * (unsigned)comm_size);
92 }
93 
95 
96  int num_a = xt_idxlist_get_num_indices(a),
97  num_b = xt_idxlist_get_num_indices(b);
98  Xt_int min_index_a = num_a ? xt_idxlist_get_min_index(a) : XT_INT_MAX,
99  min_index_b = num_b ? xt_idxlist_get_min_index(b) : XT_INT_MAX,
100  min_index = (Xt_int)MIN(min_index_a, min_index_b);
101  return min_index;
102 }
103 
105 
106  int num_a = xt_idxlist_get_num_indices(a),
107  num_b = xt_idxlist_get_num_indices(b);
108  Xt_int max_index_a = num_a ? xt_idxlist_get_max_index(a) : XT_INT_MIN,
109  max_index_b = num_b ? xt_idxlist_get_max_index(b) : XT_INT_MIN,
110  max_index = MAX(max_index_a, max_index_b);
111  return max_index;
112 }
113 
114 static struct bucket_params
115 get_bucket_params(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
116  bool *stripify, MPI_Comm comm, int comm_size)
117 {
118  /* global_interval is a multiple of comm_size and has a size of at least
119  comm_size */
121  = get_dist_dir_global_interval_size(src_idxlist, dst_idxlist, stripify,
122  comm, comm_size);
123  Xt_int local_interval = (Xt_int)(global_interval / comm_size);
125  = get_min_idxlist_index(src_idxlist, dst_idxlist);
127  = get_max_idxlist_index(src_idxlist, dst_idxlist);
128  return (struct bucket_params){
129  .global_interval = global_interval,
130  .local_interval = local_interval,
131  .local_index_range_lbound = local_index_range_lbound,
132  .local_index_range_ubound = local_index_range_ubound,
133  };
134 }
135 
136 enum {
142 };
143 
144 static inline void
145 rank_no_send(size_t rank, int (*restrict send_size)[SEND_SIZE_ASIZE])
146 {
147  send_size[rank][SEND_SIZE_SRC] = 0;
148  send_size[rank][SEND_NUM_SRC] = 0;
149  send_size[rank][SEND_SIZE_DST] = 0;
150  send_size[rank][SEND_NUM_DST] = 0;
151 }
152 
153 static int
155  Xt_idxlist src_idxlist,
156  Xt_idxlist dst_idxlist,
157  int (*send_size)[SEND_SIZE_ASIZE],
158  void **send_buffer_,
159  MPI_Comm comm, int comm_size) {
160 
161 
162  int num_msg = 0;
166  size_t send_buffer_size = 0;
167  struct Xt_stripe *stripes = NULL;
168  size_t stripes_array_size = 0;
169  Xt_int global_interval = bucket_params->global_interval;
170  Xt_int local_interval = bucket_params->local_interval;
171  size_t first_overlapping_bucket = 0;
172  /* is it impossible for early buckets to overlap our lists? */
173  if (local_index_range_lbound >= 0
174  && (local_index_range_ubound < global_interval)) {
175  first_overlapping_bucket
176  = (size_t)(local_index_range_lbound / local_interval);
177  for (size_t i = 0; i < first_overlapping_bucket; ++i)
178  rank_no_send(i, send_size);
179  }
180  /* is it impossible for later ranks to overlap our lists? */
181  size_t start_of_non_overlapping_bucket_suffix
182  = (size_t)(((long long)local_index_range_ubound + local_interval - 1)
183  / local_interval) + 1;
184  if (local_index_range_lbound < 0
185  || start_of_non_overlapping_bucket_suffix > (size_t)comm_size)
186  start_of_non_overlapping_bucket_suffix = (size_t)comm_size;
187  size_t max_num_intersect
188  = start_of_non_overlapping_bucket_suffix - first_overlapping_bucket;
189  struct {
190  Xt_idxlist list;
191  int rank;
192  } *restrict sends_dst
193  = xmalloc(2 * max_num_intersect * sizeof(*sends_dst)),
194  *restrict sends_src = sends_dst + max_num_intersect;
195  size_t i = first_overlapping_bucket;
196  size_t num_src_msg = 0, num_dst_msg = 0;
197  for (; i < (size_t)start_of_non_overlapping_bucket_suffix; ++i) {
198  Xt_idxlist bucket
200  &stripes, &stripes_array_size, (int)i);
201  if (bucket) {
202  Xt_idxlist send4src = xt_idxlist_get_intersection(src_idxlist, bucket);
203  if (xt_idxlist_get_num_indices(send4src) > 0) {
204  sends_src[num_src_msg].list = send4src;
205  sends_src[num_src_msg].rank = (int)i;
206  send_buffer_size += xt_idxlist_get_pack_size(send4src, comm);
207  send_size[i][SEND_NUM_SRC] = 1;
208  ++num_src_msg;
209  } else {
210  send_size[i][SEND_SIZE_SRC] = 0;
211  send_size[i][SEND_NUM_SRC] = 0;
212  xt_idxlist_delete(send4src);
213  }
214 
215  Xt_idxlist send4dst = xt_idxlist_get_intersection(bucket, dst_idxlist);
216  xt_idxlist_delete(bucket);
217  if (xt_idxlist_get_num_indices(send4dst) > 0) {
218  sends_dst[num_dst_msg].list = send4dst;
219  sends_dst[num_dst_msg].rank = (int)i;
220  send_buffer_size += xt_idxlist_get_pack_size(send4dst, comm);
221  send_size[i][SEND_NUM_DST] = 1;
222  ++num_dst_msg;
223  } else {
224  send_size[i][SEND_SIZE_DST] = 0;
225  send_size[i][SEND_NUM_DST] = 0;
226  xt_idxlist_delete(send4dst);
227  }
228  } else
229  rank_no_send(i, send_size);
230  }
231  for (; i < (size_t)comm_size; ++i)
232  rank_no_send(i, send_size);
233 
234  unsigned char *send_buffer
235  = *send_buffer_ = xrealloc(stripes, send_buffer_size);
236  size_t ofs = 0;
237  for (i = 0; i < num_src_msg; ++i) {
238  int position = 0;
239  xt_idxlist_pack(sends_src[i].list, send_buffer+ofs,
240  (int)(send_buffer_size-ofs), &position, comm);
241  send_size[sends_src[i].rank][SEND_SIZE_SRC] = position;
242  ofs += (size_t)position;
243  xt_idxlist_delete(sends_src[i].list);
244  }
245 
246  for (i = 0; i < num_dst_msg; ++i) {
247  int position = 0;
248  xt_idxlist_pack(sends_dst[i].list, send_buffer+ofs,
249  (int)(send_buffer_size-ofs), &position, comm);
250  send_size[sends_dst[i].rank][SEND_SIZE_DST] = position;
251  ofs += (size_t)position;
252  xt_idxlist_delete(sends_dst[i].list);
253  }
254  free(sends_dst);
255  num_msg = (int)(num_src_msg + num_dst_msg);
256  } else
257  memset(send_size, 0, (size_t)comm_size * sizeof (*send_size));
258  return num_msg;
259 }
260 
261 static void
263  int recv_count, void * recv_buffer, int tag,
264  MPI_Comm comm) {
265 
266  // initialize distributed directories
267  int total_recv_size = 0;
268 
269  for (int i = 0; i < recv_count; ++i)
270  {
271  MPI_Status status;
272 
273  xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED, MPI_ANY_SOURCE,
274  tag, comm, &status), comm);
275 
276  int received_count;
277  xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
278 
279  int position = 0;
280 
281  dist_dir->entries[i].rank = status.MPI_SOURCE;
282  dist_dir->entries[i].list =
283  xt_idxlist_unpack(recv_buffer, received_count, &position, comm);
284 
285  total_recv_size += received_count;
286  }
287 
288  if (total_recv_size != recv_size)
289  Xt_abort(comm, "ERROR: recv_intersections received wrong number of bytes",
290  __FILE__, __LINE__);
291  dist_dir->num_entries = recv_count;
292 }
293 
294 static void send_intersections(void *send_buffer,
295  const int (*send_size)[SEND_SIZE_ASIZE],
296  MPI_Request *dir_init_send_requests,
297  int tag_offset, MPI_Comm comm, int comm_size) {
298  int src_tag = tag_offset + xt_mpi_tag_xmap_dist_dir_src_send;
299  struct Xt_xmdd_txstat txstat
302  src_tag, comm, comm_size,
303  dir_init_send_requests,
304  send_size);
305  int dst_tag = tag_offset + xt_mpi_tag_xmap_dist_dir_dst_send;
306  xt_xmap_dist_dir_send_intersections((unsigned char *)send_buffer
307  + txstat.bytes,
309  dst_tag, comm, comm_size,
310  dir_init_send_requests + txstat.num_msg,
311  send_size);
312 }
313 
314 static void
316  struct dist_dir **src_dist_dir,
317  struct dist_dir **dst_dist_dir,
318  int tag_offset, MPI_Comm comm) {
319 
320  *src_dist_dir = xmalloc(sizeof (struct dist_dir)
321  + (sizeof (struct Xt_com_list)
322  * (size_t)recv_size[SEND_NUM_SRC]));
323  *dst_dist_dir = xmalloc(sizeof (struct dist_dir)
324  + (sizeof (struct Xt_com_list)
325  * (size_t)recv_size[SEND_NUM_DST]));
326 
327  void * recv_buffer = xmalloc((size_t)MAX(recv_size[SEND_SIZE_SRC],
328  recv_size[SEND_SIZE_DST]));
329 
330  recv_and_unpack_intersection(*src_dist_dir, recv_size[SEND_SIZE_SRC],
331  recv_size[SEND_NUM_SRC], recv_buffer,
333  comm);
334  recv_and_unpack_intersection(*dst_dist_dir, recv_size[SEND_SIZE_DST],
335  recv_size[SEND_NUM_DST], recv_buffer,
337  comm);
338 
339  free(recv_buffer);
340 }
341 
342 
343 static size_t
344 buf_size_from_intersections(size_t num_intersections,
345  const struct isect *restrict src_dst_intersections,
346  MPI_Comm comm, int comm_size,
347  int (*restrict send_size)[SEND_SIZE_ASIZE])
348 {
349  size_t total_send_size = 0;
350  for (int i = 0; i < comm_size; ++i)
351  (void)(send_size[i][SEND_SIZE_SRC] = 0),
352  (void)(send_size[i][SEND_SIZE_DST] = 0),
353  (void)(send_size[i][SEND_NUM_SRC] = 0),
354  (void)(send_size[i][SEND_NUM_DST] = 0);
355 
356  int rank_pack_size;
357  xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
358 
359  for (size_t i = 0; i < num_intersections; ++i)
360  {
361  int msg_size = rank_pack_size
362  + (int)xt_idxlist_get_pack_size(src_dst_intersections[i].idxlist,
363  comm);
364  size_t src_rank
365  = (size_t)src_dst_intersections[i].rank[xt_xmdd_direction_src],
366  dst_rank = (size_t)src_dst_intersections[i].rank[xt_xmdd_direction_dst];
367  /* send_size[i][SEND_SIZE_(SRC|DST)] are set when actually
368  * packing, because that provides a potentially tighter bound,
369  * see xt_xmap_dist_dir_pack_intersections */
370  ++(send_size[src_rank][SEND_NUM_SRC]);
371  ++(send_size[dst_rank][SEND_NUM_DST]);
372  total_send_size += 2*(size_t)msg_size;
373  }
374  assert(total_send_size <= INT_MAX);
375  return total_send_size;
376 }
377 
378 
379 static int
380 pack_src_dst_dist_dirs(size_t num_intersections,
381  struct isect *restrict src_dst_intersections,
382  int (*send_size)[SEND_SIZE_ASIZE],
383  void **send_buffer_,
384  MPI_Comm comm, int comm_size) {
385 
386  size_t total_send_size
387  = buf_size_from_intersections(num_intersections,
388  src_dst_intersections,
389  comm, comm_size, send_size);
390 
391  unsigned char *send_buffer = (*send_buffer_)
392  = xmalloc((size_t)total_send_size);
393  size_t ofs = 0;
394  if (num_intersections > 1)
395  qsort(src_dst_intersections, num_intersections,
396  sizeof (src_dst_intersections[0]), xt_xmdd_cmp_isect_src_rank);
397  size_t num_send_indices_requests
399  xt_xmdd_direction_src, num_intersections, src_dst_intersections, false,
400  SEND_SIZE_ASIZE, SEND_SIZE_SRC, send_size,
401  send_buffer, total_send_size, &ofs, comm);
402 
403  if (num_intersections > 1)
404  qsort(src_dst_intersections, num_intersections,
405  sizeof (src_dst_intersections[0]), xt_xmdd_cmp_isect_dst_rank);
406  num_send_indices_requests
408  xt_xmdd_direction_dst, num_intersections, src_dst_intersections, true,
409  SEND_SIZE_ASIZE, SEND_SIZE_DST, send_size,
410  send_buffer, total_send_size, &ofs, comm);
411  assert(num_send_indices_requests <= INT_MAX);
412  return (int)num_send_indices_requests;
413 }
414 
425 static void
427  int recv_size[num_sizes],
428  int (*send_size)[num_sizes],
429  MPI_Comm comm) {
430 
431 #if MPI_VERSION > 2 || ( MPI_VERSION == 2 && MPI_SUBVERSION >= 2)
432  xt_mpi_call(MPI_Reduce_scatter_block((int *)send_size, (int *)recv_size,
433  num_sizes, MPI_INT, MPI_SUM,
434  comm), comm);
435 #else
436  int comm_size;
437  xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
438 
439  int *recv_count = xmalloc((size_t)comm_size * sizeof(*recv_count));
440  for (int i = 0; i < comm_size; ++i) recv_count[i] = num_sizes;
441 
442  xt_mpi_call(MPI_Reduce_scatter(send_size, recv_size, recv_count, MPI_INT,
443  MPI_SUM, comm), comm);
444 
445  free(recv_count);
446 #endif
447 }
448 
449 static void generate_distributed_directories(struct dist_dir **src_dist_dir,
450  struct dist_dir **dst_dist_dir,
451  bool *stripify,
452  Xt_idxlist src_idxlist,
453  Xt_idxlist dst_idxlist,
454  int tag_offset,
455  MPI_Comm comm, int comm_size) {
456 
458  = get_bucket_params(src_idxlist, dst_idxlist, stripify, comm, comm_size);
459 
460  void *send_buffer = NULL;
461 
462  int (*send_size)[SEND_SIZE_ASIZE]
463  = xmalloc((size_t)comm_size * sizeof(*send_size));
464 
465  int num_msg
467  src_idxlist, dst_idxlist,
468  send_size, &send_buffer,
469  comm, comm_size);
470 
471  int recv_size[SEND_SIZE_ASIZE]; // for src and dst
472 
473  /* get packed intersection sizes to be sent from other ranks */
474  xt_xmap_dist_dir_reduce_scatter_sizes(SEND_SIZE_ASIZE, recv_size, send_size, comm);
475 
476  MPI_Request *dir_init_send_requests
477  = xmalloc((size_t)num_msg * sizeof(*dir_init_send_requests));
478  send_intersections(send_buffer, (const int (*)[SEND_SIZE_ASIZE])send_size,
479  dir_init_send_requests, tag_offset, comm, comm_size);
480 
481  free(send_size);
482 
483  recv_and_unpack_intersections(recv_size, src_dist_dir, dst_dist_dir,
484  tag_offset, comm);
485 
486  // wait for the sends to be completed
487  xt_mpi_call(MPI_Waitall(num_msg, dir_init_send_requests,
488  MPI_STATUSES_IGNORE), comm);
489  free(dir_init_send_requests);
490  free(send_buffer);
491 }
492 
493 static void
495  void *restrict recv_buffer, int tag,
496  MPI_Comm comm)
497 {
498 
499  // initiate distributed directories
500  int num_entries = 0;
501  while (recv_size > 0) {
502 
503  MPI_Status status;
504 
505  xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED,
506  MPI_ANY_SOURCE, tag, comm, &status), comm);
507 
508  int received_count;
509  xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
510 
511  recv_size -= received_count;
512 
513  int position = 0;
514 
515  while (received_count > position) {
516 
517  xt_mpi_call(MPI_Unpack(recv_buffer, received_count, &position,
518  &dist_dir->entries[num_entries].rank,
519  1, MPI_INT, comm), comm);
520 
521  dist_dir->entries[num_entries].list =
522  xt_idxlist_unpack(recv_buffer, received_count, &position, comm);
523 
524  ++num_entries;
525  }
526  }
527  qsort(dist_dir->entries, (size_t)num_entries, sizeof(*dist_dir->entries),
529 
530  if (0 != recv_size)
531  Xt_abort(comm, "ERROR: recv_and_unpack_dist_dir_result"
532  " received wrong number of bytes", __FILE__, __LINE__);
533 
534  dist_dir->num_entries = num_entries;
535 
536 }
537 
538 
539 static void
541  struct dist_dir **src_intersections,
542  struct dist_dir **dst_intersections,
543  int *num_send_indices_requests,
544  MPI_Request *send_indices_requests,
545  int tag_offset,
546  MPI_Comm comm) {
547 
548  struct dist_dir *src_dist_dir_results
549  = xmalloc(sizeof (struct dist_dir)
550  + (sizeof (struct Xt_com_list)
551  * (size_t)recv_size[SEND_NUM_SRC])),
552  *dst_dist_dir_results
553  = xmalloc(sizeof (struct dist_dir)
554  + (sizeof (struct Xt_com_list)
555  * (size_t)recv_size[SEND_NUM_DST]));
556 
557  void *recv_buffer = xmalloc((size_t)MAX(recv_size[SEND_SIZE_SRC],
558  recv_size[SEND_SIZE_DST]));
559 
560  recv_and_unpack_dist_dir_result(src_dist_dir_results,
561  recv_size[SEND_SIZE_SRC],
562  recv_buffer, tag_offset
564  assert(src_dist_dir_results->num_entries
565  == recv_size[SEND_NUM_SRC]);
566 
567  enum { ops_completed_auto_size = 16 };
568  int ops_completed_auto[ops_completed_auto_size];
569  int *ops_completed
570  = *num_send_indices_requests > ops_completed_auto_size
571  ? xmalloc((size_t)*num_send_indices_requests * sizeof (*ops_completed))
572  : ops_completed_auto;
573  bool all_sends_done
574  = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
575  ops_completed, comm);
576 
577  recv_and_unpack_dist_dir_result(dst_dist_dir_results,
578  recv_size[SEND_SIZE_DST],
579  recv_buffer, tag_offset
581  assert(dst_dist_dir_results->num_entries
582  == recv_size[SEND_NUM_DST]);
583 
584  if (!all_sends_done)
585  all_sends_done
586  = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
587  ops_completed, comm);
588  free(recv_buffer);
589 
590  xt_xmap_dist_dir_same_rank_merge(&src_dist_dir_results);
591  *src_intersections = src_dist_dir_results;
592 
593  if (!all_sends_done)
594  all_sends_done
595  = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
596  ops_completed, comm);
597 
598  xt_xmap_dist_dir_same_rank_merge(&dst_dist_dir_results);
599  *dst_intersections = dst_dist_dir_results;
600  if (ops_completed != ops_completed_auto) free(ops_completed);
601 }
602 
603 static void exchange_idxlists(struct dist_dir **src_intersections,
604  struct dist_dir **dst_intersections,
605  bool *stripify,
606  Xt_idxlist src_idxlist,
607  Xt_idxlist dst_idxlist,
608  int tag_offset,
609  MPI_Comm comm) {
610 
611  int comm_size;
612 
613  xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
614 
615  struct dist_dir *src_dist_dir, *dst_dist_dir;
616 
617  generate_distributed_directories(&src_dist_dir, &dst_dist_dir, stripify,
618  src_idxlist, dst_idxlist,
619  tag_offset, comm, comm_size);
620 
621  void * send_buffer;
622 
623  int recv_size[SEND_SIZE_ASIZE], (*send_size)[SEND_SIZE_ASIZE]
624  = xmalloc((size_t)comm_size * sizeof(*send_size));
625 
626  /* match the source and destination entries in the local distributed
627  * directories... */
628  struct isect *src_dst_intersections;
629  size_t num_intersections
630  = xt_xmap_dist_dir_match_src_dst(src_dist_dir, dst_dist_dir,
631  &src_dst_intersections);
632  xt_xmdd_free_dist_dir(src_dist_dir);
633  xt_xmdd_free_dist_dir(dst_dist_dir);
634  /* ... and pack the results into a sendable format */
635  int num_send_indices_requests
636  = pack_src_dst_dist_dirs(num_intersections, src_dst_intersections,
637  send_size, &send_buffer, comm, comm_size);
638  free(src_dst_intersections);
639 
640  // get the data size the local process will receive from other processes
642  send_size, comm);
643 
644  MPI_Request *send_indices_requests
645  = xmalloc((size_t)num_send_indices_requests
646  * sizeof(*send_indices_requests));
647 
648  send_intersections(send_buffer, (const int (*)[SEND_SIZE_ASIZE])send_size,
649  send_indices_requests, tag_offset, comm, comm_size);
650 
652  src_intersections, dst_intersections,
653  &num_send_indices_requests,
654  send_indices_requests,
655  tag_offset, comm);
656 
657  xt_mpi_call(MPI_Waitall(num_send_indices_requests, send_indices_requests,
658  MPI_STATUSES_IGNORE), comm);
659 
660  free(send_buffer);
661  free(send_size);
662  free(send_indices_requests);
663 }
664 
665 Xt_xmap
667  MPI_Comm comm)
668 {
669  INSTR_DEF(this_instr,"xt_xmap_all2all_new")
670  INSTR_START(this_instr);
671 
672  // ensure that yaxt is initialized
673  assert(xt_initialized());
674 
675  int tag_offset;
676  MPI_Comm newcomm = xt_mpi_comm_smart_dup(comm, &tag_offset);
677 
678  struct dist_dir *src_intersections, *dst_intersections;
679 
680  bool stripify;
681  exchange_idxlists(&src_intersections, &dst_intersections, &stripify,
682  src_idxlist, dst_idxlist, tag_offset, newcomm);
683 
684  Xt_xmap (*xmap_new)(int num_src_intersections,
685  const struct Xt_com_list *src_com,
686  int num_dst_intersections,
687  const struct Xt_com_list *dst_com,
688  Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
689  MPI_Comm comm)
691 
692  Xt_xmap xmap
693  = xmap_new(src_intersections->num_entries, src_intersections->entries,
694  dst_intersections->num_entries, dst_intersections->entries,
695  src_idxlist, dst_idxlist, newcomm);
696 
697  xt_mpi_comm_smart_dedup(&newcomm, tag_offset);
698 
699  xt_xmdd_free_dist_dir(src_intersections);
700  xt_xmdd_free_dist_dir(dst_intersections);
701  INSTR_STOP(this_instr);
702  return xmap;
703 }
704 
705 /*
706  * Local Variables:
707  * c-basic-offset: 2
708  * coding: utf-8
709  * indent-tabs-mode: nil
710  * show-trailing-whitespace: t
711  * require-trailing-newline: t
712  * End:
713  */
int MPI_Comm
Definition: core.h:64
#define INSTR_STOP(T)
Definition: instr.h:69
#define INSTR_DEF(T, S)
Definition: instr.h:66
#define INSTR_START(T)
Definition: instr.h:68
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Definition: ppm_xfuncs.h:71
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
struct Xt_com_list entries[]
#define XT_INT_MIN
Definition: xt_core.h:74
#define XT_INT_MAX
Definition: xt_core.h:73
int xt_initialized(void)
Definition: xt_init.c:107
struct Xt_xmap_ * Xt_xmap
Definition: xt_core.h:81
XT_INT Xt_int
Definition: xt_core.h:68
index list declaration
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
Definition: xt_idxlist.c:98
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_int xt_idxlist_get_min_index(Xt_idxlist idxlist)
Definition: xt_idxlist.c:307
Xt_int xt_idxlist_get_max_index(Xt_idxlist idxlist)
Definition: xt_idxlist.c:312
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition: xt_idxlist.c:85
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Definition: xt_idxlist.c:79
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition: xt_idxlist.c:74
Provide non-public declarations common to all index lists.
@ CHEAP_VECTOR_SIZE
#define MIN(a, b)
Definition: xt_idxstripes.c:76
#define MAX(a, b)
Definition: xt_idxstripes.c:77
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition: xt_mpi.c:813
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition: xt_mpi.c:864
bool xt_mpi_test_some(int *restrict num_req, MPI_Request *restrict req, int *restrict ops_completed, MPI_Comm comm)
Definition: xt_mpi.c:892
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
@ xt_mpi_tag_xmap_dist_dir_dst_send
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static void recv_and_unpack_intersections(int recv_size[SEND_SIZE_ASIZE], struct dist_dir **src_dist_dir, struct dist_dir **dst_dist_dir, int tag_offset, MPI_Comm comm)
static void generate_distributed_directories(struct dist_dir **src_dist_dir, struct dist_dir **dst_dist_dir, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size)
static Xt_int get_dist_dir_global_interval_size(Xt_idxlist src, Xt_idxlist dst, bool *stripify, MPI_Comm comm, int comm_size)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void recv_and_unpack_intersection(struct dist_dir *dist_dir, int recv_size, int recv_count, void *recv_buffer, int tag, MPI_Comm comm)
static void exchange_idxlists(struct dist_dir **src_intersections, struct dist_dir **dst_intersections, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm)
static int compute_and_pack_bucket_intersections(struct bucket_params *bucket_params, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static void recv_and_unpack_dist_dir_results(int recv_size[SEND_SIZE_ASIZE], struct dist_dir **src_intersections, struct dist_dir **dst_intersections, int *num_send_indices_requests, MPI_Request *send_indices_requests, int tag_offset, MPI_Comm comm)
static Xt_int get_max_idxlist_index(Xt_idxlist a, Xt_idxlist b)
static Xt_int get_min_idxlist_index(Xt_idxlist a, Xt_idxlist b)
static void xt_xmap_dist_dir_reduce_scatter_sizes(int num_sizes, int recv_size[num_sizes], int(*send_size)[num_sizes], MPI_Comm comm)
wrapper for MPI_Reduce_scatter_block if available or MPI_Reduce_scatter if not
static size_t buf_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, MPI_Comm comm, int comm_size, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void send_intersections(void *send_buffer, const int(*send_size)[SEND_SIZE_ASIZE], MPI_Request *dir_init_send_requests, int tag_offset, MPI_Comm comm, int comm_size)
@ SEND_SIZE_SRC
@ SEND_NUM_DST
@ SEND_NUM_SRC
@ SEND_SIZE_ASIZE
@ SEND_SIZE_DST
static struct bucket_params get_bucket_params(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, bool *stripify, MPI_Comm comm, int comm_size)
static void recv_and_unpack_dist_dir_result(struct dist_dir *dist_dir, int recv_size, void *restrict recv_buffer, int tag, MPI_Comm comm)
static int pack_src_dst_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int(*send_size)[send_size_asize])
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
Xt_idxlist xt_xmap_dist_dir_get_bucket(const struct bucket_params *bucket_params, struct Xt_stripe **stripes_, size_t *stripes_array_size, int dist_dir_rank)
generates the buckets of the distributed directory
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Uitlity functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)