#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mpi.h>
#include <yaxt.h>
#include "../src/xt_exchanger.h"
#include "../src/xt_exchanger_irecv_send.h"
#include "../src/xt_exchanger_irecv_isend.h"
#include "../src/xt_exchanger_mix_isend_irecv.h"
#include "../src/xt_exchanger_irecv_isend_packed.h"
#include "../src/xt_exchanger_neigh_alltoall.h"
#include "../src/xt_redist_internal.h"
#include "test_redist_common.h"
#include "tests.h"
struct test_message {
int rank;
const int *pos;
int num_pos;
};
static void
static void
static void
static void
static void
static int test_freq = 3;
int main(int argc, char **argv)
{
}
free(exchangers_new);
MPI_Finalize();
return TEST_EXIT_CODE;
}
{
};
enum {
};
size_t cur_ex = 0;
int opt;
while ((opt = getopt(*argc, *argv, "m:s:")) != -1) {
switch (opt) {
case 'm':
{
if (exchanger_new_id == -1)
{
fprintf(stderr, "Unknown exchanger constructor requested: %s\n",
optarg);
exit(EXIT_FAILURE);
}
#if MPI_VERSION < 3
{
fputs("xt_exchanger_neigh_alltoall_new requires MPI version 3.0 or "
"higher\n", stderr);
continue;
}
#endif
++cur_ex;
exchangers_new =
xrealloc(exchangers_new,
sizeof (*exchangers_new) * (cur_ex + 1));
}
break;
case 's':
{
char *endptr;
errno = 0;
long v = strtol(optarg, &endptr, 0);
if ((errno == ERANGE && (v == LONG_MAX || v == LONG_MIN))
|| (errno != 0 && v == 0)) {
perror("failed to parse argument to -s option");
exit(EXIT_FAILURE);
}
if (endptr == optarg) {
fputs("malformed argument to -s option, no digits were found\n",
stderr);
exit(EXIT_FAILURE);
}
if (v < 1 || v > INT_MAX) {
fprintf(stderr, "value of -s option (%ld) out of range [1,%d]\n",
v, INT_MAX);
exit(EXIT_FAILURE);
}
test_freq = (int)v;
}
break;
}
}
return exchangers_new;
}
static void
{
int my_rank, comm_size;
int incr = comm_size/test_freq + (comm_size < test_freq);
for (int i = 0; i < comm_size; i += incr) {
int nsend = 0;
int nrecv = my_rank != i;
if (my_rank == i) {
nsend = comm_size - 1;
send_msgs =
xmalloc((
size_t)nsend *
sizeof (*send_msgs));
for (size_t j = 0; j < (size_t)i; ++j)
for (size_t j = (size_t)i; j < (size_t)nsend; ++j)
send_msgs[j] = (
struct Xt_redist_msg){.rank=(1+(int)j)%comm_size,
.datatype=MPI_INT};
}
{{.
rank=-1, .datatype=MPI_DATATYPE_NULL},
{.rank=i, .datatype=MPI_INT}};
send_msgs,
recv_msgs + (my_rank != i),
comm, 0);
for (int async = 0; async < 1 + test_async; ++async) {
int src_data[1] = { my_rank == i ? 4711 : -1 };
int dst_data[1] = { my_rank == i ? 4711 : -1 };
if (async) {
int flag;
(void*)(dst_data), &request);
if (!flag) PUT_ERR("invalid flag result\n");
} else {
(void*)(dst_data));
}
if (dst_data[0] != 4711) PUT_ERR("invalid data\n");
}
free(send_msgs);
}
}
static void
{
int my_rank, comm_size;
MPI_Datatype *dt_by_ofs =
xmalloc((
size_t)comm_size *
sizeof (*dt_by_ofs));
dt_by_ofs[0] = MPI_INT;
for (size_t j = 1; j < (size_t)comm_size; ++j)
{
MPI_Type_indexed(1, (int[]){1}, (int[]){(int)j}, MPI_INT, dt_by_ofs + j);
MPI_Type_commit(dt_by_ofs + j);
}
int *dst_data =
xmalloc(((
size_t)comm_size - 1) *
sizeof (*dst_data) * 2);
int incr = comm_size/test_freq + (comm_size < test_freq);
for (int i = 0; i < comm_size; i += incr) {
int nsend = i != my_rank;
size_t nrecv = 0;
if (my_rank == i) {
nrecv = (size_t)comm_size - 1;
recv_msgs =
xmalloc(nrecv *
sizeof (*recv_msgs) * 2);
for (size_t j = 0; j < nrecv; ++j) {
recv_msgs[j].
rank = (i + (int)j + 1)%comm_size;
recv_msgs[j + nrecv].
rank
= (comm_size - (int)j - 1 + i)%comm_size;
recv_msgs[j + nrecv].
datatype = dt_by_ofs[j];
}
}
enum { exchange_fwd, exchange_rev, num_exchanges };
for (size_t j = 0; j < num_exchanges; ++j)
exchanger[j] = exchanger_new(nsend, (int)nrecv,
send_msgs, recv_msgs + j * nrecv, comm, 0);
for (int async = 0; async < 1 + test_async; ++async) {
int src_data[1] = {(my_rank+comm_size-i)%comm_size};
for (size_t j = 0; j < 2 * nrecv; ++j)
dst_data[j] = -1;
for (size_t j = 0; j < num_exchanges; ++j) {
if (async) {
int flag;
dst_data + j * nrecv, &request);
if (!flag) PUT_ERR("invalid flag result\n");
} else {
}
}
bool mismatch = false;
for (size_t j = 0; j < nrecv; ++j)
mismatch |= (((size_t)dst_data[j] != j + 1)
|| (dst_data[nrecv + j] != comm_size - (int)j - 1));
if (mismatch)
PUT_ERR("invalid data\n");
}
free(recv_msgs);
}
free(dst_data);
for (size_t j = 1; j < (size_t)comm_size; ++j)
MPI_Type_free(dt_by_ofs + j);
free(dt_by_ofs);
}
static void
{
int my_rank, comm_size;
int nsend = comm_size - 1;
int nrecv = comm_size - 1;
for (size_t j = 0; j < (size_t)nsend; ++j)
=
xmalloc((
size_t)nrecv *
sizeof (*recv_msgs));
int *dst_data =
xmalloc((
size_t)comm_size *
sizeof (*dst_data));
MPI_Datatype *dt_by_ofs =
xmalloc((
size_t)comm_size *
sizeof (*dt_by_ofs));
dt_by_ofs[0] = my_rank != 0 ? MPI_INT : MPI_DATATYPE_NULL;
for (size_t i = 1; i < (size_t)comm_size; ++i)
if (i != (size_t)my_rank) {
MPI_Type_indexed(1, (int[]){1}, (int[]){(int)i}, MPI_INT, dt_by_ofs + i);
MPI_Type_commit(dt_by_ofs + i);
} else
dt_by_ofs[i] = MPI_DATATYPE_NULL;
size_t comm_size_ = (size_t)comm_size,
incr = (size_t)(comm_size/test_freq + (comm_size < test_freq));
for (size_t i = 0; i < comm_size_ - 1; i += incr) {
for (size_t j = 0; j < (size_t)nsend; ++j) {
int ofs = my_rank + 1 + (int)i + (int)j;
send_msgs[j].
rank = (ofs + (ofs >= comm_size + my_rank))%comm_size;
}
for (size_t j = 0; j < comm_size_ - 1; j += incr) {
for (size_t k = 0; k < (size_t)nrecv; ++k) {
int ofs = ((int)i + (int)j + (int)k)%(comm_size - 1);
ofs += ofs >= my_rank;
}
send_msgs,
recv_msgs,
for (int async = 0; async < 1 + test_async; ++async) {
int src_data[1] = {my_rank};
for (size_t k = 0; k < comm_size_; ++k)
dst_data[k] = my_rank;
if (async) {
int flag;
&request);
if (!flag) PUT_ERR("invalid flag result\n");
} else {
}
bool mismatch = false;
for (int k = 0; k < comm_size; ++k)
mismatch |= (dst_data[k] != k);
if (mismatch)
PUT_ERR("invalid data\n");
}
}
}
for (size_t i = 1; i < comm_size_; ++i)
if (i != (size_t)my_rank)
MPI_Type_free(dt_by_ofs + i);
free(dt_by_ofs);
free(dst_data);
free(recv_msgs);
free(send_msgs);
}
static void
{
int my_rank, comm_size;
int incr = comm_size/test_freq + (comm_size < test_freq);
for (int i = 1; i < comm_size; i += incr) {
enum { nsend = 1, nrecv = 1 };
= {{.
rank=(my_rank + comm_size - i)%comm_size, .
datatype=MPI_INT}};
Xt_exchanger exchanger = exchanger_new(nsend, nrecv, send_msgs,
recv_msgs, comm, 0);
for (int async = 0; async < 1 + test_async; ++async) {
int src_data[1] = {my_rank};
int dst_data[1] = {-1};
if (async) {
int flag;
if (!flag) PUT_ERR("invalid flag result\n");
} else {
}
if (dst_data[0] != (my_rank + comm_size - i)%comm_size)
PUT_ERR("invalid data\n");
}
}
}
static void
{
int my_rank, comm_size;
if (comm_size == 1) return;
int splitRank = (comm_size * 2) / 3;
int group = my_rank >= splitRank;
xt_mpi_call(MPI_Comm_split(comm, group, 0, &intra_group_comm), comm);
xt_mpi_call(MPI_Intercomm_create(intra_group_comm, 0, comm,
group ? 0 : splitRank,
0, &inter_comm), comm);
int intra_rank;
int local_size, remote_size;
xt_mpi_call(MPI_Comm_rank(inter_comm, &intra_rank), comm);
xt_mpi_call(MPI_Comm_size(inter_comm, &local_size), comm);
xt_mpi_call(MPI_Comm_remote_size(inter_comm, &remote_size), comm);
int nsend = remote_size;
int nrecv = remote_size;
xmalloc((
size_t)nsend *
sizeof (*send_msgs));
for (int i = 0; i < nsend; ++i) {
}
xmalloc((
size_t)nrecv *
sizeof (*recv_msgs));
for (int i = 0; i < nrecv; ++i) {
MPI_Type_indexed(
1, (
int[]){1}, (
int[]){i}, MPI_INT, &(recv_msgs[i].
datatype));
MPI_Type_commit(&(recv_msgs[i].
datatype));
}
int *dst_data =
xmalloc((
size_t)nrecv *
sizeof (*dst_data));
send_msgs,
recv_msgs,
inter_comm, 0);
for (int async = 0; async < 1 + test_async; ++async) {
int src_data[1] = {my_rank};
for (int i = 0; i < nrecv; ++i) dst_data[i] = -1;
if (async) {
int flag;
&request);
if (!flag) PUT_ERR("invalid flag result\n");
} else {
}
int dst_data_offset = (my_rank >= splitRank)?0:splitRank;
bool mismatch = false;
for (int i = 0; i < nrecv; ++i)
mismatch |= (dst_data[i] != i + dst_data_offset);
if (mismatch)
PUT_ERR("invalid data\n");
}
for (
int i = 0; i < nrecv; ++i) MPI_Type_free(&(recv_msgs[i].
datatype));
free(dst_data);
free(recv_msgs);
free(send_msgs);
}
add versions of standard API functions not returning on error
#define xrealloc(ptr, size)
static const struct @3 exchanger_table[]
int xt_exchanger_id_by_name(const char *name)
@ xt_exchanger_irecv_isend
@ xt_exchanger_irecv_send
@ xt_exchanger_irecv_isend_packed
@ xt_exchanger_neigh_alltoall
@ xt_exchanger_mix_isend_irecv
void xt_initialize(MPI_Comm default_comm)
void xt_exchanger_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
void xt_exchanger_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
void xt_exchanger_delete(Xt_exchanger exchanger)
Xt_exchanger(* Xt_exchanger_new)(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
Xt_exchanger xt_exchanger_irecv_isend_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
Xt_exchanger xt_exchanger_irecv_isend_packed_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
Xt_exchanger xt_exchanger_irecv_send_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
Xt_exchanger xt_exchanger_mix_isend_irecv_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
Xt_exchanger xt_exchanger_neigh_alltoall_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset)
#define xt_mpi_call(call, comm)
void xt_request_wait(Xt_request *request)
void xt_request_test(Xt_request *request, int *flag)