Yet Another eXchange Tool  0.9.0
xt_exchanger_mix_isend_irecv.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include <assert.h>
51 #include <mpi.h>
52 
53 #include "core/core.h"
54 #include "core/ppm_xfuncs.h"
55 #include "xt/xt_mpi.h"
56 #include "xt/xt_request_msgs.h"
57 #include "xt_mpi_internal.h"
58 #include "xt_redist_internal.h"
59 #include "xt_exchanger.h"
61 
62 static Xt_exchanger
64  MPI_Comm newComm, int new_tag_offset);
67  const void * src_data,
68  void * dst_data);
70  Xt_exchanger exchanger, const void * src_data, void * dst_data,
71  Xt_request *request);
72 static int
74  enum xt_msg_direction direction,
75  int *restrict *ranks);
76 
77 static MPI_Datatype
79  int rank,
80  enum xt_msg_direction direction);
81 
89 };
90 
92 
93 struct mix_msg {
94  struct Xt_redist_msg data;
95 #if SIZEOF_MPI_DATATYPE == 2 * SIZEOF_INT
96 # define MSG_DIR(msg) ((enum xt_msg_direction)((msg).data.padding))
97 # define type data.padding
98 #else
100 # define MSG_DIR(msg) ((msg).type)
101 #endif
102 };
103 
105 
106  const struct xt_exchanger_vtable * vtable;
107 
108  int n, tag_offset;
110  struct mix_msg msgs[];
111 };
112 
115 {
117  size_t header_size = sizeof (*exchanger),
118  body_size = sizeof (struct mix_msg) * nmsg;
119  exchanger = xmalloc(header_size + body_size);
120  exchanger->n = (int)nmsg;
122  return exchanger;
123 }
124 
126 xt_exchanger_mix_isend_irecv_new(int nsend, int nrecv,
127  const struct Xt_redist_msg *send_msgs,
128  const struct Xt_redist_msg *recv_msgs,
129  MPI_Comm comm, int tag_offset) {
130 
136  assert((nsend >= 0) & (nrecv >= 0));
137  size_t nmsg = (size_t)nsend + (size_t)nrecv;
140  exchanger->comm = comm;
141  exchanger->tag_offset = tag_offset;
142  struct mix_msg *restrict msgs = exchanger->msgs;
143  xt_redist_msgs_strided_copy((size_t)nsend, send_msgs, sizeof (send_msgs[0]),
144  &(msgs[0].data), sizeof (msgs[0]), comm);
145  for (size_t i = 0; i < (size_t)nsend; ++i)
146  msgs[i].type = SEND;
147  xt_redist_msgs_strided_copy((size_t)nrecv, recv_msgs, sizeof (recv_msgs[0]),
148  &(msgs[nsend].data), sizeof (msgs[0]), comm);
149  for (size_t i = 0; i < (size_t)nrecv; ++i)
150  msgs[i + (size_t)nsend].type = RECV;
151 
152  xt_exchanger_internal_optimize(nmsg, msgs, sizeof(*msgs), comm);
153 
154  for (size_t i = 1; i < nmsg; ++i) {
155 
156  if (msgs[i-1].data.rank == msgs[i].data.rank && MSG_DIR(msgs[i]) == SEND) {
157 
158  struct mix_msg temp = msgs[i-1];
159  msgs[i-1] = msgs[i];
160  msgs[i] = temp;
161  i++;
162  }
163  }
164 
165  return (Xt_exchanger)exchanger;
166 }
167 
168 static Xt_exchanger
170  MPI_Comm new_comm, int new_tag_offset)
171 {
172  Xt_exchanger_mix_isend_irecv exchanger_msr =
173  (Xt_exchanger_mix_isend_irecv)exchanger;
174  size_t nmsg = (size_t)exchanger_msr->n;
175  Xt_exchanger_mix_isend_irecv exchanger_copy
177  exchanger_copy->comm = new_comm;
178  exchanger_copy->tag_offset = new_tag_offset;
179  struct mix_msg *restrict new_msgs = exchanger_copy->msgs,
180  *restrict orig_msgs = exchanger_msr->msgs;
181  xt_redist_msgs_strided_copy(nmsg, &orig_msgs->data, sizeof (*orig_msgs),
182  &new_msgs->data, sizeof (*new_msgs),
183  new_comm);
184  for (size_t i = 0; i < nmsg; ++i)
185  new_msgs[i].type = orig_msgs[i].type;
186  return (Xt_exchanger)exchanger_copy;
187 }
188 
189 enum { max_on_stack_req = 16 };
190 
192 
193  Xt_exchanger_mix_isend_irecv exchanger_msr =
194  (Xt_exchanger_mix_isend_irecv)exchanger;
195 
196  size_t nmsg = (size_t)exchanger_msr->n;
197  struct mix_msg *restrict msgs = exchanger_msr->msgs;
198 
199  xt_redist_msgs_strided_destruct(nmsg, &msgs[0].data, exchanger_msr->comm,
200  sizeof (*msgs));
201  free(exchanger_msr);
202 }
203 
205  const void * src_data,
206  void * dst_data) {
207 
208  Xt_exchanger_mix_isend_irecv exchanger_msr =
209  (Xt_exchanger_mix_isend_irecv)exchanger;
210 
211  if (exchanger_msr->n > 0) {
212  size_t nmsg = (size_t)exchanger_msr->n;
213  MPI_Comm comm = exchanger_msr->comm;
214  struct mix_msg *restrict msgs = exchanger_msr->msgs;
215  int tag_offset = exchanger_msr->tag_offset;
216  MPI_Request req_buf[max_on_stack_req];
217  MPI_Request *requests
218  = nmsg <= max_on_stack_req
219  ? req_buf : xmalloc(nmsg * sizeof (*requests));
220  for (size_t i = 0; i < nmsg; ++i) {
221  typedef int (*ifp)(void *buf, int count, MPI_Datatype datatype, int dest,
222  int tag, MPI_Comm comm, MPI_Request *request);
223  ifp op = MSG_DIR(msgs[i]) == SEND ? (ifp)MPI_Isend : (ifp)MPI_Irecv;
224  void *data = MSG_DIR(msgs[i]) == SEND ? (void *)src_data : dst_data;
225  xt_mpi_call(op(data, 1, msgs[i].data.datatype,
226  msgs[i].data.rank,
227  tag_offset + xt_mpi_tag_exchange_msg,
228  comm, requests+i), comm);
229  }
230  xt_mpi_call(MPI_Waitall((int)nmsg, requests, MPI_STATUSES_IGNORE), comm);
231  if (requests != req_buf)
232  free(requests);
233  }
234 }
235 
237  Xt_exchanger exchanger, const void * src_data, void * dst_data,
238  Xt_request *request) {
239 
240  Xt_exchanger_mix_isend_irecv exchanger_msr =
241  (Xt_exchanger_mix_isend_irecv)exchanger;
242 
243  Xt_request requests = XT_REQUEST_NULL;
244 
245  if (exchanger_msr->n > 0) {
246  size_t nmsg = (size_t)exchanger_msr->n;
247  MPI_Comm comm = exchanger_msr->comm;
248  struct mix_msg *restrict msgs = exchanger_msr->msgs;
249  int tag_offset = exchanger_msr->tag_offset;
250  MPI_Request req_buf[max_on_stack_req];
251  MPI_Request *tmp_requests
252  = nmsg <= max_on_stack_req
253  ? req_buf : xmalloc(nmsg * sizeof (*tmp_requests));
254  for (size_t i = 0; i < nmsg; ++i) {
255  typedef int (*ifp)(void *buf, int count, MPI_Datatype datatype, int dest,
256  int tag, MPI_Comm comm, MPI_Request *request);
257  ifp op = MSG_DIR(msgs[i]) == SEND ? (ifp)MPI_Isend : (ifp)MPI_Irecv;
258  void *data = MSG_DIR(msgs[i]) == SEND ? (void *)src_data : dst_data;
259  xt_mpi_call(op(data, 1, msgs[i].data.datatype,
260  msgs[i].data.rank,
261  tag_offset + xt_mpi_tag_exchange_msg,
262  comm, tmp_requests+i), comm);
263  }
264  requests = xt_request_msgs_new((int)nmsg, tmp_requests, comm);
265  if (tmp_requests != req_buf)
266  free(tmp_requests);
267  }
268 
269  *request = requests;
270 }
271 
272 static int
274  enum xt_msg_direction direction,
275  int *restrict *ranks)
276 {
277  Xt_exchanger_mix_isend_irecv exchanger_msr =
278  (Xt_exchanger_mix_isend_irecv)exchanger;
279  size_t nmsg = 0, nmsg_all = (size_t)exchanger_msr->n;
280  const struct mix_msg *restrict msgs = exchanger_msr->msgs;
281  for (size_t i = 0; i < nmsg_all; ++i)
282  nmsg += MSG_DIR(msgs[i]) == direction;
283  int *restrict ranks_ = *ranks = xmalloc(nmsg * sizeof (*ranks_));
284  for (size_t i = 0, j = (size_t)-1; i < nmsg_all; ++i)
285  if (MSG_DIR(msgs[i]) == direction)
286  ranks_[++j] = msgs[i].data.rank;
287  return (int)nmsg;
288 }
289 
290 static MPI_Datatype
292  int rank,
293  enum xt_msg_direction direction)
294 {
295  Xt_exchanger_mix_isend_irecv exchanger_msr =
296  (Xt_exchanger_mix_isend_irecv)exchanger;
297  size_t nmsg = (size_t)exchanger_msr->n;
298  struct mix_msg *restrict msgs = exchanger_msr->msgs;
299  MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
300  for (size_t i = 0; i < nmsg; ++i)
301  if (MSG_DIR(msgs[i]) == direction && msgs[i].data.rank == rank) {
302  xt_mpi_call(MPI_Type_dup(msgs[i].data.datatype, &datatype_copy),
303  exchanger_msr->comm);
304  break;
305  }
306  return datatype_copy;
307 }
308 
309 /*
310  * Local Variables:
311  * c-basic-offset: 2
312  * coding: utf-8
313  * indent-tabs-mode: nil
314  * show-trailing-whitespace: t
315  * require-trailing-newline: t
316  * End:
317  */
int MPI_Comm
Definition: core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
const struct xt_exchanger_vtable * vtable
MPI_Datatype datatype
Definition: xt_redist.h:69
struct Xt_redist_msg data
Xt_exchanger(* copy)(Xt_exchanger, MPI_Comm, int)
Definition: xt_exchanger.h:66
void xt_exchanger_internal_optimize(size_t n, void *msgs, size_t msg_type_size, MPI_Comm comm)
Definition: xt_exchanger.c:90
exchanging of data based on information provided by redist's
#define MSG_DIR(msg)
Xt_exchanger xt_exchanger_mix_isend_irecv_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
static MPI_Datatype xt_exchanger_mix_isend_irecv_get_MPI_Datatype(Xt_exchanger exchanger, int rank, enum xt_msg_direction direction)
struct Xt_exchanger_mix_isend_irecv_ * Xt_exchanger_mix_isend_irecv
static Xt_exchanger_mix_isend_irecv xt_exchanger_mix_isend_irecv_alloc(size_t nmsg)
static void xt_exchanger_mix_isend_irecv_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
static void xt_exchanger_mix_isend_irecv_delete(Xt_exchanger exchanger)
static int xt_exchanger_mix_isend_irecv_get_msg_ranks(Xt_exchanger exchanger, enum xt_msg_direction direction, int *restrict *ranks)
static const struct xt_exchanger_vtable exchanger_mix_isend_irecv_vtable
static void xt_exchanger_mix_isend_irecv_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
static Xt_exchanger xt_exchanger_mix_isend_irecv_copy(Xt_exchanger exchanger, MPI_Comm newComm, int new_tag_offset)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
@ xt_mpi_tag_exchange_msg
void xt_redist_msgs_strided_copy(size_t n, const struct Xt_redist_msg *restrict src, size_t src_stride, struct Xt_redist_msg *restrict dst, size_t dst_stride, MPI_Comm comm)
Definition: xt_redist.c:136
void xt_redist_msgs_strided_destruct(size_t n, struct Xt_redist_msg *msgs, MPI_Comm comm, size_t ofs_stride)
Definition: xt_redist.c:156
redistribution of data, non-public declarations
xt_msg_direction
@ RECV
@ SEND
#define XT_REQUEST_NULL
Definition: xt_request.h:53
Xt_request xt_request_msgs_new(int n, const MPI_Request requests[n], MPI_Comm comm)