[Bro-Dev] [Bro-Commits] [git/bro] topic/jsiwek/improve_comm_loop: Remove timeouts from remote communication loop. (675fba3)

Siwek, Jon jsiwek at illinois.edu
Thu Aug 28 11:45:30 PDT 2014


Feedback and/or help testing is welcome.

- Jon

On Aug 28, 2014, at 1:36 PM, Jonathan Siwek <jsiwek at ncsa.illinois.edu> wrote:

> Repository : ssh://git@bro-ids.icir.org/bro
> 
> On branch  : topic/jsiwek/improve_comm_loop
> Link       : https://github.com/bro/bro/commit/675fba3fdee0a391cfb6fc52d07b08caaca96c76
> 
>> ---------------------------------------------------------------
> 
> commit 675fba3fdee0a391cfb6fc52d07b08caaca96c76
> Author: Jon Siwek <jsiwek at illinois.edu>
> Date:   Thu Aug 28 13:13:30 2014 -0500
> 
>    Remove timeouts from remote communication loop.
> 
>    The select() now blocks until there's work to do instead of relying on a
>    small timeout value which can cause unproductive use of cpu cycles.
> 
> 
>> ---------------------------------------------------------------
> 
> 675fba3fdee0a391cfb6fc52d07b08caaca96c76
> src/CMakeLists.txt       |  2 ++
> src/ChunkedIO.cc         | 46 +++++++++++++++++++++++++++-
> src/ChunkedIO.h          | 16 +++++++++-
> src/DNS_Mgr.cc           |  5 +--
> src/DNS_Mgr.h            |  3 +-
> src/Flare.cc             | 29 ++++++++++++++++++
> src/Flare.h              | 45 +++++++++++++++++++++++++++
> src/FlowSrc.cc           |  5 +--
> src/FlowSrc.h            |  3 +-
> src/IOSource.cc          | 47 +++++++++++++++++++++-------
> src/IOSource.h           | 13 +++++---
> src/Pipe.cc              | 79 ++++++++++++++++++++++++++++++++++++++++++++++++
> src/Pipe.h               | 57 ++++++++++++++++++++++++++++++++++
> src/PktSrc.cc            |  5 +--
> src/PktSrc.h             |  3 +-
> src/RemoteSerializer.cc  | 47 +++++++++++++---------------
> src/RemoteSerializer.h   |  3 +-
> src/Serializer.cc        |  5 +--
> src/Serializer.h         |  3 +-
> src/threading/Manager.cc |  3 +-
> src/threading/Manager.h  |  3 +-
> 21 files changed, 364 insertions(+), 58 deletions(-)
> 
> diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
> index 04867b7..3764533 100644
> --- a/src/CMakeLists.txt
> +++ b/src/CMakeLists.txt
> @@ -279,6 +279,7 @@ set(bro_SRCS
>     EventRegistry.cc
>     Expr.cc
>     File.cc
> +    Flare.cc
>     FlowSrc.cc
>     Frag.cc
>     Frame.cc
> @@ -299,6 +300,7 @@ set(bro_SRCS
>     OSFinger.cc
>     PacketFilter.cc
>     PersistenceSerializer.cc
> +    Pipe.cc
>     PktSrc.cc
>     PolicyFile.cc
>     PrefixTable.cc
> diff --git a/src/ChunkedIO.cc b/src/ChunkedIO.cc
> index 54e2e59..a94eb98 100644
> --- a/src/ChunkedIO.cc
> +++ b/src/ChunkedIO.cc
> @@ -210,6 +210,7 @@ bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
> 	else
> 		pending_head = pending_tail = q;
> 
> +	write_flare.Fire();
> 	return Flush();
> 	}
> 
> @@ -232,6 +233,7 @@ bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
> 	write_len += len;
> 
> 	delete chunk;
> +	write_flare.Fire();
> 
> 	if ( network_time - last_flush > 0.005 )
> 		FlushWriteBuffer();
> @@ -269,6 +271,10 @@ bool ChunkedIOFd::FlushWriteBuffer()
> 		if ( unsigned(written) == len )
> 			{
> 			write_pos = write_len = 0;
> +
> +			if ( ! pending_head )
> +				write_flare.Extinguish();
> +
> 			return true;
> 			}
> 
> @@ -318,7 +324,12 @@ bool ChunkedIOFd::Flush()
> 			}
> 		}
> 
> -	return FlushWriteBuffer();
> +	bool rval = FlushWriteBuffer();
> +
> +	if ( ! pending_head && write_len == 0 )
> +		write_flare.Extinguish();
> +
> +	return rval;
> 	}
> 
> uint32 ChunkedIOFd::ChunkAvailable()
> @@ -394,6 +405,9 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
> #ifdef DEBUG_COMMUNICATION
> 		AddToBuffer("<false:read-chunk>", true);
> #endif
> +		if ( ! ChunkAvailable() )
> +			read_flare.Extinguish();
> +
> 		return false;
> 		}
> 
> @@ -402,9 +416,15 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
> #ifdef DEBUG_COMMUNICATION
> 		AddToBuffer("<null:no-data>", true);
> #endif
> +		read_flare.Extinguish();
> 		return true;
> 		}
> 
> +	if ( ChunkAvailable() )
> +		read_flare.Fire();
> +	else
> +		read_flare.Extinguish();
> +
> #ifdef DEBUG
> 	if ( *chunk )
> 		DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
> @@ -481,6 +501,9 @@ bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
> 	read_pos = 0;
> 	read_len = bytes_left;
> 
> +	if ( ! ChunkAvailable() )
> +		read_flare.Extinguish();
> +
> 	// If allowed, wait a bit for something to read.
> 	if ( may_block )
> 		{
> @@ -607,6 +630,14 @@ bool ChunkedIOFd::IsFillingUp()
> 	return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
> 	}
> 
> +std::vector<int> ChunkedIOFd::FdSupplements() const
> +	{
> +	std::vector<int> rval;
> +	rval.push_back(write_flare.FD());
> +	rval.push_back(read_flare.FD());
> +	return rval;
> +	}
> +
> void ChunkedIOFd::Clear()
> 	{
> 	while ( pending_head )
> @@ -618,6 +649,9 @@ void ChunkedIOFd::Clear()
> 		}
> 
> 	pending_head = pending_tail = 0;
> +
> +	if ( write_len == 0 )
> +		write_flare.Extinguish();
> 	}
> 
> const char* ChunkedIOFd::Error()
> @@ -830,6 +864,7 @@ bool ChunkedIOSSL::Write(Chunk* chunk)
> 	else
> 		write_head = write_tail = q;
> 
> +	write_flare.Fire();
> 	Flush();
> 	return true;
> 	}
> @@ -935,6 +970,7 @@ bool ChunkedIOSSL::Flush()
> 		write_state = LEN;
> 		}
> 
> +	write_flare.Extinguish();
> 	return true;
> 	}
> 
> @@ -1104,6 +1140,13 @@ bool ChunkedIOSSL::IsFillingUp()
> 	return false;
> 	}
> 
> +std::vector<int> ChunkedIOSSL::FdSupplements() const
> +	{
> +	std::vector<int> rval;
> +	rval.push_back(write_flare.FD());
> +	return rval;
> +	}
> +
> void ChunkedIOSSL::Clear()
> 	{
> 	while ( write_head )
> @@ -1114,6 +1157,7 @@ void ChunkedIOSSL::Clear()
> 		write_head = next;
> 		}
> 	write_head = write_tail = 0;
> +	write_flare.Extinguish();
> 	}
> 
> const char* ChunkedIOSSL::Error()
> diff --git a/src/ChunkedIO.h b/src/ChunkedIO.h
> index a9865e4..c640e52 100644
> --- a/src/ChunkedIO.h
> +++ b/src/ChunkedIO.h
> @@ -6,8 +6,9 @@
> #include "config.h"
> #include "List.h"
> #include "util.h"
> -
> +#include "Flare.h"
> #include <list>
> +#include <vector>
> 
> #ifdef NEED_KRB5_H
> # include <krb5.h>
> @@ -95,6 +96,11 @@ public:
> 	// Returns underlying fd if available, -1 otherwise.
> 	virtual int Fd()	{ return -1; }
> 
> +	// Returns supplementary file descriptors that become read-ready in order
> +	// to signal that there is some work that can be performed.
> +	virtual std::vector<int> FdSupplements() const
> +		{ return std::vector<int>(); }
> +
> 	// Makes sure that no additional protocol data is written into
> 	// the output stream.  If this is activated, the output cannot
> 	// be read again by any of these classes!
> @@ -177,6 +183,7 @@ public:
> 	virtual void Clear();
> 	virtual bool Eof()	{ return eof; }
> 	virtual int Fd()	{ return fd; }
> +	virtual std::vector<int> FdSupplements() const;
> 	virtual void Stats(char* buffer, int length);
> 
> private:
> @@ -240,6 +247,8 @@ private:
> 	ChunkQueue* pending_tail;
> 
> 	pid_t pid;
> +	bro::Flare write_flare;
> +	bro::Flare read_flare;
> };
> 
> // Chunked I/O using an SSL connection.
> @@ -262,6 +271,7 @@ public:
> 	virtual void Clear();
> 	virtual bool Eof()	{ return eof; }
> 	virtual int Fd()	{ return socket; }
> +	virtual std::vector<int> FdSupplements() const;
> 	virtual void Stats(char* buffer, int length);
> 
> private:
> @@ -303,6 +313,8 @@ private:
> 
> 	// One SSL for all connections.
> 	static SSL_CTX* ctx;
> +
> +	bro::Flare write_flare;
> };
> 
> #include <zlib.h>
> @@ -328,6 +340,8 @@ public:
> 
> 	virtual bool Eof()	{ return io->Eof(); }
> 	virtual int Fd()	{ return io->Fd(); }
> +	virtual std::vector<int> FdSupplements() const
> +		{ return io->FdSupplements(); }
> 	virtual void Stats(char* buffer, int length);
> 
> 	void EnableCompression(int level)
> diff --git a/src/DNS_Mgr.cc b/src/DNS_Mgr.cc
> index 9188d61..9fb5c8b 100644
> --- a/src/DNS_Mgr.cc
> +++ b/src/DNS_Mgr.cc
> @@ -1217,9 +1217,10 @@ void DNS_Mgr::IssueAsyncRequests()
> 		}
> 	}
> 
> -void  DNS_Mgr::GetFds(int* read, int* write, int* except)
> +void  DNS_Mgr::GetFds(std::vector<int>* read, std::vector<int>* write,
> +                      std::vector<int>* except)
> 	{
> -	*read = nb_dns_fd(nb_dns);
> +	read->push_back(nb_dns_fd(nb_dns));
> 	}
> 
> double DNS_Mgr::NextTimestamp(double* network_time)
> diff --git a/src/DNS_Mgr.h b/src/DNS_Mgr.h
> index 7864505..fa19914 100644
> --- a/src/DNS_Mgr.h
> +++ b/src/DNS_Mgr.h
> @@ -132,7 +132,8 @@ protected:
> 	void DoProcess(bool flush);
> 
> 	// IOSource interface.
> -	virtual void GetFds(int* read, int* write, int* except);
> +	virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	                    std::vector<int>* except);
> 	virtual double NextTimestamp(double* network_time);
> 	virtual void Process();
> 	virtual const char* Tag()	{ return "DNS_Mgr"; }
> diff --git a/src/Flare.cc b/src/Flare.cc
> new file mode 100644
> index 0000000..8a0418f
> --- /dev/null
> +++ b/src/Flare.cc
> @@ -0,0 +1,29 @@
> +// See the file "COPYING" in the main distribution directory for copyright.
> +
> +#include "Flare.h"
> +#include "util.h"
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <errno.h>
> +
> +using namespace bro;
> +
> +Flare::Flare()
> +	: pipe(FD_CLOEXEC, FD_CLOEXEC, O_NONBLOCK, O_NONBLOCK)
> +	{
> +	}
> +
> +void Flare::Fire()
> +	{
> +	char tmp;
> +	safe_write(pipe.WriteFD(), &tmp, 1);
> +	}
> +
> +void Flare::Extinguish()
> +	{
> +	char tmp[256];
> +
> +	for ( ; ; )
> +		if ( read(pipe.ReadFD(), &tmp, sizeof(tmp)) == -1 && errno == EAGAIN )
> +			break;
> +	}
> diff --git a/src/Flare.h b/src/Flare.h
> new file mode 100644
> index 0000000..4e63788
> --- /dev/null
> +++ b/src/Flare.h
> @@ -0,0 +1,45 @@
> +// See the file "COPYING" in the main distribution directory for copyright.
> +
> +#ifndef BRO_FLARE_H
> +#define BRO_FLARE_H
> +
> +#include "Pipe.h"
> +
> +namespace bro {
> +
> +class Flare {
> +public:
> +
> +	/**
> +	 * Create a flare object that can be used to signal a "ready" status via
> +	 * a file descriptor that may be integrated with select(), poll(), etc.
> +	 * Not thread-safe, but that should only require Fire()/Extinguish() calls
> +	 * to be made mutually exclusive (across all copies of a Flare).
> +	 */
> +	Flare();
> +
> +	/**
> +	 * @return a file descriptor that will become ready if the flare has been
> +	 *         Fire()'d and not yet Extinguished()'d.
> +	 */
> +	int FD() const
> +		{ return pipe.ReadFD(); }
> +
> +	/**
> +	 * Put the object in the "ready" state.
> +	 */
> +	void Fire();
> +
> +	/**
> +	 * Take the object out of the "ready" state.
> +	 */
> +	void Extinguish();
> +
> +private:
> +
> +	Pipe pipe;
> +};
> +
> +} // namespace bro
> +
> +#endif // BRO_FLARE_H
> diff --git a/src/FlowSrc.cc b/src/FlowSrc.cc
> index 8eed94f..4999d9c 100644
> --- a/src/FlowSrc.cc
> +++ b/src/FlowSrc.cc
> @@ -28,10 +28,11 @@ FlowSrc::~FlowSrc()
> 	delete netflow_analyzer;
> 	}
> 
> -void FlowSrc::GetFds(int* read, int* write, int* except)
> +void FlowSrc::GetFds(std::vector<int>* read, std::vector<int>* write,
> +                     std::vector<int>* except)
> 	{
> 	if ( selectable_fd >= 0 )
> -		*read = selectable_fd;
> +		read->push_back(selectable_fd);
> 	}
> 
> double FlowSrc::NextTimestamp(double* network_time)
> diff --git a/src/FlowSrc.h b/src/FlowSrc.h
> index 03dda27..ee92760 100644
> --- a/src/FlowSrc.h
> +++ b/src/FlowSrc.h
> @@ -34,7 +34,8 @@ public:
> 
> 	// IOSource interface:
> 	bool IsReady();
> -	void GetFds(int* read, int* write, int* except);
> +	void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	            std::vector<int>* except);
> 	double NextTimestamp(double* network_time);
> 	void Process();
> 
> diff --git a/src/IOSource.cc b/src/IOSource.cc
> index d47007c..540b797 100644
> --- a/src/IOSource.cc
> +++ b/src/IOSource.cc
> @@ -24,6 +24,15 @@ void IOSourceRegistry::RemoveAll()
> 	dont_counts = sources.size();
> 	}
> 
> +static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max)
> +	{
> +	for ( size_t i = 0; i < fds.size(); ++i )
> +		{
> +		FD_SET(fds[i], set);
> +		*max = ::max(fds[i], *max);
> +		}
> +	}
> +
> IOSource* IOSourceRegistry::FindSoonest(double* ts)
> 	{
> 	// Remove sources which have gone dry. For simplicity, we only
> @@ -94,16 +103,14 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts)
> 			// be ready.
> 			continue;
> 
> -		src->fd_read = src->fd_write = src->fd_except = 0;
> +		src->fd_read.clear();
> +		src->fd_write.clear();
> +		src->fd_except.clear();
> 		src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
> 
> -		FD_SET(src->fd_read, &fd_read);
> -		FD_SET(src->fd_write, &fd_write);
> -		FD_SET(src->fd_except, &fd_except);
> -
> -		maxx = max(src->fd_read, maxx);
> -		maxx = max(src->fd_write, maxx);
> -		maxx = max(src->fd_except, maxx);
> +		fd_vector_set(src->fd_read, &fd_read, &maxx);
> +		fd_vector_set(src->fd_write, &fd_write, &maxx);
> +		fd_vector_set(src->fd_except, &fd_except, &maxx);
> 		}
> 
> 	// We can't block indefinitely even when all sources are dry:
> @@ -143,9 +150,7 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts)
> 			if ( ! src->src->IsIdle() )
> 				continue;
> 
> -			if ( FD_ISSET(src->fd_read, &fd_read) ||
> -			     FD_ISSET(src->fd_write, &fd_write) ||
> -			     FD_ISSET(src->fd_except, &fd_except) )
> +			if ( src->Ready(&fd_read, &fd_write, &fd_except) )
> 				{
> 				double local_network_time = 0;
> 				double ts = src->src->NextTimestamp(&local_network_time);
> @@ -174,3 +179,23 @@ void IOSourceRegistry::Register(IOSource* src, bool dont_count)
> 		++dont_counts;
> 	return sources.push_back(s);
> 	}
> +
> +static bool fd_vector_ready(const std::vector<int>& fds, fd_set* set)
> +	{
> +	for ( size_t i = 0; i < fds.size(); ++i )
> +		if ( FD_ISSET(fds[i], set) )
> +			return true;
> +
> +	return false;
> +	}
> +
> +bool IOSourceRegistry::Source::Ready(fd_set* read, fd_set* write,
> +                                     fd_set* except) const
> +	{
> +	if ( fd_vector_ready(fd_read, read) ||
> +	     fd_vector_ready(fd_write, write) ||
> +	     fd_vector_ready(fd_except, except) )
> +		return true;
> +
> +	return false;
> +	}
> diff --git a/src/IOSource.h b/src/IOSource.h
> index db50bbd..3da70af 100644
> --- a/src/IOSource.h
> +++ b/src/IOSource.h
> @@ -4,6 +4,8 @@
> #define iosource_h
> 
> #include <list>
> +#include <vector>
> +#include <sys/select.h>
> #include "Timer.h"
> 
> using namespace std;
> @@ -22,7 +24,8 @@ public:
> 
> 	// Returns select'able fds (leaves args untouched if we don't have
> 	// selectable fds).
> -	virtual void GetFds(int* read, int* write, int* except) = 0;
> +	virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	                    std::vector<int>* except) = 0;
> 
> 	// The following two methods are only called when either IsIdle()
> 	// returns false or select() on one of the fds indicates that there's
> @@ -89,9 +92,11 @@ protected:
> 
> 	struct Source {
> 		IOSource* src;
> -		int fd_read;
> -		int fd_write;
> -		int fd_except;
> +		std::vector<int> fd_read;
> +		std::vector<int> fd_write;
> +		std::vector<int> fd_except;
> +
> +		bool Ready(fd_set* read, fd_set* write, fd_set* except) const;
> 	};
> 
> 	typedef list<Source*> SourceList;
> diff --git a/src/Pipe.cc b/src/Pipe.cc
> new file mode 100644
> index 0000000..51298d0
> --- /dev/null
> +++ b/src/Pipe.cc
> @@ -0,0 +1,79 @@
> +// See the file "COPYING" in the main distribution directory for copyright.
> +
> +#include "Pipe.h"
> +#include "Reporter.h"
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <errno.h>
> +#include <cstdio>
> +
> +using namespace bro;
> +
> +static void pipe_fail(int eno)
> +	{
> +	char tmp[256];
> +	strerror_r(eno, tmp, sizeof(tmp));
> +	reporter->FatalError("Pipe failure: %s", tmp);
> +	}
> +
> +static void set_flags(int fd, int flags)
> +	{
> +	if ( flags )
> +		fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags);
> +	}
> +
> +static void set_status_flags(int fd, int flags)
> +	{
> +	if ( flags )
> +		fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
> +	}
> +
> +static int dup_or_fail(int fd, int flags)
> +	{
> +	int rval = dup(fd);
> +
> +	if ( rval < 0 )
> +		pipe_fail(errno);
> +
> +	set_flags(fd, flags);
> +	return rval;
> +	}
> +
> +Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1)
> +	{
> +	// pipe2 can set flags atomically, but not yet available everywhere.
> +	if ( ::pipe(fds) )
> +		pipe_fail(errno);
> +
> +	flags[0] = flags0;
> +	flags[1] = flags1;
> +
> +	set_flags(fds[0], flags[0]);
> +	set_flags(fds[1], flags[1]);
> +	set_status_flags(fds[0], status_flags0);
> +	set_status_flags(fds[1], status_flags1);
> +	}
> +
> +Pipe::~Pipe()
> +	{
> +	close(fds[0]);
> +	close(fds[1]);
> +	}
> +
> +Pipe::Pipe(const Pipe& other)
> +	{
> +	fds[0] = dup_or_fail(other.fds[0], other.flags[0]);
> +	fds[1] = dup_or_fail(other.fds[1], other.flags[1]);
> +	}
> +
> +Pipe& Pipe::operator=(const Pipe& other)
> +	{
> +	if ( this == &other )
> +		return *this;
> +
> +	close(fds[0]);
> +	close(fds[1]);
> +	fds[0] = dup_or_fail(other.fds[0], other.flags[0]);
> +	fds[1] = dup_or_fail(other.fds[1], other.flags[1]);
> +	return *this;
> +	}
> diff --git a/src/Pipe.h b/src/Pipe.h
> new file mode 100644
> index 0000000..493169e
> --- /dev/null
> +++ b/src/Pipe.h
> @@ -0,0 +1,57 @@
> +// See the file "COPYING" in the main distribution directory for copyright.
> +
> +#ifndef BRO_PIPE_H
> +#define BRO_PIPE_H
> +
> +namespace bro {
> +
> +class Pipe {
> +public:
> +
> +	/**
> +	 * Create a pair of file descriptors via pipe(), or aborts if it cannot.
> +	 * @param flags0 file descriptor flags to set on read end of pipe.
> +	 * @param flags1 file descriptor flags to set on write end of pipe.
> +	 * @param status_flags0 descriptor status flags to set on read end of pipe.
> +	 * @param status_flags1 descriptor status flags to set on write end of pipe.
> +	 */
> +	Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0,
> +	     int status_flags1 = 0);
> +
> +	/**
> +	  * Close the pair of file descriptors owned by the object.
> +	  */
> +	~Pipe();
> +
> +	/**
> +	 * Make a copy of another Pipe object (file descriptors are dup'd).
> +	 */
> +	Pipe(const Pipe& other);
> +
> +	/**
> +	 * Assign a Pipe object by closing file descriptors and duping those of
> +	 * the other.
> +	 */
> +	Pipe& operator=(const Pipe& other);
> +
> +	/**
> +	 * @return the file descriptor associated with the read-end of the pipe.
> +	 */
> +	int ReadFD() const
> +		{ return fds[0]; }
> +
> +	/**
> +	 * @return the file descriptor associated with the write-end of the pipe.
> +	 */
> +	int WriteFD() const
> +		{ return fds[1]; }
> +
> +private:
> +
> +	int fds[2];
> +	int flags[2];
> +};
> +
> +} // namespace bro
> +
> +#endif // BRO_PIPE_H
> diff --git a/src/PktSrc.cc b/src/PktSrc.cc
> index b5ac3a5..04b7b7d 100644
> --- a/src/PktSrc.cc
> +++ b/src/PktSrc.cc
> @@ -51,7 +51,8 @@ PktSrc::~PktSrc()
> 	delete [] readfile;
> 	}
> 
> -void PktSrc::GetFds(int* read, int* write, int* except)
> +void PktSrc::GetFds(std::vector<int>* read, std::vector<int>* write,
> +                    std::vector<int>* except)
> 	{
> 	if ( pseudo_realtime )
> 		{
> @@ -62,7 +63,7 @@ void PktSrc::GetFds(int* read, int* write, int* except)
> 		}
> 
> 	if ( selectable_fd >= 0 )
> -		*read = selectable_fd;
> +		read->push_back(selectable_fd);
> 	}
> 
> int PktSrc::ExtractNextPacket()
> diff --git a/src/PktSrc.h b/src/PktSrc.h
> index 70eef4d..0d4be12 100644
> --- a/src/PktSrc.h
> +++ b/src/PktSrc.h
> @@ -98,7 +98,8 @@ public:
> 
> 	// IOSource interface
> 	bool IsReady();
> -	void GetFds(int* read, int* write, int* except);
> +	void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	            std::vector<int>* except);
> 	double NextTimestamp(double* local_network_time);
> 	void Process();
> 	const char* Tag()	{ return "PktSrc"; }
> diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc
> index 3e46c5a..34c5f1a 100644
> --- a/src/RemoteSerializer.cc
> +++ b/src/RemoteSerializer.cc
> @@ -1368,12 +1368,17 @@ void RemoteSerializer::Unregister(ID* id)
> 			}
> 	}
> 
> -void RemoteSerializer::GetFds(int* read, int* write, int* except)
> +void RemoteSerializer::GetFds(std::vector<int>* read, std::vector<int>* write,
> +                              std::vector<int>* except)
> 	{
> -	*read = io->Fd();
> +	read->push_back(io->Fd());
> +	std::vector<int> supp = io->FdSupplements();
> +
> +	for ( size_t i = 0; i < supp.size(); ++i )
> +		read->push_back(supp[i]);
> 
> 	if ( io->CanWrite() )
> -		*write = io->Fd();
> +		write->push_back(io->Fd());
> 	}
> 
> double RemoteSerializer::NextTimestamp(double* local_network_time)
> @@ -3356,6 +3361,15 @@ SocketComm::~SocketComm()
> 
> static unsigned int first_rtime = 0;
> 
> +static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max)
> +	{
> +	for ( size_t i = 0; i < fds.size(); ++i )
> +		{
> +		FD_SET(fds[i], set);
> +		*max = ::max(fds[i], *max);
> +		}
> +	}
> +
> void SocketComm::Run()
> 	{
> 	first_rtime = (unsigned int) current_time(true);
> @@ -3381,6 +3395,7 @@ void SocketComm::Run()
> 
> 		FD_SET(io->Fd(), &fd_read);
> 		max_fd = io->Fd();
> +		fd_vector_set(io->FdSupplements(), &fd_read, &max_fd);
> 
> 		loop_over_list(peers, i)
> 			{
> @@ -3389,6 +3404,7 @@ void SocketComm::Run()
> 				FD_SET(peers[i]->io->Fd(), &fd_read);
> 				if ( peers[i]->io->Fd() > max_fd )
> 					max_fd = peers[i]->io->Fd();
> +				fd_vector_set(peers[i]->io->FdSupplements(), &fd_read, &max_fd);
> 				}
> 			else
> 				{
> @@ -3439,38 +3455,17 @@ void SocketComm::Run()
> 		if ( ! io->IsFillingUp() && shutting_conns_down )
> 			shutting_conns_down = false;
> 
> -		// We cannot rely solely on select() as the there may
> -		// be some data left in our input/output queues. So, we use
> -		// a small timeout for select and check for data
> -		// manually afterwards.
> -
> 		static long selects = 0;
> 		static long canwrites = 0;
> -		static long timeouts = 0;
> 
> 		++selects;
> 		if ( io->CanWrite() )
> 			++canwrites;
> 
> -		// FIXME: Fine-tune this (timeouts, flush, etc.)
> -		struct timeval small_timeout;
> -		small_timeout.tv_sec = 0;
> -		small_timeout.tv_usec =
> -			io->CanWrite() || io->CanRead() ? 1 : 10;
> -
> -#if 0
> -		if ( ! io->CanWrite() )
> -			usleep(10);
> -#endif
> -
> -		int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except,
> -				&small_timeout);
> -
> -		if ( a == 0 )
> -			++timeouts;
> +		int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0);
> 
> 		if ( selects % 100000 == 0 )
> -			Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", selects, canwrites, timeouts));
> +			Log(fmt("selects=%ld canwrites=%ld", selects, canwrites));
> 
> 		if ( a < 0 )
> 			// Ignore errors for now.
> diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h
> index 9dbfbd9..3aa4f91 100644
> --- a/src/RemoteSerializer.h
> +++ b/src/RemoteSerializer.h
> @@ -140,7 +140,8 @@ public:
> 	void Finish();
> 
> 	// Overidden from IOSource:
> -	virtual void GetFds(int* read, int* write, int* except);
> +	virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	                    std::vector<int>* except);
> 	virtual double NextTimestamp(double* local_network_time);
> 	virtual void Process();
> 	virtual TimerMgr::Tag* GetCurrentTag();
> diff --git a/src/Serializer.cc b/src/Serializer.cc
> index 36b1c74..0ea79cf 100644
> --- a/src/Serializer.cc
> +++ b/src/Serializer.cc
> @@ -1067,9 +1067,10 @@ void EventPlayer::GotFunctionCall(const char* name, double time,
> 	// We don't replay function calls.
> 	}
> 
> -void EventPlayer::GetFds(int* read, int* write, int* except)
> +void EventPlayer::GetFds(std::vector<int>* read, std::vector<int>* write,
> +                         std::vector<int>* except)
> 	{
> -	*read = fd;
> +	read->push_back(fd);
> 	}
> 
> double EventPlayer::NextTimestamp(double* local_network_time)
> diff --git a/src/Serializer.h b/src/Serializer.h
> index 543797a..0524906 100644
> --- a/src/Serializer.h
> +++ b/src/Serializer.h
> @@ -355,7 +355,8 @@ public:
> 	EventPlayer(const char* file);
> 	virtual ~EventPlayer();
> 
> -	virtual void GetFds(int* read, int* write, int* except);
> +	virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	                    std::vector<int>* except);
> 	virtual double NextTimestamp(double* local_network_time);
> 	virtual void Process();
> 	virtual const char* Tag()	{ return "EventPlayer"; }
> diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc
> index 4491cd4..c16b9f4 100644
> --- a/src/threading/Manager.cc
> +++ b/src/threading/Manager.cc
> @@ -65,7 +65,8 @@ void Manager::AddMsgThread(MsgThread* thread)
> 	msg_threads.push_back(thread);
> 	}
> 
> -void Manager::GetFds(int* read, int* write, int* except)
> +void Manager::GetFds(std::vector<int>* read, std::vector<int>* write,
> +                     std::vector<int>* except)
> 	{
> 	}
> 
> diff --git a/src/threading/Manager.h b/src/threading/Manager.h
> index e839749..4f0e539 100644
> --- a/src/threading/Manager.h
> +++ b/src/threading/Manager.h
> @@ -103,7 +103,8 @@ protected:
> 	/**
> 	 * Part of the IOSource interface.
> 	 */
> -	virtual void GetFds(int* read, int* write, int* except);
> +	virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
> +	                    std::vector<int>* except);
> 
> 	/**
> 	 * Part of the IOSource interface.
> 
> _______________________________________________
> bro-commits mailing list
> bro-commits at bro.org
> http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-commits
> 




More information about the bro-dev mailing list