#include #include #include "caf/all.hpp" #include "caf/io/all.hpp" using std::cout; using std::endl; using std::chrono::seconds; using namespace caf; using namespace caf::io; using tick_atom = atom_constant; struct state { // accumulated bytes over the last second size_t num_bytes = 0; // remaining capacity when sending size_t capacity = 0; // dummy data for sending std::vector payload; }; behavior receiver(stateful_broker* self, size_t chunk_size) { return { [=](const new_data_msg& msg) { self->state.num_bytes += msg.buf.size(); }, [=](tick_atom) { self->delayed_send(self, seconds(1), tick_atom::value); aout(self) << self->state.num_bytes << "/s" << endl; self->state.num_bytes = 0; }, [=](const new_connection_msg& msg) { aout(self) << "client has connected" << endl; self->configure_read(msg.handle, receive_policy::exactly(chunk_size)); self->delayed_send(self, seconds(1), tick_atom::value); self->close(msg.source); } }; } behavior sender(stateful_broker* self, connection_handle hdl, size_t chunk_size, size_t max_pending) { self->state.payload.resize(chunk_size); for (size_t i = 0; i < max_pending / chunk_size; ++i) self->write(hdl, self->state.payload.size(), self->state.payload.data()); self->flush(hdl); self->ack_writes(hdl, true); self->delayed_send(self, seconds(1), tick_atom::value); return { [=](const data_transferred_msg& msg) { self->state.num_bytes += msg.written; self->state.capacity += msg.written; while (self->state.capacity >= chunk_size) { self->state.capacity -= chunk_size; self->write(hdl, self->state.payload.size(), self->state.payload.data()); } self->flush(hdl); }, [=](tick_atom) { self->delayed_send(self, seconds(1), tick_atom::value); aout(self) << self->state.num_bytes << "/s" << endl; self->state.num_bytes = 0; } }; } int main(int argc, char** argv) { uint16_t port = 4242; size_t chunk_size = 128; size_t max_pending = 0; actor_system_config cfg{argc, argv}; cfg.load(); actor_system system{cfg}; auto res = cfg.args_remainder.extract_opts({ {"port,p", "set port (default: 4242)", port}, {"sender,s", "run in sender mode"}, {"chunk-size,c", "set chunk size (default: 128)", chunk_size}, {"max-pending,m", "max pending buffer size", max_pending} }); if (! res.error.empty() || res.opts.count("help") > 0 ) return cout << res.error << endl << res.helptext << endl, 0; if (max_pending == 0) max_pending = chunk_size * 10; if (res.opts.count("sender") > 0) system.middleman().spawn_client(sender, "127.0.0.1", port, chunk_size, max_pending); else system.middleman().spawn_server(receiver, port, chunk_size); }