Yet Another eXchange Tool  0.9.0
xt_exchanger_irecv_isend_packed.c
Go to the documentation of this file.
1 
12 /*
13  * Keywords:
14  * Maintainer: Jörg Behrens <behrens@dkrz.de>
15  * Moritz Hanke <hanke@dkrz.de>
16  * Thomas Jahns <jahns@dkrz.de>
17  * URL: https://doc.redmine.dkrz.de/yaxt/html/
18  *
19  * Redistribution and use in source and binary forms, with or without
20  * modification, are permitted provided that the following conditions are
21  * met:
22  *
23  * Redistributions of source code must retain the above copyright notice,
24  * this list of conditions and the following disclaimer.
25  *
26  * Redistributions in binary form must reproduce the above copyright
27  * notice, this list of conditions and the following disclaimer in the
28  * documentation and/or other materials provided with the distribution.
29  *
30  * Neither the name of the DKRZ GmbH nor the names of its contributors
31  * may be used to endorse or promote products derived from this software
32  * without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45  */
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49 
50 #include "core/ppm_xfuncs.h"
51 #include "xt/xt_mpi.h"
53 #include "xt_mpi_internal.h"
54 #include "xt_redist_internal.h"
57 
58 static void
60  const void *src_data, void *dst_data,
61  int nsend, int nrecv,
62  const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs,
63  int tag_offset, MPI_Comm comm) {
64 
65  enum { AUTO_ALLOC_SIZE = 32, };
66  MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
67  int *buffer_sizes, buffer_sizes_auto[AUTO_ALLOC_SIZE];
68 
69  size_t num_tx = (size_t)nrecv + (size_t)nsend;
70  if (num_tx <= AUTO_ALLOC_SIZE) {
71  requests = requests_auto;
72  buffer_sizes = buffer_sizes_auto;
73  } else {
74  requests = xmalloc(num_tx * sizeof (*requests));
75  buffer_sizes = xmalloc(num_tx * sizeof (*buffer_sizes));
76  }
77 
78  for (int i = 0; i < nrecv; ++i)
79  xt_mpi_call(MPI_Pack_size(1, recv_msgs[i].datatype, comm, buffer_sizes+i),
80  comm);
81  for (int i = 0; i < nsend; ++i)
82  xt_mpi_call(MPI_Pack_size(1, send_msgs[i].datatype, comm,
83  buffer_sizes+nrecv+i), comm);
84  size_t buffer_size = 0;
85  for (size_t i = 0; i < num_tx; ++i)
86  buffer_size += (size_t)buffer_sizes[i];
87 
88  unsigned char *buffer = xmalloc(buffer_size);
89 
90  size_t ofs = 0;
91  for (int i = 0; i < nrecv; ++i) {
92  int recv_size = buffer_sizes[i];
93  xt_mpi_call(MPI_Irecv(buffer + ofs, recv_size, MPI_PACKED,
94  recv_msgs[i].rank,
95  tag_offset + xt_mpi_tag_exchange_msg, comm,
96  requests+i), comm);
97  ofs += (size_t)recv_size;
98  }
99 
100  for (int i = 0; i < nsend; ++i) {
101  int position = 0;
102  xt_mpi_call(MPI_Pack(CAST_MPI_SEND_BUF(src_data), 1, send_msgs[i].datatype,
103  buffer + ofs, buffer_sizes[nrecv+i], &position,
104  comm), comm);
105  xt_mpi_call(MPI_Isend(buffer + ofs, position, MPI_PACKED,
106  send_msgs[i].rank,
107  tag_offset + xt_mpi_tag_exchange_msg, comm,
108  requests+nrecv+i), comm);
109  ofs += (size_t)position;
110  }
111 
112  xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), comm);
113 
114  ofs = 0;
115  for (int i = 0; i < nrecv; ++i) {
116  int position = 0, recv_size = buffer_sizes[i];
117  xt_mpi_call(MPI_Unpack(buffer + ofs, recv_size, &position, dst_data,
118  1, recv_msgs[i].datatype, comm), comm);
119  ofs += (size_t)recv_size;
120  }
121 
122  free(buffer);
123  if (num_tx > AUTO_ALLOC_SIZE) {
124  free(buffer_sizes);
125  free(requests);
126  }
127 }
128 
129 static void
130 xt_exchanger_irecv_isend_packed_a_exchange(const void *src_data, void *dst_data,
131  int nsend, int nrecv,
132  const struct Xt_redist_msg * send_msgs,
133  const struct Xt_redist_msg * recv_msgs,
134  int tag_offset, MPI_Comm comm,
135  Xt_request *request) {
136 
137  MPI_Request * tmp_requests =
138  xmalloc((size_t)(nrecv + nsend) * sizeof (*tmp_requests));
139  void ** buffers =
140  xmalloc((size_t)(nrecv + nsend) * sizeof (*buffers));
141 
142  int buffer_size;
143  for (int i = 0; i < nrecv; ++i) {
144  xt_mpi_call(MPI_Pack_size(1, recv_msgs[i].datatype, comm, &buffer_size),
145  comm);
146  buffers[i] = xmalloc((size_t)buffer_size);
147  xt_mpi_call(MPI_Irecv(buffers[i], buffer_size, MPI_PACKED,
148  recv_msgs[i].rank,
149  tag_offset + xt_mpi_tag_exchange_msg, comm,
150  tmp_requests+i), comm);
151  }
152 
153  for (int i = 0; i < nsend; ++i) {
154  int position = 0;
155  xt_mpi_call(MPI_Pack_size(1, send_msgs[i].datatype, comm, &buffer_size),
156  comm);
157  buffers[nrecv + i] = xmalloc((size_t)buffer_size);
158  xt_mpi_call(MPI_Pack((void*)src_data, 1, send_msgs[i].datatype,
159  buffers[nrecv + i], buffer_size, &position,
160  comm), comm);
161  xt_mpi_call(MPI_Isend(buffers[nrecv + i], buffer_size, MPI_PACKED,
162  send_msgs[i].rank,
163  tag_offset + xt_mpi_tag_exchange_msg, comm,
164  tmp_requests+nrecv+i), comm);
165  }
166 
167  MPI_Datatype * datatypes = xmalloc((size_t)nrecv * sizeof (*datatypes));
168  for (int i = 0; i < nrecv; ++i) datatypes[i] = recv_msgs[i].datatype;
169 
170  Xt_request requests =
171  xt_request_msgs_packed_new(nrecv + nsend, tmp_requests, comm, nrecv, nsend,
172  datatypes, buffers, buffers + nrecv, dst_data);
173 
174  free(datatypes);
175  free(buffers);
176  free(tmp_requests);
177 
178  *request = requests;
179 }
180 
183  const struct Xt_redist_msg *send_msgs,
184  const struct Xt_redist_msg *recv_msgs,
185  MPI_Comm comm, int tag_offset) {
186 
191  return xt_exchanger_simple_base_new(nsend, nrecv, send_msgs, recv_msgs,
192  comm, tag_offset,
195 }
196 
197 /*
198  * Local Variables:
199  * c-basic-offset: 2
200  * coding: utf-8
201  * indent-tabs-mode: nil
202  * show-trailing-whitespace: t
203  * require-trailing-newline: t
204  * End:
205  */
int MPI_Comm
Definition: core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition: ppm_xfuncs.h:70
static void xt_exchanger_irecv_isend_packed_a_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
Xt_exchanger xt_exchanger_irecv_isend_packed_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
static void xt_exchanger_irecv_isend_packed_s_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
Xt_exchanger xt_exchanger_simple_base_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, xt_simple_s_exchange_func s_func, xt_simple_a_exchange_func a_func)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition: xt_mpi.h:68
@ xt_mpi_tag_exchange_msg
redistribution of data, non-public declarations
Xt_request xt_request_msgs_packed_new(int n_requests, const MPI_Request requests[n_requests], MPI_Comm comm, int n_packed, int n_tmp_buffers, const MPI_Datatype datatypes[n_packed], void *packed_data[n_packed], void *tmp_buffers[n_tmp_buffers], void *unpacked_data)