[Bro-Dev] [Bro-Commits] [git/bro] topic/actor-system: First-pass broker-enabled Cluster scripting API + misc. (07ad06b)

Robin Sommer robin at icir.org
Tue Oct 31 11:16:07 PDT 2017


This is coming together quite nicely. Not sure if it's stable yet, but
I'll just go ahead with some feedback I noticed looking over the new
cluster API:

    - One thing I can't quite tell is if this is still aiming to
      maintain compatibility with the old communication system, like
      by keeping the proxies and also the *_events patterns. Looking
      at setup-connections, it seems so. I'd say just go ahead and
      remove all legacy pieces. Maintain two schemes in parallel is
      cumbersome, and I think it's fine to just force everything over
      to Broker.

    - Is the idea for the "*_topic" constants that one just picks the
      apppropiate one when sending events? Like if I want to publish
      something to all workers, I'd publish to Cluster::worker_topic?
      I think that's fine, though I'm wondering if we could compress
      the API there somehow so that Cluster doesn't need to export all
      those constants indvidiually. One idea would be a function that
      returns a topic based on node type?

    - I like the Pools! How about moving Pool with its functions out
      of the main.bro, just for clarity.

    - Looks like the hello/bye events are broadcasted by all nodes. Is
      that on purpose, or should that be limited to just one, like
      just the master sending them out? Or does it not matter and this
      provides for more redundancy?

    - create_store() vs "stores": Is the idea that I'd normally use
      create_store() and that populates the table, but I could also
      redef it myself instead of using create_store() to create more
      custom entries? If so, maybe make that a bit more explicit in
      the comments that there're two ways to configure that table.

Robin


On Fri, Oct 27, 2017 at 12:44 -0500, Jonathan Siwek wrote:

> Repository : ssh://git@bro-ids.icir.org/bro
> On branch  : topic/actor-system
> Link       : https://github.com/bro/bro/commit/07ad06b083d16f9cf1c86041cf7287335a74ebbb
> 
> >---------------------------------------------------------------
> 
> commit 07ad06b083d16f9cf1c86041cf7287335a74ebbb
> Author: Jon Siwek <jsiwek at illinois.edu>
> Date:   Fri Oct 27 12:44:54 2017 -0500
> 
>     First-pass broker-enabled Cluster scripting API + misc.
>     
>     - Remove Broker::Options, Broker::configure().  This was only
>       half implemented (e.g. the "routable" flag wasn't used), and using
>       a function to set these options was awkward: the only way to
>       override defaults was via calling configure() in a bro_init with
>       *lower* priority such that the last call "wins".  Also doesn't
>       really make sense for it to be a function since the underlying
>       broker library can't adapt to changes in these configuration
>       values dynamically at runtime, so instead there's just now
>       two options you can redef: "Broker::forward_messages" and
>       "Broker::log_topic".
>     
>     - Add Broker::node_id() to get a unique identifier for the Bro instance's
>       broker endpoint.  This is used by the Cluster API to map node name
>       (e.g. "manager") to broker endpoint so that one can track which nodes
>       are still alive.
>     
>     - Fix how broker-based communication interacts with --pseudo-realtime
>       and reading pcaps: bro now terminates at end of reading pcap when
>       broker is active (this should now be equivalent to how RemoteSerializer
>       worked).
>     
>     - New broker-enabled Cluster framework API
>       - Still uses Cluster::nodes as the means of setting up cluster network
>       - See Cluster::stores, Cluster::StoreInfo, and Cluster::create_store
>         for how broker data stores are integrated into cluster operation
>     
>     - Update several unit tests to new Cluster API.  Failing tests at
>       the moment are mostly from scripts/frameworks that haven't been
>       ported to the new Cluster API.
> 
> 
> >---------------------------------------------------------------
> 
> 07ad06b083d16f9cf1c86041cf7287335a74ebbb
>  aux/broker                                         |   2 +-
>  scripts/base/frameworks/broker/main.bro            |  47 ++--
>  scripts/base/frameworks/broker/store.bro           |  14 +-
>  scripts/base/frameworks/cluster/__load__.bro       |   7 -
>  scripts/base/frameworks/cluster/main.bro           | 279 ++++++++++++++++++++-
>  .../base/frameworks/cluster/setup-connections.bro  | 150 +++++++++++
>  scripts/base/frameworks/control/main.bro           |   1 +
>  src/broker/Manager.cc                              |  20 +-
>  src/broker/Manager.h                               |  15 +-
>  src/broker/comm.bif                                |  21 +-
>  src/iosource/PktSrc.cc                             |  20 +-
>  testing/btest/Baseline/plugins.hooks/output        |  18 +-
>  .../manager-1..stdout                              |   4 +
>  .../manager-1.reporter.log                         |  11 +-
>  testing/btest/broker/remote_log_types.bro          |   2 +-
>  .../base/frameworks/cluster/start-it-up-logger.bro |  47 +++-
>  .../base/frameworks/cluster/start-it-up.bro        |  48 +++-
>  .../frameworks/control/configuration_update.bro    |   2 +-
>  .../scripts/base/frameworks/control/id_value.bro   |   2 +-
>  .../scripts/base/frameworks/control/shutdown.bro   |   2 +-
>  .../logging/field-extension-cluster-error.bro      |   7 +-
>  .../frameworks/logging/field-extension-cluster.bro |   7 +-
>  22 files changed, 601 insertions(+), 125 deletions(-)
> 
> diff --git a/aux/broker b/aux/broker
> index 76375d0..e1d637c 160000
> --- a/aux/broker
> +++ b/aux/broker
> @@ -1 +1 @@
> -Subproject commit 76375d07f5bf1ffc9711e064644bf865eda7a828
> +Subproject commit e1d637c816955a451079b419f438307960109346
> diff --git a/scripts/base/frameworks/broker/main.bro b/scripts/base/frameworks/broker/main.bro
> index 47533a4..3ac25ed 100644
> --- a/scripts/base/frameworks/broker/main.bro
> +++ b/scripts/base/frameworks/broker/main.bro
> @@ -20,6 +20,11 @@ export {
>  	## use already.
>  	const default_listen_retry = 30sec &redef;
>  
> +	## Default address on which to listen.
> +	##
> +	## .. bro:see:: Broker::listen
> +	const default_listen_address = "" &redef;
> +
>  	## Default interval to retry connecting to a peer if it cannot be made to work
>  	## initially, or if it ever becomes disconnected.
>  	const default_connect_retry = 30sec &redef;
> @@ -55,14 +60,12 @@ export {
>  	## all peers.
>  	const ssl_keyfile = "" &redef;
>  
> -	## The available configuration options when enabling Broker.
> -	type Options: record {
> -		## Whether this Broker instance relays messages not destined to itself.
> -		## By default, routing is disabled.
> -		routable: bool &default = F;
> -		## The topic prefix where to publish logs.
> -		log_topic: string &default = "bro/logs/";
> -	};
> +	## Forward all received messages to subscribing peers.
> +	const forward_messages = F &redef;
> +
> +	## The topic prefix where logs will be published.  The log's stream id
> +	## is appended when writing to a particular stream.
> +	const log_topic = "bro/logs/" &redef;
>  
>  	type ErrorCode: enum {
>  		## The unspecified default error code.
> @@ -153,13 +156,6 @@ export {
>  		val: Broker::Data;
>  	};
>  
> -	## Configures the local endpoint.
> -	##
> -	## options: Configures the local Broker endpoint behavior.
> -	##
> -	## Returns: true if configuration was successfully performed..
> -	global configure: function(options: Options &default = Options()): bool;
> -
>  	## Listen for remote connections.
>  	##
>  	## a: an address string on which to accept connections, e.g.
> @@ -174,7 +170,8 @@ export {
>  	## Returns: the bound port or 0/? on failure.
>  	##
>  	## .. bro:see:: Broker::status
> -	global listen: function(a: string &default = "", p: port &default=default_port,
> +	global listen: function(a: string &default = default_listen_address,
> +	                        p: port &default = default_port,
>  	                        retry: interval &default = default_listen_retry): port;
>  	## Initiate a remote connection.
>  	##
> @@ -213,6 +210,9 @@ export {
>  	## Returns: a list of all peer connections.
>  	global peers: function(): vector of PeerInfo;
>  
> +	## Returns: a unique identifier for the local broker endpoint.
> +	global node_id: function(): string;
> +
>  	## Publishes an event at a given topic.
>  	##
>  	## topic: a topic associated with the event message.
> @@ -278,16 +278,6 @@ export {
>  
>  module Broker;
>  
> -event bro_init() &priority=-10
> -	{
> -	configure(); # Configure with defaults.
> -	}
> -
> -function configure(options: Options &default = Options()): bool
> -	{
> -	return __configure(options);
> -	}
> -
>  event retry_listen(a: string, p: port, retry: interval)
>  	{
>  	listen(a, p, retry);
> @@ -318,6 +308,11 @@ function peers(): vector of PeerInfo
>  	return __peers();
>  	}
>  
> +function node_id(): string
> +	{
> +	return __node_id();
> +	}
> +
>  function publish(topic: string, ev: Event): bool
>  	{
>  	return __publish(topic, ev);
> diff --git a/scripts/base/frameworks/broker/store.bro b/scripts/base/frameworks/broker/store.bro
> index ed735e0..6b22f5d 100644
> --- a/scripts/base/frameworks/broker/store.bro
> +++ b/scripts/base/frameworks/broker/store.bro
> @@ -61,7 +61,7 @@ export {
>  	                               options: BackendOptions &default = BackendOptions()): opaque of Broker::Store;
>  
>  	## Create a clone of a master data store which may live with a remote peer.
> -	## A clone automatically synchronizes to the master by automatically
> +	## A clone automatically synchronizes to the master by
>  	## receiving modifications and applying them locally.  Direct modifications
>  	## are not possible, they must be sent through the master store, which then
>  	## automatically broadcasts the changes out to clones.  But queries may be
> @@ -70,18 +70,6 @@ export {
>  	##
>  	## name: the unique name which identifies the master data store.
>  	##
> -	## b: the storage backend to use.
> -	##
> -	## options: tunes how some storage backends operate.
> -	##
> -	## resync: the interval at which to re-attempt synchronizing with the master
> -	##         store should the connection be lost.  If the clone has not yet
> -	##         synchronized for the first time, updates and queries queue up
> -	##         until the synchronization completes.  After, if the connection
> -	##         to the master store is lost, queries continue to use the clone's
> -	##         version, but updates will be lost until the master is once again
> -	##         available.
> -	##
>  	## Returns: a handle to the data store.
>  	global create_clone: function(name: string): opaque of Broker::Store;
>  
> diff --git a/scripts/base/frameworks/cluster/__load__.bro b/scripts/base/frameworks/cluster/__load__.bro
> index 1717b83..4f193c0 100644
> --- a/scripts/base/frameworks/cluster/__load__.bro
> +++ b/scripts/base/frameworks/cluster/__load__.bro
> @@ -19,13 +19,6 @@ redef peer_description = Cluster::node;
>  
>  @load ./setup-connections
>  
> -# Don't load the listening script until we're a bit more sure that the
> -# cluster framework is actually being enabled.
> - at load frameworks/communication/listen
> -
> -## Set the port that this node is supposed to listen on.
> -redef Communication::listen_port = Cluster::nodes[Cluster::node]$p;
> -
>  @if ( Cluster::local_node_type() == Cluster::MANAGER )
>  @load ./nodes/manager
>  # If no logger is defined, then the manager receives logs.
> diff --git a/scripts/base/frameworks/cluster/main.bro b/scripts/base/frameworks/cluster/main.bro
> index 261f3f1..c94ed28 100644
> --- a/scripts/base/frameworks/cluster/main.bro
> +++ b/scripts/base/frameworks/cluster/main.bro
> @@ -7,10 +7,96 @@
>  ##! ``@load base/frameworks/cluster``.
>  
>  @load base/frameworks/control
> + at load base/frameworks/broker
>  
>  module Cluster;
>  
>  export {
> +	## Whether the cluster framework uses broker to perform remote communication.
> +	const use_broker = T &redef;
> +
> +	## The topic name used for exchanging general messages that are relevant to
> +	## any node in a cluster.  Used with broker-enabled cluster communication.
> +	const broadcast_topic = "bro/cluster/broadcast" &redef;
> +
> +	## The topic name used for exchanging messages that are relevant to
> +	## logger nodes in a cluster.  Used with broker-enabled cluster communication.
> +	const logger_topic = "bro/cluster/logger" &redef;
> +
> +	## The topic name used for exchanging messages that are relevant to
> +	## manager nodes in a cluster.  Used with broker-enabled cluster communication.
> +	const manager_topic = "bro/cluster/manager" &redef;
> +
> +	## The topic name used for exchanging messages that are relevant to
> +	## proxy nodes in a cluster.  Used with broker-enabled cluster communication.
> +	const proxy_topic = "bro/cluster/proxy" &redef;
> +
> +	## The topic name used for exchanging messages that are relevant to
> +	## worker nodes in a cluster.  Used with broker-enabled cluster communication.
> +	const worker_topic = "bro/cluster/worker" &redef;
> +
> +	## The topic name used for exchanging messages that are relevant to
> +	## time machine nodes in a cluster.  Used with broker-enabled cluster communication.
> +	const time_machine_topic = "bro/cluster/time_machine" &redef;
> +
> +	## The topic prefix used for exchanging messages that are relevant to
> +	## a named node in a cluster.  Used with broker-enabled cluster communication.
> +	const node_topic_prefix = "bro/cluster/node/" &redef;
> +
> +	## Name of the node on which master data stores will be created if no other
> +	## has already been specified by the user in :bro:see:`Cluster::stores`.
> +	const default_master_node = "manager" &redef;
> +
> +	## The type of data store backend that will be used for all data stores if
> +	## no other has already been specified by the user in :bro:see:`Cluster::stores`.
> +	const default_backend = Broker::MEMORY &redef;
> +
> +	## The type of persistent data store backend that will be used for all data
> +	## stores if no other has already been specified by the user in
> +	## :bro:see:`Cluster::stores`.  This will be used when script authors call
> +	## :bro:see:`Cluster::create_store` with the *persistent* argument set true.
> +	const default_persistent_backend = Broker::SQLITE &redef;
> +
> +	## Setting a default dir will, for persistent backends that have not
> +	## been given an explicit file path via :bro:see:`Cluster::stores`,
> +	## automatically create a path within this dir that is based on the name of
> +	## the data store.
> +	const default_store_dir = "" &redef;
> +
> +	## Information regarding a cluster-enabled data store.
> +	type StoreInfo: record {
> +		## The name of the data store.
> +		name: string &optional;
> +		## The store handle.
> +		store: opaque of Broker::Store &optional;
> +		## The name of the cluster node on which the master version of the data
> +		## store resides.
> +		master_node: string &default=default_master_node;
> +		## Whether the data store is the master version or a clone.
> +		master: bool &default=F;
> +		## The type of backend used for storing data.
> +		backend: Broker::BackendType &default=default_backend;
> +		## Parameters used for configuring the backend.
> +		options: Broker::BackendOptions &default=Broker::BackendOptions();
> +	};
> +
> +	## A table of cluster-enabled data stores that have been created, indexed
> +	## by their name.  To customize a particular data store, you may redef this,
> +	## defining the :bro:see:`StoreInfo` to associate with the store's name.
> +	global stores: table[string] of StoreInfo &default=StoreInfo() &redef;
> +
> +	## Sets up a cluster-enabled data store.  They will also still properly
> +	## function for uses that are not operating a cluster.
> +	##
> +	## name: the name of the data store to create.
> +	##
> +	## persistent: whether the data store must be persistent.
> +	##
> +	## Returns: the store's information.  For master stores, the store will be
> +	##          ready to use immediately.  For clones, the store field will not
> +	##          be set until the node containing the master store has connected.
> +	global create_store: function(name: string, persistent: bool &default=F): StoreInfo;
> +
>  	## The cluster logging stream identifier.
>  	redef enum Log::ID += { LOG };
>  
> @@ -18,6 +104,8 @@ export {
>  	type Info: record {
>  		## The time at which a cluster message was generated.
>  		ts:       time;
> +		## The name of the node that is creating the log record.
> +		node: string;
>  		## A message indicating information about the cluster's operation.
>  		message:  string;
>  	} &log;
> @@ -92,8 +180,7 @@ export {
>  		## If the *ip* field is a non-global IPv6 address, this field
>  		## can specify a particular :rfc:`4007` ``zone_id``.
>  		zone_id:      string      &default="";
> -		## The port to which this local node can connect when
> -		## establishing communication.
> +		## The port that this node will listen on for peer connections.
>  		p:            port;
>  		## Identifier for the interface a worker is sniffing.
>  		interface:    string      &optional;
> @@ -108,6 +195,8 @@ export {
>  		workers:      set[string] &optional;
>  		## Name of a time machine node with which this node connects.
>  		time_machine: string      &optional;
> +		## A unique identifier assigned to the node by the broker framework.
> +		id: string                &optional;
>  	};
>  
>  	## This function can be called at any time to determine if the cluster
> @@ -134,6 +223,8 @@ export {
>  	## named cluster-layout.bro somewhere in the BROPATH.  It will be
>  	## automatically loaded if the CLUSTER_NODE environment variable is set.
>  	## Note that BroControl handles all of this automatically.
> +	## The table is typically indexed by node names/labels (e.g. "manager"
> +	## or "worker-1").
>  	const nodes: table[string] of Node = {} &redef;
>  
>  	## Indicates whether or not the manager will act as the logger and receive
> @@ -148,6 +239,15 @@ export {
>  
>  	## Interval for retrying failed connections between cluster nodes.
>  	const retry_interval = 1min &redef;
> +
> +	## When using broker-enabled cluster framework, nodes use this event to
> +	## exchange their user-defined name along with a string that uniquely
> +	## identifies it for the duration of its lifetime (this string may change if
> +	## the node dies and has to reconnect later).
> +	global hello: event(name: string, id: string);
> +
> +	## Write a message to the cluster logging stream.
> +	global log: function(msg: string);
>  }
>  
>  function is_enabled(): bool
> @@ -163,13 +263,112 @@ function local_node_type(): NodeType
>  event remote_connection_handshake_done(p: event_peer) &priority=5
>  	{
>  	if ( p$descr in nodes && nodes[p$descr]$node_type == WORKER )
> -		++worker_count;
> +		{
> +		if ( use_broker )
> +			Reporter::error(fmt("broker-enabled cluster using old comms: '%s' ", node));
> +		else
> +			++worker_count;
> +		}
>  	}
>  
>  event remote_connection_closed(p: event_peer) &priority=5
>  	{
>  	if ( p$descr in nodes && nodes[p$descr]$node_type == WORKER )
> -		--worker_count;
> +		{
> +		if ( use_broker )
> +			Reporter::error(fmt("broker-enabled cluster using old comms: '%s' ", node));
> +		else
> +			--worker_count;
> +		}
> +	}
> +
> +event Cluster::hello(name: string, id: string) &priority=10
> +	{
> +	if ( name !in nodes )
> +		{
> +		Reporter::error(fmt("Got Cluster::hello msg from unexpected node: %s", name));
> +		return;
> +		}
> +
> +	local n = nodes[name];
> +
> +	if ( n?$id && n$id != id )
> +		Reporter::error(fmt("Got Cluster::hello msg from duplicate node: %s", name));
> +
> +	n$id = id;
> +	Cluster::log(fmt("got hello from %s (%s)", name, id));
> +
> +	if ( n$node_type == WORKER )
> +		++worker_count;
> +
> +	for ( store_name in stores )
> +		{
> +		local info = stores[store_name];
> +
> +		if ( info?$store )
> +			next;
> +
> +		if ( info$master )
> +			next;
> +
> +		if ( info$master_node == name )
> +			{
> +			info$store = Broker::create_clone(info$name);
> +			Cluster::log(fmt("created clone store: %s", info$name));
> +			}
> +		}
> +	}
> +
> +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
> +	{
> +	if ( ! use_broker )
> +		return;
> +
> +	if ( ! Cluster::is_enabled() )
> +		return;
> +
> +	local e = Broker::make_event(Cluster::hello, node, Broker::node_id());
> +	Broker::publish(Cluster::broadcast_topic, e);
> +	}
> +
> +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10
> +	{
> +	if ( ! use_broker )
> +		return;
> +
> +	for ( node_name in nodes )
> +		{
> +		local n = nodes[node_name];
> +
> +		if ( n?$id && n$id == endpoint$id )
> +			{
> +			Cluster::log(fmt("node down: %s", node_name));
> +			delete n$id;
> +
> +			if ( n$node_type == WORKER )
> +				--worker_count;
> +
> +			for ( store_name in stores )
> +				{
> +				local info = stores[store_name];
> +
> +				if ( ! info?$store )
> +					next;
> +
> +				if ( info$master )
> +					next;
> +
> +				if ( info$master_node == node_name )
> +					{
> +					Broker::close(info$store);
> +					delete info$store;
> +					Cluster::log(fmt("clone store closed: %s", info$name));
> +					}
> +				}
> +
> +			break;
> +			}
> +		}
>  	}
>  
>  event bro_init() &priority=5
> @@ -183,3 +382,75 @@ event bro_init() &priority=5
>  
>  	Log::create_stream(Cluster::LOG, [$columns=Info, $path="cluster"]);
>  	}
> +
> +function create_store(name: string, persistent: bool &default=F): Cluster::StoreInfo
> +	{
> +	local info = stores[name];
> +	info$name = name;
> +
> +	if ( Cluster::default_store_dir != "" )
> +		{
> +		local default_options = Broker::BackendOptions();
> +		local path = Cluster::default_store_dir + "/" + name;
> +
> +		if ( info$options$sqlite$path == default_options$sqlite$path )
> +			info$options$sqlite$path = path + ".sqlite";
> +
> +		if ( info$options$rocksdb$path == default_options$rocksdb$path )
> +			info$options$rocksdb$path = path + ".rocksdb";
> +		}
> +
> +	if ( persistent )
> +		{
> +		switch ( info$backend ) {
> +		case Broker::MEMORY:
> +			info$backend = Cluster::default_persistent_backend;
> +			break;
> +		case Broker::SQLITE:
> +			fallthrough;
> +		case Broker::ROCKSDB:
> +			# no-op: user already asked for a specific persistent backend.
> +			break;
> +		default:
> +			Reporter::error(fmt("unhandled data store type: %s", info$backend));
> +			break;
> +		}
> +		}
> +
> +	if ( ! Cluster::is_enabled() )
> +		{
> +		if ( info?$store )
> +			{
> +			Reporter::warning(fmt("duplicate cluster store creation for %s", name));
> +			return info;
> +			}
> +
> +		info$store = Broker::create_master(name, info$backend, info$options);
> +		info$master = T;
> +		stores[name] = info;
> +		Cluster::log(fmt("created master store: %s", name));
> +		return info;
> +		}
> +
> +	if ( info$master_node !in Cluster::nodes )
> +		Reporter::fatal(fmt("master node '%s' for cluster store '%s' does not exist",
> +		                    info$master_node, name));
> +
> +	if ( Cluster::node == info$master_node )
> +		{
> +		info$store = Broker::create_master(name, info$backend, info$options);
> +		info$master = T;
> +		stores[name] = info;
> +		return info;
> +		}
> +
> +	info$master = F;
> +	stores[name] = info;
> +	Cluster::log(fmt("pending clone store creation: %s", name));
> +	return info;
> +	}
> +
> +function log(msg: string)
> +	{
> +	Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]);
> +	}
> diff --git a/scripts/base/frameworks/cluster/setup-connections.bro b/scripts/base/frameworks/cluster/setup-connections.bro
> index 971a55d..2e775da 100644
> --- a/scripts/base/frameworks/cluster/setup-connections.bro
> +++ b/scripts/base/frameworks/cluster/setup-connections.bro
> @@ -3,13 +3,163 @@
>  
>  @load ./main
>  @load base/frameworks/communication
> + at load base/frameworks/broker
>  
>  @if ( Cluster::node in Cluster::nodes )
>  
>  module Cluster;
>  
> +type NamedNode: record {
> +	name: string;
> +	node: Node;
> +};
> +
> +function nodes_with_type(node_type: NodeType): vector of NamedNode
> +	{
> +	local rval: vector of NamedNode = vector();
> +
> +	for ( name in Cluster::nodes )
> +		{
> +		local n = Cluster::nodes[name];
> +
> +		if ( n$node_type != node_type )
> +			next;
> +
> +		rval[|rval|] = NamedNode($name=name, $node=n);
> +		}
> +
> +	return rval;
> +	}
> +
> +function connect_peer(node_type: NodeType, node_name: string): bool
> +	{
> +	local nn = nodes_with_type(node_type);
> +
> +	for ( i in nn )
> +		{
> +		local n = nn[i];
> +
> +		if ( n$name != node_name )
> +			next;
> +
> +		Cluster::log(fmt("initiate peering with %s:%s, retry=%s",
> +		                 n$node$ip, n$node$p, Cluster::retry_interval));
> +		return Broker::peer(cat(n$node$ip), n$node$p, Cluster::retry_interval);
> +		}
> +
> +	return F;
> +	}
> +
>  event bro_init() &priority=9
>  	{
> +	if ( ! use_broker )
> +		return;
> +
> +	local self = nodes[node];
> +
> +	switch ( self$node_type ) {
> +	case NONE:
> +		return;
> +	case CONTROL:
> +		break;
> +	case LOGGER:
> +		Broker::subscribe(Cluster::logger_topic);
> +		Broker::subscribe(Broker::log_topic);
> +		break;
> +	case MANAGER:
> +		Broker::subscribe(Cluster::manager_topic);
> +
> +		if ( Cluster::manager_is_logger )
> +			Broker::subscribe(Broker::log_topic);
> +
> +		break;
> +	case PROXY:
> +		Broker::subscribe(Cluster::proxy_topic);
> +		break;
> +	case WORKER:
> +		Broker::subscribe(Cluster::worker_topic);
> +		break;
> +	case TIME_MACHINE:
> +		Broker::subscribe(Cluster::time_machine_topic);
> +		break;
> +	default:
> +		Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type));
> +		return;
> +	}
> +
> +	Broker::subscribe(Cluster::broadcast_topic);
> +	Broker::subscribe(Cluster::node_topic_prefix + node);
> +
> +	Broker::listen(Broker::default_listen_address,
> +	               self$p,
> +	               Broker::default_listen_retry);
> +
> +	Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p));
> +
> +	switch ( self$node_type ) {
> +	case MANAGER:
> +		if ( self?$logger )
> +			connect_peer(LOGGER, self$logger);
> +
> +		if ( self?$time_machine )
> +			connect_peer(TIME_MACHINE, self$time_machine);
> +
> +		break;
> +	case PROXY:
> +		if ( self?$logger )
> +			connect_peer(LOGGER, self$logger);
> +
> +		if ( self?$manager )
> +			connect_peer(MANAGER, self$manager);
> +
> +		local proxies = nodes_with_type(PROXY);
> +
> +		for ( i in proxies )
> +			{
> +			local proxy = proxies[i];
> +
> +			if ( proxy$node?$proxy )
> +				Broker::peer(cat(proxy$node$ip), proxy$node$p, Cluster::retry_interval);
> +			}
> +
> +		break;
> +	case WORKER:
> +		if ( self?$logger )
> +			connect_peer(LOGGER, self$logger);
> +
> +		if ( self?$manager )
> +			connect_peer(MANAGER, self$manager);
> +
> +		if ( self?$proxy )
> +			connect_peer(PROXY, self$proxy);
> +
> +		if ( self?$time_machine )
> +			connect_peer(TIME_MACHINE, self$time_machine);
> +
> +		break;
> +	}
> +	}
> +
> +event bro_init() &priority=-10
> +	{
> +	if ( use_broker )
> +		return;
> +
> +	local lp = Cluster::nodes[Cluster::node]$p;
> +	enable_communication();
> +	listen(Communication::listen_interface,
> +	       lp,
> +	       Communication::listen_ssl,
> +	       Communication::listen_ipv6,
> +	       Communication::listen_ipv6_zone_id,
> +	       Communication::listen_retry);
> +	}
> +
> +event bro_init() &priority=9
> +	{
> +	if ( use_broker )
> +		return;
> +
>  	local me = nodes[node];
>  
>  	for ( i in Cluster::nodes )
> diff --git a/scripts/base/frameworks/control/main.bro b/scripts/base/frameworks/control/main.bro
> index 5c68c47..e3b58ef 100644
> --- a/scripts/base/frameworks/control/main.bro
> +++ b/scripts/base/frameworks/control/main.bro
> @@ -5,6 +5,7 @@
>  module Control;
>  
>  export {
> +	## Whether the control framework uses broker to perform remote communication.
>  	const use_broker = T &redef;
>  
>  	## The address of the host that will be controlled.
> diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc
> index e9d6280..b2cbe87 100644
> --- a/src/broker/Manager.cc
> +++ b/src/broker/Manager.cc
> @@ -113,7 +113,6 @@ Manager::BrokerState::BrokerState(broker::broker_options options)
>  
>  Manager::Manager()
>  	{
> -	routable = false;
>  	bound_port = 0;
>  
>  	next_timestamp = 1;
> @@ -128,6 +127,7 @@ void Manager::InitPostScript()
>  	{
>  	DBG_LOG(DBG_BROKER, "Initializing");
>  
> +	log_topic = get_option("Broker::log_topic")->AsString()->CheckString();
>  	log_id_type = internal_type("Log::ID")->AsEnumType();
>  	writer_id_type = internal_type("Log::Writer")->AsEnumType();
>  
> @@ -144,6 +144,7 @@ void Manager::InitPostScript()
>  
>  	broker::broker_options options;
>  	options.disable_ssl = get_option("Broker::disable_ssl")->AsBool();
> +	options.forward = get_option("Broker::forward_messages")->AsBool();
>  
>  	bstate = std::make_shared<BrokerState>(options);
>  	}
> @@ -176,18 +177,6 @@ bool Manager::Active()
>  	return bound_port > 0 || bstate->endpoint.peers().size();
>  	}
>  
> -bool Manager::Configure(bool arg_routable, std::string arg_log_topic)
> -	{
> -	DBG_LOG(DBG_BROKER, "Configuring endpoint: routable=%s log_topic=%s",
> -		(routable ? "yes" : "no"), arg_log_topic.c_str());
> -
> -	routable = arg_routable;
> -	log_topic = arg_log_topic;
> -
> -	// TODO: process routable flag
> -	return true;
> -	}
> -
>  uint16_t Manager::Listen(const string& addr, uint16_t port)
>  	{
>  	bound_port = bstate->endpoint.listen(addr, port);
> @@ -233,6 +222,11 @@ std::vector<broker::peer_info> Manager::Peers() const
>  	return bstate->endpoint.peers();
>  	}
>  
> +std::string Manager::NodeID() const
> +	{
> +	return to_string(bstate->endpoint.node_id());
> +	}
> +
>  bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
>  	{
>  	if ( ! bstate->endpoint.peers().size() )
> diff --git a/src/broker/Manager.h b/src/broker/Manager.h
> index a430c57..5af23e4 100644
> --- a/src/broker/Manager.h
> +++ b/src/broker/Manager.h
> @@ -74,15 +74,6 @@ public:
>  	bool Active();
>  
>  	/**
> -	 * Configure the local Broker endpoint.
> -	 * @param routable Whether the context of this endpoint routes messages not
> -	 * @param log_topic The topic prefix for logs we this endpoint published.
> -	 * destined to itself. By default endpoints do not route.
> -	 * @return true if configuration was successful.
> -	 */
> -	bool Configure(bool routable = false, std::string log_topic="");
> -
> -	/**
>  	 * Listen for remote connections.
>  	 * @param port the TCP port to listen on.
>  	 * @param addr an address string on which to accept connections, e.g.
> @@ -115,6 +106,11 @@ public:
>  	std::vector<broker::peer_info> Peers() const;
>  
>  	/**
> +	 * @return a unique identifier for this broker endpoint.
> +	 */
> +	std::string NodeID() const;
> +
> +	/**
>  	 * Send an event to any interested peers.
>  	 * @param topic a topic string associated with the message.
>  	 * Peers advertise interest by registering a subscription to some prefix
> @@ -296,7 +292,6 @@ private:
>  	broker::endpoint& Endpoint()
>  		{ assert(bstate); return bstate->endpoint; }
>  
> -	bool routable;
>  	std::string log_topic;
>  	uint16_t bound_port;
>  
> diff --git a/src/broker/comm.bif b/src/broker/comm.bif
> index 411e3d4..c7c94d4 100644
> --- a/src/broker/comm.bif
> +++ b/src/broker/comm.bif
> @@ -7,8 +7,6 @@
>  
>  module Broker;
>  
> -type Broker::Options: record;
> -
>  ## Generated when something changes in the Broker sub-system.
>  event Broker::status%(endpoint: EndpointInfo, msg: string%);
>  
> @@ -51,20 +49,6 @@ enum PeerStatus %{
>  	RECONNECTING,
>  %}
>  
> -function Broker::__configure%(options: Broker::Options%): bool
> -	%{
> -	auto routable = false;
> -	auto log_topic = "";
> -
> -	if ( auto routable_val = options->AsRecordVal()->Lookup(0) )
> -		routable = routable_val->AsBool();
> -
> -	if ( auto log_topic_val = options->AsRecordVal()->Lookup(1) )
> -		log_topic = log_topic_val->AsString()->CheckString();
> -
> -	return new Val(broker_mgr->Configure(routable, log_topic), TYPE_BOOL);
> -	%}
> -
>  function Broker::__listen%(a: string, p: port%): port
>  	%{
>  	if ( ! p->IsTCP() )
> @@ -140,3 +124,8 @@ function Broker::__peers%(%): PeerInfos
>  
>  	return rval;
>  	%}
> +
> +function Broker::__node_id%(%): string
> +	%{
> +	return new StringVal(broker_mgr->NodeID());
> +	%}
> diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc
> index a9362a0..343801a 100644
> --- a/src/iosource/PktSrc.cc
> +++ b/src/iosource/PktSrc.cc
> @@ -10,6 +10,8 @@
>  #include "Hash.h"
>  #include "Net.h"
>  #include "Sessions.h"
> +#include "broker/Manager.h"
> +#include "iosource/Manager.h"
>  
>  #include "pcap/pcap.bif.h"
>  
> @@ -304,13 +306,19 @@ bool PktSrc::ExtractNextPacketInternal()
>  		return 1;
>  		}
>  
> -	if ( pseudo_realtime && using_communication && ! IsOpen() )
> +	if ( pseudo_realtime && ! IsOpen() )
>  		{
> -		// Source has gone dry, we're done.
> -		if ( remote_trace_sync_interval )
> -			remote_serializer->SendFinalSyncPoint();
> -		else
> -			remote_serializer->Terminate();
> +		if ( using_communication )
> +			{
> +			// Source has gone dry, we're done.
> +			if ( remote_trace_sync_interval )
> +				remote_serializer->SendFinalSyncPoint();
> +			else
> +				remote_serializer->Terminate();
> +			}
> +
> +		if ( broker_mgr->Active() )
> +			iosource_mgr->Terminate();
>  		}
>  
>  	SetIdle(true);
> diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output
> index f7c61c0..5eefc64 100644
> --- a/testing/btest/Baseline/plugins.hooks/output
> +++ b/testing/btest/Baseline/plugins.hooks/output
> @@ -148,8 +148,6 @@
>  0.000000   MetaHookPost  CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_SYSLOG, {514/udp})) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_TEREDO, {3544/udp})) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp})) -> <no result>
> -0.000000   MetaHookPost  CallFunction(Broker::__configure, <frame>, ([routable=F, log_topic=bro<...>/])) -> <no result>
> -0.000000   MetaHookPost  CallFunction(Broker::configure, <frame>, ([routable=F, log_topic=bro<...>/])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Cluster::is_enabled, <frame>, ()) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Cluster::is_enabled, <null>, ()) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Files::register_analyzer_add_callback, <frame>, (Files::ANALYZER_EXTRACT, FileExtract::on_add{ if (!FileExtract::args?$extract_filename) FileExtract::args$extract_filename = cat(extract-, FileExtract::f$last_active, -, FileExtract::f$source, -, FileExtract::f$id)FileExtract::f$info$extracted = FileExtract::args$extract_filenameFileExtract::args$extract_filename = build_path_compressed(FileExtract::prefix, FileExtract::args$extract_filename)FileExtract::f$info$extracted_cutoff = Fmkdir(FileExtract::prefix)})) -> <no result>
> @@ -251,7 +249,7 @@
>  0.000000   MetaHookPost  CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
> -0.000000   MetaHookPost  CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1502745368.796663, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
> +0.000000   MetaHookPost  CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1509124227.694371, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::add_default_filter, <frame>, (Broker::LOG)) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::add_default_filter, <frame>, (Communication::LOG)) -> <no result>
> @@ -384,7 +382,7 @@
>  0.000000   MetaHookPost  CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
> -0.000000   MetaHookPost  CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1502745368.796663, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
> +0.000000   MetaHookPost  CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1509124227.694371, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
>  0.000000   MetaHookPost  CallFunction(NetControl::check_plugins, <frame>, ()) -> <no result>
>  0.000000   MetaHookPost  CallFunction(NetControl::init, <null>, ()) -> <no result>
>  0.000000   MetaHookPost  CallFunction(Notice::want_pp, <frame>, ()) -> <no result>
> @@ -872,8 +870,6 @@
>  0.000000   MetaHookPre   CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_SYSLOG, {514/udp}))
>  0.000000   MetaHookPre   CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_TEREDO, {3544/udp}))
>  0.000000   MetaHookPre   CallFunction(Analyzer::register_for_ports, <frame>, (Analyzer::ANALYZER_XMPP, {5222<...>/tcp}))
> -0.000000   MetaHookPre   CallFunction(Broker::__configure, <frame>, ([routable=F, log_topic=bro<...>/]))
> -0.000000   MetaHookPre   CallFunction(Broker::configure, <frame>, ([routable=F, log_topic=bro<...>/]))
>  0.000000   MetaHookPre   CallFunction(Cluster::is_enabled, <frame>, ())
>  0.000000   MetaHookPre   CallFunction(Cluster::is_enabled, <null>, ())
>  0.000000   MetaHookPre   CallFunction(Files::register_analyzer_add_callback, <frame>, (Files::ANALYZER_EXTRACT, FileExtract::on_add{ if (!FileExtract::args?$extract_filename) FileExtract::args$extract_filename = cat(extract-, FileExtract::f$last_active, -, FileExtract::f$source, -, FileExtract::f$id)FileExtract::f$info$extracted = FileExtract::args$extract_filenameFileExtract::args$extract_filename = build_path_compressed(FileExtract::prefix, FileExtract::args$extract_filename)FileExtract::f$info$extracted_cutoff = Fmkdir(FileExtract::prefix)}))
> @@ -975,7 +971,7 @@
>  0.000000   MetaHookPre   CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
>  0.000000   MetaHookPre   CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
>  0.000000   MetaHookPre   CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
> -0.000000   MetaHookPre   CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1502745368.796663, node=bro, filter=ip or not ip, init=T, success=T]))
> +0.000000   MetaHookPre   CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1509124227.694371, node=bro, filter=ip or not ip, init=T, success=T]))
>  0.000000   MetaHookPre   CallFunction(Log::add_default_filter, <frame>, (Broker::LOG))
>  0.000000   MetaHookPre   CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG))
>  0.000000   MetaHookPre   CallFunction(Log::add_default_filter, <frame>, (Communication::LOG))
> @@ -1108,7 +1104,7 @@
>  0.000000   MetaHookPre   CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
>  0.000000   MetaHookPre   CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
>  0.000000   MetaHookPre   CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
> -0.000000   MetaHookPre   CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1502745368.796663, node=bro, filter=ip or not ip, init=T, success=T]))
> +0.000000   MetaHookPre   CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1509124227.694371, node=bro, filter=ip or not ip, init=T, success=T]))
>  0.000000   MetaHookPre   CallFunction(NetControl::check_plugins, <frame>, ())
>  0.000000   MetaHookPre   CallFunction(NetControl::init, <null>, ())
>  0.000000   MetaHookPre   CallFunction(Notice::want_pp, <frame>, ())
> @@ -1596,8 +1592,6 @@
>  0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_SYSLOG, {514/udp})
>  0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_TEREDO, {3544/udp})
>  0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_XMPP, {5222<...>/tcp})
> -0.000000 | HookCallFunction Broker::__configure([routable=F, log_topic=bro<...>/])
> -0.000000 | HookCallFunction Broker::configure([routable=F, log_topic=bro<...>/])
>  0.000000 | HookCallFunction Cluster::is_enabled()
>  0.000000 | HookCallFunction Files::register_analyzer_add_callback(Files::ANALYZER_EXTRACT, FileExtract::on_add{ if (!FileExtract::args?$extract_filename) FileExtract::args$extract_filename = cat(extract-, FileExtract::f$last_active, -, FileExtract::f$source, -, FileExtract::f$id)FileExtract::f$info$extracted = FileExtract::args$extract_filenameFileExtract::args$extract_filename = build_path_compressed(FileExtract::prefix, FileExtract::args$extract_filename)FileExtract::f$info$extracted_cutoff = Fmkdir(FileExtract::prefix)})
>  0.000000 | HookCallFunction Files::register_for_mime_type(Files::ANALYZER_PE, application/x-dosexec)
> @@ -1698,7 +1692,7 @@
>  0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
>  0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
>  0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
> -0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1502745368.796663, node=bro, filter=ip or not ip, init=T, success=T])
> +0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1509124227.694371, node=bro, filter=ip or not ip, init=T, success=T])
>  0.000000 | HookCallFunction Log::add_default_filter(Broker::LOG)
>  0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG)
>  0.000000 | HookCallFunction Log::add_default_filter(Communication::LOG)
> @@ -1831,7 +1825,7 @@
>  0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
>  0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
>  0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
> -0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1502745368.796663, node=bro, filter=ip or not ip, init=T, success=T])
> +0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1509124227.694371, node=bro, filter=ip or not ip, init=T, success=T])
>  0.000000 | HookCallFunction NetControl::check_plugins()
>  0.000000 | HookCallFunction NetControl::init()
>  0.000000 | HookCallFunction Notice::want_pp()
> diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.start-it-up/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.start-it-up/manager-1..stdout
> index 7c8eb5e..5b10602 100644
> --- a/testing/btest/Baseline/scripts.base.frameworks.cluster.start-it-up/manager-1..stdout
> +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.start-it-up/manager-1..stdout
> @@ -1,4 +1,8 @@
>  Connected to a peer
>  Connected to a peer
>  Connected to a peer
> +Got fully_connected event
> +Got fully_connected event
>  Connected to a peer
> +Got fully_connected event
> +Got fully_connected event
> diff --git a/testing/btest/Baseline/scripts.base.frameworks.logging.field-extension-cluster-error/manager-1.reporter.log b/testing/btest/Baseline/scripts.base.frameworks.logging.field-extension-cluster-error/manager-1.reporter.log
> index b7d8c11..e7972c2 100644
> --- a/testing/btest/Baseline/scripts.base.frameworks.logging.field-extension-cluster-error/manager-1.reporter.log
> +++ b/testing/btest/Baseline/scripts.base.frameworks.logging.field-extension-cluster-error/manager-1.reporter.log
> @@ -3,11 +3,10 @@
>  #empty_field	(empty)
>  #unset_field	-
>  #path	reporter
> -#open	2016-09-22-23-31-34
> +#open	2017-10-26-19-18-59
>  #fields	_write_ts	_stream	_system_name	ts	level	message	location
>  #types	time	string	string	time	enum	string	string
> -1474587094.261799	reporter	manager-1	0.000000	Reporter::WARNING	WriterFrontend communication/Log::WRITER_ASCII expected 11 fields in write, got 8. Skipping line.	(empty)
> -1474587094.261799	reporter	manager-1	0.000000	Reporter::WARNING	WriterFrontend communication/Log::WRITER_ASCII expected 11 fields in write, got 8. Skipping line.	(empty)
> -1474587094.261799	reporter	manager-1	0.000000	Reporter::WARNING	WriterFrontend communication/Log::WRITER_ASCII expected 11 fields in write, got 8. Skipping line.	(empty)
> -1474587099.984660	reporter	manager-1	0.000000	Reporter::INFO	received termination signal	(empty)
> -#close	2016-09-22-23-31-40
> +1509045539.693078	reporter	manager-1	0.000000	Reporter::WARNING	Write using filter 'default' on path 'broker' changed to use new path 'broker-2' to avoid conflict with filter ''	(empty)
> +1509045539.699623	reporter	manager-1	0.000000	Reporter::WARNING	WriterFrontend cluster/Log::WRITER_ASCII expected 6 fields in write, got 3. Skipping line.	(empty)
> +1509045547.196521	reporter	manager-1	0.000000	Reporter::INFO	received termination signal	(empty)
> +#close	2017-10-26-19-19-07
> diff --git a/testing/btest/broker/remote_log_types.bro b/testing/btest/broker/remote_log_types.bro
> index aeaf1b9..56d63eb 100644
> --- a/testing/btest/broker/remote_log_types.bro
> +++ b/testing/btest/broker/remote_log_types.bro
> @@ -1,4 +1,4 @@
> - @TEST-SERIALIZE: brokercomm
> +# @TEST-SERIALIZE: brokercomm
>  
>  # @TEST-EXEC: btest-bg-run recv "bro -b ../recv.bro >recv.out"
>  # @TEST-EXEC: btest-bg-run send "bro -b ../send.bro >send.out"
> diff --git a/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.bro b/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.bro
> index 97f3698..72251c0 100644
> --- a/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.bro
> +++ b/testing/btest/scripts/base/frameworks/cluster/start-it-up-logger.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run logger-1  CLUSTER_NODE=logger-1 BROPATH=$BROPATH:.. bro %INPUT
>  # @TEST-EXEC: sleep 1
> @@ -38,10 +38,16 @@ global fully_connected_nodes = 0;
>  event fully_connected()
>  	{
>  	++fully_connected_nodes;
> +
>  	if ( Cluster::node == "logger-1" )
>  		{
>  		if ( peer_count == 5 && fully_connected_nodes == 5 )
> -			terminate_communication();
> +			{
> +			if ( Cluster::use_broker )
> +				terminate();
> +			else
> +				terminate_communication();
> +			}
>  		}
>  	}
>  
> @@ -49,6 +55,43 @@ redef Cluster::worker2logger_events += /fully_connected/;
>  redef Cluster::proxy2logger_events += /fully_connected/;
>  redef Cluster::manager2logger_events += /fully_connected/;
>  
> +event bro_init()
> +	{
> +	Broker::auto_publish(Cluster::logger_topic, fully_connected);
> +	}
> +
> +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
> +	{
> +	print "Connected to a peer";
> +	++peer_count;
> +
> +	if ( Cluster::node == "logger-1" )
> +		{
> +		if ( peer_count == 5 && fully_connected_nodes == 5 )
> +			{
> +			if ( Cluster::use_broker )
> +				terminate();
> +			else
> +				terminate_communication();
> +			}
> +		}
> +	else if ( Cluster::node == "manager-1" )
> +		{
> +		if ( peer_count == 5 )
> +			event fully_connected();
> +		}
> +	else
> +		{
> +		if ( peer_count == 3 )
> +			event fully_connected();
> +		}
> +	}
> +
> +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
> +	{
> +	terminate();
> +	}
> +
>  event remote_connection_handshake_done(p: event_peer)
>  	{
>  	print "Connected to a peer";
> diff --git a/testing/btest/scripts/base/frameworks/cluster/start-it-up.bro b/testing/btest/scripts/base/frameworks/cluster/start-it-up.bro
> index acb9c36..b0fcc69 100644
> --- a/testing/btest/scripts/base/frameworks/cluster/start-it-up.bro
> +++ b/testing/btest/scripts/base/frameworks/cluster/start-it-up.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run manager-1 BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro %INPUT
>  # @TEST-EXEC: sleep 1
> @@ -8,7 +8,7 @@
>  # @TEST-EXEC: btest-bg-run worker-1  BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro %INPUT
>  # @TEST-EXEC: btest-bg-run worker-2  BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro %INPUT
>  # @TEST-EXEC: btest-bg-wait 30
> -# @TEST-EXEC: btest-diff manager-1/.stdout
> +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff manager-1/.stdout
>  # @TEST-EXEC: btest-diff proxy-1/.stdout
>  # @TEST-EXEC: btest-diff proxy-2/.stdout
>  # @TEST-EXEC: btest-diff worker-1/.stdout
> @@ -32,17 +32,59 @@ global fully_connected_nodes = 0;
>  
>  event fully_connected()
>  	{
> +	if ( ! is_remote_event() )
> +		return;
> +
> +	print "Got fully_connected event";
>  	fully_connected_nodes = fully_connected_nodes + 1;
> +
>  	if ( Cluster::node == "manager-1" )
>  		{
>  		if ( peer_count == 4 && fully_connected_nodes == 4 )
> -			terminate_communication();
> +			{
> +			if ( Cluster::use_broker )
> +				terminate();
> +			else
> +				terminate_communication();
> +			}
>  		}
>  	}
>  
>  redef Cluster::worker2manager_events += /fully_connected/;
>  redef Cluster::proxy2manager_events += /fully_connected/;
>  
> +event bro_init()
> +	{
> +	Broker::auto_publish(Cluster::manager_topic, fully_connected);
> +	}
> +
> +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
> +	{
> +	print "Connected to a peer";
> +	peer_count = peer_count + 1;
> +
> +	if ( Cluster::node == "manager-1" )
> +		{
> +		if ( peer_count == 4 && fully_connected_nodes == 4 )
> +			{
> +			if ( Cluster::use_broker )
> +				terminate();
> +			else
> +				terminate_communication();
> +			}
> +		}
> +	else
> +		{
> +		if ( peer_count == 2 )
> +			event fully_connected();
> +		}
> +	}
> +
> +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
> +	{
> +	terminate();
> +	}
> +
>  event remote_connection_handshake_done(p: event_peer)
>  	{
>  	print "Connected to a peer";
> diff --git a/testing/btest/scripts/base/frameworks/control/configuration_update.bro b/testing/btest/scripts/base/frameworks/control/configuration_update.bro
> index 535a357..8d8fb0a 100644
> --- a/testing/btest/scripts/base/frameworks/control/configuration_update.bro
> +++ b/testing/btest/scripts/base/frameworks/control/configuration_update.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run controllee  BROPATH=$BROPATH:.. bro %INPUT frameworks/control/controllee Communication::listen_port=65531/tcp Broker::default_port=65531/tcp
>  # @TEST-EXEC: sleep 5
> diff --git a/testing/btest/scripts/base/frameworks/control/id_value.bro b/testing/btest/scripts/base/frameworks/control/id_value.bro
> index bee37a2..1b5e354 100644
> --- a/testing/btest/scripts/base/frameworks/control/id_value.bro
> +++ b/testing/btest/scripts/base/frameworks/control/id_value.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run controllee  BROPATH=$BROPATH:.. bro %INPUT only-for-controllee frameworks/control/controllee Communication::listen_port=65532/tcp Broker::default_port=65532/tcp
>  # @TEST-EXEC: btest-bg-run controller  BROPATH=$BROPATH:.. bro %INPUT frameworks/control/controller Control::host=127.0.0.1 Control::host_port=65532/tcp Control::cmd=id_value Control::arg=test_var
> diff --git a/testing/btest/scripts/base/frameworks/control/shutdown.bro b/testing/btest/scripts/base/frameworks/control/shutdown.bro
> index 9c0c104..2869c3f 100644
> --- a/testing/btest/scripts/base/frameworks/control/shutdown.bro
> +++ b/testing/btest/scripts/base/frameworks/control/shutdown.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run controllee BROPATH=$BROPATH:.. bro %INPUT frameworks/control/controllee Communication::listen_port=65530/tcp Broker::default_port=65530/tcp
>  # @TEST-EXEC: btest-bg-run controller BROPATH=$BROPATH:.. bro %INPUT frameworks/control/controller Control::host=127.0.0.1 Control::host_port=65530/tcp Control::cmd=shutdown
> diff --git a/testing/btest/scripts/base/frameworks/logging/field-extension-cluster-error.bro b/testing/btest/scripts/base/frameworks/logging/field-extension-cluster-error.bro
> index 6ac7a5e..c0ab10b 100644
> --- a/testing/btest/scripts/base/frameworks/logging/field-extension-cluster-error.bro
> +++ b/testing/btest/scripts/base/frameworks/logging/field-extension-cluster-error.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run manager-1 "cp ../cluster-layout.bro . && CLUSTER_NODE=manager-1 bro %INPUT"
>  # @TEST-EXEC: sleep 1
> @@ -43,6 +43,11 @@ event terminate_me() {
>  	terminate();
>  }
>  
> +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
> +	{
> +	schedule 1sec { terminate_me() };
> +	}
> +
>  event remote_connection_closed(p: event_peer) {
>    schedule 1sec { terminate_me() };
>  }
> diff --git a/testing/btest/scripts/base/frameworks/logging/field-extension-cluster.bro b/testing/btest/scripts/base/frameworks/logging/field-extension-cluster.bro
> index fb51251..6740743 100644
> --- a/testing/btest/scripts/base/frameworks/logging/field-extension-cluster.bro
> +++ b/testing/btest/scripts/base/frameworks/logging/field-extension-cluster.bro
> @@ -1,4 +1,4 @@
> -# @TEST-SERIALIZE: comm
> +# @TEST-SERIALIZE: brokercomm
>  #
>  # @TEST-EXEC: btest-bg-run manager-1 "cp ../cluster-layout.bro . && CLUSTER_NODE=manager-1 bro %INPUT"
>  # @TEST-EXEC: sleep 1
> @@ -39,6 +39,11 @@ event terminate_me() {
>  	terminate();
>  }
>  
> +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
> +	{
> +	schedule 1sec { terminate_me() };
> +	}
> +
>  event remote_connection_closed(p: event_peer) {
>    schedule 1sec { terminate_me() };
>  }
> 
> 
> 
> _______________________________________________
> bro-commits mailing list
> bro-commits at bro.org
> http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-commits
> 


-- 
Robin Sommer * ICSI/LBNL * robin at icir.org * www.icir.org/robin


More information about the bro-dev mailing list