Yet Another eXchange Tool  0.9.0
xt_xmap_dist_dir_intercomm.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include <stdbool.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <string.h>
54 #include <assert.h>
55 #include <limits.h>
56 
57 #include <mpi.h>
58 
59 #include "xt/xt_idxlist.h"
61 #include "xt/xt_idxvec.h"
62 #include "xt/xt_idxstripes.h"
63 #include "xt/xt_idxempty.h"
64 #include "xt/xt_xmap.h"
65 #include "xt/xt_xmap_dist_dir.h"
67 #include "xt/xt_mpi.h"
68 #include "xt_arithmetic_util.h"
69 #include "xt_idxstripes_internal.h"
70 #include "xt_mpi_internal.h"
71 #include "core/core.h"
72 #include "core/ppm_xfuncs.h"
73 #include "ensure_array_size.h"
75 #include "xt_idxlist_internal.h"
77 #include "instr.h"
78 
79 static inline void
81  bool *stripify, Xt_int interval_size[2],
82  MPI_Comm intra_comm, MPI_Comm inter_comm,
83  int comm_size, int remote_size,
84  int comm_rank, int tag_offset_inter)
85 {
86  /* global_sums[0] and [1] refer to the local and remote group of
87  * intercommunicator inter_comm */
88  unsigned long long local_vals[2], global_sums[2][2];
89 
90  unsigned num_indices_src = (unsigned)xt_idxlist_get_num_indices(src);
91  local_vals[0] = num_indices_src;
92  local_vals[1] = (num_indices_src >= CHEAP_VECTOR_SIZE)
94 
95  xt_mpi_call(MPI_Allreduce(local_vals, global_sums[0], 2,
96  MPI_UNSIGNED_LONG_LONG, MPI_SUM, intra_comm),
97  intra_comm);
98  /* instead of sendrecv one might use hand-programmed multi-casts
99  * sending to each rank in a range from the remote group and
100  * receiving from the first rank in that group,
101  * the better choice probably depends on the asymmetry of the group
102  * sizes, i.e. use bcast from a very small to a very large group
103  * and few sends from a large to a small group */
104  if (comm_rank == 0) {
105  int tag = tag_offset_inter + xt_mpi_tag_xmap_dist_dir_src_send;
106  xt_mpi_call(MPI_Sendrecv(global_sums[0], 2, MPI_UNSIGNED_LONG_LONG, 0, tag,
107  global_sums[1], 2, MPI_UNSIGNED_LONG_LONG, 0, tag,
108  inter_comm, MPI_STATUS_IGNORE), inter_comm);
109  }
110  xt_mpi_call(MPI_Bcast(global_sums[1], 2, MPI_UNSIGNED_LONG_LONG,
111  0, intra_comm), intra_comm);
112  *stripify = (global_sums[0][1] > 0 || global_sums[1][1] > 0);
113  interval_size[0]
114  = (Xt_int)(((global_sums[0][0] + (unsigned)comm_size - 1)
115  / (unsigned)comm_size) * (unsigned)comm_size);
116  interval_size[1]
117  = (Xt_int)(((global_sums[1][0] + (unsigned)remote_size - 1)
118  / (unsigned)remote_size) * (unsigned)remote_size);
119 }
120 
121 enum {
123  SEND_NUM = 1,
125 };
126 
127 static inline void
128 rank_no_send(size_t rank, int (*restrict send_size)[SEND_SIZE_ASIZE])
129 {
130  send_size[rank][SEND_SIZE] = 0;
131  send_size[rank][SEND_NUM] = 0;
132 }
133 
134 static size_t
136  Xt_idxlist idxlist,
137  int (*send_size)[SEND_SIZE_ASIZE],
138  void **send_buffer_,
139  MPI_Comm comm, int comm_size)
140 {
141  size_t num_msg = 0;
142  Xt_int local_index_range_lbound = bucket_params->local_index_range_lbound;
143  Xt_int local_index_range_ubound = bucket_params->local_index_range_ubound;
144  if (local_index_range_lbound <= local_index_range_ubound) {
145  size_t send_buffer_size = 0;
146  struct Xt_stripe *stripes = NULL;
147  size_t stripes_array_size = 0;
148  Xt_int global_interval = bucket_params->global_interval;
149  Xt_int local_interval = bucket_params->local_interval;
150  size_t first_overlapping_bucket = 0;
151  /* is it impossible for early buckets to overlap our lists? */
152  if (local_index_range_lbound >= 0
153  && (local_index_range_ubound < global_interval)) {
154  first_overlapping_bucket
155  = (size_t)(local_index_range_lbound / local_interval);
156  for (size_t i = 0; i < first_overlapping_bucket; ++i)
157  rank_no_send(i, send_size);
158  }
159  /* is it impossible for later ranks to overlap our lists? */
160  size_t start_of_non_overlapping_bucket_suffix
161  = (size_t)(((long long)local_index_range_ubound + local_interval - 1)
162  / local_interval) + 1;
163  if (local_index_range_lbound < 0
164  || start_of_non_overlapping_bucket_suffix > (size_t)comm_size)
165  start_of_non_overlapping_bucket_suffix = (size_t)comm_size;
166  size_t max_num_intersect
167  = start_of_non_overlapping_bucket_suffix - first_overlapping_bucket;
168  struct {
169  Xt_idxlist list;
170  int rank;
171  } *restrict sends
172  = xmalloc(max_num_intersect * sizeof(*sends));
173  size_t i = first_overlapping_bucket;
174  for (; i < (size_t)start_of_non_overlapping_bucket_suffix; ++i) {
175  Xt_idxlist bucket
177  &stripes, &stripes_array_size, (int)i);
178  if (bucket) {
179  Xt_idxlist isect2send = xt_idxlist_get_intersection(idxlist, bucket);
180  xt_idxlist_delete(bucket);
181  if (xt_idxlist_get_num_indices(isect2send) > 0) {
182  sends[num_msg].list = isect2send;
183  sends[num_msg].rank = (int)i;
184  send_buffer_size += xt_idxlist_get_pack_size(isect2send, comm);
185  /* send_size[i][SEND_SIZE] is set below after the actual
186  * pack, because MPI_Pack_size only gives an upper bound,
187  * not the actually needed size */
188  send_size[i][SEND_NUM] = 1;
189  ++num_msg;
190  } else {
191  send_size[i][SEND_SIZE] = 0;
192  send_size[i][SEND_NUM] = 0;
193  xt_idxlist_delete(isect2send);
194  }
195  } else
196  rank_no_send(i, send_size);
197 
198  }
199  for (; i < (size_t)comm_size; ++i)
200  rank_no_send(i, send_size);
201 
202  unsigned char *send_buffer
203  = *send_buffer_ = xrealloc(stripes, send_buffer_size);
204  size_t ofs = 0;
205  for (i = 0; i < num_msg; ++i) {
206  int position = 0;
207  xt_idxlist_pack(sends[i].list, send_buffer + ofs,
208  (int)(send_buffer_size-ofs), &position, comm);
209  send_size[sends[i].rank][SEND_SIZE] = position;
210  ofs += (size_t)position;
211  xt_idxlist_delete(sends[i].list);
212  }
213 
214  free(sends);
215  } else
216  memset(send_size, 0, (size_t)comm_size * sizeof (*send_size));
217  return num_msg;
218 }
219 
220 static inline Xt_int
222 {
223  int num_idx = xt_idxlist_get_num_indices(l);
224  Xt_int min_index = num_idx ? xt_idxlist_get_min_index(l) : XT_INT_MAX;
225  return min_index;
226 }
227 
228 static inline Xt_int
230 {
231  int num_idx = xt_idxlist_get_num_indices(l);
232  Xt_int max_index = num_idx ? xt_idxlist_get_max_index(l) : XT_INT_MIN;
233  return max_index;
234 }
235 
236 static struct bucket_params
238  Xt_int global_interval, int comm_size)
239 {
240  /* guard vs. comm_size being larger than number of indices */
241  Xt_int local_interval = MAX((Xt_int)1, (Xt_int)(global_interval / comm_size));
244  return (struct bucket_params){
245  .global_interval = global_interval,
246  .local_interval = local_interval,
247  .local_index_range_lbound = local_index_range_lbound,
248  .local_index_range_ubound = local_index_range_ubound,
249  };
250 }
251 
252 static void
253 compress_sizes(int (*restrict sizes)[SEND_SIZE_ASIZE], int comm_size,
254  struct Xt_xmdd_txstat *tx_stat, int *counts)
255 {
256  size_t tx_num = 0, size_sum = 0;
257  for (size_t i = 0; i < (size_t)comm_size; ++i)
258  if (sizes[i][SEND_SIZE]) {
259  int tx_size = sizes[i][SEND_SIZE];
260  size_sum += (size_t)tx_size;
261  sizes[tx_num][SEND_SIZE] = tx_size;
262  if (counts) counts[tx_num] = sizes[i][SEND_NUM];
263  sizes[tx_num][SEND_NUM] = (int)i;
264  ++tx_num;
265  }
266  *tx_stat = (struct Xt_xmdd_txstat){ .bytes = size_sum, .num_msg = tx_num };
267 }
268 
269 static void
271  int recv_size[][SEND_SIZE_ASIZE],
272  void **send_buffer, int send_size[][SEND_SIZE_ASIZE],
273  Xt_idxlist idxlist, Xt_int interval_size,
274  MPI_Comm comm, int comm_size)
275 {
277  = get_bucket_params(idxlist, interval_size, comm_size);
278  *send_buffer = NULL;
279 #ifndef NDEBUG
280  size_t num_msg =
281 #endif
283  &bucket_params, idxlist, send_size, send_buffer, comm, comm_size);
284  xt_mpi_call(MPI_Alltoall(send_size, SEND_SIZE_ASIZE, MPI_INT,
285  recv_size, SEND_SIZE_ASIZE, MPI_INT, comm), comm);
286  compress_sizes(recv_size, comm_size, tx_stat + 0, NULL);
287  compress_sizes(send_size, comm_size, tx_stat + 1, NULL);
288  assert(num_msg == tx_stat[1].num_msg);
289 }
290 
291 typedef int (*tx_fp)(void *, int, MPI_Datatype, int, int,
292  MPI_Comm, MPI_Request *);
293 static void
294 tx_intersections(size_t num_msg,
295  const int (*sizes)[SEND_SIZE_ASIZE],
296  unsigned char *buffer, MPI_Request *requests,
297  int tag, MPI_Comm comm, tx_fp tx_op)
298 {
299  size_t ofs = 0;
300  for (size_t i = 0; i < num_msg; ++i)
301  {
302  int rank = sizes[i][SEND_NUM], count = sizes[i][SEND_SIZE];
303  xt_mpi_call(tx_op(buffer + ofs,
304  count, MPI_PACKED, rank, tag, comm, requests + i), comm);
305  ofs += (size_t)count;
306  }
307 }
308 
309 static void
310 irecv_intersections(size_t num_msg,
311  const int (*recv_size)[SEND_SIZE_ASIZE],
312  void *recv_buffer, MPI_Request *requests,
313  int tag, MPI_Comm comm)
314 {
315  tx_intersections(num_msg, recv_size, recv_buffer, requests, tag, comm,
316  (tx_fp)MPI_Irecv);
317 }
318 
319 static void
320 isend_intersections(size_t num_msg,
321  const int (*send_size)[SEND_SIZE_ASIZE],
322  void *send_buffer, MPI_Request *requests,
323  int tag, MPI_Comm comm)
324 {
325  tx_intersections(num_msg, send_size, send_buffer, requests, tag, comm,
326  (tx_fp)MPI_Isend);
327 }
328 
329 
330 static void
332  const int (*sizes)[SEND_SIZE_ASIZE],
333  void *buffer,
334  struct dist_dir **dist_dir,
335  MPI_Comm comm)
336 {
337  size_t num_msg = tx_stat.num_msg, buf_size = tx_stat.bytes;
338  struct dist_dir *restrict dist_dir_
339  = *dist_dir = xmalloc(sizeof (*dist_dir_)
340  + sizeof (*dist_dir_->entries) * num_msg);
341  dist_dir_->num_entries = (int)num_msg;
342  int position = 0;
343  for (size_t i = 0; i < num_msg; ++i)
344  {
345  int rank = sizes[i][SEND_NUM];
346  dist_dir_->entries[i].rank = rank;
347  dist_dir_->entries[i].list
348  = xt_idxlist_unpack(buffer, (int)buf_size, &position, comm);
349  }
350 }
351 
352 static void
354  struct dist_dir **dst_dist_dir,
355  bool *stripify,
356  Xt_idxlist src_idxlist,
357  Xt_idxlist dst_idxlist,
358  int tag_offset_inter, int tag_offset_intra,
359  MPI_Comm inter_comm, MPI_Comm intra_comm,
360  int remote_size, int comm_size,
361  int comm_rank) {
362 
363  /* interval_size[0] and interval_size[1] are the global interval
364  * size for the local and remote group */
365  Xt_int interval_size[2];
366  get_dist_dir_global_interval_size(src_idxlist, dst_idxlist,
367  stripify, interval_size,
368  intra_comm, inter_comm,
369  comm_size, remote_size,
370  comm_rank, tag_offset_inter);
371  void *send_buffer_local, *send_buffer_remote;
372  int (*send_size_local)[SEND_SIZE_ASIZE]
373  = xmalloc(((size_t)comm_size + (size_t)remote_size)
374  * 2 * sizeof(*send_size_local)),
375  (*send_size_remote)[SEND_SIZE_ASIZE] = send_size_local + comm_size,
376  (*recv_size_local)[SEND_SIZE_ASIZE] = send_size_remote + remote_size,
377  (*recv_size_remote)[SEND_SIZE_ASIZE] = recv_size_local + comm_size;
378  struct Xt_xmdd_txstat tx_stat_local[2], tx_stat_remote[2];
379  create_intersections(tx_stat_local, recv_size_local, &send_buffer_local,
380  send_size_local, src_idxlist, interval_size[0],
381  intra_comm, comm_size);
382  create_intersections(tx_stat_remote, recv_size_remote, &send_buffer_remote,
383  send_size_remote, dst_idxlist, interval_size[1],
384  inter_comm, remote_size);
385 
386  size_t num_req = tx_stat_local[0].num_msg + tx_stat_remote[0].num_msg
387  + tx_stat_local[1].num_msg + tx_stat_remote[1].num_msg;
388  MPI_Request *dir_init_requests
389  = xmalloc(num_req * sizeof(*dir_init_requests)
390  + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
391  void *recv_buffer_local = dir_init_requests + num_req,
392  *recv_buffer_remote = ((unsigned char *)recv_buffer_local
393  + tx_stat_local[0].bytes);
394  int tag_intra = tag_offset_intra + xt_mpi_tag_xmap_dist_dir_src_send;
395  size_t req_ofs = tx_stat_local[0].num_msg;
396  irecv_intersections(tx_stat_local[0].num_msg,
397  (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
398  recv_buffer_local, dir_init_requests,
399  tag_intra, intra_comm);
400  int tag_inter = tag_offset_inter + xt_mpi_tag_xmap_dist_dir_src_send;
401  irecv_intersections(tx_stat_remote[0].num_msg,
402  (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
403  recv_buffer_remote, dir_init_requests + req_ofs,
404  tag_inter, inter_comm);
405  req_ofs += tx_stat_remote[0].num_msg;
406  isend_intersections(tx_stat_local[1].num_msg,
407  (const int (*)[SEND_SIZE_ASIZE])send_size_local,
408  send_buffer_local, dir_init_requests + req_ofs,
409  tag_intra, intra_comm);
410  req_ofs += tx_stat_local[1].num_msg;
411  isend_intersections(tx_stat_remote[1].num_msg,
412  (const int (*)[SEND_SIZE_ASIZE])send_size_remote,
413  send_buffer_remote, dir_init_requests + req_ofs,
414  tag_inter, inter_comm);
415  // wait for data transfers to complete
416  xt_mpi_call(MPI_Waitall((int)num_req, dir_init_requests,
417  MPI_STATUSES_IGNORE), inter_comm);
418  free(send_buffer_local);
419  free(send_buffer_remote);
420  unpack_dist_dir(tx_stat_local[0],
421  (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
422  recv_buffer_local, src_dist_dir, intra_comm);
423  unpack_dist_dir(tx_stat_remote[0],
424  (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
425  recv_buffer_remote, dst_dist_dir, inter_comm);
426  free(send_size_local);
427  free(dir_init_requests);
428 }
429 
430 
431 static size_t
432 send_size_from_intersections(size_t num_intersections,
433  const struct isect *restrict src_dst_intersections,
434  enum xt_xmdd_direction target,
435  MPI_Comm comm, int comm_size,
436  int (*restrict send_size_target)[SEND_SIZE_ASIZE])
437 {
438  size_t total_send_size = 0;
439  for (int i = 0; i < comm_size; ++i)
440  (void)(send_size_target[i][SEND_SIZE] = 0),
441  (void)(send_size_target[i][SEND_NUM] = 0);
442 
443  int rank_pack_size;
444  xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
445 
446  for (size_t i = 0; i < num_intersections; ++i)
447  {
448  int msg_size = rank_pack_size
449  + (int)xt_idxlist_get_pack_size(src_dst_intersections[i].idxlist, comm);
450  size_t target_rank = (size_t)src_dst_intersections[i].rank[target];
451  /* send_size_target[target_rank][SEND_SIZE] += msg_size; */
452  ++(send_size_target[target_rank][SEND_NUM]);
453  total_send_size += (size_t)msg_size;
454  }
455  assert(total_send_size <= INT_MAX);
456  return total_send_size;
457 }
458 
459 
460 static size_t
461 pack_dist_dirs(size_t num_intersections,
462  struct isect *restrict src_dst_intersections,
463  int (*send_size)[SEND_SIZE_ASIZE],
464  void **send_buffer_, enum xt_xmdd_direction target,
465  bool isect_idxlist_delete, MPI_Comm comm, int comm_size) {
466 
467  size_t total_send_size
468  = send_size_from_intersections(num_intersections,
469  src_dst_intersections,
470  target,
471  comm, comm_size, send_size);
472 
473  unsigned char *send_buffer = (*send_buffer_)
474  = xmalloc((size_t)total_send_size);
475  qsort(src_dst_intersections, num_intersections,
476  sizeof (src_dst_intersections[0]),
477  target == xt_xmdd_direction_src
479  size_t ofs = 0;
480  size_t num_requests
482  target, num_intersections, src_dst_intersections,
483  isect_idxlist_delete,
484  SEND_SIZE_ASIZE, SEND_SIZE, send_size,
485  send_buffer, total_send_size, &ofs, comm);
486  return num_requests;
487 }
488 
489 static void
491  struct dist_dir **dist_dir,
492  void *restrict recv_buffer,
493  int *restrict entry_counts,
494  MPI_Comm comm)
495 {
496  size_t num_msg = tx_stat.num_msg;
497  int buf_size = (int)tx_stat.bytes;
498  int position = 0;
499  size_t num_entries_sent = 0;
500  for (size_t i = 0; i < num_msg; ++i)
501  num_entries_sent += (size_t)entry_counts[i];
502  *dist_dir = xmalloc(sizeof (struct dist_dir)
503  + (sizeof (struct Xt_com_list) * num_entries_sent));
504  (*dist_dir)->num_entries = (int)num_entries_sent;
505  struct Xt_com_list *restrict entries = (*dist_dir)->entries;
506  size_t num_entries = 0;
507  for (size_t i = 0; i < num_msg; ++i) {
508  size_t num_entries_from_rank = (size_t)entry_counts[i];
509  for (size_t j = 0; j < num_entries_from_rank; ++j) {
510  xt_mpi_call(MPI_Unpack(recv_buffer, buf_size, &position,
511  &entries[num_entries].rank,
512  1, MPI_INT, comm), comm);
513  entries[num_entries].list =
514  xt_idxlist_unpack(recv_buffer, buf_size, &position, comm);
515  ++num_entries;
516  }
517  }
518  assert(num_entries == num_entries_sent);
519  qsort(entries, num_entries_sent, sizeof(*entries), xt_com_list_rank_cmp);
521 }
522 
523 
524 static void
525 exchange_idxlists(struct dist_dir **src_intersections,
526  struct dist_dir **dst_intersections,
527  bool *stripify,
528  Xt_idxlist src_idxlist,
529  Xt_idxlist dst_idxlist,
530  int tag_offset_inter, int tag_offset_intra,
531  MPI_Comm inter_comm, MPI_Comm intra_comm) {
532 
533  int comm_size, remote_size, comm_rank;
534  xt_mpi_call(MPI_Comm_size(inter_comm, &comm_size), inter_comm);
535  xt_mpi_call(MPI_Comm_rank(inter_comm, &comm_rank), inter_comm);
536  xt_mpi_call(MPI_Comm_remote_size(inter_comm, &remote_size), inter_comm);
537 
538  struct dist_dir *src_dist_dir, *dst_dist_dir;
539 
540  generate_distributed_directories(&src_dist_dir, &dst_dist_dir, stripify,
541  src_idxlist, dst_idxlist,
542  tag_offset_inter, tag_offset_intra,
543  inter_comm, intra_comm,
544  remote_size, comm_size,
545  comm_rank);
546 
547 
548  int (*send_size_local)[SEND_SIZE_ASIZE]
549  = xmalloc(((size_t)comm_size + (size_t)remote_size)
550  * 2U * sizeof(*send_size_local)),
551  (*recv_size_local)[SEND_SIZE_ASIZE] = send_size_local + comm_size,
552  (*send_size_remote)[SEND_SIZE_ASIZE] = recv_size_local + comm_size,
553  (*recv_size_remote)[SEND_SIZE_ASIZE] = send_size_remote + remote_size;
554 
555  /* match the source and destination entries in the local distributed
556  * directories... */
557  struct isect *src_dst_intersections;
558  size_t num_intersections
559  = xt_xmap_dist_dir_match_src_dst(src_dist_dir, dst_dist_dir,
560  &src_dst_intersections);
561  xt_xmdd_free_dist_dir(src_dist_dir);
562  xt_xmdd_free_dist_dir(dst_dist_dir);
563  /* ... and pack the results into a sendable format */
564  void *send_buffer_local, *send_buffer_remote;
565  size_t num_send_requests_local
566  = pack_dist_dirs(num_intersections, src_dst_intersections,
567  send_size_local, &send_buffer_local,
568  xt_xmdd_direction_src, false, intra_comm, comm_size),
569  num_send_requests_remote
570  = pack_dist_dirs(num_intersections, src_dst_intersections,
571  send_size_remote, &send_buffer_remote,
572  xt_xmdd_direction_dst, true, inter_comm, remote_size);
573  free(src_dst_intersections);
574 
575  // get the data size the local process will receive from other processes
576  xt_mpi_call(MPI_Alltoall(send_size_local, SEND_SIZE_ASIZE, MPI_INT,
577  recv_size_local, SEND_SIZE_ASIZE, MPI_INT,
578  intra_comm), intra_comm);
579  xt_mpi_call(MPI_Alltoall(send_size_remote, SEND_SIZE_ASIZE, MPI_INT,
580  recv_size_remote, SEND_SIZE_ASIZE, MPI_INT,
581  inter_comm), inter_comm);
582 
583  struct Xt_xmdd_txstat tx_stat_local[2], tx_stat_remote[2];
584  int *isect_counts_recv_local
585  = xmalloc(((size_t)comm_size + (size_t)remote_size) * sizeof (int)),
586  *isect_counts_recv_remote = isect_counts_recv_local + comm_size;
587  compress_sizes(send_size_local, comm_size, tx_stat_local+1, NULL);
588  compress_sizes(recv_size_local, comm_size, tx_stat_local+0,
589  isect_counts_recv_local);
590  compress_sizes(send_size_remote, remote_size, tx_stat_remote+1, NULL);
591  compress_sizes(recv_size_remote, remote_size, tx_stat_remote+0,
592  isect_counts_recv_remote);
593  assert(tx_stat_local[1].num_msg == num_send_requests_local
594  && tx_stat_remote[1].num_msg == num_send_requests_remote);
595  size_t num_requests
596  = num_send_requests_local + num_send_requests_remote
597  + tx_stat_local[0].num_msg + tx_stat_remote[0].num_msg;
598  assert(num_requests <= INT_MAX);
599  MPI_Request *requests
600  = xmalloc(num_requests * sizeof(*requests)
601  + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
602  void *recv_buf_local = requests + num_requests,
603  *recv_buf_remote = (unsigned char *)recv_buf_local + tx_stat_local[0].bytes;
604  size_t req_ofs = tx_stat_local[0].num_msg;
605  int tag_intra = tag_offset_intra + xt_mpi_tag_xmap_dist_dir_src_send;
606  irecv_intersections(tx_stat_local[0].num_msg,
607  (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
608  recv_buf_local, requests, tag_intra, intra_comm);
609  int tag_inter = tag_offset_inter + xt_mpi_tag_xmap_dist_dir_src_send;
610  irecv_intersections(tx_stat_remote[0].num_msg,
611  (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
612  recv_buf_remote, requests+req_ofs, tag_inter, inter_comm);
613  req_ofs += tx_stat_remote[0].num_msg;
614  isend_intersections(tx_stat_local[1].num_msg,
615  (const int (*)[SEND_SIZE_ASIZE])send_size_local,
616  send_buffer_local, requests+req_ofs, tag_intra,
617  intra_comm);
618  req_ofs += tx_stat_local[1].num_msg;
619  isend_intersections(tx_stat_remote[1].num_msg,
620  (const int (*)[SEND_SIZE_ASIZE])send_size_remote,
621  send_buffer_remote, requests+req_ofs, tag_inter,
622  inter_comm);
623  xt_mpi_call(MPI_Waitall((int)num_requests, requests, MPI_STATUSES_IGNORE),
624  inter_comm);
625  free(send_buffer_local);
626  free(send_buffer_remote);
627  free(send_size_local);
628 
629  unpack_dist_dir_results(tx_stat_local[0], src_intersections, recv_buf_local,
630  isect_counts_recv_local, intra_comm);
631  unpack_dist_dir_results(tx_stat_remote[0], dst_intersections, recv_buf_remote,
632  isect_counts_recv_remote, inter_comm);
633  free(requests);
634  free(isect_counts_recv_local);
635 }
636 
637 
638 
639 Xt_xmap
641  MPI_Comm inter_comm_, MPI_Comm intra_comm_)
642 {
643  INSTR_DEF(this_instr,"xt_xmap_dist_dir_intercomm_new")
644  INSTR_START(this_instr);
645 
646  // ensure that yaxt is initialized
647  assert(xt_initialized());
648 
649  int tag_offset_inter, tag_offset_intra;
650  MPI_Comm inter_comm = xt_mpi_comm_smart_dup(inter_comm_, &tag_offset_inter),
651  intra_comm = xt_mpi_comm_smart_dup(intra_comm_, &tag_offset_intra);
652 
653  struct dist_dir *src_intersections, *dst_intersections;
654 
655  bool stripify;
656  exchange_idxlists(&src_intersections, &dst_intersections, &stripify,
657  src_idxlist, dst_idxlist,
658  tag_offset_inter, tag_offset_intra,
659  inter_comm, intra_comm);
660 
661  Xt_xmap (*xmap_new)(int num_src_intersections,
662  const struct Xt_com_list *src_com,
663  int num_dst_intersections,
664  const struct Xt_com_list *dst_com,
665  Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
666  MPI_Comm comm)
668 
669  Xt_xmap xmap
670  = xmap_new(src_intersections->num_entries, src_intersections->entries,
671  dst_intersections->num_entries, dst_intersections->entries,
672  src_idxlist, dst_idxlist, inter_comm);
673 
674  xt_mpi_comm_smart_dedup(&inter_comm, tag_offset_inter);
675  xt_mpi_comm_smart_dedup(&intra_comm, tag_offset_intra);
676 
677  xt_xmdd_free_dist_dir(src_intersections);
678  xt_xmdd_free_dist_dir(dst_intersections);
679  INSTR_STOP(this_instr);
680  return xmap;
681 }
682 
683 
684 
685 /*
686  * Local Variables:
687  * c-basic-offset: 2
688  * coding: utf-8
689  * indent-tabs-mode: nil
690  * show-trailing-whitespace: t
691  * require-trailing-newline: t
692  * End:
693  */
int MPI_Comm
Definition: core.h:64
#define INSTR_STOP(T)
Definition: instr.h:69
#define INSTR_DEF(T, S)
Definition: instr.h:66
#define INSTR_START(T)
Definition: instr.h:68
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Definition: ppm_xfuncs.h:71
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
struct Xt_com_list entries[]
#define XT_INT_MIN
Definition: xt_core.h:74
#define XT_INT_MAX
Definition: xt_core.h:73
int xt_initialized(void)
Definition: xt_init.c:107
struct Xt_xmap_ * Xt_xmap
Definition: xt_core.h:81
XT_INT Xt_int
Definition: xt_core.h:68
index list declaration
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
Definition: xt_idxlist.c:98
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_int xt_idxlist_get_min_index(Xt_idxlist idxlist)
Definition: xt_idxlist.c:307
Xt_int xt_idxlist_get_max_index(Xt_idxlist idxlist)
Definition: xt_idxlist.c:312
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition: xt_idxlist.c:85
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Definition: xt_idxlist.c:79
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition: xt_idxlist.c:74
Provide non-public declarations common to all index lists.
@ CHEAP_VECTOR_SIZE
#define MAX(a, b)
Definition: xt_idxstripes.c:77
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition: xt_mpi.c:813
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition: xt_mpi.c:864
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dir(struct dist_dir *dist_dir)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
Xt_idxlist xt_xmap_dist_dir_get_bucket(const struct bucket_params *bucket_params, struct Xt_stripe **stripes_, size_t *stripes_array_size, int dist_dir_rank)
generates the buckets of the distributed directory
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Uitlity functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
static void exchange_idxlists(struct dist_dir **src_intersections, struct dist_dir **dst_intersections, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void irecv_intersections(size_t num_msg, const int(*recv_size)[SEND_SIZE_ASIZE], void *recv_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
static void unpack_dist_dir(struct Xt_xmdd_txstat tx_stat, const int(*sizes)[SEND_SIZE_ASIZE], void *buffer, struct dist_dir **dist_dir, MPI_Comm comm)
static void isend_intersections(size_t num_msg, const int(*send_size)[SEND_SIZE_ASIZE], void *send_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
int(* tx_fp)(void *, int, MPI_Datatype, int, int, MPI_Comm, MPI_Request *)
static void get_dist_dir_global_interval_size(Xt_idxlist src, Xt_idxlist dst, bool *stripify, Xt_int interval_size[2], MPI_Comm intra_comm, MPI_Comm inter_comm, int comm_size, int remote_size, int comm_rank, int tag_offset_inter)
static size_t send_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, enum xt_xmdd_direction target, MPI_Comm comm, int comm_size, int(*restrict send_size_target)[SEND_SIZE_ASIZE])
static size_t pack_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, enum xt_xmdd_direction target, bool isect_idxlist_delete, MPI_Comm comm, int comm_size)
static void tx_intersections(size_t num_msg, const int(*sizes)[SEND_SIZE_ASIZE], unsigned char *buffer, MPI_Request *requests, int tag, MPI_Comm comm, tx_fp tx_op)
static void create_intersections(struct Xt_xmdd_txstat tx_stat[2], int recv_size[][SEND_SIZE_ASIZE], void **send_buffer, int send_size[][SEND_SIZE_ASIZE], Xt_idxlist idxlist, Xt_int interval_size, MPI_Comm comm, int comm_size)
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm_, MPI_Comm intra_comm_)
static struct bucket_params get_bucket_params(Xt_idxlist idxlist, Xt_int global_interval, int comm_size)
static size_t compute_and_pack_bucket_intersections(struct bucket_params *bucket_params, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
static void unpack_dist_dir_results(struct Xt_xmdd_txstat tx_stat, struct dist_dir **dist_dir, void *restrict recv_buffer, int *restrict entry_counts, MPI_Comm comm)
static void generate_distributed_directories(struct dist_dir **src_dist_dir, struct dist_dir **dst_dist_dir, bool *stripify, Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset_inter, int tag_offset_intra, MPI_Comm inter_comm, MPI_Comm intra_comm, int remote_size, int comm_size, int comm_rank)
static Xt_int get_min_idxlist_index(Xt_idxlist l)
static void compress_sizes(int(*restrict sizes)[SEND_SIZE_ASIZE], int comm_size, struct Xt_xmdd_txstat *tx_stat, int *counts)
static Xt_int get_max_idxlist_index(Xt_idxlist l)
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)