#include #include #include #include #include #include "caf/message_builder.hpp" using std::cout; using std::cerr; using std::endl; using std::vector; using std::chrono::seconds; void server(uint16_t port, size_t chunk_size) { auto sock = socket(AF_INET , SOCK_STREAM , 0); if (sock == -1) { cerr << "unable to open socket" << endl; return; } sockaddr_in srv_addr; srv_addr.sin_addr.s_addr = INADDR_ANY; srv_addr.sin_family = AF_INET; srv_addr.sin_port = htons(port); if (bind(sock, (sockaddr*) &srv_addr, sizeof(srv_addr)) < 0) { cerr << "bind failed" << endl; return; } listen(sock, 3); auto c = static_cast(sizeof(struct sockaddr_in)); sockaddr_in cl_addr; auto csock = accept(sock, (sockaddr*) &cl_addr, &c); if (csock < 0) { cerr << "accept failed" << endl; return; } auto now = [] { return std::chrono::high_resolution_clock::now(); }; auto t0 = now(); auto t1 = t0 + seconds(1); size_t read = 0; std::vector chunk; chunk.resize(chunk_size); while (recv(csock, chunk.data(), chunk.size(), 0) > 0) { read += chunk_size; auto tn = now(); if (tn >= t1) { t0 = tn; t1 = tn + seconds(1); cout << read << "/s" << endl; read = 0; } } } void sender(uint16_t port, size_t chunk_size, size_t max_pending) { cout << "run in sender mode, port = " << port << ", chunk size = " << chunk_size << endl; auto sock = socket(AF_INET , SOCK_STREAM , 0); if (sock == -1) { cerr << "unable to open socket" << endl; return; } sockaddr_in srv_addr; srv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); srv_addr.sin_family = AF_INET; srv_addr.sin_port = htons(port); if (connect(sock, (sockaddr*) &srv_addr, sizeof(srv_addr)) < 0) { cerr << "connection failed" << endl; return; } vector buf; buf.resize(max_pending); auto now = [] { return std::chrono::high_resolution_clock::now(); }; auto t0 = now(); auto t1 = t0 + seconds(1); size_t written = 0; //fcntl(sock, F_SETFL, O_NONBLOCK); int sres; for (;;) { if ((sres = send(sock, buf.data(), buf.size(), 0)) < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) continue; cerr << "send failed" << endl; return; } written += sres; auto tn = now(); if (tn >= t1) { t0 = tn; t1 = tn + seconds(1); cout << written << "/s" << endl; written = 0; } } } int main(int argc, char** argv) { uint16_t port = 4242; size_t chunk_size = 128; size_t max_pending = 0; auto res = caf::message_builder{argv + 1, argv + argc}.extract_opts({ {"port,p", "set port (default: 4242)", port}, {"sender,s", "run in sender mode"}, {"chunk-size,c", "set chunk size (default: 128)", chunk_size}, {"max-pending,m", "max pending buffer size", max_pending} }); if (! res.error.empty() || res.opts.count("help") > 0 ) return cout << res.error << endl << res.helptext << endl, 0; if (max_pending == 0) max_pending = chunk_size * 10; cout << "opts: " << caf::deep_to_string(res.opts) << endl; if (res.opts.count("sender") > 0) sender(port, chunk_size, max_pending); else server(port, chunk_size); }