Yet Another eXchange Tool 0.11.3
Loading...
Searching...
No Matches
xt_exchanger_irecv_isend_ddt_packed.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include "core/ppm_xfuncs.h"
51#include "xt/xt_mpi.h"
53#include "xt_mpi_internal.h"
54#include "xt_redist_internal.h"
57#include "xt_ddt_internal.h"
58
59/* unfortunately GCC 11 cannot handle the literal constants used for
60 * MPI_STATUSES_IGNORE by MPICH */
61#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
62#pragma GCC diagnostic push
63#pragma GCC diagnostic ignored "-Wstringop-overread"
64#pragma GCC diagnostic ignored "-Wstringop-overflow"
65#endif
66
67static void
69 const void *src_data, void *dst_data,
70 int nsend, int nrecv,
71 const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs,
72 int tag_offset, MPI_Comm comm) {
73
75
76 enum { AUTO_ALLOC_SIZE = 32, };
77 MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
78 size_t *buffer_sizes, buffer_sizes_auto[AUTO_ALLOC_SIZE];
79 Xt_ddt *ddts, ddts_auto[AUTO_ALLOC_SIZE];
80
81 size_t num_tx = (size_t)nrecv + (size_t)nsend;
82 if (num_tx <= AUTO_ALLOC_SIZE) {
83 requests = requests_auto;
84 buffer_sizes = buffer_sizes_auto;
85 ddts = ddts_auto;
86 } else {
87 requests = xmalloc(num_tx * sizeof (*requests));
88 buffer_sizes = xmalloc(num_tx * sizeof (*buffer_sizes));
89 ddts = xmalloc(num_tx * sizeof(*ddts));
90 }
91
92 size_t recv_buffer_size = 0;
93 size_t send_buffer_size = 0;
94 for (int i = 0; i < nrecv; ++i)
95 recv_buffer_size +=
96 ((buffer_sizes[i] =
98 ((ddts[i] = xt_ddt_from_mpi_ddt(recv_msgs[i].datatype))))));
99 for (int i = 0; i < nsend; ++i)
100 send_buffer_size +=
101 ((buffer_sizes[nrecv+i] =
103 ((ddts[nrecv+i] = xt_ddt_from_mpi_ddt(send_msgs[i].datatype))))));
104
105 enum xt_memtype src_data_memtype = xt_gpu_get_memtype(src_data);
106 enum xt_memtype dst_data_memtype = xt_gpu_get_memtype(dst_data);
107 unsigned char * send_buffer =
108 xt_gpu_malloc(send_buffer_size, src_data_memtype);
109 unsigned char * recv_buffer =
110 xt_gpu_malloc(recv_buffer_size, dst_data_memtype);
111
112 size_t ofs = 0;
113 for (int i = 0; i < nrecv; ++i) {
114 int recv_size = (int)buffer_sizes[i];
115 xt_mpi_call(MPI_Irecv(recv_buffer + ofs, recv_size, MPI_BYTE,
116 recv_msgs[i].rank,
117 tag_offset + xt_mpi_tag_exchange_msg, comm,
118 requests+i), comm);
119 ofs += (size_t)recv_size;
120 }
121
122 ofs = 0;
123 for (int i = 0; i < nsend; ++i) {
124 size_t send_size = buffer_sizes[nrecv+i];
126 ddts[nrecv+i], CAST_MPI_SEND_BUF(src_data), send_buffer + ofs,
127 src_data_memtype);
128 xt_mpi_call(MPI_Isend(send_buffer + ofs, (int)send_size, MPI_BYTE,
129 send_msgs[i].rank,
130 tag_offset + xt_mpi_tag_exchange_msg, comm,
131 requests+nrecv+i), comm);
132 ofs += send_size;
133 }
134
135 xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), comm);
136
137 ofs = 0;
138 for (int i = 0; i < nrecv; ++i) {
139 size_t recv_size = buffer_sizes[i];
141 ddts[i], recv_buffer + ofs, dst_data, dst_data_memtype);
142 ofs += recv_size;
143 }
144
145 xt_gpu_free(recv_buffer, dst_data_memtype);
146 xt_gpu_free(send_buffer, src_data_memtype);
147 if (num_tx > AUTO_ALLOC_SIZE) {
148 free(ddts);
149 free(buffer_sizes);
150 free(requests);
151 }
152 XT_GPU_INSTR_POP; // xt_exchanger_irecv_isend_ddt_packed_s_exchange
153}
154
155static void
156xt_exchanger_irecv_isend_ddt_packed_a_exchange(const void *src_data, void *dst_data,
157 int nsend, int nrecv,
158 const struct Xt_redist_msg * send_msgs,
159 const struct Xt_redist_msg * recv_msgs,
160 int tag_offset, MPI_Comm comm,
161 Xt_request *request) {
162
164
165 MPI_Request * tmp_requests =
166 xmalloc((size_t)(nrecv + nsend) * sizeof (*tmp_requests));
167 void ** buffers =
168 xmalloc((size_t)(nrecv + nsend) * sizeof (*buffers));
169
170 enum xt_memtype src_data_memtype = xt_gpu_get_memtype(src_data);
171 enum xt_memtype dst_data_memtype = xt_gpu_get_memtype(dst_data);
172
173 Xt_ddt * recv_ddts = xmalloc((size_t)nrecv * sizeof(*recv_ddts));
174 for (int i = 0; i < nrecv; ++i) {
175 recv_ddts[i] = xt_ddt_from_mpi_ddt(recv_msgs[i].datatype);
176 size_t buffer_size = xt_ddt_get_pack_size_internal(recv_ddts[i]);
177 buffers[i] = xt_gpu_malloc(buffer_size, dst_data_memtype);
178 xt_mpi_call(MPI_Irecv(buffers[i], (int)buffer_size, MPI_BYTE,
179 recv_msgs[i].rank,
180 tag_offset + xt_mpi_tag_exchange_msg, comm,
181 tmp_requests+i), comm);
182 }
183
184 for (int i = 0; i < nsend; ++i) {
185 Xt_ddt send_ddt = xt_ddt_from_mpi_ddt(send_msgs[i].datatype);
186 size_t buffer_size = xt_ddt_get_pack_size_internal(send_ddt);
187 buffers[nrecv + i] = xt_gpu_malloc(buffer_size, src_data_memtype);
191 send_ddt, src_data, buffers[nrecv + i], src_data_memtype);
192 xt_mpi_call(MPI_Isend(buffers[nrecv + i], (int)buffer_size, MPI_BYTE,
193 send_msgs[i].rank,
194 tag_offset + xt_mpi_tag_exchange_msg, comm,
195 tmp_requests+nrecv+i), comm);
196 }
197
198 Xt_request requests =
200 nrecv + nsend, tmp_requests, comm, nrecv, nsend,
201 recv_ddts, buffers, buffers + nrecv, dst_data,
202 src_data_memtype, dst_data_memtype);
203
204 free(recv_ddts);
205 free(buffers);
206 free(tmp_requests);
207
208 *request = requests;
209
210 XT_GPU_INSTR_POP; // xt_exchanger_irecv_isend_ddt_packed_a_exchange
211}
212
215 const struct Xt_redist_msg *send_msgs,
216 const struct Xt_redist_msg *recv_msgs,
217 MPI_Comm comm, int tag_offset,
218 Xt_config config)
219{
224 return
225 xt_exchanger_simple_base_new(nsend, nrecv, send_msgs, recv_msgs,
226 comm, tag_offset,
230 config);
231}
232
233/*
234 * Local Variables:
235 * c-basic-offset: 2
236 * coding: utf-8
237 * indent-tabs-mode: nil
238 * show-trailing-whitespace: t
239 * require-trailing-newline: t
240 * End:
241 */
int MPI_Comm
Definition core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
void xt_ddt_unpack_internal(Xt_ddt ddt, const void *src, void *dst, enum xt_memtype memtype)
Definition xt_ddt.c:553
size_t xt_ddt_get_pack_size_internal(Xt_ddt ddt)
Definition xt_ddt.c:166
void xt_ddt_pack_internal(Xt_ddt ddt, const void *src, void *dst, enum xt_memtype memtype)
Definition xt_ddt.c:341
Xt_ddt xt_ddt_from_mpi_ddt(MPI_Datatype mpi_ddt)
internal utility routines for manual handling of MPI DDT's
static void xt_exchanger_irecv_isend_ddt_packed_a_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
Xt_exchanger xt_exchanger_irecv_isend_ddt_packed_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, Xt_config config)
static void xt_exchanger_irecv_isend_ddt_packed_s_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
Xt_exchanger xt_exchanger_simple_base_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, xt_simple_s_exchange_func s_func, xt_simple_a_exchange_func a_func, xt_simple_create_omp_share_func create_omp_share_func, Xt_config config)
Xt_exchanger_omp_share(* xt_simple_create_omp_share_func)(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm)
void * xt_gpu_malloc(size_t alloc_size, enum xt_memtype memtype)
Definition xt_gpu.c:183
enum xt_memtype xt_gpu_get_memtype(const void *ptr)
Definition xt_gpu.c:197
void xt_gpu_free(void *ptr, enum xt_memtype memtype)
Definition xt_gpu.c:187
#define XT_GPU_INSTR_POP
Definition xt_gpu.h:60
xt_memtype
Definition xt_gpu.h:68
#define XT_GPU_INSTR_PUSH(arg)
Definition xt_gpu.h:59
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_exchange_msg
redistribution of data, non-public declarations
Xt_request xt_request_msgs_ddt_packed_new(int n_requests, const MPI_Request requests[n_requests], MPI_Comm comm, int n_packed, int n_tmp_buffers, Xt_ddt packed_ddts[n_packed], void *packed_data[n_packed], void *tmp_buffers[n_tmp_buffers], void *dst_data, enum xt_memtype packed_memtype, enum xt_memtype tmp_memtype)