#include "options.hpp"
#include <deque>
#include <iostream>
#include <map>
#include <string>
#include <thread>
bool verbose;
#define DOUT(x) do {if (verbose) {x};} while (false)
class Queue;
class Sender;
friend class connection_handler;
std::string queue_name_;
Queue* queue_;
int pending_credit_;
public:
sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
{
}
bool add(proton::work f) {
return work_queue_.
add(f);
}
static Sender*
get(
const proton::sender& s) {
return reinterpret_cast<Sender*
>(s.
user_data());
}
void boundQueue(Queue* q, std::string qn);
void sendMsg(proton::message m) {
DOUT(std::cerr << "Sender: " << this << " sending\n";);
}
void unsubscribed() {
DOUT(std::cerr << "Sender: " << this << " deleting\n";);
delete this;
}
};
class Queue {
proton::work_queue work_queue_;
const std::string name_;
std::deque<proton::message> messages_;
typedef std::map<Sender*, int> subscriptions;
subscriptions subscriptions_;
subscriptions::iterator current_;
void tryToSend() {
DOUT(std::cerr << "Queue: " << this << " tryToSend: " << subscriptions_.size(););
size_t outOfCredit = 0;
while (!messages_.empty() && outOfCredit<subscriptions_.size()) {
if (current_==subscriptions_.end()) {
current_=subscriptions_.begin();
}
DOUT(std::cerr << "(" << current_->second << ") ";);
if (current_->second>0) {
DOUT(std::cerr << current_->first << " ";);
auto msg = messages_.front();
auto sender = current_->first;
sender->add([=]{sender->sendMsg(msg);});
messages_.pop_front();
--current_->second;
++current_;
} else {
++outOfCredit;
}
}
DOUT(std::cerr << "\n";);
}
public:
Queue(proton::container& c, const std::string& n) :
work_queue_(c), name_(n), current_(subscriptions_.end())
{}
bool add(proton::work f) {
return work_queue_.
add(f);
}
void queueMsg(proton::message m) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") queueMsg\n";);
messages_.push_back(m);
tryToSend();
}
void flow(Sender* s, int c) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") flow: " << c << " to " << s << "\n";);
subscriptions_[s] = c;
tryToSend();
}
void subscribe(Sender* s) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") subscribe Sender: " << s << "\n";);
subscriptions_[s] = 0;
}
void unsubscribe(Sender* s) {
DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") unsubscribe Sender: " << s << "\n";);
if (current_ != subscriptions_.end() && current_->first==s) ++current_;
subscriptions_.erase(s);
s->add([=]{s->unsubscribed();});
}
};
if (queue_) {
auto credit = sender.
credit();
queue_->add([=]{queue_->flow(this, credit);});
} else {
pending_credit_ = sender.
credit();
}
}
if (queue_) {
queue_->add([=]{queue_->unsubscribe(this);});
} else {
}
}
void Sender::boundQueue(Queue* q, std::string qn) {
DOUT(std::cerr << "Sender: " << this << " bound to Queue: " << q <<"(" << qn << ")\n";);
queue_ = q;
queue_name_ = qn;
.handler(*this));
auto credit = pending_credit_;
q->add([=]{
q->subscribe(this);
if (credit>0) {
q->flow(this, credit);
}
});
std::cout << "sending from " << queue_name_ << std::endl;
}
class QueueManager;
friend class connection_handler;
proton::receiver receiver_;
proton::work_queue& work_queue_;
Queue* queue_;
QueueManager& queue_manager_;
std::deque<proton::message> messages_;
void on_message(proton::delivery &d, proton::message &m)
override {
auto to_address = m.
to();
if (queue_) {
messages_.push_back(m);
queueMsgs();
} else if (!to_address.empty()) {
queueMsgToNamedQueue(m, to_address);
} else {
}
}
void queueMsgs() {
DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";);
while (!messages_.empty()) {
auto msg = messages_.front();
queue_->add([=]{queue_->queueMsg(msg);});
messages_.pop_front();
}
}
void queueMsgToNamedQueue(proton::message& m, std::string address);
public:
Receiver(proton::receiver r, QueueManager& qm) :
receiver_(r), work_queue_(r.work_queue()), queue_(0), queue_manager_(qm)
{}
bool add(proton::work f) {
return work_queue_.
add(f);
}
void boundQueue(Queue* q, std::string qn) {
DOUT(std::cerr << "Receiver: " << this << " bound to Queue: " << q << "(" << qn << ")\n";);
queue_ = q;
receiver_.
open(proton::receiver_options()
.source((proton::source_options().address(qn)))
.handler(*this));
std::cout << "receiving to " << qn << std::endl;
queueMsgs();
}
};
class QueueManager {
proton::container& container_;
proton::work_queue work_queue_;
typedef std::map<std::string, Queue*> queues;
queues queues_;
int next_id_;
public:
QueueManager(proton::container& c) :
container_(c), work_queue_(c), next_id_(0)
{}
bool add(proton::work f) {
return work_queue_.
add(f);
}
template <class T>
void findQueue(T& connection, std::string& qn) {
if (qn.empty()) {
std::ostringstream os;
os << "_dynamic_" << next_id_++;
qn = os.str();
}
Queue* q = 0;
auto i = queues_.find(qn);
if (i==queues_.end()) {
q = new Queue(container_, qn);
queues_[qn] = q;
} else {
q = i->second;
}
connection.add([=, &connection] {connection.boundQueue(q, qn);});
}
void queueMessage(proton::message m, std::string address) {
Queue* q = 0;
auto i = queues_.find(address);
if (i==queues_.end()) {
q = new Queue(container_, address);
queues_[address] = q;
} else {
q = i->second;
}
q->add([=] {q->queueMsg(m);});
}
void findQueueSender(Sender* s, std::string qn) {
findQueue(*s, qn);
}
void findQueueReceiver(Receiver* r, std::string qn) {
findQueue(*r, qn);
}
};
void Receiver::queueMsgToNamedQueue(
proton::message& m, std::string address) {
DOUT(std::cerr << "Receiver: " << this << " send msg to Queue: " << address << "\n";);
queue_manager_.add([=]{queue_manager_.queueMessage(m, address);});
}
QueueManager& queue_manager_;
public:
connection_handler(QueueManager& qm) :
queue_manager_(qm)
{}
c.
open(proton::connection_options{}
.offered_capabilities({"ANONYMOUS-RELAY"}));
}
Sender* s = new Sender(sender);
queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);});
}
Receiver* r = new Receiver(receiver, queue_manager_);
if (qname.empty()) {
receiver.
open(proton::receiver_options{}
.handler(*r));
} else {
queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
}
}
for (proton::sender_iterator i = session.
senders().begin(); i != session.
senders().end(); ++i) {
Sender* s = Sender::get(*i);
if (s && s->queue_) {
auto q = s->queue_;
s->queue_->add([=]{q->unsubscribe(s);});
}
}
}
void on_error(
const proton::error_condition& e)
override {
std::cout <<
"protocol error: " << e.
what() << std::endl;
}
Sender* s = Sender::get(*i);
if (s && s->queue_) {
auto q = s->queue_;
s->queue_->add([=]{q->unsubscribe(s);});
}
}
delete this;
}
};
class broker {
public:
broker(const std::string addr) :
container_("broker"), queues_(container_), listener_(queues_)
{
container_.
listen(addr, listener_);
}
void run() {
container_.
run(std::thread::hardware_concurrency());
}
private:
struct listener : public proton::listen_handler {
listener(QueueManager& c) : queues_(c) {}
proton::connection_options on_accept(proton::listener&) override{
return proton::connection_options().handler(*(new connection_handler(queues_)));
}
void on_open(proton::listener& l) override {
std::cout <<
"broker listening on " << l.
port() << std::endl;
}
void on_error(proton::listener&, const std::string& s) override {
std::cerr << "listen error: " << s << std::endl;
throw std::runtime_error(s);
}
QueueManager& queues_;
};
proton::container container_;
QueueManager queues_;
listener listener_;
};
int main(int argc, char **argv) {
std::string address("0.0.0.0");
example::options opts(argc, argv);
opts.add_flag(verbose, 'v', "verbose", "verbose (debugging) output");
opts.add_value(address, 'a', "address", "listen on URL", "URL");
try {
verbose = false;
opts.parse();
broker(address).run();
return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cout << "broker shutdown: " << e.what() << std::endl;
}
return 1;
}
sender_range senders() const
Return all senders on this connection.
void open()
Open the connection.
void run()
Run the container in the current thread.
listener listen(const std::string &listen_url, listen_handler &handler)
Listen for new connections on listen_url.
void reject()
Settle with REJECTED state.
std::string what() const
Simple printable string for condition.
void close()
Close the endpoint.
void user_data(void *user_data) const
Set user data on this link.
int credit() const
Credit available on the link.
int port()
Unsettedled API
An AMQP message.
Definition message.hpp:48
void to(const std::string &)
Set the destination address.
Handler for Proton messaging events.
Definition messaging_handler.hpp:69
virtual void on_error(const error_condition &)
Fallback error handling.
virtual void on_connection_open(connection &)
The remote peer opened the connection: called once on initial open, and again on each successful auto...
virtual void on_message(delivery &, message &)
A message is received.
virtual void on_receiver_open(receiver &)
The remote peer opened the link.
virtual void on_sendable(sender &)
A message can be sent.
virtual void on_transport_close(transport &)
The final event for a connection: there will be no more reconnect attempts and no more event function...
virtual void on_session_close(session &)
The remote peer closed the session.
virtual void on_sender_close(sender &)
The remote peer closed the link.
virtual void on_sender_open(sender &)
The remote peer opened the link.
class target target() const
Get the target node.
void open()
Open the receiver.
Options for creating a sender.
Definition sender_options.hpp:60
A channel for sending messages.
Definition sender.hpp:40
tracker send(const message &m)
Send a message on the sender.
class source source() const
Get the source node.
void open()
Open the sender.
sender_range senders() const
Return the senders on this session.
Options for creating a source node for a sender or receiver.
Definition source_options.hpp:46
std::string address() const
The address of the source.
std::string address() const
The address of the target.
bool dynamic() const
True if the remote node is created dynamically.
class connection connection() const
Get the connection associated with this transport.
Unsettled API - A context for thread-safe execution of work.
Definition work_queue.hpp:327
bool add(work fn)
Unsettled API - Add work fn to the work queue.
A connection to a remote AMQP peer.
Options for creating a connection.
A top-level container of connections, sessions, and links.
Describes an endpoint error state.
Unsettled API - A handler for incoming connections.
A listener for incoming connections.
Handler for Proton messaging events.
T get(const scalar &s)
Get a contained value of type T.
Definition scalar.hpp:60
Options for creating a receiver.
Options for creating a sender.
Options for creating a source node for a sender or receiver.
A destination for messages.
Options for creating a target node for a sender or receiver.
A tracker for a sent message.
A network channel supporting an AMQP connection.
Unsettled API - A context for thread-safe execution of work.