[Zeek] Cannot send logs to their individual Kafka topics

Zeolla@GMail.com zeolla at gmail.com
Thu Apr 4 03:00:05 PDT 2019


To run a local proof of concept and see a working config, apply the below
patch to master and then run `./run_end_to_end.sh --kafka-topic=dns` (just
requires docker and bash > 4) from the docker/ folder.  The issue is, like
Seth said earlier, you need to configure the metadata.broker.list in
Kafka::kafka_conf not in the logging filter's $config table (although we
could likely add that option pretty easily - feel free to open a ticket at
https://issues.apache.org/jira/browse/METRON-2060?filter=-4&jql=project%20%3D%20METRON%20order%20by%20created%20DESC
).

If you're going to run up the PoC and have already built the plugin's bro
docker container on your computer in the recent past you can add
`--skip-docker-build` to speed things up, but it will need to be built the
first time around at least.  If you want to poke around in the container
running bro after things are up you can run
`./scripts/docker_execute_shell.sh` from the docker/ folder for convenience
and it will drop you into a shell.  Also, don't forget to run
`./finish_end_to_end.sh` from docker/ when you're done to clean everything
up.  Our docker testing environment is currently limited to testing one
kafka topic at a time but this same approach should work if you configure
multiple filters with different topics specified.  I'm doing exactly this
in one of my bro clusters using master of the plugin.

```
diff --git a/docker/in_docker_scripts/configure_bro_plugin.sh
b/docker/in_docker_scripts/configure_bro_plugin.sh
index c292504..afdd0ad 100755
--- a/docker/in_docker_scripts/configure_bro_plugin.sh
+++ b/docker/in_docker_scripts/configure_bro_plugin.sh
@@ -28,13 +28,22 @@ shopt -s nocasematch
 echo "Configuring kafka plugin"
 {
   echo "@load packages"
-  echo "redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG, Conn::LOG,
DPD::LOG, FTP::LOG, Files::LOG, Known::CERTS_LOG, SMTP::LOG, SSL::LOG,
Weird::LOG, Notice::LOG, DHCP::LOG, SSH::LOG, Software::LOG, RADIUS::LOG,
X509::LOG, Known::DEVICES_LOG, RFB::LOG, Stats::LOG, CaptureLoss::LOG,
SIP::LOG);"
-  echo "redef Kafka::topic_name = \"bro\";"
+  echo "redef Kafka::topic_name = \"\";"
   echo "redef Kafka::tag_json = T;"
   echo "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] =
\"kafka:9092\");"
-  echo "redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);"
   echo "redef Known::cert_tracking = ALL_HOSTS;"
   echo "redef Software::asset_tracking = ALL_HOSTS;"
+  echo 'event bro_init() &priority=-10
+{
+# handles DNS
+local dns_filter: Log::Filter = [
+$name = "kafka-dns",
+$writer = Log::WRITER_KAFKAWRITER,
+$config = table(["topic_name"] = "dns"),
+$path = "dns"
+];
+Log::add_filter(DNS::LOG, dns_filter);
+}'
 } >> /usr/local/bro/share/bro/site/local.bro

 # Load "known-devices-and-hostnames.bro" which is necessary in bro 2.5.5 to
```

Let me know if that works for you or if you have any other questions

- Jon Zeolla
Zeolla at GMail.Com


On Wed, Apr 3, 2019 at 11:41 AM Arda Savran <asavran at layerxtech.com> wrote:

> Hello again:
>
> I tried the script on the web site and it still fails the check:
>
> ##! Local site policy. Customize as appropriate.
> ##!
> ##! This file will not be overwritten when upgrading or reinstalling!
>
> #@load packages
>
> #@load
> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
> #redef Kafka::send_all_active_logs = T;
> #redef Kafka::tag_json = T;
> #redef Kafka::kafka_conf = table(["metadata.broker.list"] = "
> 13.88.224.129:9092");
>
> ###########
> ###########
>
> @load
> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
> redef Kafka::logs_to_send = set(DHCP::LOG);
> redef Kafka::topic_name = "bro";
> redef Kafka::kafka_conf = table(
>     ["metadata.broker.list"] = "XX.XX.XX.XX:9092"
> );
> redef Kafka::tag_json = T;
>
> event bro_init() &priority=-10
> {
>     # Send DHCP to the shew_bro_dhcp topic
>     local shew_dhcp_filter: Log::Filter = [
>         $name = "kafka-dhcp-shew",
>         $writer = Log::WRITER_KAFKAWRITER,
>         $path = "shew_bro_dhcp"
>         $config = table(["topic_name"] = "shew_bro_dhcp")
>     ];
>     Log::add_filter(DHCP::LOG, shew_dhcp_filter);
> }
>
> ###########
> ###########
>
> [root at localhost site]# broctl check
> bro scripts failed.
> error in /usr/local/bro/share/bro/site/local.bro, lines 29-30: not a
> record (shew_bro_dhcp$config)
> error in /usr/local/bro/share/bro/site/local.bro, lines 26-31 and error:
> type clash for field "path" ((coerce [$name=kafka-dhcp-shew,
> $writer=Log::WRITER_KAFKAWRITER, $path=shew_bro_dhcp$<error> =
> table(topic_name = shew_bro_dhcp)] to Log::Filter) and error)
>
> Am I doing something wrong?
>
> Thanks,
>
>
>
> On Wed, Apr 3, 2019 at 9:52 AM Arda Savran <asavran at layerxtech.com> wrote:
>
>> I used the master.
>>
>> I changed the beginning of my local.bro as follows and did a "broctl
>> check" and "broctl deploy":
>>
>> #@load packages
>>
>> #@load
>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
>> #redef Kafka::send_all_active_logs = T;
>> #redef Kafka::tag_json = T;
>> #redef Kafka::kafka_conf = table(["metadata.broker.list"] =
>> "XX.XX.XX.XX:9092");
>>
>> ###########
>> ###########
>>
>> @load
>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
>> redef Kafka::topic_name = "";
>> redef Kafka::tag_json = T;
>> redef Kafka::debug = "all";
>>
>> event bro_init() &priority=-10
>> {
>> # handles DNS
>> local dns_filter: Log::Filter = [
>> $name = "kafka-dns",
>> $writer = Log::WRITER_KAFKAWRITER,
>> $config = table(["metadata.broker.list"] = " XX.XX.XX.XX:9092"),
>> *$config = table(["topic_name"] = "bro_dns"),*
>> $path = "dns"
>> ];
>> Log::add_filter(DNS::LOG, dns_filter);
>> }
>>
>> Still having no luck:
>>
>> [root at localhost current]# tail -f stderr.log
>> %7|1554299460.116|CONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>> ipv4#127.0.0.1:9092 (plaintext) with socket 34
>> %7|1554299460.116|STATE|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>> state DOWN -> CONNECT
>> %7|1554299460.116|BROADCAST|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>> %7|1554299460.116|BROKERFAIL|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>> Local: Broker transport failure: (errno: Connection refused)
>> %7|1554299460.116|FAIL|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>> 127.0.0.1:9092 failed: Connection refused
>> %7|1554299460.116|STATE|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>> state CONNECT -> DOWN
>> %7|1554299460.116|BROADCAST|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>> %7|1554299460.116|BUFQ|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>> with 0 buffers
>> %7|1554299460.116|BUFQ|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>> buffers on connection reset
>> %7|1554299460.116|RECONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>> reconnect by 435ms
>> %7|1554299460.394|NOINFO|rdkafka#producer-1| [thrd:main]: Topic bro_dns
>> partition count is zero: should refresh metadata
>> %7|1554299460.394|METADATA|rdkafka#producer-1| [thrd:main]: Skipping
>> metadata refresh of 1 topic(s): no usable brokers
>> %7|1554299460.552|RECONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>> reconnect by 276ms
>> %7|1554299460.827|CONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state
>> DOWN connecting
>> %7|1554299460.827|CONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>> ipv4#127.0.0.1:9092 (plaintext) with socket 34
>> %7|1554299460.827|STATE|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>> state DOWN -> CONNECT
>> %7|1554299460.827|BROADCAST|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>> %7|1554299460.827|BROKERFAIL|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>> Local: Broker transport failure: (errno: Connection refused)
>> %7|1554299460.827|FAIL|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>> 127.0.0.1:9092 failed: Connection refused
>> %7|1554299460.827|STATE|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>> state CONNECT -> DOWN
>> %7|1554299460.827|BROADCAST|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>> %7|1554299460.827|BUFQ|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>> with 0 buffers
>> %7|1554299460.827|BUFQ|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>> buffers on connection reset
>> %7|1554299461.394|NOINFO|rdkafka#producer-1| [thrd:main]: Topic bro_dns
>> partition count is zero: should refresh metadata
>> %7|1554299461.394|METADATA|rdkafka#producer-1| [thrd:main]: Skipping
>> metadata refresh of 1 topic(s): no usable brokers
>> %7|1554299461.827|CONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state
>> DOWN connecting
>> %7|1554299461.828|CONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>> ipv4#127.0.0.1:9092 (plaintext) with socket 34
>> %7|1554299461.828|STATE|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>> state DOWN -> CONNECT
>> %7|1554299461.828|BROADCAST|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>> %7|1554299461.828|BROKERFAIL|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>> Local: Broker transport failure: (errno: Connection refused)
>> %7|1554299461.828|FAIL|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>> 127.0.0.1:9092 failed: Connection refused
>> %7|1554299461.828|STATE|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>> state CONNECT -> DOWN
>> %7|1554299461.828|BROADCAST|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>> %7|1554299461.828|BUFQ|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>> with 0 buffers
>> %7|1554299461.829|BUFQ|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>> buffers on connection reset
>> %7|1554299461.829|RECONNECT|rdkafka#producer-1|
>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>> reconnect by 715ms
>>
>>
>> Do you have any other suggestions for me?
>>
>> Thanks
>>
>>
>> On Wed, Apr 3, 2019 at 8:38 AM Zeolla at GMail.com <zeolla at gmail.com> wrote:
>>
>>> Are you using master?  The easiest way to fix this is likely to add a
>>> key of "topic_name" and a value of "dns" to your $config table, similar to
>>> as shown here
>>> <https://github.com/apache/metron-bro-plugin-kafka#example-6---sending-a-log-to-multiple-topics>.
>>> Please let me know if that works for you.
>>>
>>> There is a known issue in master where the plugin is not falling back to
>>> use $path as the destination topic name, and I have a PR open
>>> <https://github.com/apache/metron-bro-plugin-kafka/pull/26> for it but
>>> unfortunately haven't had a lot of time to finish (it is just pending some
>>> btests - functionally it is done) and get that merged.
>>>
>>> - Jon Zeolla
>>> Zeolla at GMail.Com
>>>
>>>
>>> On Tue, Apr 2, 2019 at 11:37 AM Arda Savran <asavran at layerxtech.com>
>>> wrote:
>>>
>>>> Hello folks:
>>>>
>>>> I have successfully been able to send everything to a remote single
>>>> Kafka Topic from a local Bro machine and following is my local.bro file to
>>>> make that happen:
>>>>
>>>> *##! Local site policy. Customize as appropriate.*
>>>> *##!*
>>>> *##! This file will not be overwritten when upgrading or reinstalling!*
>>>>
>>>> *#@load packages*
>>>>
>>>> *@load
>>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka*
>>>> *redef Kafka::send_all_active_logs = T;*
>>>> *redef Kafka::tag_json = T;*
>>>> *redef Kafka::kafka_conf = table(["metadata.broker.list"] =
>>>> "XX.XX.XX.XX:9092");*
>>>>
>>>> However, when I change that to write logs to their individual Kafka
>>>> topics I get an error message under stderr.log. Following is my updated
>>>> local.bro:
>>>>
>>>> *##! Local site policy. Customize as appropriate.*
>>>> *##!*
>>>> *##! This file will not be overwritten when upgrading or reinstalling!*
>>>>
>>>> *#@load packages*
>>>>
>>>> *#@load
>>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka*
>>>> *#redef Kafka::send_all_active_logs = T;*
>>>> *#redef Kafka::tag_json = T;*
>>>> *#redef Kafka::kafka_conf = table(["metadata.broker.list"] =
>>>> "XX.XX.XX.XX:9092");*
>>>>
>>>> *###########*
>>>> *###########*
>>>>
>>>> *@load
>>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka*
>>>> *redef Kafka::topic_name = "";*
>>>> *redef Kafka::tag_json = T;*
>>>> *redef Kafka::debug = "all";*
>>>>
>>>> *event bro_init() &priority=-10*
>>>> *{*
>>>> *# handles DNS*
>>>> *local dns_filter: Log::Filter = [*
>>>> *$name = "kafka-dns",*
>>>> *$writer = Log::WRITER_KAFKAWRITER,*
>>>> *$config = table(["metadata.broker.list"] = "XX.XX.XX.XX:9092"),*
>>>> *$path = "dns"*
>>>> *];*
>>>> *Log::add_filter(DNS::LOG, dns_filter);*
>>>> *}*
>>>>
>>>> *###########*
>>>> *###########*
>>>>
>>>> I enter "broctl check" and "broctl deploy" after that; but get the
>>>> following:
>>>>
>>>> [root at localhost current]# tail -f stderr.log
>>>> %7|1554218121.957|STATE|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>> state DOWN -> CONNECT
>>>> %7|1554218121.957|BROADCAST|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>> %7|1554218121.957|BROKERFAIL|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>>> Local: Broker transport failure: (errno: Connection refused)
>>>> %7|1554218121.957|FAIL|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>>> 127.0.0.1:9092 failed: Connection refused
>>>> %7|1554218121.957|STATE|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>> state CONNECT -> DOWN
>>>> %7|1554218121.957|BROADCAST|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>> %7|1554218121.957|BUFQ|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>>> with 0 buffers
>>>> %7|1554218121.957|BUFQ|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>>> buffers on connection reset
>>>> %7|1554218122.309|NOINFO|rdkafka#producer-1| [thrd:main]: Topic
>>>> partition count is zero: should refresh metadata
>>>> %7|1554218122.309|METADATA|rdkafka#producer-1| [thrd:main]: Skipping
>>>> metadata refresh of 1 topic(s): no usable brokers
>>>> %7|1554218122.957|CONNECT|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state
>>>> DOWN connecting
>>>> %7|1554218122.958|CONNECT|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>>>> ipv4#127.0.0.1:9092 (plaintext) with socket 29
>>>> %7|1554218122.958|STATE|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>> state DOWN -> CONNECT
>>>> %7|1554218122.958|BROADCAST|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>> %7|1554218122.958|BROKERFAIL|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>>> Local: Broker transport failure: (errno: Connection refused)
>>>> %7|1554218122.958|FAIL|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>>> 127.0.0.1:9092 failed: Connection refused
>>>> %7|1554218122.958|STATE|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>> state CONNECT -> DOWN
>>>> %7|1554218122.958|BROADCAST|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>> %7|1554218122.958|BUFQ|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>>> with 0 buffers
>>>> %7|1554218122.958|BUFQ|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>>> buffers on connection reset
>>>> %7|1554218122.958|RECONNECT|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>>> reconnect by 301ms
>>>> %7|1554218123.259|RECONNECT|rdkafka#producer-1|
>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>>> reconnect by 53ms
>>>> %7|1554218123.309|NOINFO|rdkafka#producer-1| [thrd:main]: Topic
>>>> partition count is zero: should refresh metadata
>>>>
>>>> Yes, I have iptables enabled on the local bro machine but it works with
>>>> the first configuration option file. How come bro thinks that the kafka
>>>> broker is local. It is supposed to send the messages to XX.XX.XX.XX.
>>>>
>>>> Thanks in advance.
>>>>
>>>> _______________________________________________
>>>> Zeek mailing list
>>>> zeek at zeek.org
>>>> http://mailman.ICSI.Berkeley.EDU/mailman/listinfo/zeek
>>>
>>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mailman.ICSI.Berkeley.EDU/pipermail/zeek/attachments/20190404/5e5a1e30/attachment-0001.html 


More information about the Zeek mailing list