[Bro-Dev] Broker status update

Matthias Vallentin vallentin at icir.org
Sat May 21 09:01:32 PDT 2016


This is just a brief summary of where the Broker overhaul is currently
at. The new Broker API will come two types of endpoints: blocking and
nonblocking. The former provides a synchronous API to receive messages,
whereas the latter operates fully asynchronous. Performance-wise, the
asynchronous version has a major advantage because it does not introduce
buffering. The callback provided upon endpoint construction will be
immediately executed by the Broker runtime, for each message as it
arrives. For a blocking endpoint, it's the users responsibility to
extract messages, otherwise they queue up.

Endpoints can only be constructed through a broker::context object:

    // Encapsulates global states, such as announced types, thread pool,
    // CAF scheduler, etc.
    context ctx;

    // Creates a synchronous endpoint.
    auto be = ctx.spawn<blocking>();
    auto msg = be.receive();  // Block and wait for next message.
    be.receive(
      [&](const topic&, const message&) {
        // As above, but use a lambda to handle the topic-message pair.
      }
    );  // Block and wait for next message.

    // Creates an asynchronous endpoint.
    auto ne = ctx.spawn<nonblocking>(
      [=](const topic&, const message&) {
        // invoked for every subscribed message
      }
    );

Other than that, both brokers have a publish/subscribe interface, e.g.:

    be.subscribe("foo");
    be.subscribe("bar");
    auto t = topic{"bar"};
    auto msg = make_message(42, "string");
    be.publish(t, msg);

I'll post more updates as the development progresses, but all of the
above examples are now working. 

The TODO list currently includes:

    - Peerings: managing subscriptions across multiple endpoints. Here's
      an example:
      
        // Make two endpoints.
        auto x = ctx.spawn<...>(...);
        auto y = ctx.spawn<...>(...);

      I'm torn between two interfaces. This is the first:

        peer(x, y); // Create a peering between the two endpoints.
        peer(y, x); // Idempotent. Peerings are symmetric.

        // Peer with a remote endpoint. Requires that the remote side
        // called e.listen(addr, port) beforehand.
        auto r = ctx.spawn<remote>("1.2.3.4", 42000);
        peer(y, r);

        // Undo a peering.
        unpeer(x, y);
        unpeer(r, y);

      The second is:

        x.peer(y); // Create a peering between the two endpoints.
        y.peer(x); // Idempotent. Peerings are symmetric.
        x.peer("1.2.3.4", 42000); // Peer with a remote endpoint.

        x.unpeer(y);
        x.unpeer("1.2.3.4", 42000); // Unpeer from a remote endpoint,
                                       must match previous peering data.

      Personally, I'm favoring the first version, as the functional API
      makes the symmetry of the peering relationship clearer. 

    - Data store: integrate the new "main API" with the existing data
      stores. My plan is to use as much as possible from the existing
      data store API.

    - Bindings: For Python, I'm considering switching to pybind11 [1],
      which provides a much more convenient API than SWIG and supports
      modern C++11.

Please chime in if you have questions/comment or see opportunities for
improvement.

    Matthias

[1] http://pybind11.readthedocs.io/en/latest/classes.html


More information about the bro-dev mailing list