Yet Another eXchange Tool  0.9.0
xt_exchanger_neigh_alltoall.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include <mpi.h>
51 
53 
54 #include "core/core.h"
55 
56 #if MPI_VERSION >= 3
57 
58 #include <assert.h>
59 #include <string.h>
60 
61 #include "core/ppm_xfuncs.h"
62 #include "xt/xt_mpi.h"
63 #include "xt_mpi_internal.h"
64 #include "xt_redist_internal.h"
65 #include "xt/xt_xmap.h"
66 #include "xt/xt_idxlist.h"
67 #include "xt/xt_request.h"
68 #include "xt/xt_request_msgs.h"
69 #include "xt_exchanger.h"
70 
71 #define MAX(a,b) ((a) >= (b) ? (a) : (b))
72 
73 static Xt_exchanger
74 xt_exchanger_neigh_alltoall_copy(Xt_exchanger exchanger,
75  MPI_Comm newComm, int new_tag_offset);
76 static void xt_exchanger_neigh_alltoall_delete(Xt_exchanger exchanger);
77 static void xt_exchanger_neigh_alltoall_s_exchange(Xt_exchanger exchanger,
78  const void * src_data,
79  void * dst_data);
80 static void xt_exchanger_neigh_alltoall_a_exchange(Xt_exchanger exchanger,
81  const void * src_data,
82  void * dst_data,
83  Xt_request *request);
84 static int
85 xt_exchanger_neigh_alltoall_get_msg_ranks(Xt_exchanger exchanger,
86  enum xt_msg_direction direction,
87  int *restrict *ranks);
88 
89 static MPI_Datatype
90 xt_exchanger_neigh_alltoall_get_MPI_Datatype(Xt_exchanger exchanger,
91  int rank,
92  enum xt_msg_direction direction);
93 
94 
95 static const struct xt_exchanger_vtable exchanger_neigh_alltoall_vtable = {
96  .copy = xt_exchanger_neigh_alltoall_copy,
97  .delete = xt_exchanger_neigh_alltoall_delete,
98  .s_exchange = xt_exchanger_neigh_alltoall_s_exchange,
99  .a_exchange = xt_exchanger_neigh_alltoall_a_exchange,
100  .get_msg_ranks = xt_exchanger_neigh_alltoall_get_msg_ranks,
101  .get_MPI_Datatype = xt_exchanger_neigh_alltoall_get_MPI_Datatype,
102 };
103 
104 typedef struct Xt_exchanger_neigh_alltoall_ * Xt_exchanger_neigh_alltoall;
105 
106 struct Xt_exchanger_neigh_alltoall_ {
107 
108  const struct xt_exchanger_vtable * vtable;
109 
110  int nmsg[2];
111  int tag_offset;
112  MPI_Comm comm;
113  int * ranks;
114  int * one_counts;
115  MPI_Aint * displs;
116  MPI_Datatype * datatypes;
117 };
118 
119 static Xt_exchanger_neigh_alltoall
120 xt_exchanger_neigh_alltoall_alloc(size_t nsend, size_t nrecv)
121 {
122  size_t nmsg = nsend + nrecv;
123  size_t max_msgs = MAX(nsend, nrecv);
124  Xt_exchanger_neigh_alltoall exchanger = xmalloc(1 * sizeof(*exchanger));
125  exchanger->ranks = xmalloc(nmsg * sizeof(*(exchanger->ranks)));
126  exchanger->datatypes = xmalloc(nmsg * sizeof(*(exchanger->datatypes)));
127  exchanger->one_counts = xmalloc(max_msgs * sizeof(*(exchanger->one_counts)));
128  exchanger->displs = xmalloc(max_msgs * sizeof(*(exchanger->displs)));
129  exchanger->vtable = &exchanger_neigh_alltoall_vtable;
130  for (size_t i = 0; i < max_msgs; ++i) {
131  exchanger->one_counts[i] = 1;
132  exchanger->displs[i] = 0;
133  }
134  return exchanger;
135 }
136 
137 static void copy_from_redist_msgs(size_t n,
138  const struct Xt_redist_msg *restrict msgs,
139  int *restrict ranks,
140  MPI_Datatype *restrict datatypes,
141  MPI_Comm comm) {
142 
143  for (size_t i = 0; i < n; ++i) {
144  ranks[i] = msgs[i].rank;
145  xt_mpi_call(MPI_Type_dup(msgs[i].datatype, datatypes + i), comm);
146  }
147 }
148 
150 xt_exchanger_neigh_alltoall_new(int nsend, int nrecv,
151  const struct Xt_redist_msg *send_msgs,
152  const struct Xt_redist_msg *recv_msgs,
153  MPI_Comm comm, int tag_offset) {
154 
160  int flag;
161  xt_mpi_call(MPI_Comm_test_inter(comm, &flag), comm);
162  if (flag)
163  Xt_abort(comm, "ERROR(xt_exchanger_neigh_alltoall_new): "
164  "inter-communicator's are not defined for virtual topologies",
165  __FILE__, __LINE__);
166 
167  assert((nsend >= 0) & (nrecv >= 0));
168  Xt_exchanger_neigh_alltoall exchanger
169  = xt_exchanger_neigh_alltoall_alloc((size_t)nsend, (size_t)nrecv);
170  exchanger->tag_offset = tag_offset;
171  exchanger->nmsg[SEND] = nsend;
172  copy_from_redist_msgs((size_t)nsend, send_msgs, exchanger->ranks,
173  exchanger->datatypes, comm);
174  exchanger->nmsg[RECV] = nrecv;
175  copy_from_redist_msgs((size_t)nrecv, recv_msgs, exchanger->ranks + nsend,
176  exchanger->datatypes + nsend, comm);
177 
178  int reorder = 0; // no reordering of ranks in new comm
179  xt_mpi_call(
180  MPI_Dist_graph_create_adjacent(
181  comm, nrecv, exchanger->ranks + nsend, MPI_UNWEIGHTED, nsend,
182  exchanger->ranks, MPI_UNWEIGHTED, MPI_INFO_NULL, reorder,
183  &(exchanger->comm)), comm);
184 
185  return (Xt_exchanger)exchanger;
186 }
187 
188 static Xt_exchanger
189 xt_exchanger_neigh_alltoall_copy(Xt_exchanger exchanger,
190  MPI_Comm new_comm, int new_tag_offset)
191 {
192  Xt_exchanger_neigh_alltoall exchanger_na =
193  (Xt_exchanger_neigh_alltoall)exchanger;
194  size_t nsend = (size_t)(exchanger_na->nmsg[SEND]),
195  nrecv = (size_t)(exchanger_na->nmsg[RECV]),
196  nmsg = nsend + nrecv;
197 
198  Xt_exchanger_neigh_alltoall
199  exchanger_copy = xt_exchanger_neigh_alltoall_alloc(nsend, nrecv);
200 
201  exchanger_copy->nmsg[SEND] = (int)nsend;
202  exchanger_copy->nmsg[RECV] = (int)nrecv;
203  exchanger_copy->tag_offset = new_tag_offset;
204  exchanger_copy->comm = new_comm;
205  memcpy(exchanger_copy->ranks, exchanger_na->ranks,
206  nmsg * sizeof(*(exchanger_copy->ranks)));
207  for (size_t i = 0; i < nmsg; ++i)
208  xt_mpi_call(MPI_Type_dup(exchanger_na->datatypes[i],
209  exchanger_copy->datatypes + i), new_comm);
210 
211  return (Xt_exchanger)exchanger_copy;
212 }
213 
214 static void xt_exchanger_neigh_alltoall_delete(Xt_exchanger exchanger) {
215 
216  Xt_exchanger_neigh_alltoall exchanger_na =
217  (Xt_exchanger_neigh_alltoall)exchanger;
218 
219  size_t nmsg = (size_t)exchanger_na->nmsg[SEND]
220  + (size_t)exchanger_na->nmsg[RECV];
221  MPI_Comm comm = exchanger_na->comm;
222 
223  free(exchanger_na->ranks);
224  free(exchanger_na->one_counts);
225  free(exchanger_na->displs);
226  for (size_t i = 0; i < nmsg; ++i) {
227  MPI_Datatype *dt = exchanger_na->datatypes + i;
228  if (*dt != MPI_DATATYPE_NULL)
229  xt_mpi_call(MPI_Type_free(dt), comm);
230  }
231  free(exchanger_na->datatypes);
232  xt_mpi_call(MPI_Comm_free(&(exchanger_na->comm)), Xt_default_comm);
233  free(exchanger_na);
234 }
235 
236 static void xt_exchanger_neigh_alltoall_s_exchange(Xt_exchanger exchanger,
237  const void * src_data,
238  void * dst_data) {
239 
240  Xt_exchanger_neigh_alltoall exchanger_na =
241  (Xt_exchanger_neigh_alltoall)exchanger;
242 
243  xt_mpi_call(
244  MPI_Neighbor_alltoallw(src_data, exchanger_na->one_counts,
245  exchanger_na->displs, exchanger_na->datatypes,
246  dst_data, exchanger_na->one_counts,
247  exchanger_na->displs, exchanger_na->datatypes +
248  (size_t)(exchanger_na->nmsg[SEND]),
249  exchanger_na->comm),
250  exchanger_na->comm);
251 }
252 
253 static void xt_exchanger_neigh_alltoall_a_exchange(Xt_exchanger exchanger,
254  const void * src_data,
255  void * dst_data,
256  Xt_request *request) {
257 
258  Xt_exchanger_neigh_alltoall exchanger_na =
259  (Xt_exchanger_neigh_alltoall)exchanger;
260 
261  MPI_Request tmp_request;
262 
263  xt_mpi_call(
264  MPI_Ineighbor_alltoallw(src_data, exchanger_na->one_counts,
265  exchanger_na->displs, exchanger_na->datatypes,
266  dst_data, exchanger_na->one_counts,
267  exchanger_na->displs, exchanger_na->datatypes +
268  (size_t)(exchanger_na->nmsg[SEND]),
269  exchanger_na->comm, &tmp_request),
270  exchanger_na->comm);
271 
272  *request = xt_request_msgs_new(1, &tmp_request, exchanger_na->comm);
273 }
274 
275 static MPI_Datatype
276 xt_exchanger_neigh_alltoall_get_MPI_Datatype(Xt_exchanger exchanger,
277  int rank,
278  enum xt_msg_direction direction)
279 {
280  Xt_exchanger_neigh_alltoall exchanger_na =
281  (Xt_exchanger_neigh_alltoall)exchanger;
282  size_t nsend = (size_t)exchanger_na->nmsg[SEND],
283  nmsg = (size_t)exchanger_na->nmsg[direction],
284  ofs = direction == SEND ? 0 : nsend;
285  int *restrict ranks = exchanger_na->ranks + ofs;
286  MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
287  for (size_t i = 0; i < nmsg; ++i) {
288  if (ranks[i] == rank) {
289  xt_mpi_call(MPI_Type_dup(exchanger_na->datatypes[i+ofs], &datatype_copy),
290  exchanger_na->comm);
291  break;
292  }
293  }
294  return datatype_copy;
295 }
296 
297 static int
298 xt_exchanger_neigh_alltoall_get_msg_ranks(Xt_exchanger exchanger,
299  enum xt_msg_direction direction,
300  int *restrict *ranks)
301 {
302  Xt_exchanger_neigh_alltoall exchanger_na =
303  (Xt_exchanger_neigh_alltoall)exchanger;
304  size_t nsend = (size_t)exchanger_na->nmsg[SEND],
305  nmsg = (size_t)exchanger_na->nmsg[direction],
306  ofs = direction == SEND ? 0 : nsend;
307  *ranks = xmalloc(nmsg * sizeof(**ranks));
308  memcpy(*ranks, exchanger_na->ranks + ofs, nmsg * sizeof(**ranks));
309  return (int)nmsg;
310 }
311 
312 // #if MPI_VERSION >= 3
313 #else
314 
316 xt_exchanger_neigh_alltoall_new(int nsend, int nrecv,
317  const struct Xt_redist_msg *send_msgs,
318  const struct Xt_redist_msg *recv_msgs,
319  MPI_Comm comm, int tag_offset) {
320 
321  (void)nsend; (void)nrecv; (void)send_msgs; (void)recv_msgs; (void)tag_offset;
322  Xt_abort(comm, "ERROR(xt_exchanger_neigh_alltoall_new): "
323  "exchanger_neigh_alltoall requires MPI version 3.0 or higher",
324  __FILE__, __LINE__);
325 
326  return NULL;
327 }
328 
329 // #if MPI_VERSION >= 3
330 #endif
331 
332 /*
333  * Local Variables:
334  * c-basic-offset: 2
335  * coding: utf-8
336  * indent-tabs-mode: nil
337  * show-trailing-whitespace: t
338  * require-trailing-newline: t
339  * End:
340  */
int MPI_Comm
Definition: core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
Xt_exchanger(* copy)(Xt_exchanger, MPI_Comm, int)
Definition: xt_exchanger.h:66
exchanging of data based on information provided by redist's
Xt_exchanger xt_exchanger_neigh_alltoall_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
index list declaration
#define MAX(a, b)
Definition: xt_idxstripes.c:77
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
redistribution of data, non-public declarations
xt_msg_direction
@ RECV
@ SEND
Xt_request xt_request_msgs_new(int n, const MPI_Request requests[n], MPI_Comm comm)
exchange map declarations