Subscribes to the 'example' node and prints the message bodies received.Can be used with broker.c, direct.c or an external AMQP broker.
#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
int message_count;
int received;
bool finished;
} app_data_t;
static const int BATCH = 1000;
static int exit_code = 0;
exit_code = 1;
}
}
if (!err) {
pn_string_t *s = pn_string(NULL);
printf("%s\n", pn_string_get(s));
pn_free(s);
free(data.start);
} else {
fprintf(stderr,
"decode_message: %s\n",
pn_code(err));
exit_code = 1;
}
}
static bool handle(app_data_t* app,
pn_event_t* event) {
{
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
}
} break;
int recv;
size_t oldsize = m->size;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
printf("Message aborted\n");
m->size = 0;
}
else if (recv < 0 && recv !=
PN_EOS) {
decode_message(*m);
*m = pn_rwbytes_null;
if (app->message_count == 0) {
}
} else if (++app->received >= app->message_count) {
printf("%d messages received\n", app->received);
}
}
}
break;
}
break;
break;
break;
break;
return false;
break;
default:
break;
}
return true;
}
void run(app_data_t *app) {
do {
if (!handle(app, e) || exit_code != 0) {
return;
}
}
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
app.container_id = argv[0];
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
run(&app);
return exit_code;
}