Yet Another eXchange Tool  0.9.0
xt_xmap_intersection.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include <stdlib.h>
51 #include <stdio.h>
52 #include <string.h>
53 #include <assert.h>
54 #include <limits.h>
55 
56 #include <mpi.h>
57 
58 #include "xt/xt_idxlist.h"
59 #include "xt/xt_idxvec.h"
60 #include "xt/xt_xmap.h"
61 #include "xt_xmap_internal.h"
62 #include "xt/xt_mpi.h"
63 #include "xt_mpi_internal.h"
64 #include "core/core.h"
65 #include "core/ppm_xfuncs.h"
68 #include "ensure_array_size.h"
69 #include "xt_arithmetic_util.h"
70 #include "xt/quicksort.h"
71 
75 static void
77 static void
82 static void xmap_intersection_delete(Xt_xmap xmap);
85 static int const *
87 static int
89 static const struct Xt_pos_ext *
91 static int
96 static Xt_xmap
98 static Xt_xmap
100  const int * src_positions,
101  const int * dst_positions);
102 static Xt_xmap
103 xmap_intersection_spread(Xt_xmap xmap, int num_repetitions,
104  const int src_displacements[num_repetitions],
105  const int dst_displacements[num_repetitions]);
106 
107 
108 static const struct Xt_xmap_iter_vtable
113  .get_num_transfer_pos = xmap_intersection_iterator_get_num_transfer_pos,
114  .get_transfer_pos_ext = xmap_intersection_iterator_get_transfer_pos_ext,
115  .get_num_transfer_pos_ext
118 
120 
122 
123  const struct Xt_xmap_iter_vtable * vtable;
124 
125  struct exchange_data * msg;
127 };
128 
129 static inline Xt_xmap_iter_intersection
130 xmii(void *iter)
131 {
132  return (Xt_xmap_iter_intersection)iter;
133 }
134 
135 
136 static const struct Xt_xmap_vtable xmap_intersection_vtable = {
138  .get_num_destinations = xmap_intersection_get_num_destinations,
139  .get_num_sources = xmap_intersection_get_num_sources,
140  .get_destination_ranks = xmap_intersection_get_destination_ranks,
141  .get_source_ranks = xmap_intersection_get_source_ranks,
142  .get_out_iterator = xmap_intersection_get_out_iterator,
143  .get_in_iterator = xmap_intersection_get_in_iterator,
144  .copy = xmap_intersection_copy,
145  .delete = xmap_intersection_delete,
146  .get_max_src_pos = xmap_intersection_get_max_src_pos,
147  .get_max_dst_pos = xmap_intersection_get_max_dst_pos,
148  .reorder = xmap_intersection_reorder,
150  .spread = xmap_intersection_spread};
151 
153  // list of relative positions in memory to send or receive
157  int rank;
158 };
159 
161 
162  const struct Xt_xmap_vtable * vtable;
163 
164  int n_in, n_out;
165 
166  // we need the max position in order to enable quick range-checks
167  // for xmap-users like redist
168  int max_src_pos; // max possible pos over all src transfer_pos (always >= 0)
169  int max_dst_pos; // same for dst
170  int tag_offset; /* add to make tags on same communicator non-overlapping */
171 
173  struct exchange_data msg[];
174 };
175 
177 
178 static inline Xt_xmap_intersection
179 xmi(void *xmap)
180 {
181  return (Xt_xmap_intersection)xmap;
182 }
183 
185 
186  Xt_xmap_intersection xmap_intersection = xmi(xmap);
187 
188  return xmap_intersection->comm;
189 }
190 
192 
193  Xt_xmap_intersection xmap_intersection = xmi(xmap);
194 
195  // the number of destinations equals the number of source messages
196  return xmap_intersection->n_out;
197 }
198 
200 
201  Xt_xmap_intersection xmap_intersection = xmi(xmap);
202 
203  // the number of sources equals the number of destination messages
204  return xmap_intersection->n_in;
205 }
206 
207 static void xmap_intersection_get_destination_ranks(Xt_xmap xmap, int * ranks) {
208 
209  Xt_xmap_intersection xmap_intersection = xmi(xmap);
210  struct exchange_data *restrict out_msg
211  = xmap_intersection->msg + xmap_intersection->n_in;
212  for (int i = 0; i < xmap_intersection->n_out; ++i)
213  ranks[i] = out_msg[i].rank;
214 }
215 
216 static void xmap_intersection_get_source_ranks(Xt_xmap xmap, int * ranks) {
217 
218  Xt_xmap_intersection xmap_intersection = xmi(xmap);
219  struct exchange_data *restrict in_msg = xmap_intersection->msg;
220  for (int i = 0; i < xmap_intersection->n_in; ++i)
221  ranks[i] = in_msg[i].rank;
222 }
223 
224 enum {
225  bitsPerCoverageElement = sizeof (unsigned long) * CHAR_BIT,
226 };
227 
228 struct tpd_result {
230  int resCount;
232 };
233 
234 /* compute list positions for recv direction */
235 static struct tpd_result
237  int num_intersections,
238  const struct Xt_com_list intersections[num_intersections],
239  Xt_idxlist mypart_idxlist,
240  struct exchange_data *restrict resSets,
241  int *restrict num_indices_to_remove_per_intersection)
242 {
243  int mypart_num_indices = xt_idxlist_get_num_indices(mypart_idxlist);
244  size_t coverage_size = (size_t)mypart_num_indices;
245  coverage_size = (coverage_size + bitsPerCoverageElement - 1)
247  unsigned long *restrict coverage = xcalloc(coverage_size, sizeof(*coverage));
248  /* set uncovered top-most bits to ease later comparison */
249  if (mypart_num_indices%bitsPerCoverageElement)
250  coverage[coverage_size-1]
251  = ~((1UL << (mypart_num_indices%bitsPerCoverageElement)) - 1UL);
252 
253  int new_num_intersections = 0;
254  size_t total_num_indices_to_remove = 0;
255  size_t curr_indices_to_remove_size = 0;
256  Xt_int *restrict indices_to_remove = NULL;
257  int *restrict intersection_pos = NULL;
258 
259  for (int i = 0; i < num_intersections; ++i) {
260 
261  const Xt_int *restrict intersection_idxvec
262  = xt_idxlist_get_indices_const(intersections[i].list);
263  int max_intersection_size
264  = xt_idxlist_get_num_indices(intersections[i].list);
265  intersection_pos
266  = xrealloc(intersection_pos,
267  (size_t)max_intersection_size * sizeof(*intersection_pos));
268 
269 #ifndef NDEBUG
270  int retval =
271 #endif
273  mypart_idxlist, intersection_idxvec, max_intersection_size,
274  intersection_pos, 1);
275  assert(retval == 0);
276 
277  // we have to enforce single_match_only not only within a single
278  // intersection, but also between all intersections
279 
280  int intersection_size = 0;
281  int num_indices_to_remove_isect = 0;
282 
283  /* at most max_intersection_size many indices need to be removed */
284  ENSURE_ARRAY_SIZE(indices_to_remove, curr_indices_to_remove_size,
285  total_num_indices_to_remove
286  + (size_t)max_intersection_size);
287 
288  for (int j = 0; j < max_intersection_size; ++j) {
289 
290  int pos = intersection_pos[j];
291  /* the predicate effectively conditionalizes adding of either
292  * the position to intersection_pos
293  * if the current value was NOT already in another intersection
294  * or
295  * the index to indices_to_remove_
296  * if the current value was already in another intersection
297  */
298  unsigned long mask = 1UL << (pos % bitsPerCoverageElement);
299  int is_duplicate = (coverage[pos/bitsPerCoverageElement] & mask) != 0UL;
300  intersection_pos[intersection_size] = pos;
301  indices_to_remove[total_num_indices_to_remove
302  + (size_t)num_indices_to_remove_isect]
303  = intersection_idxvec[j];
304  intersection_size += is_duplicate ^ 1;
305  num_indices_to_remove_isect += is_duplicate;
306  coverage[pos/bitsPerCoverageElement] |= mask;
307  }
308 
309  total_num_indices_to_remove += (size_t)num_indices_to_remove_isect;
310  num_indices_to_remove_per_intersection[i] = num_indices_to_remove_isect;
311 
312  if (intersection_size > 0) {
313  resSets[new_num_intersections].transfer_pos = intersection_pos;
314  resSets[new_num_intersections].num_transfer_pos = intersection_size;
315  resSets[new_num_intersections].transfer_pos_ext_cache = NULL;
316  resSets[new_num_intersections].num_transfer_pos_ext
317  = (int)count_pos_ext((size_t)intersection_size, intersection_pos);
318  resSets[new_num_intersections].rank = intersections[i].rank;
319  new_num_intersections++;
320  intersection_pos = NULL;
321  }
322  }
323  free(intersection_pos);
324 
326  = xrealloc(indices_to_remove, total_num_indices_to_remove
327  * sizeof (*indices_to_remove));
328 
329  // check resulting bit map
330  unsigned long all_bits_set = ~0UL;
331  for (size_t i = 0; i < coverage_size; ++i)
332  all_bits_set &= coverage[i];
333 
334  free(coverage);
335  return (struct tpd_result){
336  .indices_to_remove = indices_to_remove,
337  .resCount = new_num_intersections,
338  .all_dst_covered = all_bits_set == ~0UL };
339 }
340 
341 struct tps_result {
342  int resCount;
343  int max_pos;
344 };
345 
346 /* compute list positions for send direction */
347 static struct tps_result
348 generate_dir_transfer_pos_src(int num_intersections,
349  const struct Xt_com_list
350  intersections[num_intersections],
351  Xt_idxlist mypart_idxlist,
352  struct exchange_data *restrict resSets,
353  const Xt_int *indices_to_remove,
354  const int *num_indices_to_remove_per_intersection)
355 {
356  int new_num_intersections = 0;
357  int offset = 0;
358 
359  Xt_int * new_intersection_idxvec = NULL;
360  size_t curr_new_intersection_idxvec_size = 0;
361  int *restrict intersection_pos = NULL;
362  int max_pos_ = -1;
363 
364  for (int i = 0; i < num_intersections; ++i) {
365 
366  const Xt_int *restrict intersection_idxvec
367  = xt_idxlist_get_indices_const(intersections[i].list);
368  int intersection_size
369  = xt_idxlist_get_num_indices(intersections[i].list);
370  intersection_pos = xrealloc(intersection_pos,
371  (size_t)intersection_size
372  * sizeof(*intersection_pos));
373 
374  int num_indices_to_remove = num_indices_to_remove_per_intersection[i];
375  if (num_indices_to_remove > 0) {
376 
378  new_intersection_idxvec, curr_new_intersection_idxvec_size,
379  intersection_size - num_indices_to_remove + 1);
380  int new_intersection_size = 0;
381 
382  for (int j = 0; j < intersection_size; ++j) {
383 
384  int discard = 0;
385 
386  Xt_int idx = intersection_idxvec[j];
387  /* could be improved with BLOOM-filter if
388  * num_indices_to_remove was sufficiently large */
389  for (int k = 0; k < num_indices_to_remove; ++k)
390  discard |= (idx == indices_to_remove[offset + k]);
391 
392  new_intersection_idxvec[new_intersection_size] = idx;
393  new_intersection_size += !discard;
394  }
395 
396  intersection_idxvec = new_intersection_idxvec;
397  intersection_size = new_intersection_size;
398  offset = offset + num_indices_to_remove;
399  }
400 
401 #ifndef NDEBUG
402  int retval =
403 #endif
405  mypart_idxlist, intersection_idxvec, intersection_size,
406  intersection_pos, 0);
407  assert(retval == 0);
408 
409  if (intersection_size > 0) {
410  resSets[new_num_intersections].transfer_pos = intersection_pos;
411  resSets[new_num_intersections].num_transfer_pos = intersection_size;
412  for (int j = 0; j < intersection_size; ++j)
413  if (intersection_pos[j] > max_pos_) max_pos_ = intersection_pos[j];
414  resSets[new_num_intersections].transfer_pos_ext_cache = NULL;
415  resSets[new_num_intersections].num_transfer_pos_ext
416  = (int)count_pos_ext((size_t)intersection_size, intersection_pos);
417  resSets[new_num_intersections].rank = intersections[i].rank;
418  new_num_intersections++;
419  intersection_pos = NULL;
420  }
421  }
422 
423  free(new_intersection_idxvec);
424  free(intersection_pos);
425 
426  return (struct tps_result){ .max_pos = max_pos_,
427  .resCount = new_num_intersections };
428 }
429 
430 static Xt_int *
431 exchange_points_to_remove(int num_src_intersections,
432  const struct Xt_com_list
433  src_com[num_src_intersections],
434  int num_dst_intersections,
435  const struct Xt_com_list
436  dst_com[num_dst_intersections],
437  int *restrict num_src_indices_to_remove_per_intersection,
438  Xt_int *dst_indices_to_remove,
439  const int *restrict
440  num_dst_indices_to_remove_per_intersection,
441  int tag_offset,
442  MPI_Comm comm) {
443 
444  MPI_Request * requests
445  = xmalloc((size_t)(num_src_intersections + 2 * num_dst_intersections) *
446  sizeof(*requests));
447  MPI_Request *send_header_requests = requests,
448  *recv_requests = requests + num_dst_intersections,
449  *send_data_requests = recv_requests + num_src_intersections;
450 
451  // set up receives for indices that need to be removed from the send messages
452  for (int i = 0; i < num_src_intersections; ++i)
453  xt_mpi_call(MPI_Irecv(
454  num_src_indices_to_remove_per_intersection + i, 1, MPI_INT,
455  src_com[i].rank,
457  comm, recv_requests+i), comm);
458 
459  // send indices that need to be removed on the target side due to duplicated
460  // receives
461  int offset = 0;
462  unsigned num_nonempty_dst_intersections = 0;
463  for (int i = 0; i < num_dst_intersections; ++i) {
464  xt_mpi_call(MPI_Isend(
465  CAST_MPI_SEND_BUF(
466  num_dst_indices_to_remove_per_intersection + i), 1, MPI_INT,
467  dst_com[i].rank,
469  comm, send_header_requests + i), comm);
470 
471  if (num_dst_indices_to_remove_per_intersection[i] > 0) {
472 
473  xt_mpi_call(MPI_Isend(
474  dst_indices_to_remove + offset,
475  num_dst_indices_to_remove_per_intersection[i], Xt_int_dt,
476  dst_com[i].rank,
478  comm, send_data_requests + num_nonempty_dst_intersections),
479  comm);
480  offset += num_dst_indices_to_remove_per_intersection[i];
481  ++num_nonempty_dst_intersections;
482  }
483  }
484 
485  // wait for the receiving of headers to complete
486  xt_mpi_call(MPI_Waitall(num_src_intersections + num_dst_intersections,
487  send_header_requests, MPI_STATUSES_IGNORE), comm);
488 
489  size_t total_num_src_indices_to_recv = 0;
490 
491  for (int i = 0; i < num_src_intersections; ++i)
492  total_num_src_indices_to_recv
493  += (size_t)num_src_indices_to_remove_per_intersection[i];
494 
495  unsigned num_nonempty_src_intersections = 0;
496  Xt_int *src_indices_to_remove;
497  if (total_num_src_indices_to_recv > 0) {
498 
499  src_indices_to_remove = xmalloc(total_num_src_indices_to_recv
500  * sizeof(*src_indices_to_remove));
501 
502  // set up receive for indices that need to be removed
503  offset = 0;
504  for (int i = 0; i < num_src_intersections; ++i)
505  if (num_src_indices_to_remove_per_intersection[i] > 0) {
506  ++num_nonempty_src_intersections;
507  xt_mpi_call(MPI_Irecv(
508  src_indices_to_remove + offset,
509  num_src_indices_to_remove_per_intersection[i],
510  Xt_int_dt, src_com[i].rank,
512  comm,
513  send_data_requests - num_nonempty_src_intersections),
514  comm);
515 
516  offset += num_src_indices_to_remove_per_intersection[i];
517  }
518 
519  } else {
520  src_indices_to_remove = NULL;
521  }
522 
523  // wait until all communication is completed
524  xt_mpi_call(MPI_Waitall((int)num_nonempty_src_intersections
525  + (int)num_nonempty_dst_intersections,
526  send_data_requests-num_nonempty_src_intersections,
527  MPI_STATUSES_IGNORE), comm);
528  free(requests);
529  return src_indices_to_remove;
530 }
531 
532 static int
534  int num_src_intersections,
535  const struct Xt_com_list src_com[num_src_intersections],
536  int num_dst_intersections,
537  const struct Xt_com_list dst_com[num_dst_intersections],
538  Xt_idxlist src_idxlist_local,
539  Xt_idxlist dst_idxlist_local,
540  MPI_Comm comm) {
541 
542  int *num_src_indices_to_remove_per_intersection =
543  xmalloc(((size_t)num_src_intersections + (size_t)num_dst_intersections)
544  * sizeof(int)),
545  *num_dst_indices_to_remove_per_intersection =
546  num_src_indices_to_remove_per_intersection + num_src_intersections;
547 
548  struct tpd_result tpdr
550  num_dst_intersections, dst_com, dst_idxlist_local, xmap->msg,
551  num_dst_indices_to_remove_per_intersection);
552  int all_dst_covered = tpdr.all_dst_covered;
553  xmap->n_in = tpdr.resCount;
554  Xt_int *dst_indices_to_remove = tpdr.indices_to_remove;
555  // exchange the points that need to be removed
556  Xt_int *src_indices_to_remove
558  num_src_intersections, src_com, num_dst_intersections, dst_com,
559  num_src_indices_to_remove_per_intersection,
560  dst_indices_to_remove, num_dst_indices_to_remove_per_intersection,
561  xmap->tag_offset, comm);
562 
563  free(dst_indices_to_remove);
564  num_src_indices_to_remove_per_intersection
565  = xrealloc(num_src_indices_to_remove_per_intersection,
566  (size_t)num_src_intersections * sizeof(int));
567 
568  struct tps_result tpsr
570  num_src_intersections, src_com, src_idxlist_local, xmap->msg + xmap->n_in,
571  src_indices_to_remove, num_src_indices_to_remove_per_intersection);
572  xmap->max_src_pos = tpsr.max_pos;
573  xmap->n_out = tpsr.resCount;
574 
575  free(src_indices_to_remove);
576  free(num_src_indices_to_remove_per_intersection);
577  return all_dst_covered;
578 }
579 
580 Xt_xmap
581 xt_xmap_intersection_new(int num_src_intersections,
582  const struct Xt_com_list
583  src_com[num_src_intersections],
584  int num_dst_intersections,
585  const struct Xt_com_list
586  dst_com[num_dst_intersections],
587  Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
588  MPI_Comm comm) {
589 
590  // ensure that yaxt is initialized
591  assert(xt_initialized());
592 
593  size_t num_isect
594  = (size_t)num_dst_intersections + (size_t)num_src_intersections;
596  = xmalloc(sizeof (*xmap) + num_isect * sizeof (struct exchange_data));
597 
599 
600  xmap->comm = comm = xt_mpi_comm_smart_dup(comm, &xmap->tag_offset);
601 
602  // generate exchange lists
603  if (!generate_transfer_pos(xmap,
604  num_src_intersections, src_com,
605  num_dst_intersections, dst_com,
606  src_idxlist, dst_idxlist, comm)) {
607 
608  int num_dst_indices = xt_idxlist_get_num_indices(dst_idxlist);
609  const Xt_int *dst_indices = xt_idxlist_get_indices_const(dst_idxlist);
610  int * found_index_mask = xcalloc((size_t)num_dst_indices,
611  sizeof(*found_index_mask));
612  int * index_positions = xmalloc((size_t)num_dst_indices
613  * sizeof(*index_positions));
614 
615  for (size_t i = 0; i < (size_t)num_dst_intersections; ++i) {
616  xt_idxlist_get_positions_of_indices(dst_com[i].list, dst_indices,
617  num_dst_indices, index_positions, 0);
618  for (size_t j = 0; j < (size_t)num_dst_indices; ++j)
619  found_index_mask[j] |= index_positions[j] != -1;
620  }
621 
622  int first_missing_pos = 0;
623  while ((first_missing_pos < (num_dst_indices - 1)) &&
624  (found_index_mask[first_missing_pos])) ++first_missing_pos;
625  free(found_index_mask);
626  free(index_positions);
628  print_miss_msg(dst_idxlist, first_missing_pos, comm, __FILE__, __LINE__);
629  }
630 
631  int n_in = xmap->n_in, n_out = xmap->n_out;
632  size_t new_num_isect = (size_t)n_in + (size_t)n_out;
633  if (new_num_isect != num_isect)
634  xmap = xrealloc(xmap, sizeof (*xmap) + (new_num_isect
635  * sizeof(struct exchange_data)));
636 
637  xmap->max_dst_pos = xt_idxlist_get_num_indices(dst_idxlist) - 1;
638 
639  return (Xt_xmap)xmap;
640 }
641 
643  return xmi(xmap)->max_src_pos;
644 }
645 
647  return xmi(xmap)->max_dst_pos;
648 }
649 
650 
651 typedef int (*Xt_pos_copy)(size_t num_pos,
652  int *pos, const int *orig_pos,
653  void *state);
654 
655 static int
656 pos_copy_verbatim(size_t num_pos,
657  int *pos, const int *orig_pos, void *state)
658 {
659  (void)state;
660  memcpy(pos, orig_pos, sizeof (*pos) * num_pos);
661  return -1;
662 }
663 
664 typedef void (*Xt_pos_ncopy)(size_t num_pos, int *pos, const int *orig_pos,
665  void *state, int num_repetitions,
666  const int displacements[num_repetitions]);
667 
668 static void
670  const struct exchange_data *restrict msg,
671  int *nmsg_copy,
672  struct exchange_data *restrict msg_copy,
673  int *max_pos_, int num_repetitions,
674  Xt_pos_copy pos_copy, void *pos_copy_state) {
675  *nmsg_copy = (int)nmsg;
676  int max_pos = 0;
677  for (size_t i = 0; i < nmsg; ++i) {
678  size_t num_transfer_pos
679  = (size_t)(msg_copy[i].num_transfer_pos
680  = num_repetitions * msg[i].num_transfer_pos);
681  msg_copy[i].rank = msg[i].rank;
682  msg_copy[i].transfer_pos_ext_cache = NULL;
683  size_t size_transfer_pos
684  = num_transfer_pos * sizeof (*(msg[i].transfer_pos));
685  msg_copy[i].transfer_pos = xmalloc(size_transfer_pos);
686  int new_max_pos
687  = pos_copy(num_transfer_pos,
688  msg_copy[i].transfer_pos, msg[i].transfer_pos,
689  pos_copy_state);
690  if (new_max_pos > max_pos)
691  max_pos = new_max_pos;
692  if (pos_copy == pos_copy_verbatim)
693  msg_copy[i].num_transfer_pos_ext = msg[i].num_transfer_pos_ext;
694  else
695  msg_copy[i].num_transfer_pos_ext =
696  (int)(count_pos_ext(num_transfer_pos, msg_copy[i].transfer_pos));
697  }
698  if (pos_copy != pos_copy_verbatim)
699  *max_pos_ = max_pos;
700 }
701 
702 static Xt_xmap
704  int num_repetitions,
705  Xt_pos_copy pos_copy_in, void *pci_state,
706  Xt_pos_copy pos_copy_out, void *pco_state)
707 {
708  Xt_xmap_intersection xmap_intersection = xmi(xmap), xmap_intersection_new;
709  size_t n_in = (size_t)xmap_intersection->n_in,
710  n_out = (size_t)xmap_intersection->n_out,
711  num_isect = n_in + n_out;
712  xmap_intersection_new
713  = xmalloc(sizeof (*xmap_intersection_new)
714  + num_isect * sizeof (struct exchange_data));
715  xmap_intersection_new->vtable = xmap_intersection->vtable;
716  xmap_intersection_new->n_in = (int)n_in;
717  xmap_intersection_new->n_out = (int)n_out;
718  xmap_intersection_new->max_src_pos = xmap_intersection->max_src_pos;
719  xmap_intersection_new->max_dst_pos = xmap_intersection->max_dst_pos;
720  xmap_intersection_msg_copy(n_in, xmap_intersection->msg,
721  &xmap_intersection_new->n_in,
722  xmap_intersection_new->msg,
723  &xmap_intersection_new->max_dst_pos,
724  num_repetitions,
725  pos_copy_in, pci_state);
726  xmap_intersection_msg_copy(n_out, xmap_intersection->msg + n_in,
727  &xmap_intersection_new->n_out,
728  xmap_intersection_new->msg+n_in,
729  &xmap_intersection_new->max_src_pos,
730  num_repetitions,
731  pos_copy_out, pco_state);
732  xmap_intersection_new->comm
733  = xt_mpi_comm_smart_dup(xmap_intersection->comm,
734  &xmap_intersection_new->tag_offset);
735  return (Xt_xmap)xmap_intersection_new;
736 }
737 
738 static Xt_xmap
740 {
741  return xmap_intersection_copy_(xmap, 1,
742  pos_copy_verbatim, NULL,
743  pos_copy_verbatim, NULL);
744 }
745 
746 static void
748  for (int i = 0; i < nmsg; ++i) {
749  free(msg[i].transfer_pos_ext_cache);
750  free(msg[i].transfer_pos);
751  }
752 }
753 
755 
756  Xt_xmap_intersection xmap_intersection = xmi(xmap);
757 
758  int num_isect = xmap_intersection->n_in + xmap_intersection->n_out;
759  xmap_intersection_msg_delete(num_isect, xmap_intersection->msg);
760  xt_mpi_comm_smart_dedup(&xmap_intersection->comm,
761  xmap_intersection->tag_offset);
762  free(xmap_intersection);
763 }
764 
766 
767  Xt_xmap_intersection xmap_intersection = xmi(xmap);
768 
769  if (xmap_intersection->n_in == 0)
770  return NULL;
771 
772  Xt_xmap_iter_intersection iter = xmalloc(sizeof (*iter));
773 
775  iter->msg = xmap_intersection->msg;
776  iter->msgs_left = xmap_intersection->n_in - 1;
777 
778  return (Xt_xmap_iter)iter;
779 }
780 
782 
783  Xt_xmap_intersection xmap_intersection = xmi(xmap);
784 
785  if (xmap_intersection->n_out == 0)
786  return NULL;
787 
788  Xt_xmap_iter_intersection iter = xmalloc(sizeof (*iter));
789 
791  iter->msg = xmap_intersection->msg + xmap_intersection->n_in;
792  iter->msgs_left = xmap_intersection->n_out - 1;
793 
794  return (Xt_xmap_iter)iter;
795 }
796 
797 struct a2abuf
798 {
799  int *buffer, *counts, *displs;
800 };
801 
802 static inline struct a2abuf
803 setup_buffer(int comm_size, int n, const struct exchange_data *msgs)
804 {
805  size_t buffer_size = 0;
806  for (int i = 0; i < n; ++i)
807  buffer_size += (size_t)msgs[i].num_transfer_pos;
808 
809  int *restrict counts = xmalloc((2 * (size_t)comm_size + buffer_size)
810  * sizeof(*counts)),
811  *restrict displs = counts + comm_size,
812  *restrict buffer = counts + 2 * comm_size;
813  for (int i = 0; i < comm_size; ++i)
814  counts[i] = 0;
815 
816  for (int i = 0; i < n; ++i)
817  counts[msgs[i].rank] = msgs[i].num_transfer_pos;
818 
819  for (int i = 0, accu = 0; i < comm_size; ++i) {
820  displs[i] = accu;
821  accu += counts[i];
822  }
823  return (struct a2abuf){ .buffer=buffer, .counts=counts, .displs=displs };
824 }
825 
827  int n_out, int n_in, struct exchange_data * out_msg,
828  struct exchange_data * in_msg, MPI_Comm comm) {
829 
830  int comm_size;
831  xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
832 
833  struct a2abuf
834  out = setup_buffer(comm_size, n_out, out_msg),
835  in = setup_buffer(comm_size, n_in, in_msg);
836 
837  for (int i = 0; i < n_out; ++i) {
838  const struct exchange_data * curr_msg = out_msg + i;
839  int count = curr_msg->num_transfer_pos;
840  if (count > 0) {
841  // pack local out transfer_pos into out buffer
842  memcpy(out.buffer + out.displs[curr_msg->rank],
843  curr_msg->transfer_pos, (size_t)count * sizeof(*out.buffer));
844  // sort local out transfer_pos
845  xt_quicksort_int(curr_msg->transfer_pos, (size_t)count);
846  }
847  }
848 
849  // exchange local out transfer_pos
850  xt_mpi_call(MPI_Alltoallv(out.buffer, out.counts, out.displs, MPI_INT,
851  in.buffer, in.counts, in.displs, MPI_INT,
852  comm), comm);
853 
854  for (int i = 0; i < n_in; ++i) {
855  const struct exchange_data * curr_msg = in_msg + i;
856  int count = curr_msg->num_transfer_pos;
857  if (count > 0)
858  // reorder local receive transfer_pos (curr_msg->transfer_pos) based
859  // on remote out transfer_pos (in_buffer + in_displs[curr_msg->rank])
861  in.buffer + in.displs[curr_msg->rank], (size_t)count,
862  curr_msg->transfer_pos);
863  }
864 
865  free(out.counts);
866  free(in.counts);
867 
868  free(out_msg->transfer_pos_ext_cache);
869  free(in_msg->transfer_pos_ext_cache);
870  out_msg->transfer_pos_ext_cache = NULL;
871  in_msg->transfer_pos_ext_cache = NULL;
872  out_msg->num_transfer_pos_ext =
873  (int)count_pos_ext(
874  (size_t)out_msg->num_transfer_pos, out_msg->transfer_pos);
875  in_msg->num_transfer_pos_ext =
876  (int)count_pos_ext(
877  (size_t)in_msg->num_transfer_pos, in_msg->transfer_pos);
878 }
879 
880 static Xt_xmap
882 
883  Xt_xmap_intersection xmap_intersection_new =
885 
886  int n_out = xmap_intersection_new->n_out;
887  int n_in = xmap_intersection_new->n_in;
888  struct exchange_data *in_msg = xmap_intersection_new->msg,
889  *out_msg = in_msg+n_in;
890  MPI_Comm comm = xmap_intersection_new->comm;
891 
892  switch ((int)type) {
893  case (XT_REORDER_NONE):
894  break;
895  case (XT_REORDER_SEND_UP):
896  reorder_transfer_pos(n_out, n_in, out_msg, in_msg, comm);
897  break;
898  case (XT_REORDER_RECV_UP):
899  reorder_transfer_pos(n_in, n_out, in_msg, out_msg, comm);
900  break;
901  default:
902  Xt_abort(comm, "ERROR(xmap_intersection_reorder):invalid reorder type",
903  __FILE__, __LINE__);
904  };
905 
906  return (Xt_xmap)xmap_intersection_new;
907 }
908 
909 static int
910 subst_positions(size_t num_pos,
911  int *restrict pos, const int *restrict orig_pos, void *new_pos_)
912 {
913  const int *restrict new_pos = new_pos_;
914  int max_pos = 0;
915  for (size_t i = 0; i < num_pos; ++i) {
916  int np = new_pos[orig_pos[i]];
917  pos[i] = np;
918  if (np > max_pos)
919  max_pos = np;
920  }
921  return max_pos;
922 }
923 
924 static Xt_xmap
926  const int *src_positions,
927  const int *dst_positions) {
928 
929  return xmap_intersection_copy_(xmap, 1,
930  subst_positions, (void *)dst_positions,
931  subst_positions, (void *)src_positions);
932 }
933 
935 {
937  const int *restrict displacements;
938 };
939 
940 static int
941 pos_copy_spread(size_t num_pos, int *restrict pos,
942  const int *restrict orig_pos, void *state)
943 {
944  struct spread_state *sp = state;
945  int num_repetitions = sp->num_repetitions;
946  const int *restrict displacements = sp->displacements;
947  num_pos = num_pos / (size_t)num_repetitions;
948  int max_pos = -1;
949  for (int i = 0, k = 0; i < num_repetitions; ++i) {
950  int curr_disp = displacements[i];
951  for (size_t j = 0; j < num_pos; ++j, ++k) {
952  int np = orig_pos[j] + curr_disp;
953  pos[k] = np;
954  if (np > max_pos)
955  max_pos = np;
956  }
957  }
958  return max_pos;
959 }
960 
961 
962 static Xt_xmap
964  const int src_displacements[num_repetitions],
965  const int dst_displacements[num_repetitions])
966 {
969  &(struct spread_state){
970  .num_repetitions = num_repetitions,
971  .displacements = src_displacements },
973  &(struct spread_state){
974  .num_repetitions = num_repetitions,
975  .displacements = dst_displacements });
976 }
977 
979  int count, struct exchange_data *restrict msgs,
980  const struct Xt_com_pos *restrict com, int *max_pos) {
981 
982  int max_pos_ = -1;
983  for (int i = 0; i < count; ++i) {
984  int num_transfer_pos = com[i].num_transfer_pos;
985  int *restrict transfer_pos =
986  xmalloc((size_t)num_transfer_pos * sizeof(*transfer_pos));
987  int rank = com[i].rank;
988  const int *restrict com_transfer_pos = com[i].transfer_pos;
989  for (int j = 0; j < num_transfer_pos; ++j)
990  if (com_transfer_pos[j] > max_pos_) max_pos_ = com_transfer_pos[j];
991  msgs[i].transfer_pos = transfer_pos;
992  msgs[i].transfer_pos_ext_cache = NULL;
993  msgs[i].num_transfer_pos = num_transfer_pos;
994  msgs[i].num_transfer_pos_ext =
995  (int)(count_pos_ext((size_t)num_transfer_pos, transfer_pos));
996  msgs[i].rank = rank;
997  memcpy(transfer_pos, com_transfer_pos,
998  (size_t)num_transfer_pos * sizeof(*transfer_pos));
999  }
1000  *max_pos = max_pos_;
1001 }
1002 
1003 Xt_xmap
1005  int num_src_msg, const struct Xt_com_pos src_com[num_src_msg],
1006  int num_dst_msg, const struct Xt_com_pos dst_com[num_dst_msg],
1007  MPI_Comm comm) {
1008 
1009  // ensure that yaxt is initialized
1010  assert(xt_initialized());
1011 
1012  size_t num_msg = (size_t)num_src_msg + (size_t)num_dst_msg;
1014  = xmalloc(sizeof (*xmap) + num_msg * sizeof (struct exchange_data));
1015 
1017  xmap->n_in = num_dst_msg;
1018  xmap->n_out = num_src_msg;
1019  xmap->comm = comm = xt_mpi_comm_smart_dup(comm, &xmap->tag_offset);
1021  num_dst_msg, xmap->msg, dst_com, &xmap->max_dst_pos);
1023  num_src_msg, xmap->msg + num_dst_msg, src_com, &xmap->max_src_pos);
1024 
1025  return (Xt_xmap)xmap;
1026 }
1027 
1029 
1030  Xt_xmap_iter_intersection iter_intersection = xmii(iter);
1031 
1032  if (iter_intersection == NULL || iter_intersection->msgs_left == 0)
1033  return 0;
1034 
1035  iter_intersection->msg++;
1036  iter_intersection->msgs_left--;
1037 
1038  return 1;
1039 }
1040 
1042 
1043  assert(iter != NULL);
1044  return xmii(iter)->msg->rank;
1045 }
1046 
1047 static int const *
1049 
1050  assert(iter != NULL);
1051  return xmii(iter)->msg->transfer_pos;
1052 }
1053 
1054 static const struct Xt_pos_ext *
1056 
1057  assert(iter != NULL);
1058  if (!xmii(iter)->msg->transfer_pos_ext_cache) {
1059  size_t num_transfer_pos_ext = (size_t)((size_t)xmii(iter)->msg->num_transfer_pos_ext);
1060  struct Xt_pos_ext * transfer_pos_ext =
1061  (xmii(iter)->msg->transfer_pos_ext_cache =
1062  xmalloc(num_transfer_pos_ext * sizeof(*transfer_pos_ext)));
1063  generate_pos_ext((size_t)xmii(iter)->msg->num_transfer_pos,
1064  xmii(iter)->msg->transfer_pos,
1065  num_transfer_pos_ext, transfer_pos_ext);
1066  }
1067  return xmii(iter)->msg->transfer_pos_ext_cache;
1068 }
1069 
1070 static int
1072  assert(iter != NULL);
1073  return xmii(iter)->msg->num_transfer_pos_ext;
1074 }
1075 
1076 static int
1078 
1079  assert(iter != NULL);
1080  return xmii(iter)->msg->num_transfer_pos;
1081 }
1082 
1084 
1085  free(iter);
1086 }
1087 
1088 /*
1089  * Local Variables:
1090  * c-basic-offset: 2
1091  * coding: utf-8
1092  * indent-tabs-mode: nil
1093  * show-trailing-whitespace: t
1094  * require-trailing-newline: t
1095  * End:
1096  */
int MPI_Comm
Definition: core.h:64
#define ENSURE_ARRAY_SIZE(arrayp, curr_array_size, req_size)
integer, parameter, public sp
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
Definition: ppm_xfuncs.h:71
#define xcalloc(nmemb, size)
Definition: ppm_xfuncs.h:68
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
quicksort declaration
void xt_quicksort_int(int a[], size_t n)
void xt_quicksort_int_permutation(int a[], size_t n, int permutation[])
const struct Xt_xmap_vtable * vtable
struct exchange_data msg[]
const struct Xt_xmap_iter_vtable * vtable
struct exchange_data * msg
int(* next)(Xt_xmap_iter iter)
MPI_Comm(* get_communicator)(Xt_xmap)
struct Xt_pos_ext * transfer_pos_ext_cache
const int *restrict displacements
Xt_int * indices_to_remove
int xt_initialized(void)
Definition: xt_init.c:107
#define Xt_int_dt
Definition: xt_core.h:69
XT_INT Xt_int
Definition: xt_core.h:68
index list declaration
int xt_idxlist_get_num_indices(Xt_idxlist idxlist)
Definition: xt_idxlist.c:98
int xt_idxlist_get_positions_of_indices(Xt_idxlist idxlist, const Xt_int *indices, int num_indices, int *positions, int single_match_only)
Definition: xt_idxlist.c:203
const Xt_int * xt_idxlist_get_indices_const(Xt_idxlist idxlist)
Definition: xt_idxlist.c:108
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition: xt_mpi.c:813
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition: xt_mpi.c:864
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
@ xt_mpi_tag_xmap_intersection_data_exchange
@ xt_mpi_tag_xmap_intersection_header_exchange
exchange map declarations
xt_reorder_type
Definition: xt_xmap.h:239
@ XT_REORDER_RECV_UP
optimise data access on receiver side
Definition: xt_xmap.h:242
@ XT_REORDER_NONE
no reordering
Definition: xt_xmap.h:240
@ XT_REORDER_SEND_UP
optimise data access on sender side
Definition: xt_xmap.h:241
contains declaration for the exchange map data structure
static const struct Xt_xmap_vtable xmap_intersection_vtable
@ bitsPerCoverageElement
static Xt_xmap xmap_intersection_update_positions(Xt_xmap xmap, const int *src_positions, const int *dst_positions)
static Xt_xmap_iter xmap_intersection_get_in_iterator(Xt_xmap xmap)
struct Xt_xmap_iter_intersection_ * Xt_xmap_iter_intersection
static Xt_xmap xmap_intersection_spread(Xt_xmap xmap, int num_repetitions, const int src_displacements[num_repetitions], const int dst_displacements[num_repetitions])
static const struct Xt_xmap_iter_vtable xmap_iterator_intersection_vtable
void(* Xt_pos_ncopy)(size_t num_pos, int *pos, const int *orig_pos, void *state, int num_repetitions, const int displacements[num_repetitions])
static int xmap_intersection_iterator_get_rank(Xt_xmap_iter iter)
static struct a2abuf setup_buffer(int comm_size, int n, const struct exchange_data *msgs)
static Xt_xmap_intersection xmi(void *xmap)
int(* Xt_pos_copy)(size_t num_pos, int *pos, const int *orig_pos, void *state)
static void xmap_intersection_msg_delete(int nmsg, struct exchange_data *msg)
static Xt_xmap xmap_intersection_copy_(Xt_xmap xmap, int num_repetitions, Xt_pos_copy pos_copy_in, void *pci_state, Xt_pos_copy pos_copy_out, void *pco_state)
struct Xt_xmap_intersection_ * Xt_xmap_intersection
static void xmap_intersection_get_source_ranks(Xt_xmap xmap, int *ranks)
static void init_exchange_data_from_com_pos(int count, struct exchange_data *restrict msgs, const struct Xt_com_pos *restrict com, int *max_pos)
static int xmap_intersection_get_max_dst_pos(Xt_xmap xmap)
static void reorder_transfer_pos(int n_out, int n_in, struct exchange_data *out_msg, struct exchange_data *in_msg, MPI_Comm comm)
static int subst_positions(size_t num_pos, int *restrict pos, const int *restrict orig_pos, void *new_pos_)
static void xmap_intersection_get_destination_ranks(Xt_xmap xmap, int *ranks)
static void xmap_intersection_msg_copy(size_t nmsg, const struct exchange_data *restrict msg, int *nmsg_copy, struct exchange_data *restrict msg_copy, int *max_pos_, int num_repetitions, Xt_pos_copy pos_copy, void *pos_copy_state)
Xt_xmap xt_xmap_intersection_pos_new(int num_src_msg, const struct Xt_com_pos src_com[num_src_msg], int num_dst_msg, const struct Xt_com_pos dst_com[num_dst_msg], MPI_Comm comm)
static int pos_copy_spread(size_t num_pos, int *restrict pos, const int *restrict orig_pos, void *state)
static int xmap_intersection_get_num_sources(Xt_xmap xmap)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static Xt_xmap xmap_intersection_copy(Xt_xmap xmap)
static struct tps_result generate_dir_transfer_pos_src(int num_intersections, const struct Xt_com_list intersections[num_intersections], Xt_idxlist mypart_idxlist, struct exchange_data *restrict resSets, const Xt_int *indices_to_remove, const int *num_indices_to_remove_per_intersection)
static MPI_Comm xmap_intersection_get_communicator(Xt_xmap xmap)
static void xmap_intersection_delete(Xt_xmap xmap)
static void xmap_intersection_iterator_delete(Xt_xmap_iter iter)
static Xt_xmap_iter_intersection xmii(void *iter)
static int xmap_intersection_iterator_next(Xt_xmap_iter iter)
static Xt_xmap_iter xmap_intersection_get_out_iterator(Xt_xmap xmap)
static int pos_copy_verbatim(size_t num_pos, int *pos, const int *orig_pos, void *state)
static int const * xmap_intersection_iterator_get_transfer_pos(Xt_xmap_iter iter)
static int xmap_intersection_iterator_get_num_transfer_pos_ext(Xt_xmap_iter iter)
static int xmap_intersection_get_num_destinations(Xt_xmap xmap)
static const struct Xt_pos_ext * xmap_intersection_iterator_get_transfer_pos_ext(Xt_xmap_iter iter)
static int xmap_intersection_iterator_get_num_transfer_pos(Xt_xmap_iter iter)
static int xmap_intersection_get_max_src_pos(Xt_xmap xmap)
static struct tpd_result generate_dir_transfer_pos_dst(int num_intersections, const struct Xt_com_list intersections[num_intersections], Xt_idxlist mypart_idxlist, struct exchange_data *restrict resSets, int *restrict num_indices_to_remove_per_intersection)
static int generate_transfer_pos(struct Xt_xmap_intersection_ *xmap, int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist_local, Xt_idxlist dst_idxlist_local, MPI_Comm comm)
static Xt_xmap xmap_intersection_reorder(Xt_xmap xmap, enum xt_reorder_type type)
static Xt_int * exchange_points_to_remove(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], int *restrict num_src_indices_to_remove_per_intersection, Xt_int *dst_indices_to_remove, const int *restrict num_dst_indices_to_remove_per_intersection, int tag_offset, MPI_Comm comm)
Utility functions shared by xt_xmap_intersection and xt_xmap_intersection_ext.
static void print_miss_msg(Xt_idxlist dst_idxlist, int missing_pos, MPI_Comm comm, const char *source, int line) __attribute__((noreturn))
static size_t count_pos_ext(size_t num_pos, const int *restrict pos)
static void generate_pos_ext(size_t num_pos, const int *restrict pos, size_t num_pos_ext, struct Xt_pos_ext *restrict pos_ext)