Yet Another eXchange Tool 0.11.3
Loading...
Searching...
No Matches
xt_xmap_dist_dir.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <stdbool.h>
51#include <stdlib.h>
52#include <stdio.h>
53#include <string.h>
54#include <assert.h>
55#include <limits.h>
56
57#include <mpi.h>
58
59#include "xt/xt_idxlist.h"
60#include "xt_idxlist_internal.h"
62#include "xt/xt_idxvec.h"
63#include "xt/xt_idxstripes.h"
64#include "xt/xt_idxempty.h"
65#include "xt/xt_xmap.h"
66#include "xt/xt_xmap_dist_dir.h"
67#include "xt/xt_mpi.h"
68#include "xt_mpi_internal.h"
69#include "core/core.h"
70#include "core/ppm_xfuncs.h"
72#include "xt_config_internal.h"
73#include "instr.h"
75#include "xt/xt_sort.h"
77
78
79
80static const char filename[] = "xt_xmap_dist_dir.c";
81
82/* unfortunately GCC 11 cannot handle the literal constants used for
83 * MPI_STATUSES_IGNORE by MPICH */
84#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
85#pragma GCC diagnostic push
86#pragma GCC diagnostic ignored "-Wstringop-overread"
87#pragma GCC diagnostic ignored "-Wstringop-overflow"
88#endif
89
90enum {
96};
97
98static inline void
99rank_no_send(size_t rank, int (*restrict send_size)[SEND_SIZE_ASIZE])
100{
101 send_size[rank][SEND_SIZE_SRC] = 0;
102 send_size[rank][SEND_NUM_SRC] = 0;
103 send_size[rank][SEND_SIZE_DST] = 0;
104 send_size[rank][SEND_NUM_DST] = 0;
105}
106
107
110};
111
112static struct capbi_result
114 Xt_idxlist dst_idxlist,
115 int (*send_size)[SEND_SIZE_ASIZE],
116 void **send_buffer_,
117 MPI_Comm comm, int tag_offset,
118 int comm_size,
119 Xt_config config)
120{
121 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
122 bgd_size = (bgd_size + sizeof (void *) - 1)/sizeof (void *) * sizeof (void *);
123 void *bgd[bgd_size];
124 int stripify =
125 config->xmdd_bucket_gen->init(
126 &bgd, src_idxlist, dst_idxlist, config,
127 &(struct Xt_xmdd_bucket_gen_comms){
128 .intra_comm = comm, .tag_offset_intra = tag_offset,
129 .inter_comm = MPI_COMM_NULL, .tag_offset_inter = INT_MIN },
130 config->xmdd_bucket_gen->init_params,
131 config->xmdd_bucket_gen);
132
133 int nosort_forced = XT_CONFIG_GET_FORCE_NOSORT(config);
134 Xt_idxlist src_idxlist_sorted
135 = nosort_forced || xt_idxlist_get_sorting(src_idxlist) == 1
136 ? src_idxlist
137 : xt_idxlist_sorted_copy_custom(src_idxlist, config);
138 Xt_idxlist dst_idxlist_sorted
139 = nosort_forced || xt_idxlist_get_sorting(dst_idxlist) == 1
140 ? dst_idxlist
141 : xt_idxlist_sorted_copy_custom(dst_idxlist, config);
142
143 int num_msg = 0;
144 size_t max_num_intersect
145 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
147 size_t send_size_filled = (size_t)-1;
148 if (max_num_intersect) {
149 size_t send_buffer_size = 0;
150 struct Xt_com_list *restrict sends_dst
151 = xmalloc(2 * max_num_intersect * sizeof(*sends_dst)),
152 *restrict sends_src = sends_dst + max_num_intersect;
153 size_t num_src_msg = 0, num_dst_msg = 0;
154 struct Xt_com_list bucket;
155 while ((bucket = config->xmdd_bucket_gen->next(
157 size_t rank;
158 for (rank = send_size_filled + 1; rank < (size_t)bucket.rank; ++rank)
159 rank_no_send(rank, send_size);
161 src_idxlist_sorted, bucket.list, config);
162 if (xt_idxlist_get_num_indices(send4src) > 0) {
163 sends_src[num_src_msg].list = send4src;
164 sends_src[num_src_msg].rank = (int)rank;
165 send_buffer_size += xt_idxlist_get_pack_size(send4src, comm);
166 send_size[rank][SEND_NUM_SRC] = 1;
167 ++num_src_msg;
168 } else {
169 send_size[rank][SEND_SIZE_SRC] = 0;
170 send_size[rank][SEND_NUM_SRC] = 0;
171 xt_idxlist_delete(send4src);
172 }
173
175 bucket.list, dst_idxlist_sorted, config);
176 if (xt_idxlist_get_num_indices(send4dst) > 0) {
177 sends_dst[num_dst_msg].list = send4dst;
178 sends_dst[num_dst_msg].rank = (int)rank;
179 send_buffer_size += xt_idxlist_get_pack_size(send4dst, comm);
180 send_size[rank][SEND_NUM_DST] = 1;
181 ++num_dst_msg;
182 } else {
183 send_size[rank][SEND_SIZE_DST] = 0;
184 send_size[rank][SEND_NUM_DST] = 0;
185 xt_idxlist_delete(send4dst);
186 }
187 send_size_filled = rank;
188 }
189 XT_CONFIG_BUCKET_DESTROY(config, &bgd);
190 for (size_t rank = send_size_filled+1; rank < (size_t)comm_size; ++rank)
191 rank_no_send(rank, send_size);
192 unsigned char *send_buffer
193 = *send_buffer_ = xmalloc(send_buffer_size);
194 size_t ofs = 0;
195 for (size_t i = 0; i < num_src_msg; ++i) {
196 int position = 0;
197 xt_idxlist_pack(sends_src[i].list, send_buffer+ofs,
198 (int)(send_buffer_size-ofs), &position, comm);
199 send_size[sends_src[i].rank][SEND_SIZE_SRC] = position;
200 ofs += (size_t)position;
201 xt_idxlist_delete(sends_src[i].list);
202 }
203
204 for (size_t i = 0; i < num_dst_msg; ++i) {
205 int position = 0;
206 xt_idxlist_pack(sends_dst[i].list, send_buffer+ofs,
207 (int)(send_buffer_size-ofs), &position, comm);
208 send_size[sends_dst[i].rank][SEND_SIZE_DST] = position;
209 ofs += (size_t)position;
210 xt_idxlist_delete(sends_dst[i].list);
211 }
212 free(sends_dst);
213 num_msg = (int)(num_src_msg + num_dst_msg);
214 } else {
215 XT_CONFIG_BUCKET_DESTROY(config, &bgd);
216 memset(send_size, 0, (size_t)comm_size * sizeof (*send_size));
217 }
218
219 if (dst_idxlist_sorted != dst_idxlist)
220 xt_idxlist_delete(dst_idxlist_sorted);
221 if (src_idxlist_sorted != src_idxlist)
222 xt_idxlist_delete(src_idxlist_sorted);
223
224 return (struct capbi_result){ .stripify = stripify, .num_msg = num_msg };
225}
226
227static void
229 int recv_count, void * recv_buffer, int tag,
230 MPI_Comm comm) {
231
232 // initialize distributed directories
233 int total_recv_size = 0;
234
235 for (int i = 0; i < recv_count; ++i)
236 {
237 MPI_Status status;
238
239 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED, MPI_ANY_SOURCE,
240 tag, comm, &status), comm);
241
242 int received_count;
243 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
244
245 int position = 0;
246
247 dist_dir->entries[i].rank = status.MPI_SOURCE;
248 dist_dir->entries[i].list =
249 xt_idxlist_unpack(recv_buffer, received_count, &position, comm);
250
251 total_recv_size += received_count;
252 }
253
254 if (total_recv_size != recv_size)
255 Xt_abort(comm, "ERROR: recv_intersections received wrong number of bytes",
256 filename, __LINE__);
257 dist_dir->num_entries = recv_count;
258}
259
260static void send_intersections(void *send_buffer,
261 const int (*send_size)[SEND_SIZE_ASIZE],
262 MPI_Request *dir_init_send_requests,
263 int tag_offset, MPI_Comm comm, int comm_size) {
264 int src_tag = tag_offset + xt_mpi_tag_xmap_dist_dir_src_send;
265 struct Xt_xmdd_txstat txstat
268 src_tag, comm, comm_size,
269 dir_init_send_requests,
270 send_size);
271 int dst_tag = tag_offset + xt_mpi_tag_xmap_dist_dir_dst_send;
272 xt_xmap_dist_dir_send_intersections((unsigned char *)send_buffer
273 + txstat.bytes,
275 dst_tag, comm, comm_size,
276 dir_init_send_requests + txstat.num_msg,
277 send_size);
278}
279
280static struct dist_dir_pair
282 int tag_offset, MPI_Comm comm) {
283
284 struct dist_dir *src_dist_dir, *dst_dist_dir;
285 src_dist_dir = xmalloc(sizeof (struct dist_dir)
286 + (sizeof (struct Xt_com_list)
287 * (size_t)recv_size[SEND_NUM_SRC]));
288 dst_dist_dir = xmalloc(sizeof (struct dist_dir)
289 + (sizeof (struct Xt_com_list)
290 * (size_t)recv_size[SEND_NUM_DST]));
291
292 void * recv_buffer = xmalloc((size_t)MAX(recv_size[SEND_SIZE_SRC],
293 recv_size[SEND_SIZE_DST]));
294
295 recv_and_unpack_intersection(src_dist_dir, recv_size[SEND_SIZE_SRC],
296 recv_size[SEND_NUM_SRC], recv_buffer,
298 comm);
299 recv_and_unpack_intersection(dst_dist_dir, recv_size[SEND_SIZE_DST],
300 recv_size[SEND_NUM_DST], recv_buffer,
302 comm);
303
304 free(recv_buffer);
305 return (struct dist_dir_pair){ .src = src_dist_dir,
306 .dst = dst_dist_dir };
307}
308
309
310static size_t
311buf_size_from_intersections(size_t num_intersections,
312 const struct isect *restrict src_dst_intersections,
313 MPI_Comm comm, int comm_size,
314 int (*restrict send_size)[SEND_SIZE_ASIZE])
315{
316 size_t total_send_size = 0;
317 for (int i = 0; i < comm_size; ++i)
318 (void)(send_size[i][SEND_SIZE_SRC] = 0),
319 (void)(send_size[i][SEND_SIZE_DST] = 0),
320 (void)(send_size[i][SEND_NUM_SRC] = 0),
321 (void)(send_size[i][SEND_NUM_DST] = 0);
322
323 int rank_pack_size;
324 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
325
326 for (size_t i = 0; i < num_intersections; ++i)
327 {
328 int msg_size = rank_pack_size
329 + (int)xt_idxlist_get_pack_size(src_dst_intersections[i].idxlist,
330 comm);
331 size_t src_rank
332 = (size_t)src_dst_intersections[i].rank[xt_xmdd_direction_src],
333 dst_rank = (size_t)src_dst_intersections[i].rank[xt_xmdd_direction_dst];
334 /* send_size[i][SEND_SIZE_(SRC|DST)] are set when actually
335 * packing, because that provides a potentially tighter bound,
336 * see xt_xmap_dist_dir_pack_intersections */
337 ++(send_size[src_rank][SEND_NUM_SRC]);
338 ++(send_size[dst_rank][SEND_NUM_DST]);
339 total_send_size += 2*(size_t)msg_size;
340 }
341 assert(total_send_size <= INT_MAX);
342 return total_send_size;
343}
344
345
346static int
347pack_src_dst_dist_dirs(size_t num_intersections,
348 struct isect *restrict src_dst_intersections,
349 int (*send_size)[SEND_SIZE_ASIZE],
350 void **send_buffer_,
351 MPI_Comm comm, int comm_size) {
352
353 size_t total_send_size
354 = buf_size_from_intersections(num_intersections,
355 src_dst_intersections,
356 comm, comm_size, send_size);
357
358 unsigned char *send_buffer = (*send_buffer_)
359 = xmalloc((size_t)total_send_size);
360 size_t ofs = 0;
361 if (num_intersections > 1)
362 qsort(src_dst_intersections, num_intersections,
363 sizeof (src_dst_intersections[0]), xt_xmdd_cmp_isect_src_rank);
364 size_t num_send_indices_requests
366 xt_xmdd_direction_src, num_intersections, src_dst_intersections, false,
367 SEND_SIZE_ASIZE, SEND_SIZE_SRC, send_size,
368 send_buffer, total_send_size, &ofs, comm);
369
370 if (num_intersections > 1)
371 qsort(src_dst_intersections, num_intersections,
372 sizeof (src_dst_intersections[0]), xt_xmdd_cmp_isect_dst_rank);
373 num_send_indices_requests
375 xt_xmdd_direction_dst, num_intersections, src_dst_intersections, true,
376 SEND_SIZE_ASIZE, SEND_SIZE_DST, send_size,
377 send_buffer, total_send_size, &ofs, comm);
378 assert(num_send_indices_requests <= INT_MAX);
379 return (int)num_send_indices_requests;
380}
381
392static void
394 int recv_size[num_sizes],
395 int (*send_size)[num_sizes],
396 MPI_Comm comm) {
397
398#if MPI_VERSION > 2 || ( MPI_VERSION == 2 && MPI_SUBVERSION >= 2)
399 xt_mpi_call(MPI_Reduce_scatter_block((int *)send_size, (int *)recv_size,
400 num_sizes, MPI_INT, MPI_SUM,
401 comm), comm);
402#else
403 int comm_size;
404 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
405
406 int *recv_count = xmalloc((size_t)comm_size * sizeof(*recv_count));
407 for (int i = 0; i < comm_size; ++i) recv_count[i] = num_sizes;
408
409 xt_mpi_call(MPI_Reduce_scatter(send_size, recv_size, recv_count, MPI_INT,
410 MPI_SUM, comm), comm);
411
412 free(recv_count);
413#endif
414}
415
420
421static struct dd_result
423 Xt_idxlist dst_idxlist,
424 int tag_offset,
425 MPI_Comm comm, int comm_size,
426 Xt_config config) {
427
428 void *send_buffer = NULL;
429
430 int (*send_size)[SEND_SIZE_ASIZE]
431 = xmalloc((size_t)comm_size * sizeof(*send_size));
432
433 struct capbi_result nms
434 = compute_and_pack_bucket_intersections(src_idxlist, dst_idxlist,
435 send_size, &send_buffer,
436 comm, tag_offset, comm_size,
437 config);
438
439 int recv_size[SEND_SIZE_ASIZE]; // for src and dst
440
441 /* get packed intersection sizes to be sent from other ranks */
442 xt_xmap_dist_dir_reduce_scatter_sizes(SEND_SIZE_ASIZE, recv_size, send_size, comm);
443
444 MPI_Request *dir_init_send_requests
445 = xmalloc((size_t)nms.num_msg * sizeof(*dir_init_send_requests));
446 send_intersections(send_buffer, (const int (*)[SEND_SIZE_ASIZE])send_size,
447 dir_init_send_requests, tag_offset, comm, comm_size);
448
449 free(send_size);
450
452 = recv_and_unpack_intersections(recv_size, tag_offset, comm);
453
454 // wait for the sends to be completed
455 xt_mpi_call(MPI_Waitall(nms.num_msg, dir_init_send_requests,
456 MPI_STATUSES_IGNORE), comm);
457 free(dir_init_send_requests);
458 free(send_buffer);
459 return (struct dd_result){ .dist_dirs.dst = dist_dirs.dst,
460 .dist_dirs.src = dist_dirs.src,
461 .stripify = nms.stripify };
462}
463
464static void
466 void *restrict recv_buffer, int tag,
467 MPI_Comm comm)
468{
469
470 // initiate distributed directories
471 int num_entries = 0;
472 while (recv_size > 0) {
473
474 MPI_Status status;
475
476 xt_mpi_call(MPI_Recv(recv_buffer, recv_size, MPI_PACKED,
477 MPI_ANY_SOURCE, tag, comm, &status), comm);
478
479 int received_count;
480 xt_mpi_call(MPI_Get_count(&status, MPI_PACKED, &received_count), comm);
481
482 recv_size -= received_count;
483
484 int position = 0;
485
486 while (received_count > position) {
487
488 xt_mpi_call(MPI_Unpack(recv_buffer, received_count, &position,
489 &dist_dir->entries[num_entries].rank,
490 1, MPI_INT, comm), comm);
491
492 dist_dir->entries[num_entries].list =
493 xt_idxlist_unpack(recv_buffer, received_count, &position, comm);
494
495 ++num_entries;
496 }
497 }
498 qsort(dist_dir->entries, (size_t)num_entries, sizeof(*dist_dir->entries),
500
501 if (0 != recv_size)
502 Xt_abort(comm, "ERROR: recv_and_unpack_dist_dir_result"
503 " received wrong number of bytes", filename, __LINE__);
504
505 dist_dir->num_entries = num_entries;
506
507}
508
509
510static struct dist_dir_pair
512 int *num_send_indices_requests,
513 MPI_Request *send_indices_requests,
514 int tag_offset,
515 MPI_Comm comm) {
516
517 struct dist_dir_pair dist_dir_results;
518 dist_dir_results.src
519 = xmalloc(sizeof (struct dist_dir)
520 + (sizeof (struct Xt_com_list)
521 * (size_t)recv_size[SEND_NUM_SRC]));
522 dist_dir_results.dst
523 = xmalloc(sizeof (struct dist_dir)
524 + (sizeof (struct Xt_com_list)
525 * (size_t)recv_size[SEND_NUM_DST]));
526
527 void *recv_buffer = xmalloc((size_t)MAX(recv_size[SEND_SIZE_SRC],
528 recv_size[SEND_SIZE_DST]));
529
530 recv_and_unpack_dist_dir_result(dist_dir_results.src,
531 recv_size[SEND_SIZE_SRC],
532 recv_buffer, tag_offset
534 assert(dist_dir_results.src->num_entries == recv_size[SEND_NUM_SRC]);
535
536 enum { ops_completed_auto_size = 16 };
537 int ops_completed_auto[ops_completed_auto_size];
538 int *ops_completed
539 = *num_send_indices_requests > ops_completed_auto_size
540 ? xmalloc((size_t)*num_send_indices_requests * sizeof (*ops_completed))
541 : ops_completed_auto;
542 bool all_sends_done
543 = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
544 ops_completed, comm);
545
546 recv_and_unpack_dist_dir_result(dist_dir_results.dst,
547 recv_size[SEND_SIZE_DST],
548 recv_buffer, tag_offset
550 assert(dist_dir_results.dst->num_entries == recv_size[SEND_NUM_DST]);
551
552 if (!all_sends_done)
553 all_sends_done
554 = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
555 ops_completed, comm);
556 free(recv_buffer);
557
558 xt_xmap_dist_dir_same_rank_merge(&dist_dir_results.src);
559
560 if (!all_sends_done)
561 all_sends_done
562 = xt_mpi_test_some(num_send_indices_requests, send_indices_requests,
563 ops_completed, comm);
564
565 xt_xmap_dist_dir_same_rank_merge(&dist_dir_results.dst);
566 if (ops_completed != ops_completed_auto) free(ops_completed);
567 return dist_dir_results;
568}
569
570static struct dd_result
572 Xt_idxlist dst_idxlist,
573 int tag_offset,
574 MPI_Comm comm,
575 Xt_config config)
576{
577
578 int comm_size;
579
580 xt_mpi_call(MPI_Comm_size(comm, &comm_size), comm);
581
582 struct dd_result dds
583 = generate_distributed_directories(src_idxlist, dst_idxlist,
584 tag_offset, comm, comm_size, config);
585
586 void * send_buffer;
587
588 int recv_size[SEND_SIZE_ASIZE], (*send_size)[SEND_SIZE_ASIZE]
589 = xmalloc((size_t)comm_size * sizeof(*send_size));
590
591 /* match the source and destination entries in the local distributed
592 * directories... */
593 struct isect *src_dst_intersections;
594 size_t num_intersections
596 &src_dst_intersections, config);
598 /* ... and pack the results into a sendable format */
599 int num_send_indices_requests
600 = pack_src_dst_dist_dirs(num_intersections, src_dst_intersections,
601 send_size, &send_buffer, comm, comm_size);
602 free(src_dst_intersections);
603
604 // get the data size the local process will receive from other processes
606 send_size, comm);
607
608 MPI_Request *send_indices_requests
609 = xmalloc((size_t)num_send_indices_requests
610 * sizeof(*send_indices_requests));
611
612 send_intersections(send_buffer, (const int (*)[SEND_SIZE_ASIZE])send_size,
613 send_indices_requests, tag_offset, comm, comm_size);
614
615 struct dist_dir_pair ddp
617 &num_send_indices_requests,
618 send_indices_requests,
619 tag_offset, comm);
620
621 xt_mpi_call(MPI_Waitall(num_send_indices_requests, send_indices_requests,
622 MPI_STATUSES_IGNORE), comm);
623
624 free(send_buffer);
625 free(send_size);
626 free(send_indices_requests);
627 return (struct dd_result){ .dist_dirs.dst = ddp.dst,
628 .dist_dirs.src = ddp.src,
629 .stripify = dds.stripify };
630}
631
634 Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
635 MPI_Comm comm, Xt_config config)
636{
637 INSTR_DEF(this_instr,"xt_xmap_all2all_new")
638 INSTR_START(this_instr);
639
640 // ensure that yaxt is initialized
641 assert(xt_initialized());
642
643 int tag_offset;
644 MPI_Comm newcomm = xt_mpi_comm_smart_dup(comm, &tag_offset);
645
646 struct dd_result ddr
647 = exchange_idxlists(src_idxlist, dst_idxlist, tag_offset, newcomm, config);
648
650 if (stripify == 2)
651 stripify = ddr.stripify;
652 Xt_xmap (*xmap_new)(int num_src_intersections,
653 const struct Xt_com_list *src_com,
654 int num_dst_intersections,
655 const struct Xt_com_list *dst_com,
656 Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
657 MPI_Comm comm, Xt_config config)
658 = stripify
661
662
663 Xt_xmap xmap
664 = xmap_new(ddr.dist_dirs.src->num_entries, ddr.dist_dirs.src->entries,
666 src_idxlist, dst_idxlist, newcomm, config);
667
668 xt_mpi_comm_smart_dedup(&newcomm, tag_offset);
669
671 INSTR_STOP(this_instr);
672 return xmap;
673}
674
677 MPI_Comm comm)
678{
679 return xt_xmap_dist_dir_intracomm_custom_new(src_idxlist, dst_idxlist, comm,
681}
682
683/*
684 * Local Variables:
685 * c-basic-offset: 2
686 * coding: utf-8
687 * indent-tabs-mode: nil
688 * show-trailing-whitespace: t
689 * require-trailing-newline: t
690 * End:
691 */
@ MPI_COMM_NULL
Definition core.h:74
int MPI_Comm
Definition core.h:64
#define INSTR_STOP(T)
Definition instr.h:69
#define INSTR_DEF(T, S)
Definition instr.h:66
#define INSTR_START(T)
Definition instr.h:68
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
Xt_idxlist list
Definition xt_core.h:154
struct dist_dir_pair dist_dirs
struct dist_dir * dst
struct dist_dir * src
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
Definition xt_config.c:204
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_XMAP_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
int xt_initialized(void)
struct Xt_xmap_ * Xt_xmap
Definition xt_core.h:85
#define MAX(a, b)
Definition xt_cuda.c:68
index list declaration
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
Definition xt_idxlist.c:104
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition xt_idxlist.c:86
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Definition xt_idxlist.c:80
Xt_idxlist xt_idxlist_get_intersection_custom(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst, Xt_config config)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
Definition xt_idxlist.c:359
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition xt_idxlist.c:75
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition xt_mpi.c:333
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition xt_mpi.c:386
bool xt_mpi_test_some(int *restrict num_req, MPI_Request *restrict req, int *restrict ops_completed, MPI_Comm comm)
Definition xt_mpi.c:415
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_xmap_dist_dir_dst_send
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static const char filename[]
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
@ SEND_SIZE_SRC
@ SEND_NUM_DST
@ SEND_NUM_SRC
@ SEND_SIZE_ASIZE
@ SEND_SIZE_DST
static struct dist_dir_pair recv_and_unpack_intersections(int recv_size[SEND_SIZE_ASIZE], int tag_offset, MPI_Comm comm)
static void recv_and_unpack_intersection(struct dist_dir *dist_dir, int recv_size, int recv_count, void *recv_buffer, int tag, MPI_Comm comm)
Xt_xmap xt_xmap_dist_dir_intracomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_dist_dir_intracomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size, Xt_config config)
static void xt_xmap_dist_dir_reduce_scatter_sizes(int num_sizes, int recv_size[num_sizes], int(*send_size)[num_sizes], MPI_Comm comm)
wrapper for MPI_Reduce_scatter_block if available or MPI_Reduce_scatter if not
static size_t buf_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, MPI_Comm comm, int comm_size, int(*restrict send_size)[SEND_SIZE_ASIZE])
static struct capbi_result compute_and_pack_bucket_intersections(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int tag_offset, int comm_size, Xt_config config)
static void send_intersections(void *send_buffer, const int(*send_size)[SEND_SIZE_ASIZE], MPI_Request *dir_init_send_requests, int tag_offset, MPI_Comm comm, int comm_size)
static void recv_and_unpack_dist_dir_result(struct dist_dir *dist_dir, int recv_size, void *restrict recv_buffer, int tag, MPI_Comm comm)
static struct dist_dir_pair recv_and_unpack_dist_dir_results(int recv_size[SEND_SIZE_ASIZE], int *num_send_indices_requests, MPI_Request *send_indices_requests, int tag_offset, MPI_Comm comm)
static int pack_src_dst_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int comm_size)
@ Xt_dist_dir_bucket_gen_type_sendrecv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
struct Xt_xmdd_txstat xt_xmap_dist_dir_send_intersections(void *restrict send_buffer, size_t send_size_asize, size_t send_size_entry, int tag, MPI_Comm comm, int rank_lim, MPI_Request *restrict requests, const int send_size[rank_lim][send_size_asize])
Utility functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
#define MAX(a, b)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, const struct Xt_xmdd_bucket_gen_comms *comms, int remote_size, int comm_size, Xt_config config)
Xt_xmap xt_xmap_intersection_ext_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)
Xt_xmap xt_xmap_intersection_custom_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm, Xt_config config)