[Bro-Dev] [Bro-Commits] [git/bro] topic/seth/elasticsearch-merge-tmp: ElasticSearch plugin fixes. (49b19f2)
Slagell, Adam J
slagell at illinois.edu
Fri Jun 29 09:35:03 PDT 2012
I'd like to learn more about Elastic Search sometime Seth. Maybe a blog entry or a talk at the Bro Exchange. :-)
On Jun 29, 2012, at 11:09 AM, Seth Hall wrote:
> Repository : ssh://git@bro-ids.icir.org/bro
>
> On branch : topic/seth/elasticsearch-merge-tmp
> Link : http://tracker.bro-ids.org/bro/changeset/49b19f244fc8284f75469f05d96c4ac16acd2cfe/bro
>
>> ---------------------------------------------------------------
>
> commit 49b19f244fc8284f75469f05d96c4ac16acd2cfe
> Author: Seth Hall <seth at icir.org>
> Date: Fri Jun 29 12:09:34 2012 -0400
>
> ElasticSearch plugin fixes.
>
> - Fixed memory leak.
>
> - Now using Fmt instead of the thread unsafe fmt.
>
>
>> ---------------------------------------------------------------
>
> 49b19f244fc8284f75469f05d96c4ac16acd2cfe
> src/logging/writers/ElasticSearch.cc | 33 +++++++++++++++++----------------
> src/logging/writers/ElasticSearch.h | 3 +++
> 2 files changed, 20 insertions(+), 16 deletions(-)
>
> diff --git a/src/logging/writers/ElasticSearch.cc b/src/logging/writers/ElasticSearch.cc
> index 0e388df..19cd296 100644
> --- a/src/logging/writers/ElasticSearch.cc
> +++ b/src/logging/writers/ElasticSearch.cc
> @@ -32,9 +32,11 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
>
> index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len());
>
> - es_server = string(fmt("http://%s:%d/", BifConst::LogElasticSearch::server_host->Bytes(),
> - (int) BifConst::LogElasticSearch::server_port));
> + es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(),
> + (int) BifConst::LogElasticSearch::server_port));
> + bulk_url = string(Fmt("%s/_bulk", es_server.c_str()));
>
> + http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
> buffer.Clear();
> counter = 0;
> current_index = string();
> @@ -56,13 +58,14 @@ bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threadi
>
> bool ElasticSearch::DoFlush()
> {
> - // Do something here?
> + BatchIndex();
> return true;
> }
>
> bool ElasticSearch::DoFinish()
> {
> BatchIndex();
> + curl_slist_free_all(http_headers);
> curl_easy_cleanup(curl_handle);
> return WriterBackend::DoFinish();
> }
> @@ -70,8 +73,7 @@ bool ElasticSearch::DoFinish()
> bool ElasticSearch::BatchIndex()
> {
> curl_easy_reset(curl_handle);
> - string url = es_server + "_bulk";
> - curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
> + curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str());
> curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
> curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
> curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
> @@ -264,7 +266,7 @@ bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase)
> current_index = index_prefix + "-" + buf;
> }
>
> - //printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str());
> + printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str());
> return true;
> }
>
> @@ -281,7 +283,7 @@ bool ElasticSearch::DoRotate(string rotated_path, const RotateInfo& info, bool t
>
> // Compress the previous index
> //curl_easy_reset(curl_handle);
> - //curl_easy_setopt(curl_handle, CURLOPT_URL, fmt("%s%s/_settings", es_server.c_str(), prev_index.c_str()));
> + //curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str()));
> //curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT");
> //curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}");
> //curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42);
> @@ -290,14 +292,14 @@ bool ElasticSearch::DoRotate(string rotated_path, const RotateInfo& info, bool t
> // Optimize the previous index.
> // TODO: make this into variables.
> //curl_easy_reset(curl_handle);
> - //curl_easy_setopt(curl_handle, CURLOPT_URL, fmt("%s%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str()));
> + //curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str()));
> //HTTPSend(curl_handle);
> }
>
> - if ( ! FinishedRotation(current_index, prev_index, info, terminating) )
> - {
> - Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
> - }
> + //if ( ! FinishedRotation(current_index, prev_index, info, terminating) )
> + // {
> + // Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
> + // }
>
> return true;
> }
> @@ -325,7 +327,7 @@ CURL* ElasticSearch::HTTPSetup()
> CURL* handle = curl_easy_init();
> if ( ! handle )
> {
> - Error(fmt("cURL did not initialize correctly."));
> + Error("cURL did not initialize correctly.");
> return 0;
> }
>
> @@ -340,8 +342,7 @@ bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
>
> bool ElasticSearch::HTTPSend(CURL *handle)
> {
> - struct curl_slist *headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
> - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
> + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers);
> curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
> // HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed.
> // The best (only?) way to disable that is to just use HTTP 1.0
> @@ -361,7 +362,7 @@ bool ElasticSearch::HTTPSend(CURL *handle)
> uint http_code = 0;
> curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
> if ( http_code != 200 )
> - Error(fmt("Received a non-successful status code back from ElasticSearch server."));
> + Error(Fmt("Received a non-successful status code back from ElasticSearch server."));
>
> return true;
> }
> diff --git a/src/logging/writers/ElasticSearch.h b/src/logging/writers/ElasticSearch.h
> index 71238a9..aa56dba 100644
> --- a/src/logging/writers/ElasticSearch.h
> +++ b/src/logging/writers/ElasticSearch.h
> @@ -59,6 +59,9 @@ private:
> int cluster_name_len;
>
> string es_server;
> + string bulk_url;
> +
> + struct curl_slist *http_headers;
>
> string path;
> string index_prefix;
>
> _______________________________________________
> bro-commits mailing list
> bro-commits at bro-ids.org
> http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-commits
>
------
Adam J. Slagell, CISO, CISSP
Chief Information Security Officer
National Center for Supercomputing Applications
University of Illinois at Urbana-Champaign
www.slagell.info
217.244.8965
"Under the Illinois Freedom of Information Act (FOIA), any written communication to or from University employees regarding University business is a public record and may be subject to public disclosure."
More information about the bro-dev
mailing list