[Bro-Dev] [Bro-Commits] [git/bro] topic/seth/elasticsearch-merge-tmp: ElasticSearch plugin fixes. (49b19f2)

Vlad Grigorescu vladg at cmu.edu
Fri Jun 29 09:47:47 PDT 2012


I'll be presenting at the Bro Exchange on ElasticSearch, and specifically
a web interface I've been working on for accessing Bro logs in
ElasticSearch.

If people are interested in playing around and learning more about
ElasticSearch until then, just let me know - I'd be happy to help.
Personally, I think it's really cool (or as their website describes it,
"bonsai cool.")

  --Vlad

On 6/29/12 12:35 PM, "Slagell, Adam J" <slagell at illinois.edu> wrote:

I'd like to learn more about Elastic Search sometime Seth. Maybe a blog
entry or a talk at the Bro Exchange. :-)


On Jun 29, 2012, at 11:09 AM, Seth Hall wrote:

> Repository : ssh://git@bro-ids.icir.org/bro
> 
> On branch  : topic/seth/elasticsearch-merge-tmp
> Link       : 
>http://tracker.bro-ids.org/bro/changeset/49b19f244fc8284f75469f05d96c4ac16
>acd2cfe/bro
> 
>> ---------------------------------------------------------------
> 
> commit 49b19f244fc8284f75469f05d96c4ac16acd2cfe
> Author: Seth Hall <seth at icir.org>
> Date:   Fri Jun 29 12:09:34 2012 -0400
> 
>    ElasticSearch plugin fixes.
> 
>    - Fixed memory leak.
> 
>    - Now using Fmt instead of the thread unsafe fmt.
> 
> 
>> ---------------------------------------------------------------
> 
> 49b19f244fc8284f75469f05d96c4ac16acd2cfe
> src/logging/writers/ElasticSearch.cc |   33
>+++++++++++++++++----------------
> src/logging/writers/ElasticSearch.h  |    3 +++
> 2 files changed, 20 insertions(+), 16 deletions(-)
> 
> diff --git a/src/logging/writers/ElasticSearch.cc
>b/src/logging/writers/ElasticSearch.cc
> index 0e388df..19cd296 100644
> --- a/src/logging/writers/ElasticSearch.cc
> +++ b/src/logging/writers/ElasticSearch.cc
> @@ -32,9 +32,11 @@ ElasticSearch::ElasticSearch(WriterFrontend*
>frontend) : WriterBackend(frontend)
> 	
> 	index_prefix = string((const char*)
>BifConst::LogElasticSearch::index_prefix->Bytes(),
>BifConst::LogElasticSearch::index_prefix->Len());
> 	
> -	es_server = string(fmt("http://%s:%d/",
>BifConst::LogElasticSearch::server_host->Bytes(),
> -	                                        (int)
>BifConst::LogElasticSearch::server_port));
> +	es_server = string(Fmt("http://%s:%d",
>BifConst::LogElasticSearch::server_host->Bytes(),
> +	                                       (int)
>BifConst::LogElasticSearch::server_port));
> +	bulk_url = string(Fmt("%s/_bulk", es_server.c_str()));
> 	
> +	http_headers = curl_slist_append(NULL, "Content-Type: text/json;
>charset=utf-8");
> 	buffer.Clear();
> 	counter = 0;
> 	current_index = string();
> @@ -56,13 +58,14 @@ bool ElasticSearch::DoInit(const WriterInfo& info,
>int num_fields, const threadi
> 
> bool ElasticSearch::DoFlush()
> 	{
> -	// Do something here?
> +	BatchIndex();
> 	return true;
> 	}
> 
> bool ElasticSearch::DoFinish()
> 	{
> 	BatchIndex();
> +	curl_slist_free_all(http_headers);
> 	curl_easy_cleanup(curl_handle);
> 	return WriterBackend::DoFinish();
> 	}
> @@ -70,8 +73,7 @@ bool ElasticSearch::DoFinish()
> bool ElasticSearch::BatchIndex()
> 	{
> 	curl_easy_reset(curl_handle);
> -	string url = es_server + "_bulk";
> -	curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
> +	curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str());
> 	curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
> 	curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE,
>(curl_off_t)buffer.Len());
> 	curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
> @@ -264,7 +266,7 @@ bool ElasticSearch::UpdateIndex(double now, double
>rinterval, double rbase)
> 		current_index = index_prefix + "-" + buf;
> 		}
> 	
> -	//printf("%s - prev:%s current:%s\n", Info().path.c_str(),
>prev_index.c_str(), current_index.c_str());
> +		printf("%s - prev:%s current:%s\n", Info().path.c_str(),
>prev_index.c_str(), current_index.c_str());
> 	return true;
> 	}
> 	
> @@ -281,7 +283,7 @@ bool ElasticSearch::DoRotate(string rotated_path,
>const RotateInfo& info, bool t
> 		
> 		// Compress the previous index
> 		//curl_easy_reset(curl_handle);
> -		//curl_easy_setopt(curl_handle, CURLOPT_URL, fmt("%s%s/_settings",
>es_server.c_str(), prev_index.c_str()));
> +		//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings",
>es_server.c_str(), prev_index.c_str()));
> 		//curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT");
> 		//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS,
>"{\"index\":{\"store.compress.stored\":\"true\"}}");
> 		//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE,
>(curl_off_t) 42);
> @@ -290,14 +292,14 @@ bool ElasticSearch::DoRotate(string rotated_path,
>const RotateInfo& info, bool t
> 		// Optimize the previous index.
> 		// TODO: make this into variables.
> 		//curl_easy_reset(curl_handle);
> -		//curl_easy_setopt(curl_handle, CURLOPT_URL,
>fmt("%s%s/_optimize?max_num_segments=1&wait_for_merge=false",
>es_server.c_str(), prev_index.c_str()));
> +		//curl_easy_setopt(curl_handle, CURLOPT_URL,
>Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false",
>es_server.c_str(), prev_index.c_str()));
> 		//HTTPSend(curl_handle);
> 		}
> 	
> -	if ( ! FinishedRotation(current_index, prev_index, info, terminating) )
> -		{
> -		Error(Fmt("error rotating %s to %s", prev_index.c_str(),
>current_index.c_str()));
> -		}
> +	//if ( ! FinishedRotation(current_index, prev_index, info,
>terminating) )
> +	//	{
> +	//	Error(Fmt("error rotating %s to %s", prev_index.c_str(),
>current_index.c_str()));
> +	//	}
> 	
> 	return true;
> 	}
> @@ -325,7 +327,7 @@ CURL* ElasticSearch::HTTPSetup()
> 	CURL* handle = curl_easy_init();
> 	if ( ! handle )
> 		{
> -		Error(fmt("cURL did not initialize correctly."));
> +		Error("cURL did not initialize correctly.");
> 		return 0;
> 		}
> 	
> @@ -340,8 +342,7 @@ bool ElasticSearch::HTTPReceive(void* ptr, int size,
>int nmemb, void* userdata)
> 
> bool ElasticSearch::HTTPSend(CURL *handle)
> 	{
> -	struct curl_slist *headers = curl_slist_append(NULL, "Content-Type:
>text/json; charset=utf-8");
> -	curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
> +	curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers);
> 	curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION,
>&logging::writer::ElasticSearch::HTTPReceive); // This gets called with
>the result.
> 	// HTTP 1.1 likes to use chunked encoded transfers, which aren't good
>for speed. 
> 	// The best (only?) way to disable that is to just use HTTP 1.0
> @@ -361,7 +362,7 @@ bool ElasticSearch::HTTPSend(CURL *handle)
> 			uint http_code = 0;
> 			curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
> 			if ( http_code != 200 )
> -				Error(fmt("Received a non-successful status code back from
>ElasticSearch server."));
> +				Error(Fmt("Received a non-successful status code back from
>ElasticSearch server."));
> 			
> 			return true;
> 			}
> diff --git a/src/logging/writers/ElasticSearch.h
>b/src/logging/writers/ElasticSearch.h
> index 71238a9..aa56dba 100644
> --- a/src/logging/writers/ElasticSearch.h
> +++ b/src/logging/writers/ElasticSearch.h
> @@ -59,6 +59,9 @@ private:
> 	int cluster_name_len;
> 	
> 	string es_server;
> +	string bulk_url;
> +	
> +	struct curl_slist *http_headers;
> 	
> 	string path;
> 	string index_prefix;
> 
> _______________________________________________
> bro-commits mailing list
> bro-commits at bro-ids.org
> http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-commits
> 

------

Adam J. Slagell, CISO, CISSP
Chief Information Security Officer
National Center for Supercomputing Applications
University of Illinois at Urbana-Champaign
www.slagell.info
217.244.8965

"Under the Illinois Freedom of Information Act (FOIA), any written
communication to or from University employees regarding University
business is a public record and may be subject to public disclosure."








_______________________________________________
bro-dev mailing list
bro-dev at bro-ids.org
http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-dev




More information about the bro-dev mailing list