60 const void *src_data,
void *dst_data,
65 enum { AUTO_ALLOC_SIZE = 32, };
66 MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
67 int *buffer_sizes, buffer_sizes_auto[AUTO_ALLOC_SIZE];
69 size_t num_tx = (size_t)nrecv + (
size_t)nsend;
70 if (num_tx <= AUTO_ALLOC_SIZE) {
71 requests = requests_auto;
72 buffer_sizes = buffer_sizes_auto;
74 requests =
xmalloc(num_tx *
sizeof (*requests));
75 buffer_sizes =
xmalloc(num_tx *
sizeof (*buffer_sizes));
78 for (
int i = 0; i < nrecv; ++i)
79 xt_mpi_call(MPI_Pack_size(1, recv_msgs[i].datatype, comm, buffer_sizes+i),
81 for (
int i = 0; i < nsend; ++i)
82 xt_mpi_call(MPI_Pack_size(1, send_msgs[i].datatype, comm,
83 buffer_sizes+nrecv+i), comm);
84 size_t buffer_size = 0;
85 for (
size_t i = 0; i < num_tx; ++i)
86 buffer_size += (
size_t)buffer_sizes[i];
88 unsigned char *buffer =
xmalloc(buffer_size);
91 for (
int i = 0; i < nrecv; ++i) {
92 int recv_size = buffer_sizes[i];
93 xt_mpi_call(MPI_Irecv(buffer + ofs, recv_size, MPI_PACKED,
97 ofs += (size_t)recv_size;
100 for (
int i = 0; i < nsend; ++i) {
102 xt_mpi_call(MPI_Pack(CAST_MPI_SEND_BUF(src_data), 1, send_msgs[i].datatype,
103 buffer + ofs, buffer_sizes[nrecv+i], &position,
105 xt_mpi_call(MPI_Isend(buffer + ofs, position, MPI_PACKED,
108 requests+nrecv+i), comm);
109 ofs += (size_t)position;
112 xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), comm);
115 for (
int i = 0; i < nrecv; ++i) {
116 int position = 0, recv_size = buffer_sizes[i];
117 xt_mpi_call(MPI_Unpack(buffer + ofs, recv_size, &position, dst_data,
118 1, recv_msgs[i].datatype, comm), comm);
119 ofs += (size_t)recv_size;
123 if (num_tx > AUTO_ALLOC_SIZE) {
131 int nsend,
int nrecv,
137 MPI_Request * tmp_requests =
138 xmalloc((
size_t)(nrecv + nsend) *
sizeof (*tmp_requests));
140 xmalloc((
size_t)(nrecv + nsend) *
sizeof (*buffers));
143 for (
int i = 0; i < nrecv; ++i) {
144 xt_mpi_call(MPI_Pack_size(1, recv_msgs[i].datatype, comm, &buffer_size),
146 buffers[i] =
xmalloc((
size_t)buffer_size);
147 xt_mpi_call(MPI_Irecv(buffers[i], buffer_size, MPI_PACKED,
150 tmp_requests+i), comm);
153 for (
int i = 0; i < nsend; ++i) {
155 xt_mpi_call(MPI_Pack_size(1, send_msgs[i].datatype, comm, &buffer_size),
157 buffers[nrecv + i] =
xmalloc((
size_t)buffer_size);
158 xt_mpi_call(MPI_Pack((
void*)src_data, 1, send_msgs[i].datatype,
159 buffers[nrecv + i], buffer_size, &position,
161 xt_mpi_call(MPI_Isend(buffers[nrecv + i], buffer_size, MPI_PACKED,
164 tmp_requests+nrecv+i), comm);
167 MPI_Datatype * datatypes =
xmalloc((
size_t)nrecv *
sizeof (*datatypes));
168 for (
int i = 0; i < nrecv; ++i) datatypes[i] = recv_msgs[i].datatype;
172 datatypes, buffers, buffers + nrecv, dst_data);
add versions of standard API functions not returning on error
static void xt_exchanger_irecv_isend_packed_a_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
Xt_exchanger xt_exchanger_irecv_isend_packed_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
static void xt_exchanger_irecv_isend_packed_s_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
Xt_exchanger xt_exchanger_simple_base_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, xt_simple_s_exchange_func s_func, xt_simple_a_exchange_func a_func)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_exchange_msg
redistribution of data, non-public declarations
Xt_request xt_request_msgs_packed_new(int n_requests, const MPI_Request requests[n_requests], MPI_Comm comm, int n_packed, int n_tmp_buffers, const MPI_Datatype datatypes[n_packed], void *packed_data[n_packed], void *tmp_buffers[n_tmp_buffers], void *unpacked_data)