[Zeek] Cannot send logs to their individual Kafka topics

Zeolla@GMail.com zeolla at gmail.com
Thu Apr 4 03:13:08 PDT 2019


Sorry, I was in a rush to send that prior email out. I should have
mentioned that there are actually two issues with your original config, and
the example I show above fixes both of them.  One is the bug that I
mentioned earlier and the other is the issue that Seth mentioned.

Jon Zeolla

On Thu, Apr 4, 2019, 6:00 AM Zeolla at GMail.com <zeolla at gmail.com> wrote:

> To run a local proof of concept and see a working config, apply the below
> patch to master and then run `./run_end_to_end.sh --kafka-topic=dns` (just
> requires docker and bash > 4) from the docker/ folder.  The issue is, like
> Seth said earlier, you need to configure the metadata.broker.list in
> Kafka::kafka_conf not in the logging filter's $config table (although we
> could likely add that option pretty easily - feel free to open a ticket at
> https://issues.apache.org/jira/browse/METRON-2060?filter=-4&jql=project%20%3D%20METRON%20order%20by%20created%20DESC
> ).
>
> If you're going to run up the PoC and have already built the plugin's bro
> docker container on your computer in the recent past you can add
> `--skip-docker-build` to speed things up, but it will need to be built the
> first time around at least.  If you want to poke around in the container
> running bro after things are up you can run
> `./scripts/docker_execute_shell.sh` from the docker/ folder for convenience
> and it will drop you into a shell.  Also, don't forget to run
> `./finish_end_to_end.sh` from docker/ when you're done to clean everything
> up.  Our docker testing environment is currently limited to testing one
> kafka topic at a time but this same approach should work if you configure
> multiple filters with different topics specified.  I'm doing exactly this
> in one of my bro clusters using master of the plugin.
>
> ```
> diff --git a/docker/in_docker_scripts/configure_bro_plugin.sh
> b/docker/in_docker_scripts/configure_bro_plugin.sh
> index c292504..afdd0ad 100755
> --- a/docker/in_docker_scripts/configure_bro_plugin.sh
> +++ b/docker/in_docker_scripts/configure_bro_plugin.sh
> @@ -28,13 +28,22 @@ shopt -s nocasematch
>  echo "Configuring kafka plugin"
>  {
>    echo "@load packages"
> -  echo "redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG, Conn::LOG,
> DPD::LOG, FTP::LOG, Files::LOG, Known::CERTS_LOG, SMTP::LOG, SSL::LOG,
> Weird::LOG, Notice::LOG, DHCP::LOG, SSH::LOG, Software::LOG, RADIUS::LOG,
> X509::LOG, Known::DEVICES_LOG, RFB::LOG, Stats::LOG, CaptureLoss::LOG,
> SIP::LOG);"
> -  echo "redef Kafka::topic_name = \"bro\";"
> +  echo "redef Kafka::topic_name = \"\";"
>    echo "redef Kafka::tag_json = T;"
>    echo "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] =
> \"kafka:9092\");"
> -  echo "redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);"
>    echo "redef Known::cert_tracking = ALL_HOSTS;"
>    echo "redef Software::asset_tracking = ALL_HOSTS;"
> +  echo 'event bro_init() &priority=-10
> +{
> +# handles DNS
> +local dns_filter: Log::Filter = [
> +$name = "kafka-dns",
> +$writer = Log::WRITER_KAFKAWRITER,
> +$config = table(["topic_name"] = "dns"),
> +$path = "dns"
> +];
> +Log::add_filter(DNS::LOG, dns_filter);
> +}'
>  } >> /usr/local/bro/share/bro/site/local.bro
>
>  # Load "known-devices-and-hostnames.bro" which is necessary in bro 2.5.5
> to
> ```
>
> Let me know if that works for you or if you have any other questions
>
> - Jon Zeolla
> Zeolla at GMail.Com
>
>
> On Wed, Apr 3, 2019 at 11:41 AM Arda Savran <asavran at layerxtech.com>
> wrote:
>
>> Hello again:
>>
>> I tried the script on the web site and it still fails the check:
>>
>> ##! Local site policy. Customize as appropriate.
>> ##!
>> ##! This file will not be overwritten when upgrading or reinstalling!
>>
>> #@load packages
>>
>> #@load
>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
>> #redef Kafka::send_all_active_logs = T;
>> #redef Kafka::tag_json = T;
>> #redef Kafka::kafka_conf = table(["metadata.broker.list"] = "
>> 13.88.224.129:9092");
>>
>> ###########
>> ###########
>>
>> @load
>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
>> redef Kafka::logs_to_send = set(DHCP::LOG);
>> redef Kafka::topic_name = "bro";
>> redef Kafka::kafka_conf = table(
>>     ["metadata.broker.list"] = "XX.XX.XX.XX:9092"
>> );
>> redef Kafka::tag_json = T;
>>
>> event bro_init() &priority=-10
>> {
>>     # Send DHCP to the shew_bro_dhcp topic
>>     local shew_dhcp_filter: Log::Filter = [
>>         $name = "kafka-dhcp-shew",
>>         $writer = Log::WRITER_KAFKAWRITER,
>>         $path = "shew_bro_dhcp"
>>         $config = table(["topic_name"] = "shew_bro_dhcp")
>>     ];
>>     Log::add_filter(DHCP::LOG, shew_dhcp_filter);
>> }
>>
>> ###########
>> ###########
>>
>> [root at localhost site]# broctl check
>> bro scripts failed.
>> error in /usr/local/bro/share/bro/site/local.bro, lines 29-30: not a
>> record (shew_bro_dhcp$config)
>> error in /usr/local/bro/share/bro/site/local.bro, lines 26-31 and error:
>> type clash for field "path" ((coerce [$name=kafka-dhcp-shew,
>> $writer=Log::WRITER_KAFKAWRITER, $path=shew_bro_dhcp$<error> =
>> table(topic_name = shew_bro_dhcp)] to Log::Filter) and error)
>>
>> Am I doing something wrong?
>>
>> Thanks,
>>
>>
>>
>> On Wed, Apr 3, 2019 at 9:52 AM Arda Savran <asavran at layerxtech.com>
>> wrote:
>>
>>> I used the master.
>>>
>>> I changed the beginning of my local.bro as follows and did a "broctl
>>> check" and "broctl deploy":
>>>
>>> #@load packages
>>>
>>> #@load
>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
>>> #redef Kafka::send_all_active_logs = T;
>>> #redef Kafka::tag_json = T;
>>> #redef Kafka::kafka_conf = table(["metadata.broker.list"] =
>>> "XX.XX.XX.XX:9092");
>>>
>>> ###########
>>> ###########
>>>
>>> @load
>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka
>>> redef Kafka::topic_name = "";
>>> redef Kafka::tag_json = T;
>>> redef Kafka::debug = "all";
>>>
>>> event bro_init() &priority=-10
>>> {
>>> # handles DNS
>>> local dns_filter: Log::Filter = [
>>> $name = "kafka-dns",
>>> $writer = Log::WRITER_KAFKAWRITER,
>>> $config = table(["metadata.broker.list"] = " XX.XX.XX.XX:9092"),
>>> *$config = table(["topic_name"] = "bro_dns"),*
>>> $path = "dns"
>>> ];
>>> Log::add_filter(DNS::LOG, dns_filter);
>>> }
>>>
>>> Still having no luck:
>>>
>>> [root at localhost current]# tail -f stderr.log
>>> %7|1554299460.116|CONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>>> ipv4#127.0.0.1:9092 (plaintext) with socket 34
>>> %7|1554299460.116|STATE|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>> state DOWN -> CONNECT
>>> %7|1554299460.116|BROADCAST|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>> %7|1554299460.116|BROKERFAIL|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>> Local: Broker transport failure: (errno: Connection refused)
>>> %7|1554299460.116|FAIL|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>> 127.0.0.1:9092 failed: Connection refused
>>> %7|1554299460.116|STATE|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>> state CONNECT -> DOWN
>>> %7|1554299460.116|BROADCAST|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>> %7|1554299460.116|BUFQ|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>> with 0 buffers
>>> %7|1554299460.116|BUFQ|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>> buffers on connection reset
>>> %7|1554299460.116|RECONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>> reconnect by 435ms
>>> %7|1554299460.394|NOINFO|rdkafka#producer-1| [thrd:main]: Topic bro_dns
>>> partition count is zero: should refresh metadata
>>> %7|1554299460.394|METADATA|rdkafka#producer-1| [thrd:main]: Skipping
>>> metadata refresh of 1 topic(s): no usable brokers
>>> %7|1554299460.552|RECONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>> reconnect by 276ms
>>> %7|1554299460.827|CONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state
>>> DOWN connecting
>>> %7|1554299460.827|CONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>>> ipv4#127.0.0.1:9092 (plaintext) with socket 34
>>> %7|1554299460.827|STATE|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>> state DOWN -> CONNECT
>>> %7|1554299460.827|BROADCAST|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>> %7|1554299460.827|BROKERFAIL|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>> Local: Broker transport failure: (errno: Connection refused)
>>> %7|1554299460.827|FAIL|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>> 127.0.0.1:9092 failed: Connection refused
>>> %7|1554299460.827|STATE|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>> state CONNECT -> DOWN
>>> %7|1554299460.827|BROADCAST|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>> %7|1554299460.827|BUFQ|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>> with 0 buffers
>>> %7|1554299460.827|BUFQ|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>> buffers on connection reset
>>> %7|1554299461.394|NOINFO|rdkafka#producer-1| [thrd:main]: Topic bro_dns
>>> partition count is zero: should refresh metadata
>>> %7|1554299461.394|METADATA|rdkafka#producer-1| [thrd:main]: Skipping
>>> metadata refresh of 1 topic(s): no usable brokers
>>> %7|1554299461.827|CONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state
>>> DOWN connecting
>>> %7|1554299461.828|CONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>>> ipv4#127.0.0.1:9092 (plaintext) with socket 34
>>> %7|1554299461.828|STATE|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>> state DOWN -> CONNECT
>>> %7|1554299461.828|BROADCAST|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>> %7|1554299461.828|BROKERFAIL|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>> Local: Broker transport failure: (errno: Connection refused)
>>> %7|1554299461.828|FAIL|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>> 127.0.0.1:9092 failed: Connection refused
>>> %7|1554299461.828|STATE|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>> state CONNECT -> DOWN
>>> %7|1554299461.828|BROADCAST|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>> %7|1554299461.828|BUFQ|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>> with 0 buffers
>>> %7|1554299461.829|BUFQ|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>> buffers on connection reset
>>> %7|1554299461.829|RECONNECT|rdkafka#producer-1|
>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>> reconnect by 715ms
>>>
>>>
>>> Do you have any other suggestions for me?
>>>
>>> Thanks
>>>
>>>
>>> On Wed, Apr 3, 2019 at 8:38 AM Zeolla at GMail.com <zeolla at gmail.com>
>>> wrote:
>>>
>>>> Are you using master?  The easiest way to fix this is likely to add a
>>>> key of "topic_name" and a value of "dns" to your $config table, similar to
>>>> as shown here
>>>> <https://github.com/apache/metron-bro-plugin-kafka#example-6---sending-a-log-to-multiple-topics>.
>>>> Please let me know if that works for you.
>>>>
>>>> There is a known issue in master where the plugin is not falling back
>>>> to use $path as the destination topic name, and I have a PR open
>>>> <https://github.com/apache/metron-bro-plugin-kafka/pull/26> for it but
>>>> unfortunately haven't had a lot of time to finish (it is just pending some
>>>> btests - functionally it is done) and get that merged.
>>>>
>>>> - Jon Zeolla
>>>> Zeolla at GMail.Com
>>>>
>>>>
>>>> On Tue, Apr 2, 2019 at 11:37 AM Arda Savran <asavran at layerxtech.com>
>>>> wrote:
>>>>
>>>>> Hello folks:
>>>>>
>>>>> I have successfully been able to send everything to a remote single
>>>>> Kafka Topic from a local Bro machine and following is my local.bro file to
>>>>> make that happen:
>>>>>
>>>>> *##! Local site policy. Customize as appropriate.*
>>>>> *##!*
>>>>> *##! This file will not be overwritten when upgrading or reinstalling!*
>>>>>
>>>>> *#@load packages*
>>>>>
>>>>> *@load
>>>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka*
>>>>> *redef Kafka::send_all_active_logs = T;*
>>>>> *redef Kafka::tag_json = T;*
>>>>> *redef Kafka::kafka_conf = table(["metadata.broker.list"] =
>>>>> "XX.XX.XX.XX:9092");*
>>>>>
>>>>> However, when I change that to write logs to their individual Kafka
>>>>> topics I get an error message under stderr.log. Following is my updated
>>>>> local.bro:
>>>>>
>>>>> *##! Local site policy. Customize as appropriate.*
>>>>> *##!*
>>>>> *##! This file will not be overwritten when upgrading or reinstalling!*
>>>>>
>>>>> *#@load packages*
>>>>>
>>>>> *#@load
>>>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka*
>>>>> *#redef Kafka::send_all_active_logs = T;*
>>>>> *#redef Kafka::tag_json = T;*
>>>>> *#redef Kafka::kafka_conf = table(["metadata.broker.list"] =
>>>>> "XX.XX.XX.XX:9092");*
>>>>>
>>>>> *###########*
>>>>> *###########*
>>>>>
>>>>> *@load
>>>>> /usr/local/bro/lib/bro/plugins/packages/metron-bro-plugin-kafka/scripts/Apache/Kafka*
>>>>> *redef Kafka::topic_name = "";*
>>>>> *redef Kafka::tag_json = T;*
>>>>> *redef Kafka::debug = "all";*
>>>>>
>>>>> *event bro_init() &priority=-10*
>>>>> *{*
>>>>> *# handles DNS*
>>>>> *local dns_filter: Log::Filter = [*
>>>>> *$name = "kafka-dns",*
>>>>> *$writer = Log::WRITER_KAFKAWRITER,*
>>>>> *$config = table(["metadata.broker.list"] = "XX.XX.XX.XX:9092"),*
>>>>> *$path = "dns"*
>>>>> *];*
>>>>> *Log::add_filter(DNS::LOG, dns_filter);*
>>>>> *}*
>>>>>
>>>>> *###########*
>>>>> *###########*
>>>>>
>>>>> I enter "broctl check" and "broctl deploy" after that; but get the
>>>>> following:
>>>>>
>>>>> [root at localhost current]# tail -f stderr.log
>>>>> %7|1554218121.957|STATE|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>>> state DOWN -> CONNECT
>>>>> %7|1554218121.957|BROADCAST|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>>> %7|1554218121.957|BROKERFAIL|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>>>> Local: Broker transport failure: (errno: Connection refused)
>>>>> %7|1554218121.957|FAIL|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>>>> 127.0.0.1:9092 failed: Connection refused
>>>>> %7|1554218121.957|STATE|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>>> state CONNECT -> DOWN
>>>>> %7|1554218121.957|BROADCAST|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>>> %7|1554218121.957|BUFQ|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>>>> with 0 buffers
>>>>> %7|1554218121.957|BUFQ|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>>>> buffers on connection reset
>>>>> %7|1554218122.309|NOINFO|rdkafka#producer-1| [thrd:main]: Topic
>>>>> partition count is zero: should refresh metadata
>>>>> %7|1554218122.309|METADATA|rdkafka#producer-1| [thrd:main]: Skipping
>>>>> metadata refresh of 1 topic(s): no usable brokers
>>>>> %7|1554218122.957|CONNECT|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state
>>>>> DOWN connecting
>>>>> %7|1554218122.958|CONNECT|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to
>>>>> ipv4#127.0.0.1:9092 (plaintext) with socket 29
>>>>> %7|1554218122.958|STATE|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>>> state DOWN -> CONNECT
>>>>> %7|1554218122.958|BROADCAST|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>>> %7|1554218122.958|BROKERFAIL|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: failed: err:
>>>>> Local: Broker transport failure: (errno: Connection refused)
>>>>> %7|1554218122.958|FAIL|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#
>>>>> 127.0.0.1:9092 failed: Connection refused
>>>>> %7|1554218122.958|STATE|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed
>>>>> state CONNECT -> DOWN
>>>>> %7|1554218122.958|BROADCAST|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: Broadcasting state change
>>>>> %7|1554218122.958|BUFQ|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq
>>>>> with 0 buffers
>>>>> %7|1554218122.958|BUFQ|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0
>>>>> buffers on connection reset
>>>>> %7|1554218122.958|RECONNECT|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>>>> reconnect by 301ms
>>>>> %7|1554218123.259|RECONNECT|rdkafka#producer-1|
>>>>> [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Delaying next
>>>>> reconnect by 53ms
>>>>> %7|1554218123.309|NOINFO|rdkafka#producer-1| [thrd:main]: Topic
>>>>> partition count is zero: should refresh metadata
>>>>>
>>>>> Yes, I have iptables enabled on the local bro machine but it works
>>>>> with the first configuration option file. How come bro thinks that the
>>>>> kafka broker is local. It is supposed to send the messages to XX.XX.XX.XX.
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> _______________________________________________
>>>>> Zeek mailing list
>>>>> zeek at zeek.org
>>>>> http://mailman.ICSI.Berkeley.EDU/mailman/listinfo/zeek
>>>>
>>>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mailman.ICSI.Berkeley.EDU/pipermail/zeek/attachments/20190404/2430e4e2/attachment-0001.html 


More information about the Zeek mailing list