Yet Another eXchange Tool 0.11.3
Loading...
Searching...
No Matches
xt_exchanger_irecv_isend_packed.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <mpi.h>
51#ifdef _OPENMP
52#include <omp.h>
53#endif
54
55#include "core/ppm_xfuncs.h"
56#include "xt/xt_mpi.h"
59#include "xt_config_internal.h"
60#include "xt_mpi_internal.h"
61#include "xt_redist_internal.h"
64
65/* unfortunately GCC 11 to 13 cannot handle the literal constants used for
66 * MPI_STATUSES_IGNORE by MPICH */
67#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
68#pragma GCC diagnostic push
69#pragma GCC diagnostic ignored "-Wstringop-overread"
70#pragma GCC diagnostic ignored "-Wstringop-overflow"
71#endif
72
73static size_t
74get_buffer_offsets(size_t *restrict buf_ofs,
75 int nsend, int nrecv,
76 const struct Xt_redist_msg *send_msgs,
77 const struct Xt_redist_msg *recv_msgs,
78 MPI_Comm comm)
79{
80 int buf_size;
81 size_t accum = 0;
82 for (int i = 0; i < nrecv; ++i) {
83 buf_ofs[i] = accum;
84 xt_mpi_call(MPI_Pack_size(1, recv_msgs[i].datatype, comm, &buf_size),
85 comm);
86 accum += (size_t)buf_size;
87 }
88 for (int i = 0; i < nsend; ++i) {
89 buf_ofs[nrecv+i] = accum;
90 xt_mpi_call(MPI_Pack_size(1, send_msgs[i].datatype, comm,
91 &buf_size), comm);
92 accum += (size_t)buf_size;
93 }
94 buf_ofs[nsend+nrecv] = accum;
95 return accum;
96}
97
98static size_t
99get_buffer_size(int nsend, int nrecv,
100 const struct Xt_redist_msg *send_msgs,
101 const struct Xt_redist_msg *recv_msgs,
102 MPI_Comm comm)
103{
104 int buf_size;
105 size_t accum = 0;
106 for (int i = 0; i < nrecv; ++i) {
107 xt_mpi_call(MPI_Pack_size(1, recv_msgs[i].datatype, comm, &buf_size),
108 comm);
109 accum += (size_t)buf_size;
110 }
111 for (int i = 0; i < nsend; ++i) {
112 xt_mpi_call(MPI_Pack_size(1, send_msgs[i].datatype, comm,
113 &buf_size), comm);
114 accum += (size_t)buf_size;
115 }
116
117 return accum;
118}
119
120static void
121start_packed_transfer(unsigned char *buffer,
122 int send_start,
123 const size_t *buf_ofs,
124 const void *src_data, int nsend, int nrecv,
125 const struct Xt_redist_msg *send_msgs,
126 const struct Xt_redist_msg *recv_msgs,
127 MPI_Comm comm, int tag_offset,
128 MPI_Request *requests)
129{
130 for (int i = 0; i < nrecv; ++i) {
131 int recv_size = (int)(buf_ofs[i+1] - buf_ofs[i]);
132 xt_mpi_call(MPI_Irecv(buffer + buf_ofs[i], recv_size, MPI_PACKED,
133 recv_msgs[i].rank,
134 tag_offset + xt_mpi_tag_exchange_msg, comm,
135 requests+i), comm);
136 }
137
138 for (int i = 0; i < nsend; ++i) {
139 int position = 0;
140 int buf_size = (int)(buf_ofs[send_start+i+1] - buf_ofs[send_start+i]);
141 xt_mpi_call(MPI_Pack(CAST_MPI_SEND_BUF(src_data), 1, send_msgs[i].datatype,
142 buffer + buf_ofs[send_start+i], buf_size, &position,
143 comm), comm);
144 xt_mpi_call(MPI_Isend(buffer + buf_ofs[send_start+i], position, MPI_PACKED,
145 send_msgs[i].rank,
146 tag_offset + xt_mpi_tag_exchange_msg, comm,
147 requests+nrecv+i), comm);
148 }
149}
150
151enum { AUTO_ALLOC_SIZE = 32, };
152
153static void
155 const void *src_data, void *dst_data,
156 int nsend, int nrecv,
157 const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs,
158 int tag_offset, MPI_Comm comm)
159{
160 MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
161 size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
162
163 size_t num_tx = (size_t)nrecv + (size_t)nsend;
164 if (num_tx <= AUTO_ALLOC_SIZE) {
165 requests = requests_auto;
166 buf_ofs = buf_ofs_auto;
167 } else {
168 requests = xmalloc((num_tx+(num_tx&1)) * sizeof (*requests)
169 + (num_tx+1) * sizeof (*buf_ofs));
170 buf_ofs = (void *)(requests + (num_tx+(num_tx&1)));
171 }
172
173 size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
174 send_msgs, recv_msgs, comm);
175 unsigned char *buffer = xmalloc(buffer_size);
176
177 start_packed_transfer(buffer, nrecv, buf_ofs,
178 src_data, nsend, nrecv,
179 send_msgs, recv_msgs,
180 comm, tag_offset,
181 requests);
182
183 xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), comm);
184
185 for (int i = 0; i < nrecv; ++i) {
186 int position = 0, recv_size = (int)(buf_ofs[i+1]-buf_ofs[i]);
187 xt_mpi_call(MPI_Unpack(buffer + buf_ofs[i], recv_size, &position, dst_data,
188 1, recv_msgs[i].datatype, comm), comm);
189 }
190
191 free(buffer);
192 if (num_tx > AUTO_ALLOC_SIZE)
193 free(requests);
194}
195
196#ifdef _OPENMP
197static void
198xt_exchanger_irecv_isend_packed_s_exchange_omp(
199 const void *src_data, void *dst_data,
200 int nsend, int nrecv,
201 const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs,
202 int tag_offset, MPI_Comm comm)
203{
204 MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
205 size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
206
207 size_t num_tx = (size_t)nrecv + (size_t)nsend;
208 if (num_tx <= AUTO_ALLOC_SIZE) {
209 requests = requests_auto;
210 buf_ofs = buf_ofs_auto;
211 } else {
212 requests = xmalloc((num_tx+(num_tx&1)) * sizeof (*requests) + (num_tx+1) * sizeof (*buf_ofs));
213 buf_ofs = (size_t *)(requests + (num_tx+(num_tx&1)));
214 }
215
216 size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
217 send_msgs, recv_msgs, comm);
218 unsigned char *buffer = xmalloc(buffer_size);
219
220#pragma omp parallel
221 {
222 int num_threads = omp_get_num_threads(),
223 tid = omp_get_thread_num();
224 int start_send = (nsend * tid) / num_threads,
225 nsend_ = (nsend * (tid+1)) / num_threads - start_send,
226 start_recv = (nrecv * tid) / num_threads,
227 end_recv = (nrecv * (tid+1)) / num_threads,
228 nrecv_ = end_recv - start_recv,
229 nreq = nrecv_+nsend_,
230 start_req = start_send+start_recv;
231 start_packed_transfer(buffer, start_send+nrecv-start_recv,
232 buf_ofs+start_recv,
233 src_data, nsend_, nrecv_,
234 send_msgs+start_send, recv_msgs+start_recv,
235 comm, tag_offset,
236 requests+start_req);
237
238 xt_mpi_call(MPI_Waitall(nreq, requests+start_req, MPI_STATUSES_IGNORE),
239 comm);
240 for (int i = start_recv; i < end_recv; ++i) {
241 int position = 0, recv_size = (int)(buf_ofs[i+1]-buf_ofs[i]);
242 xt_mpi_call(MPI_Unpack(buffer + buf_ofs[i], recv_size, &position,
243 dst_data, 1, recv_msgs[i].datatype, comm), comm);
244 }
245 }
246
247 free(buffer);
248 if (num_tx > AUTO_ALLOC_SIZE)
249 free(requests);
250}
251#endif
252
254{
256 void *dst_data;
257};
258
259/*
260 * layout of inventory buffer created by
261 * xt_exchanger_irecv_isend_packed_a_exchange:
262 * struct inventory_header header;
263 * size_t buf_ofs[nrecv+1];
264 * MPI_Datatype dt[nrecv];
265 * unsigned char packed_data_buf[sum_of_pack_and_unpack_sizes];
266 */
267
268static void
270{
271 struct inventory_header *header = buf;
272 int nsend = header->nsend,
273 nrecv = header->nrecv;
274 (void)nsend;
276 MPI_Datatype *datatypes = (MPI_Datatype *)
277 (void *)((unsigned char *)(header+1) + sizeof (size_t) * ((size_t)nrecv+1));
278 void *dst_data = header->dst_data;
279 size_t *buf_ofs = (void *)(header+1);
280 for (int i = 0; i < nrecv; ++i) {
281 int position = 0, buffer_size = (int)(buf_ofs[i+1]-buf_ofs[i]);
282 xt_mpi_call(MPI_Unpack((unsigned char *)buf + buf_ofs[i], buffer_size,
283 &position, dst_data,
284 1, datatypes[i], comm), comm);
285 }
286 for (int i = 0; i < nrecv; ++i)
287 xt_mpi_call(MPI_Type_free(datatypes+i), comm);
288}
289
290static void
292 int nsend, int nrecv,
293 const struct Xt_redist_msg * send_msgs,
294 const struct Xt_redist_msg * recv_msgs,
295 int tag_offset, MPI_Comm comm,
296 Xt_request *request)
297{
298 size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
299
300 size_t num_tx = (size_t)nrecv + (size_t)nsend;
301 if (num_tx <= AUTO_ALLOC_SIZE)
302 buf_ofs = buf_ofs_auto;
303 else
304 buf_ofs = xmalloc((num_tx+1) * sizeof (*buf_ofs));
305
306 size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
307 send_msgs, recv_msgs, comm);
308 size_t inventory_size
309 = sizeof (struct inventory_header)
310 + sizeof (size_t) * ((size_t)nrecv+1)
311 + sizeof (MPI_Datatype) * (size_t)nrecv;
312 struct Xt_config_ conf = xt_default_config;
314 Xt_request requests
315 = xt_request_msgs_ebuf_alloc(nrecv + nsend, comm,
316 inventory_size + buffer_size, &conf);
318
319 MPI_Request *tmp_requests
321
322 unsigned char *buffer
324
325 start_packed_transfer(buffer+inventory_size, nrecv, buf_ofs,
326 src_data, nsend, nrecv,
327 send_msgs, recv_msgs,
328 comm, tag_offset,
329 tmp_requests);
330
331 size_t *buf_ofs_ = (void *)(buffer + sizeof (struct inventory_header));
332 for (int i = 0; i <= nrecv; ++i) {
333 buf_ofs_[i] = buf_ofs[i]+inventory_size;
334 }
335
336 struct inventory_header *header = (void *)buffer;
337 header->nsend = nsend;
338 header->nrecv = nrecv;
339 header->dst_data = dst_data;
340
341 MPI_Datatype *datatypes = (void *)(buffer + sizeof (struct inventory_header)
342 + sizeof (size_t) * ((size_t)nrecv+1));
343 for (int i = 0; i < nrecv; ++i)
344 xt_mpi_call(MPI_Type_dup(recv_msgs[i].datatype, datatypes+i), comm);
345 *request = requests;
346
347 if (num_tx > AUTO_ALLOC_SIZE)
348 free(buf_ofs);
349}
350
351#ifdef _OPENMP
352static void
353finalize_packed_a_exchange_mt(Xt_request request, void *buf)
354{
355 struct inventory_header *header = buf;
356 int nsend = header->nsend,
357 nrecv = header->nrecv;
358 (void)nsend;
360 MPI_Datatype *datatypes = (MPI_Datatype *)
361 ((unsigned char *)(header+1) + sizeof (size_t) * ((size_t)nrecv+1));
362 void *dst_data = header->dst_data;
363 size_t *buf_ofs = (void *)(header+1);
364 int num_threads = omp_get_num_threads(),
365 tid = omp_get_thread_num();
366 int start_recv = (nrecv * tid) / num_threads,
367 end_recv = (nrecv * (tid+1)) / num_threads;
368 for (int i = start_recv; i < end_recv; ++i) {
369 int position = 0, buffer_size = (int)(buf_ofs[i+1]-buf_ofs[i]);
370 xt_mpi_call(MPI_Unpack((unsigned char *)buf + buf_ofs[i], buffer_size,
371 &position, dst_data,
372 1, datatypes[i], comm), comm);
373 }
374 for (int i = start_recv; i < end_recv; ++i)
375 xt_mpi_call(MPI_Type_free(datatypes+i), comm);
376}
377
378static void
379xt_exchanger_irecv_isend_packed_a_exchange_omp(const void *src_data, void *dst_data,
380 int nsend, int nrecv,
381 const struct Xt_redist_msg * send_msgs,
382 const struct Xt_redist_msg * recv_msgs,
383 int tag_offset, MPI_Comm comm,
384 Xt_request *request)
385{
386 size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
387
388 size_t num_tx = (size_t)nrecv + (size_t)nsend;
389 if (num_tx <= AUTO_ALLOC_SIZE)
390 buf_ofs = buf_ofs_auto;
391 else
392 buf_ofs = xmalloc((num_tx+1) * sizeof (*buf_ofs));
393
394 size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
395 send_msgs, recv_msgs, comm);
396 size_t inventory_size
397 = sizeof (struct inventory_header)
398 + sizeof (size_t) * ((size_t)nrecv+1)
399 + sizeof (MPI_Datatype) * (size_t)nrecv;
400 struct Xt_config_ conf = xt_default_config;
402 Xt_request requests
403 = xt_request_msgs_ebuf_alloc(nrecv + nsend, comm,
404 inventory_size + buffer_size, &conf);
406 requests, finalize_packed_a_exchange_mt);
407#pragma omp parallel firstprivate(requests, inventory_size)
408 {
409 MPI_Request *prequests
411
412 unsigned char *buffer
414
415 int num_threads = omp_get_num_threads(),
416 tid = omp_get_thread_num();
417 int start_send = (nsend * tid) / num_threads,
418 nsend_ = (nsend * (tid+1)) / num_threads - start_send,
419 start_recv = (nrecv * tid) / num_threads,
420 end_recv = (nrecv * (tid+1)) / num_threads,
421 nrecv_ = end_recv - start_recv,
422 start_req = start_send+start_recv;
423
424 start_packed_transfer(buffer+inventory_size,
425 start_send+nrecv-start_recv,
426 buf_ofs+start_recv,
427 src_data, nsend_, nrecv_,
428 send_msgs+start_send, recv_msgs+start_recv,
429 comm, tag_offset,
430 prequests+start_req);
431
432 size_t *buf_ofs_ = (void *)(buffer + sizeof (struct inventory_header));
433 for (int i = start_recv; i < end_recv+(end_recv==nrecv); ++i) {
434 buf_ofs_[i] = buf_ofs[i]+inventory_size;
435 }
436#pragma omp master
437 {
438 struct inventory_header *header = (void *)buffer;
439 header->nsend = nsend;
440 header->nrecv = nrecv;
441 header->dst_data = dst_data;
442 }
443 MPI_Datatype *datatypes = (void *)(buffer + sizeof (struct inventory_header)
444 + sizeof (size_t) * ((size_t)nrecv+1));
445 for (int i = start_recv; i < end_recv; ++i)
446 xt_mpi_call(MPI_Type_dup(recv_msgs[i].datatype, datatypes+i), comm);
447 }
448 *request = requests;
449
450 if (num_tx > AUTO_ALLOC_SIZE)
451 free(buf_ofs);
452}
453#endif
454
457 int nsend, int nrecv,
458 const struct Xt_redist_msg *send_msgs,
459 const struct Xt_redist_msg *recv_msgs,
460 MPI_Comm comm)
461{
462 size_t buf_size = get_buffer_size(nsend, nrecv, send_msgs, recv_msgs, comm);
463 size_t inventory_size
464 = sizeof (struct inventory_header)
465 + sizeof (size_t) * ((size_t)nrecv+1)
466 + sizeof (MPI_Datatype) * (size_t)nrecv;
467 struct Xt_config_ conf = xt_default_config;
469 Xt_request shared_req
470 = xt_request_msgs_ebuf_alloc(nsend+nrecv, comm,
471 inventory_size + buf_size, &conf);
472 return (Xt_exchanger_omp_share)shared_req;
473}
474
475
478 const struct Xt_redist_msg *send_msgs,
479 const struct Xt_redist_msg *recv_msgs,
480 MPI_Comm comm, int tag_offset,
481 Xt_config config)
482{
487 static const xt_simple_s_exchange_func
488 s_exch_by_mthread_mode[] = {
490#ifdef _OPENMP
491 xt_exchanger_irecv_isend_packed_s_exchange_omp,
492#else
494#endif
495 };
496 static const xt_simple_a_exchange_func
497 a_exch_by_mthread_mode[] = {
499#ifdef _OPENMP
500 xt_exchanger_irecv_isend_packed_a_exchange_omp,
501#else
503#endif
504 };
505 int mthread_mode = xt_config_get_redist_mthread_mode(config);
507 nsend, nrecv, send_msgs, recv_msgs,
508 comm, tag_offset,
509 s_exch_by_mthread_mode[mthread_mode],
510 a_exch_by_mthread_mode[mthread_mode],
512 config);
513}
514
515/*
516 * Local Variables:
517 * c-basic-offset: 2
518 * coding: utf-8
519 * indent-tabs-mode: nil
520 * show-trailing-whitespace: t
521 * require-trailing-newline: t
522 * End:
523 */
int MPI_Comm
Definition core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
int MPI_Type_free(MPI_Datatype *datatype)
int MPI_Type_dup(MPI_Datatype oldtype, MPI_Datatype *newtype)
struct Xt_config_ xt_default_config
Definition xt_config.c:204
void xt_config_set_redist_mthread_mode(Xt_config config, int mode)
Definition xt_config.c:347
@ XT_MT_OPENMP
Definition xt_config.h:142
@ XT_MT_NONE
Definition xt_config.h:140
int xt_config_get_redist_mthread_mode(Xt_config config)
Definition xt_config.c:340
implementation of configuration object
struct Xt_exchanger_omp_share_ * Xt_exchanger_omp_share
static void xt_exchanger_irecv_isend_packed_a_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
static size_t get_buffer_offsets(size_t *restrict buf_ofs, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm)
static size_t get_buffer_size(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm)
static void start_packed_transfer(unsigned char *buffer, int send_start, const size_t *buf_ofs, const void *src_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, MPI_Request *requests)
static Xt_exchanger_omp_share xt_exchanger_irecv_isend_packed_create_omp_share(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm)
static void finalize_packed_a_exchange(Xt_request request, void *buf)
static void xt_exchanger_irecv_isend_packed_s_exchange(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
Xt_exchanger xt_exchanger_irecv_isend_packed_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, Xt_config config)
Xt_exchanger xt_exchanger_simple_base_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, xt_simple_s_exchange_func s_func, xt_simple_a_exchange_func a_func, xt_simple_create_omp_share_func create_omp_share_func, Xt_config config)
void(* xt_simple_s_exchange_func)(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
void(* xt_simple_a_exchange_func)(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_exchange_msg
redistribution of data, non-public declarations
void xt_request_msgs_ebuf_set_finalizer(Xt_request request, Xt_request_msgs_ebuf_finalizer finalizer)
MPI_Comm xt_request_msgs_ebuf_get_comm(Xt_request request)
Xt_request xt_request_msgs_ebuf_alloc(int n_requests, MPI_Comm comm, size_t extra_buf_size, Xt_config config)
MPI_Request * xt_request_msgs_ebuf_get_req_ptr(Xt_request request)
void * xt_request_msgs_ebuf_get_extra_buf(Xt_request request)
functions to create collection of request handles augmented with user-defined buffer
internal interfaces for xt_request_msgs_ebuf