Yet Another eXchange Tool  0.9.0
xt_exchanger_simple_base.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include <assert.h>
51 #include <mpi.h>
52 
53 #include "core/core.h"
54 #include "core/ppm_xfuncs.h"
55 #include "xt/xt_mpi.h"
56 #include "xt_mpi_internal.h"
57 #include "xt_redist_internal.h"
58 #include "xt_exchanger.h"
60 
61 static Xt_exchanger
63  MPI_Comm newComm, int new_tag_offset);
64 static void xt_exchanger_simple_base_delete(Xt_exchanger exchanger);
66  const void * src_data,
67  void * dst_data);
69  const void * src_data,
70  void * dst_data,
71  Xt_request *request);
72 static int
74  enum xt_msg_direction direction,
75  int *restrict *ranks);
76 
77 static MPI_Datatype
79  int rank,
80  enum xt_msg_direction direction);
81 
82 
90 };
91 
93 
95 
96  const struct xt_exchanger_vtable * vtable;
97 
98  int nmsg[2];
103  struct Xt_redist_msg msgs[];
104 };
105 
108 {
109  Xt_exchanger_simple_base exchanger;
110  size_t header_size = sizeof(*exchanger),
111  body_size = nmsg * sizeof (exchanger->msgs[0]);
112  exchanger = xmalloc(header_size + body_size);
113  exchanger->vtable = &exchanger_simple_base_vtable;
114  return exchanger;
115 }
116 
118 xt_exchanger_simple_base_new(int nsend, int nrecv,
119  const struct Xt_redist_msg *send_msgs,
120  const struct Xt_redist_msg *recv_msgs,
121  MPI_Comm comm, int tag_offset,
123  xt_simple_a_exchange_func a_func) {
124 
130  if (s_func == NULL)
131  Xt_abort(comm, "ERROR(xt_exchanger_simple_base_new): invalid synchronous "
132  "exchange function pointer", __FILE__, __LINE__);
133 
134  assert((nsend >= 0) & (nrecv >= 0));
135  size_t nmsg = (size_t)nsend + (size_t)nrecv;
136  Xt_exchanger_simple_base exchanger
138  exchanger->comm = comm;
139  exchanger->tag_offset = tag_offset;
140  exchanger->nmsg[SEND] = nsend;
141  exchanger->nmsg[RECV] = nrecv;
142  xt_redist_msgs_strided_copy((size_t)nsend, send_msgs, sizeof (send_msgs[0]),
143  exchanger->msgs, sizeof (exchanger->msgs[0]),
144  comm);
145  xt_redist_msgs_strided_copy((size_t)nrecv, recv_msgs, sizeof (recv_msgs[0]),
146  exchanger->msgs + nsend,
147  sizeof (exchanger->msgs[0]),
148  comm);
149  exchanger->s_func = s_func;
150  exchanger->a_func = a_func;
151 
152  xt_exchanger_internal_optimize((size_t)nsend, exchanger->msgs,
153  sizeof(exchanger->msgs[0]),
154  comm);
155 
156  xt_exchanger_internal_optimize((size_t)nrecv, exchanger->msgs + nsend,
157  sizeof(exchanger->msgs[0]),
158  comm);
159 
160  return (Xt_exchanger)exchanger;
161 }
162 
163 static Xt_exchanger
165  MPI_Comm new_comm, int new_tag_offset)
166 {
167  Xt_exchanger_simple_base exchanger_sb =
168  (Xt_exchanger_simple_base)exchanger;
169  int nsend = exchanger_sb->nmsg[SEND],
170  nrecv = exchanger_sb->nmsg[RECV];
171  size_t nmsg = (size_t)nsend + (size_t)nrecv;
172  Xt_exchanger_simple_base exchanger_copy
174  exchanger_copy->nmsg[SEND] = nsend;
175  exchanger_copy->nmsg[RECV] = nrecv;
176  exchanger_copy->s_func = exchanger_sb->s_func;
177  exchanger_copy->a_func = exchanger_sb->a_func;
178  struct Xt_redist_msg *restrict new_msgs = exchanger_copy->msgs,
179  *restrict orig_msgs = exchanger_sb->msgs;
180  xt_redist_msgs_strided_copy(nmsg, orig_msgs, sizeof (*orig_msgs),
181  new_msgs, sizeof (*new_msgs),
182  new_comm);
183  exchanger_copy->comm = new_comm;
184  exchanger_copy->tag_offset = new_tag_offset;
185  return (Xt_exchanger)exchanger_copy;
186 }
187 
188 
190 
191  Xt_exchanger_simple_base exchanger_sb =
192  (Xt_exchanger_simple_base)exchanger;
193 
194  size_t nmsg = (size_t)exchanger_sb->nmsg[SEND] + (size_t)exchanger_sb->nmsg[RECV];
195  struct Xt_redist_msg *restrict msgs = exchanger_sb->msgs;
196  xt_redist_msgs_strided_destruct(nmsg, msgs, exchanger_sb->comm,
197  sizeof (msgs[0]));
198  free(exchanger_sb);
199 }
200 
202  const void * src_data,
203  void * dst_data) {
204 
205  Xt_exchanger_simple_base exchanger_sb =
206  (Xt_exchanger_simple_base)exchanger;
207 
208  int nsend = exchanger_sb->nmsg[SEND];
209  exchanger_sb->s_func(src_data, dst_data, nsend,
210  exchanger_sb->nmsg[RECV], exchanger_sb->msgs,
211  exchanger_sb->msgs + (size_t)nsend,
212  exchanger_sb->tag_offset, exchanger_sb->comm);
213 }
214 
216  const void * src_data,
217  void * dst_data,
218  Xt_request *request) {
219 
220  Xt_exchanger_simple_base exchanger_sb =
221  (Xt_exchanger_simple_base)exchanger;
222 
223  if (exchanger_sb->a_func == NULL)
224  Xt_abort(exchanger_sb->comm, "ERROR(xt_exchanger_simple_base_a_exchange): "
225  "asynchronous exchange function not defined for current exchanger",
226  __FILE__, __LINE__);
227 
228  int nsend = exchanger_sb->nmsg[SEND];
229  exchanger_sb->a_func(src_data, dst_data, nsend,
230  exchanger_sb->nmsg[RECV], exchanger_sb->msgs,
231  exchanger_sb->msgs + (size_t)nsend,
232  exchanger_sb->tag_offset, exchanger_sb->comm, request);
233 }
234 
235 static MPI_Datatype
237  int rank,
238  enum xt_msg_direction direction)
239 {
240  Xt_exchanger_simple_base exchanger_sb =
241  (Xt_exchanger_simple_base)exchanger;
242  size_t nsend = (size_t)exchanger_sb->nmsg[SEND],
243  nmsg = (size_t)exchanger_sb->nmsg[direction],
244  ofs = direction == SEND ? 0 : nsend;
245  struct Xt_redist_msg *restrict msgs = exchanger_sb->msgs + ofs;
246  MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
247  for (size_t i = 0; i < nmsg; ++i)
248  if (msgs[i].rank == rank) {
249  xt_mpi_call(MPI_Type_dup(msgs[i].datatype, &datatype_copy),
250  exchanger_sb->comm);
251  break;
252  }
253  return datatype_copy;
254 }
255 
256 static int
258  enum xt_msg_direction direction,
259  int *restrict *ranks)
260 {
261  Xt_exchanger_simple_base exchanger_sb = (Xt_exchanger_simple_base)exchanger;
262  size_t nmsg = (size_t)exchanger_sb->nmsg[direction];
263  struct Xt_redist_msg *restrict msgs = exchanger_sb->msgs
264  + (direction == RECV ? (size_t)exchanger_sb->nmsg[SEND] : 0);
265  int *restrict ranks_ = *ranks = xmalloc(nmsg * sizeof (*ranks_));
266  for (size_t i = 0; i < nmsg; ++i)
267  ranks_[i] = msgs[i].rank;
268  return (int)nmsg;
269 }
270 
271 /*
272  * Local Variables:
273  * c-basic-offset: 2
274  * coding: utf-8
275  * indent-tabs-mode: nil
276  * show-trailing-whitespace: t
277  * require-trailing-newline: t
278  * End:
279  */
int MPI_Comm
Definition: core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
xt_simple_a_exchange_func a_func
xt_simple_s_exchange_func s_func
const struct xt_exchanger_vtable * vtable
MPI_Datatype datatype
Definition: xt_redist.h:69
Xt_exchanger(* copy)(Xt_exchanger, MPI_Comm, int)
Definition: xt_exchanger.h:66
void xt_exchanger_internal_optimize(size_t n, void *msgs, size_t msg_type_size, MPI_Comm comm)
Definition: xt_exchanger.c:90
exchanging of data based on information provided by redist's
static void xt_exchanger_simple_base_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
static MPI_Datatype xt_exchanger_simple_base_get_MPI_Datatype(Xt_exchanger exchanger, int rank, enum xt_msg_direction direction)
static Xt_exchanger_simple_base xt_exchanger_simple_base_alloc(size_t nmsg)
static int xt_exchanger_simple_base_get_msg_ranks(Xt_exchanger exchanger, enum xt_msg_direction direction, int *restrict *ranks)
static void xt_exchanger_simple_base_delete(Xt_exchanger exchanger)
static Xt_exchanger xt_exchanger_simple_base_copy(Xt_exchanger exchanger, MPI_Comm newComm, int new_tag_offset)
static void xt_exchanger_simple_base_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
struct Xt_exchanger_simple_base_ * Xt_exchanger_simple_base
Xt_exchanger xt_exchanger_simple_base_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, xt_simple_s_exchange_func s_func, xt_simple_a_exchange_func a_func)
static const struct xt_exchanger_vtable exchanger_simple_base_vtable
void(* xt_simple_s_exchange_func)(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
void(* xt_simple_a_exchange_func)(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
void xt_redist_msgs_strided_copy(size_t n, const struct Xt_redist_msg *restrict src, size_t src_stride, struct Xt_redist_msg *restrict dst, size_t dst_stride, MPI_Comm comm)
Definition: xt_redist.c:136
void xt_redist_msgs_strided_destruct(size_t n, struct Xt_redist_msg *msgs, MPI_Comm comm, size_t ofs_stride)
Definition: xt_redist.c:156
redistribution of data, non-public declarations
xt_msg_direction
@ RECV
@ SEND