[Bro] Trouble passing a message between two local endpoints.

Siwek, Jon jsiwek at illinois.edu
Mon Aug 10 08:10:15 PDT 2015


> On Aug 10, 2015, at 8:22 AM, David Banas <capn.freako at gmail.com> wrote:
> 
> Does anyone see what I’m doing wrong?

There’s a few race conditions to be aware of:

1) For the sample code you made, you want to establish/create the message queues attached to an endpoint before initiating peerings/connections with other endpoints.  The point here is to allow topic advertisements/subscriptions to be established before actually sending any messages.

2) If you request a message to be sent before the connection w/ a peer is actually established, it may just get dropped because it’s seen that no one is interested in that message.  In your example, you can check the outgoing/incoming connection status queues to wait for the connection to establish.  Using the blocking “need” version of the function works fine as a convenience in this case, but I’d think it more typical in real code to use the non-blocking “want” version and to have integrated queues into event loop (e.g. select(), poll(), etc.)

3) Using the non-blocking “want” version of popping the message queue doesn’t give any time for the message to actually be sent and arrive at the peer endpoint.  Either integrate into an event loop or just use the blocking “need” version to wait for the message to arrive.

Here’s an example of revising your code w/ those 3 suggestions:

#include <stdio.h>
#include "broker.h"

int main (int argc, char* argv[]) {
    broker_init(0);
    broker_endpoint* ep1 = broker_endpoint_create_with_flags("ep1", 3);
    broker_endpoint* ep2 = broker_endpoint_create_with_flags("ep2", 3);
    broker_string*       bs = broker_string_create("");
    broker_message_queue* q = broker_message_queue_create(bs, ep2);
    
    broker_peering*    p = broker_endpoint_peer_locally(ep2, ep1);

    const broker_outgoing_connection_status_queue* ocsq =
            broker_endpoint_outgoing_connection_status(ep2);
    broker_deque_of_outgoing_connection_status_delete(
        broker_outgoing_connection_status_queue_need_pop(ocsq));

    broker_string*  msg_str = broker_string_create("Hello, World!\n");
    broker_string*    topic = broker_string_create("test");
    broker_data*        msg = broker_data_from_string(msg_str);
    
    broker_vector* vec = broker_vector_create();
    int            res = broker_vector_insert(vec, msg, 0L);
                   res = broker_endpoint_send(ep1, topic, vec);
    
    broker_deque_of_message* msg_list = broker_message_queue_need_pop(q);
    size_t                   num_msgs = broker_deque_of_message_size(msg_list);
    
    printf("There are %ld messages.\n", num_msgs);
}

Hope that helps.

- Jon


More information about the Bro mailing list