Yet Another eXchange Tool 0.11.4
Loading...
Searching...
No Matches
xt_xmap_dist_dir_intercomm.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <stdbool.h>
51#include <stdlib.h>
52#include <stdio.h>
53#include <string.h>
54#include <assert.h>
55#include <limits.h>
56
57#include <mpi.h>
58
59#include "xt/xt_idxlist.h"
61#include "xt/xt_idxvec.h"
62#include "xt/xt_idxstripes.h"
63#include "xt/xt_idxempty.h"
64#include "xt/xt_xmap.h"
65#include "xt/xt_xmap_dist_dir.h"
67#include "xt/xt_mpi.h"
68#include "xt_arithmetic_util.h"
70#include "xt_mpi_internal.h"
71#include "core/core.h"
72#include "core/ppm_xfuncs.h"
74#include "xt_idxlist_internal.h"
76#include "xt_config_internal.h"
77#include "instr.h"
78#include "xt/xt_sort.h"
80
81enum {
85};
86
87static inline void
88rank_no_send(size_t rank, int (*restrict send_size)[SEND_SIZE_ASIZE])
89{
90 send_size[rank][SEND_SIZE] = 0;
91 send_size[rank][SEND_NUM] = 0;
92}
93
95{
96 size_t num_msg;
97 void *buffer;
98};
99
100
101static struct mmsg_buf
103 int bucket_type,
104 Xt_idxlist idxlist,
105 int (*send_size)[SEND_SIZE_ASIZE],
106 MPI_Comm comm, int comm_size,
107 Xt_config config)
108{
109 int nosort_forced = XT_CONFIG_GET_FORCE_NOSORT(config);
110 size_t send_size_filled = (size_t)-1;
111 size_t max_num_intersect
112 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
113 bucket_gen_state, bucket_type);
114 struct mmsg_buf result = { 0, 0 };
115 if (max_num_intersect) {
116 Xt_idxlist idxlist_sorted
117 = nosort_forced || xt_idxlist_get_sorting(idxlist) == 1
118 ? idxlist
119 : xt_idxlist_sorted_copy_custom(idxlist, config);
120
121 size_t num_msg = 0;
122 size_t send_buffer_size = 0;
123 struct Xt_com_list *restrict sends
124 = xmalloc(max_num_intersect * sizeof(*sends));
125 struct Xt_com_list bucket;
126 while ((bucket = config->xmdd_bucket_gen->next(
127 bucket_gen_state, bucket_type)).list) {
128 size_t rank;
129 for (rank = send_size_filled + 1; rank < (size_t)bucket.rank; ++rank)
130 rank_no_send(rank, send_size);
131
132 Xt_idxlist isect2send
133 = xt_idxlist_get_intersection(idxlist_sorted, bucket.list);
134 if (xt_idxlist_get_num_indices(isect2send) > 0) {
135 sends[num_msg].list = isect2send;
136 sends[num_msg].rank = (int)rank;
137 send_buffer_size += xt_idxlist_get_pack_size(isect2send, comm);
138 /* send_size[rank][SEND_SIZE] is set below after the actual
139 * pack, because MPI_Pack_size only gives an upper bound,
140 * not the actually needed size */
141 send_size[rank][SEND_NUM] = 1;
142 ++num_msg;
143 } else {
144 rank_no_send(rank, send_size);
145 xt_idxlist_delete(isect2send);
146 }
147 }
148 if (idxlist_sorted != idxlist)
149 xt_idxlist_delete(idxlist_sorted);
150 for (size_t rank = send_size_filled+1; rank < (size_t)comm_size; ++rank)
151 rank_no_send(rank, send_size);
152
153 unsigned char *send_buffer = xmalloc(send_buffer_size);
154 size_t ofs = 0;
155 for (size_t i = 0; i < num_msg; ++i) {
156 int position = 0;
157 xt_idxlist_pack(sends[i].list, send_buffer + ofs,
158 (int)(send_buffer_size-ofs), &position, comm);
159 send_size[sends[i].rank][SEND_SIZE] = position;
160 ofs += (size_t)position;
161 xt_idxlist_delete(sends[i].list);
162 }
163
164 free(sends);
165 result.num_msg = num_msg;
166 result.buffer = send_buffer;
167 } else {
168 memset(send_size, 0, (size_t)comm_size * sizeof (*send_size));
169 result.num_msg = 0;
170 result.buffer = NULL;
171 }
172
173 return result;
174}
175
176
177static void
178compress_sizes(int (*restrict sizes)[SEND_SIZE_ASIZE], int comm_size,
179 struct Xt_xmdd_txstat *tx_stat, int *counts)
180{
181 size_t tx_num = 0, size_sum = 0;
182 for (size_t i = 0; i < (size_t)comm_size; ++i)
183 if (sizes[i][SEND_SIZE]) {
184 int tx_size = sizes[i][SEND_SIZE];
185 size_sum += (size_t)tx_size;
186 sizes[tx_num][SEND_SIZE] = tx_size;
187 if (counts) counts[tx_num] = sizes[i][SEND_NUM];
188 sizes[tx_num][SEND_NUM] = (int)i;
189 ++tx_num;
190 }
191 *tx_stat = (struct Xt_xmdd_txstat){ .bytes = size_sum, .num_msg = tx_num };
192}
193
194static void *
195create_intersections(void *bucket_gen_state,
196 int bucket_type,
197 struct Xt_xmdd_txstat tx_stat[2],
198 int recv_size[][SEND_SIZE_ASIZE],
199 int send_size[][SEND_SIZE_ASIZE],
200 Xt_idxlist idxlist,
201 MPI_Comm comm, int comm_size, Xt_config config)
202{
203 struct mmsg_buf ddr
205 bucket_gen_state, bucket_type, idxlist,
206 send_size, comm, comm_size, config);
207 xt_mpi_call(MPI_Alltoall((int *)send_size, SEND_SIZE_ASIZE, MPI_INT,
208 (int *)recv_size, SEND_SIZE_ASIZE, MPI_INT, comm),
209 comm);
210 compress_sizes(recv_size, comm_size, tx_stat + 0, NULL);
211 compress_sizes(send_size, comm_size, tx_stat + 1, NULL);
212 assert(ddr.num_msg == tx_stat[1].num_msg);
213 return ddr.buffer;
214}
215
216typedef int (*tx_fp)(void *, int, MPI_Datatype, int, int,
217 MPI_Comm, MPI_Request *);
218static void
220 const int (*sizes)[SEND_SIZE_ASIZE],
221 unsigned char *buffer, MPI_Request *requests,
222 int tag, MPI_Comm comm, tx_fp tx_op)
223{
224 size_t ofs = 0;
225 for (size_t i = 0; i < num_msg; ++i)
226 {
227 int rank = sizes[i][SEND_NUM], count = sizes[i][SEND_SIZE];
228 xt_mpi_call(tx_op(buffer + ofs,
229 count, MPI_PACKED, rank, tag, comm, requests + i), comm);
230 ofs += (size_t)count;
231 }
232}
233
234static void
236 const int (*recv_size)[SEND_SIZE_ASIZE],
237 void *recv_buffer, MPI_Request *requests,
238 int tag, MPI_Comm comm)
239{
240 tx_intersections(num_msg, recv_size, recv_buffer, requests, tag, comm,
241 (tx_fp)MPI_Irecv);
242}
243
244static void
246 const int (*send_size)[SEND_SIZE_ASIZE],
247 void *send_buffer, MPI_Request *requests,
248 int tag, MPI_Comm comm)
249{
250 tx_intersections(num_msg, send_size, send_buffer, requests, tag, comm,
251 (tx_fp)MPI_Isend);
252}
253
254
255static struct dist_dir *
257 const int (*sizes)[SEND_SIZE_ASIZE],
258 void *buffer,
259 MPI_Comm comm)
260{
261 size_t num_msg = tx_stat.num_msg, buf_size = tx_stat.bytes;
262 struct dist_dir *restrict dist_dir
263 = xmalloc(sizeof (*dist_dir) + sizeof (*dist_dir->entries) * num_msg);
264 dist_dir->num_entries = (int)num_msg;
265 int position = 0;
266 for (size_t i = 0; i < num_msg; ++i)
267 {
268 int rank = sizes[i][SEND_NUM];
269 dist_dir->entries[i].rank = rank;
271 = xt_idxlist_unpack(buffer, (int)buf_size, &position, comm);
272 }
273 return dist_dir;
274}
275
276struct dd_result {
278 int stripify;
279};
280
281/* unfortunately GCC 11 cannot handle the literal constants used for
282 * MPI_STATUSES_IGNORE by MPICH */
283#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
284#pragma GCC diagnostic push
285#pragma GCC diagnostic ignored "-Wstringop-overread"
286#pragma GCC diagnostic ignored "-Wstringop-overflow"
287#endif
288
289static struct dd_result
291 Xt_idxlist src_idxlist,
292 Xt_idxlist dst_idxlist,
293 const struct Xt_xmdd_bucket_gen_comms *comms,
294 int remote_size, int comm_size,
295 Xt_config config) {
296
297 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
298 bgd_size = (bgd_size + sizeof (void *) - 1)/sizeof (void *) * sizeof (void *);
299 void *bgd[bgd_size];
300 struct dd_result results;
301 results.stripify
302 = config->xmdd_bucket_gen->init(
303 &bgd, src_idxlist, dst_idxlist, config, comms, NULL,
304 config->xmdd_bucket_gen);
305 int (*send_size_local)[SEND_SIZE_ASIZE]
306 = xmalloc(((size_t)comm_size + (size_t)remote_size)
307 * 2 * sizeof(*send_size_local)),
308 (*send_size_remote)[SEND_SIZE_ASIZE] = send_size_local + comm_size,
309 (*recv_size_local)[SEND_SIZE_ASIZE] = send_size_remote + remote_size,
310 (*recv_size_remote)[SEND_SIZE_ASIZE] = recv_size_local + comm_size;
311 struct Xt_xmdd_txstat tx_stat_local[2], tx_stat_remote[2];
312 void *send_buffer_local
314 tx_stat_local, recv_size_local,
315 send_size_local, src_idxlist,
316 comms->intra_comm, comm_size, config);
317 void *send_buffer_remote
319 tx_stat_remote, recv_size_remote,
320 send_size_remote, dst_idxlist,
321 comms->inter_comm, remote_size, config);
322 XT_CONFIG_BUCKET_DESTROY(config, &bgd);
323 size_t num_req = tx_stat_local[0].num_msg + tx_stat_remote[0].num_msg
324 + tx_stat_local[1].num_msg + tx_stat_remote[1].num_msg;
325 MPI_Request *dir_init_requests
326 = xmalloc(num_req * sizeof(*dir_init_requests)
327 + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
328 void *recv_buffer_local = dir_init_requests + num_req,
329 *recv_buffer_remote = ((unsigned char *)recv_buffer_local
330 + tx_stat_local[0].bytes);
331 int tag_intra = comms->tag_offset_intra
333 size_t req_ofs = tx_stat_local[0].num_msg;
334 irecv_intersections(tx_stat_local[0].num_msg,
335 (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
336 recv_buffer_local, dir_init_requests,
337 tag_intra, comms->intra_comm);
338 int tag_inter = comms->tag_offset_inter
340 irecv_intersections(tx_stat_remote[0].num_msg,
341 (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
342 recv_buffer_remote, dir_init_requests + req_ofs,
343 tag_inter, comms->inter_comm);
344 req_ofs += tx_stat_remote[0].num_msg;
345 isend_intersections(tx_stat_local[1].num_msg,
346 (const int (*)[SEND_SIZE_ASIZE])send_size_local,
347 send_buffer_local, dir_init_requests + req_ofs,
348 tag_intra, comms->intra_comm);
349 req_ofs += tx_stat_local[1].num_msg;
350 isend_intersections(tx_stat_remote[1].num_msg,
351 (const int (*)[SEND_SIZE_ASIZE])send_size_remote,
352 send_buffer_remote, dir_init_requests + req_ofs,
353 tag_inter, comms->inter_comm);
354 // wait for data transfers to complete
355 xt_mpi_call(MPI_Waitall((int)num_req, dir_init_requests,
356 MPI_STATUSES_IGNORE), comms->inter_comm);
357 free(send_buffer_local);
358 free(send_buffer_remote);
359 results.dist_dirs.src
360 = unpack_dist_dir(tx_stat_local[0],
361 (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
362 recv_buffer_local, comms->intra_comm);
363 results.dist_dirs.dst
364 = unpack_dist_dir(tx_stat_remote[0],
365 (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
366 recv_buffer_remote, comms->inter_comm);
367 free(send_size_local);
368 free(dir_init_requests);
369 return results;
370}
371
372
373static size_t
374send_size_from_intersections(size_t num_intersections,
375 const struct isect *restrict src_dst_intersections,
376 enum xt_xmdd_direction target,
377 MPI_Comm comm, int comm_size,
378 int (*restrict send_size_target)[SEND_SIZE_ASIZE])
379{
380 size_t total_send_size = 0;
381 for (int i = 0; i < comm_size; ++i)
382 (void)(send_size_target[i][SEND_SIZE] = 0),
383 (void)(send_size_target[i][SEND_NUM] = 0);
384
385 int rank_pack_size;
386 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
387
388 for (size_t i = 0; i < num_intersections; ++i)
389 {
390 size_t msg_size = (size_t)rank_pack_size
391 + xt_idxlist_get_pack_size(src_dst_intersections[i].idxlist, comm);
392 size_t target_rank = (size_t)src_dst_intersections[i].rank[target];
393 /* send_size_target[target_rank][SEND_SIZE] += msg_size; */
394 ++(send_size_target[target_rank][SEND_NUM]);
395 total_send_size += msg_size;
396 }
397 assert(total_send_size <= INT_MAX);
398 return total_send_size;
399}
400
401static struct mmsg_buf
402pack_dist_dirs(size_t num_intersections,
403 struct isect *restrict src_dst_intersections,
404 int (*send_size)[SEND_SIZE_ASIZE],
405 enum xt_xmdd_direction target,
406 bool isect_idxlist_delete, MPI_Comm comm, int comm_size) {
407
408 size_t total_send_size
409 = send_size_from_intersections(num_intersections,
410 src_dst_intersections,
411 target,
412 comm, comm_size, send_size);
413
414 unsigned char *send_buffer = xmalloc(total_send_size);
415 qsort(src_dst_intersections, num_intersections,
416 sizeof (src_dst_intersections[0]),
417 target == xt_xmdd_direction_src
419 size_t ofs = 0;
420 size_t num_requests
422 target, num_intersections, src_dst_intersections,
423 isect_idxlist_delete,
424 SEND_SIZE_ASIZE, SEND_SIZE, send_size,
425 send_buffer, total_send_size, &ofs, comm);
426 return (struct mmsg_buf){ .num_msg = num_requests,
427 .buffer = send_buffer };
428}
429
430static struct dist_dir *
432 void *restrict recv_buffer,
433 int *restrict entry_counts,
434 MPI_Comm comm)
435{
436 size_t num_msg = tx_stat.num_msg;
437 int buf_size = (int)tx_stat.bytes;
438 int position = 0;
439 size_t num_entries_sent = 0;
440 for (size_t i = 0; i < num_msg; ++i)
441 num_entries_sent += (size_t)entry_counts[i];
442 struct dist_dir *dist_dir
443 = xmalloc(sizeof (struct dist_dir)
444 + (sizeof (struct Xt_com_list) * num_entries_sent));
445 dist_dir->num_entries = (int)num_entries_sent;
446 struct Xt_com_list *restrict entries = dist_dir->entries;
447 size_t num_entries = 0;
448 for (size_t i = 0; i < num_msg; ++i) {
449 size_t num_entries_from_rank = (size_t)entry_counts[i];
450 for (size_t j = 0; j < num_entries_from_rank; ++j) {
451 xt_mpi_call(MPI_Unpack(recv_buffer, buf_size, &position,
452 &entries[num_entries].rank,
453 1, MPI_INT, comm), comm);
454 entries[num_entries].list =
455 xt_idxlist_unpack(recv_buffer, buf_size, &position, comm);
456 ++num_entries;
457 }
458 }
459 assert(num_entries == num_entries_sent);
460 qsort(entries, num_entries_sent, sizeof(*entries), xt_com_list_rank_cmp);
462 return dist_dir;
463}
464
465
466static struct dd_result
468 Xt_idxlist dst_idxlist,
469 const struct Xt_xmdd_bucket_gen_comms *comms,
470 Xt_config config) {
471
472 int comm_size, remote_size;
473 xt_mpi_call(MPI_Comm_size(comms->inter_comm, &comm_size),
474 comms->inter_comm);
475 xt_mpi_call(MPI_Comm_remote_size(comms->inter_comm, &remote_size),
476 comms->inter_comm);
477
478 struct dd_result bucket_isects
479 = generate_distributed_directories(src_idxlist, dst_idxlist, comms,
480 remote_size, comm_size,
481 config);
482
483
484 int (*send_size_local)[SEND_SIZE_ASIZE]
485 = xmalloc(((size_t)comm_size + (size_t)remote_size)
486 * 2U * sizeof(*send_size_local)),
487 (*recv_size_local)[SEND_SIZE_ASIZE] = send_size_local + comm_size,
488 (*send_size_remote)[SEND_SIZE_ASIZE] = recv_size_local + comm_size,
489 (*recv_size_remote)[SEND_SIZE_ASIZE] = send_size_remote + remote_size;
490
491 /* match the source and destination entries in the local distributed
492 * directories... */
493 struct isect *src_dst_intersections;
494 size_t num_intersections
496 bucket_isects.dist_dirs.dst,
497 &src_dst_intersections, config);
498 xt_xmdd_free_dist_dirs(bucket_isects.dist_dirs);
499 /* ... and pack the results into a sendable format */
500 struct mmsg_buf dd_local, dd_remote;
501 dd_local
502 = pack_dist_dirs(num_intersections, src_dst_intersections, send_size_local,
503 xt_xmdd_direction_src, false, comms->intra_comm,
504 comm_size);
505 dd_remote
506 = pack_dist_dirs(num_intersections, src_dst_intersections, send_size_remote,
507 xt_xmdd_direction_dst, true, comms->inter_comm,
508 remote_size);
509 free(src_dst_intersections);
510
511 // get the data size the local process will receive from other processes
512 xt_mpi_call(MPI_Alltoall((int *)send_size_local, SEND_SIZE_ASIZE, MPI_INT,
513 (int *)recv_size_local, SEND_SIZE_ASIZE, MPI_INT,
514 comms->intra_comm), comms->intra_comm);
515 xt_mpi_call(MPI_Alltoall((int *)send_size_remote, SEND_SIZE_ASIZE, MPI_INT,
516 (int *)recv_size_remote, SEND_SIZE_ASIZE, MPI_INT,
517 comms->inter_comm), comms->inter_comm);
518
519 struct Xt_xmdd_txstat tx_stat_local[2], tx_stat_remote[2];
520 int *isect_counts_recv_local
521 = xmalloc(((size_t)comm_size + (size_t)remote_size) * sizeof (int)),
522 *isect_counts_recv_remote = isect_counts_recv_local + comm_size;
523 compress_sizes(send_size_local, comm_size, tx_stat_local+1, NULL);
524 compress_sizes(recv_size_local, comm_size, tx_stat_local+0,
525 isect_counts_recv_local);
526 compress_sizes(send_size_remote, remote_size, tx_stat_remote+1, NULL);
527 compress_sizes(recv_size_remote, remote_size, tx_stat_remote+0,
528 isect_counts_recv_remote);
529 assert(tx_stat_local[1].num_msg == dd_local.num_msg
530 && tx_stat_remote[1].num_msg == dd_remote.num_msg);
531 size_t num_requests
532 = dd_local.num_msg + dd_remote.num_msg
533 + tx_stat_local[0].num_msg + tx_stat_remote[0].num_msg;
534 assert(num_requests <= INT_MAX);
535 MPI_Request *requests
536 = xmalloc(num_requests * sizeof(*requests)
537 + tx_stat_local[0].bytes + tx_stat_remote[0].bytes);
538 void *recv_buf_local = requests + num_requests,
539 *recv_buf_remote = (unsigned char *)recv_buf_local + tx_stat_local[0].bytes;
540 size_t req_ofs = tx_stat_local[0].num_msg;
541 int tag_intra = comms->tag_offset_intra + xt_mpi_tag_xmap_dist_dir_src_send;
542 irecv_intersections(tx_stat_local[0].num_msg,
543 (const int (*)[SEND_SIZE_ASIZE])recv_size_local,
544 recv_buf_local, requests, tag_intra, comms->intra_comm);
545 int tag_inter = comms->tag_offset_inter + xt_mpi_tag_xmap_dist_dir_src_send;
546 irecv_intersections(tx_stat_remote[0].num_msg,
547 (const int (*)[SEND_SIZE_ASIZE])recv_size_remote,
548 recv_buf_remote, requests+req_ofs, tag_inter,
549 comms->inter_comm);
550 req_ofs += tx_stat_remote[0].num_msg;
551 isend_intersections(tx_stat_local[1].num_msg,
552 (const int (*)[SEND_SIZE_ASIZE])send_size_local,
553 dd_local.buffer, requests+req_ofs, tag_intra,
554 comms->intra_comm);
555 req_ofs += tx_stat_local[1].num_msg;
556 isend_intersections(tx_stat_remote[1].num_msg,
557 (const int (*)[SEND_SIZE_ASIZE])send_size_remote,
558 dd_remote.buffer, requests+req_ofs, tag_inter,
559 comms->inter_comm);
560 xt_mpi_call(MPI_Waitall((int)num_requests, requests, MPI_STATUSES_IGNORE),
561 comms->inter_comm);
562 free(dd_local.buffer);
563 free(dd_remote.buffer);
564 free(send_size_local);
565
566 struct dd_result results;
567 results.stripify = bucket_isects.stripify;
568 results.dist_dirs.src
569 = unpack_dist_dir_results(tx_stat_local[0], recv_buf_local,
570 isect_counts_recv_local, comms->intra_comm);
571 results.dist_dirs.dst
572 = unpack_dist_dir_results(tx_stat_remote[0], recv_buf_remote,
573 isect_counts_recv_remote, comms->inter_comm);
574 free(requests);
575 free(isect_counts_recv_local);
576 return results;
577}
578
579
580
583 Xt_idxlist dst_idxlist,
584 MPI_Comm inter_comm,
585 MPI_Comm intra_comm,
586 Xt_config config)
587{
588 INSTR_DEF(this_instr,"xt_xmap_dist_dir_intercomm_new")
589 INSTR_START(this_instr);
590
591 // ensure that yaxt is initialized
592 assert(xt_initialized());
593
594 struct Xt_xmdd_bucket_gen_comms comms;
596 &comms.tag_offset_intra);
598 &comms.tag_offset_inter);
599 struct dd_result results
600 = exchange_idxlists(src_idxlist, dst_idxlist,
601 &comms, config);
602
604 if (stripify == 2)
605 stripify = results.stripify;
606 Xt_xmap (*xmap_new)(int num_src_intersections,
607 const struct Xt_com_list *src_com,
608 int num_dst_intersections,
609 const struct Xt_com_list *dst_com,
610 Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist,
611 MPI_Comm comm)
612 = stripify ? xt_xmap_intersection_ext_new : xt_xmap_intersection_new;
613
614 Xt_xmap xmap
615 = xmap_new(results.dist_dirs.src->num_entries,
616 results.dist_dirs.src->entries,
617 results.dist_dirs.dst->num_entries,
618 results.dist_dirs.dst->entries,
619 src_idxlist, dst_idxlist, comms.inter_comm);
620
623
625 INSTR_STOP(this_instr);
626 return xmap;
627}
628
631 MPI_Comm inter_comm, MPI_Comm intra_comm)
632{
634 src_idxlist, dst_idxlist, inter_comm, intra_comm, &xt_default_config);
635}
636
637/*
638 * Local Variables:
639 * c-basic-offset: 2
640 * coding: utf-8
641 * indent-tabs-mode: nil
642 * show-trailing-whitespace: t
643 * require-trailing-newline: t
644 * End:
645 */
int MPI_Comm
Definition core.h:64
#define INSTR_STOP(T)
Definition instr.h:69
#define INSTR_DEF(T, S)
Definition instr.h:66
#define INSTR_START(T)
Definition instr.h:68
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
Xt_idxlist list
Definition xt_core.h:154
struct dist_dir_pair dist_dirs
struct dist_dir * dst
struct dist_dir * src
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
Definition xt_config.c:204
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_XMAP_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
int xt_initialized(void)
struct Xt_xmap_ * Xt_xmap
Definition xt_core.h:85
index list declaration
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
Definition xt_idxlist.c:104
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
Definition xt_idxlist.c:86
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Definition xt_idxlist.c:80
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
Definition xt_idxlist.c:359
void xt_idxlist_delete(Xt_idxlist idxlist)
Definition xt_idxlist.c:75
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
Definition xt_mpi.c:333
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
Definition xt_mpi.c:386
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
@ Xt_dist_dir_bucket_gen_type_send
@ Xt_dist_dir_bucket_gen_type_recv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Utility functions for creation of distributed directories.
@ xt_xmdd_direction_dst
@ xt_xmdd_direction_src
static void * create_intersections(void *bucket_gen_state, int bucket_type, struct Xt_xmdd_txstat tx_stat[2], int recv_size[][SEND_SIZE_ASIZE], int send_size[][SEND_SIZE_ASIZE], Xt_idxlist idxlist, MPI_Comm comm, int comm_size, Xt_config config)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void irecv_intersections(size_t num_msg, const int(*recv_size)[SEND_SIZE_ASIZE], void *recv_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
static struct dist_dir * unpack_dist_dir_results(struct Xt_xmdd_txstat tx_stat, void *restrict recv_buffer, int *restrict entry_counts, MPI_Comm comm)
static void isend_intersections(size_t num_msg, const int(*send_size)[SEND_SIZE_ASIZE], void *send_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
int(* tx_fp)(void *, int, MPI_Datatype, int, int, MPI_Comm, MPI_Request *)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, const struct Xt_xmdd_bucket_gen_comms *comms, Xt_config config)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
static size_t send_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, enum xt_xmdd_direction target, MPI_Comm comm, int comm_size, int(*restrict send_size_target)[SEND_SIZE_ASIZE])
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm)
static void tx_intersections(size_t num_msg, const int(*sizes)[SEND_SIZE_ASIZE], unsigned char *buffer, MPI_Request *requests, int tag, MPI_Comm comm, tx_fp tx_op)
Xt_xmap xt_xmap_dist_dir_intercomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)
static struct dist_dir * unpack_dist_dir(struct Xt_xmdd_txstat tx_stat, const int(*sizes)[SEND_SIZE_ASIZE], void *buffer, MPI_Comm comm)
static struct mmsg_buf pack_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], enum xt_xmdd_direction target, bool isect_idxlist_delete, MPI_Comm comm, int comm_size)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, const struct Xt_xmdd_bucket_gen_comms *comms, int remote_size, int comm_size, Xt_config config)
static void compress_sizes(int(*restrict sizes)[SEND_SIZE_ASIZE], int comm_size, struct Xt_xmdd_txstat *tx_stat, int *counts)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)