[Bro-Dev] Broker status update
Matthias Vallentin
vallentin at icir.org
Sat May 21 09:01:32 PDT 2016
This is just a brief summary of where the Broker overhaul is currently
at. The new Broker API will come two types of endpoints: blocking and
nonblocking. The former provides a synchronous API to receive messages,
whereas the latter operates fully asynchronous. Performance-wise, the
asynchronous version has a major advantage because it does not introduce
buffering. The callback provided upon endpoint construction will be
immediately executed by the Broker runtime, for each message as it
arrives. For a blocking endpoint, it's the users responsibility to
extract messages, otherwise they queue up.
Endpoints can only be constructed through a broker::context object:
// Encapsulates global states, such as announced types, thread pool,
// CAF scheduler, etc.
context ctx;
// Creates a synchronous endpoint.
auto be = ctx.spawn<blocking>();
auto msg = be.receive(); // Block and wait for next message.
be.receive(
[&](const topic&, const message&) {
// As above, but use a lambda to handle the topic-message pair.
}
); // Block and wait for next message.
// Creates an asynchronous endpoint.
auto ne = ctx.spawn<nonblocking>(
[=](const topic&, const message&) {
// invoked for every subscribed message
}
);
Other than that, both brokers have a publish/subscribe interface, e.g.:
be.subscribe("foo");
be.subscribe("bar");
auto t = topic{"bar"};
auto msg = make_message(42, "string");
be.publish(t, msg);
I'll post more updates as the development progresses, but all of the
above examples are now working.
The TODO list currently includes:
- Peerings: managing subscriptions across multiple endpoints. Here's
an example:
// Make two endpoints.
auto x = ctx.spawn<...>(...);
auto y = ctx.spawn<...>(...);
I'm torn between two interfaces. This is the first:
peer(x, y); // Create a peering between the two endpoints.
peer(y, x); // Idempotent. Peerings are symmetric.
// Peer with a remote endpoint. Requires that the remote side
// called e.listen(addr, port) beforehand.
auto r = ctx.spawn<remote>("1.2.3.4", 42000);
peer(y, r);
// Undo a peering.
unpeer(x, y);
unpeer(r, y);
The second is:
x.peer(y); // Create a peering between the two endpoints.
y.peer(x); // Idempotent. Peerings are symmetric.
x.peer("1.2.3.4", 42000); // Peer with a remote endpoint.
x.unpeer(y);
x.unpeer("1.2.3.4", 42000); // Unpeer from a remote endpoint,
must match previous peering data.
Personally, I'm favoring the first version, as the functional API
makes the symmetry of the peering relationship clearer.
- Data store: integrate the new "main API" with the existing data
stores. My plan is to use as much as possible from the existing
data store API.
- Bindings: For Python, I'm considering switching to pybind11 [1],
which provides a much more convenient API than SWIG and supports
modern C++11.
Please chime in if you have questions/comment or see opportunities for
improvement.
Matthias
[1] http://pybind11.readthedocs.io/en/latest/classes.html
More information about the bro-dev
mailing list