diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2019-12-07 03:36:21 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2019-12-07 03:36:21 +0300 |
commit | 00863566ec4601c65c435b74e575d49546a1c707 (patch) | |
tree | 479a0a6e96aba8191c7a65ea9bee2f4d5e3a4aba /server/main | |
parent | add stress_test.py (diff) | |
download | math-server-00863566ec4601c65c435b74e575d49546a1c707.tar.gz math-server-00863566ec4601c65c435b74e575d49546a1c707.zip |
split server into multiple components
In a vague attempt to make header files more readable, split server/
into a number of components.
Also, refactor the unit tests to use the "Data-driven test cases" of
Boost.Test.
Diffstat (limited to 'server/main')
-rw-r--r-- | server/main/CMakeLists.txt | 13 | ||||
-rw-r--r-- | server/main/main.cpp | 34 | ||||
-rw-r--r-- | server/main/server.cpp | 112 | ||||
-rw-r--r-- | server/main/server.hpp | 34 | ||||
-rw-r--r-- | server/main/session.cpp | 114 | ||||
-rw-r--r-- | server/main/session.hpp | 42 | ||||
-rw-r--r-- | server/main/session_manager.cpp | 38 | ||||
-rw-r--r-- | server/main/session_manager.hpp | 30 | ||||
-rw-r--r-- | server/main/settings.hpp | 87 |
9 files changed, 504 insertions, 0 deletions
diff --git a/server/main/CMakeLists.txt b/server/main/CMakeLists.txt new file mode 100644 index 0000000..b322390 --- /dev/null +++ b/server/main/CMakeLists.txt @@ -0,0 +1,13 @@ +find_package(Boost REQUIRED COMPONENTS filesystem program_options) + +option(DEBUG_ASIO "enable debug output for Boost.Asio" OFF) + +add_executable(server main.cpp server.cpp session.cpp session_manager.cpp) +target_link_libraries(server PRIVATE common parser) + +target_include_directories(server SYSTEM PRIVATE ${Boost_INCLUDE_DIRS}) +target_link_libraries(server PRIVATE ${Boost_LIBRARIES}) + +if(DEBUG_ASIO) + target_compile_definitions(server PRIVATE BOOST_ASIO_ENABLE_HANDLER_TRACKING) +endif() diff --git a/server/main/main.cpp b/server/main/main.cpp new file mode 100644 index 0000000..2cf6d35 --- /dev/null +++ b/server/main/main.cpp @@ -0,0 +1,34 @@ +#include "server.hpp" +#include "settings.hpp" + +#include <boost/program_options.hpp> + +#include <exception> +#include <iostream> + +int main(int argc, char* argv[]) { + try { + math::server::SettingsParser parser{argv[0]}; + + try { + const auto settings = parser.parse(argc, argv); + if (settings.exit_with_usage()) { + parser.usage(); + return 0; + } + + math::server::Server server{settings}; + server.run(); + } catch (const boost::program_options::error& e) { + parser.usage_error(e); + return 1; + } + } catch (const std::exception& e) { + std::cerr << "An error occured: " << e.what() << "\n"; + return 1; + } catch (...) { + std::cerr << "An unknown error occured\n"; + return 1; + } + return 0; +} diff --git a/server/main/server.cpp b/server/main/server.cpp new file mode 100644 index 0000000..3800144 --- /dev/null +++ b/server/main/server.cpp @@ -0,0 +1,112 @@ +#include "server.hpp" +#include "session.hpp" +#include "session_manager.hpp" +#include "settings.hpp" + +#include "../common/error.hpp" +#include "../common/log.hpp" + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> +#include <boost/system/system_error.hpp> + +#include <cstddef> + +#include <exception> +#include <thread> +#include <vector> + +namespace math::server { +namespace { + +boost::asio::ip::tcp::endpoint make_endpoint(unsigned port) { + return {boost::asio::ip::tcp::v4(), port}; +} + +void configure_acceptor(boost::asio::ip::tcp::acceptor& acceptor, unsigned port) { + try { + const auto endpoint = make_endpoint(port); + acceptor.open(endpoint.protocol()); + acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + acceptor.bind(endpoint); + acceptor.listen(); + } catch (const boost::system::system_error& e) { + throw Error{e.what()}; + } +} + +} + +Server::Server(const Settings& settings) + : Server{settings.m_port, settings.m_threads} +{ } + +Server::Server(unsigned port, unsigned threads) + : m_numof_threads{threads} + , m_signals{m_io_context} + , m_acceptor{m_io_context} { + + wait_for_signal(); + configure_acceptor(m_acceptor, port); + + accept(); +} + +void Server::run() { + std::vector<std::thread> threads{m_numof_threads}; + for (std::size_t i = 0; i < m_numof_threads; ++i) { + threads[i] = std::thread{[this] () { m_io_context.run(); }}; + } + + for (std::size_t i = 0; i < m_numof_threads; ++i) { + threads[i].join(); + } +} + +void Server::wait_for_signal() { + try { + m_signals.add(SIGINT); + m_signals.add(SIGTERM); + + m_signals.async_wait([this] (const boost::system::error_code& ec, int signo) { + handle_signal(ec, signo); + }); + } catch (const boost::system::system_error& e) { + throw Error{e.what()}; + } +} + +void Server::handle_signal(const boost::system::error_code& ec, int signo) { + if (ec) { + log::error("%1%: %2%", __func__, ec.message()); + } + + log::log("Caught signal %1%", signo); + + try { + m_acceptor.close(); + m_session_mgr.stop_all(); + } catch (const std::exception& e) { + log::error(e.what()); + } +} + +void Server::accept() { + const auto session = m_session_mgr.make_session(m_io_context); + m_acceptor.async_accept(session->socket(), + [session, this] (const boost::system::error_code& ec) { + handle_accept(session, ec); + }); +} + +void Server::handle_accept(SessionPtr session, const boost::system::error_code& ec) { + if (ec) { + log::error("%1%: %2%", __func__, ec.message()); + return; + } + + m_session_mgr.start(session); + accept(); +} + +} diff --git a/server/main/server.hpp b/server/main/server.hpp new file mode 100644 index 0000000..5524f88 --- /dev/null +++ b/server/main/server.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "session_manager.hpp" +#include "settings.hpp" + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> + +namespace math::server { + +class Server { +public: + Server(const Settings& settings); + Server(unsigned port, unsigned threads); + + void run(); + +private: + void wait_for_signal(); + void handle_signal(const boost::system::error_code&, int); + + void accept(); + void handle_accept(SessionPtr session, const boost::system::error_code& ec); + + const unsigned m_numof_threads; + + boost::asio::io_context m_io_context; + boost::asio::signal_set m_signals; + boost::asio::ip::tcp::acceptor m_acceptor; + + SessionManager m_session_mgr; +}; + +} diff --git a/server/main/session.cpp b/server/main/session.cpp new file mode 100644 index 0000000..0ee7f75 --- /dev/null +++ b/server/main/session.cpp @@ -0,0 +1,114 @@ +#include "session.hpp" +#include "session_manager.hpp" + +#include "../common/error.hpp" +#include "../common/log.hpp" +#include "../parser/parser.hpp" + +#include <boost/asio.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/system/error_code.hpp> +#include <boost/system/system_error.hpp> + +#include <cstddef> + +#include <exception> +#include <string> +#include <utility> + +namespace math::server { +namespace { + +std::string reply_to_string(double result) { + return boost::lexical_cast<std::string>(result); +} + +std::string calc_reply(const std::string& input) { + std::string reply; + try { + reply = reply_to_string(Parser{input}.exec()); + } catch (const std::exception& e) { + reply = e.what(); + } + return reply; +} + +} + +Session::Session(SessionManager& mgr, boost::asio::io_context& io_context) + : m_session_mgr{mgr}, m_strand{io_context}, m_socket{io_context} +{ } + +boost::asio::ip::tcp::socket& Session::socket() { + return m_socket; +} + +void Session::start() { + read(); +} + +void Session::stop() { + close(); +} + +void Session::close() { + try { + m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both); + m_socket.close(); + } catch (const boost::system::system_error& e) { + throw Error{e.what()}; + } +} + +void Session::read() { + const auto self = shared_from_this(); + + // Stop at LF + boost::asio::async_read_until(m_socket, m_buffer, '\n', boost::asio::bind_executor(m_strand, + [this, self] (const boost::system::error_code& ec, std::size_t bytes) { + handle_read(ec, bytes); + })); +} + +void Session::handle_read(const boost::system::error_code& ec, std::size_t bytes) { + if (ec) { + log::error("%1%: %2%", __func__, ec.message()); + m_session_mgr.stop(shared_from_this()); + return; + } + + write(calc_reply(consume_input(bytes))); +} + +std::string Session::consume_input(std::size_t bytes) { + const auto data = boost::asio::buffer_cast<const char*>(m_buffer.data()); + const std::string input{data, bytes - 1}; + m_buffer.consume(bytes); + return input; +} + +void Session::write(std::string output) { + const auto self = shared_from_this(); + + // Include CR (so that Windows' telnet client works) + output += "\r\n"; + + boost::asio::const_buffer buffer{output.c_str(), output.length()}; + + boost::asio::async_write(m_socket, std::move(buffer), boost::asio::bind_executor(m_strand, + [this, self] (const boost::system::error_code& ec, std::size_t bytes) { + handle_write(ec, bytes); + })); +} + +void Session::handle_write(const boost::system::error_code& ec, std::size_t bytes) { + if (ec) { + log::error("%1%: %2%", __func__, ec.message()); + m_session_mgr.stop(shared_from_this()); + return; + } + + read(); +} + +} diff --git a/server/main/session.hpp b/server/main/session.hpp new file mode 100644 index 0000000..ace3755 --- /dev/null +++ b/server/main/session.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> + +#include <cstddef> + +#include <memory> +#include <string> + +namespace math::server { + +class SessionManager; + +class Session : public std::enable_shared_from_this<Session> { +public: + Session(SessionManager& mgr, boost::asio::io_context& io_context); + + boost::asio::ip::tcp::socket& socket(); + + void start(); + void stop(); + +private: + void close(); + + void read(); + void write(std::string); + + void handle_read(const boost::system::error_code&, std::size_t); + void handle_write(const boost::system::error_code&, std::size_t); + + std::string consume_input(std::size_t); + + SessionManager& m_session_mgr; + + boost::asio::io_context::strand m_strand; + boost::asio::ip::tcp::socket m_socket; + boost::asio::streambuf m_buffer; +}; + +} diff --git a/server/main/session_manager.cpp b/server/main/session_manager.cpp new file mode 100644 index 0000000..c2aef6d --- /dev/null +++ b/server/main/session_manager.cpp @@ -0,0 +1,38 @@ +#include "session.hpp" +#include "session_manager.hpp" + +#include "../common/log.hpp" + +#include <memory> +#include <mutex> + +namespace math::server { + +SessionPtr SessionManager::make_session(boost::asio::io_context& io_context) { + return std::make_shared<Session>(*this, io_context); +} + +void SessionManager::start(const SessionPtr& session) { + std::lock_guard<std::mutex> lck{m_mtx}; + m_sessions.emplace(session); + session->start(); +} + +void SessionManager::stop(const SessionPtr& session) { + std::lock_guard<std::mutex> lck{m_mtx}; + const auto removed = m_sessions.erase(session) > 0; + if (removed) { + session->stop(); + } +} + +void SessionManager::stop_all() { + std::lock_guard<std::mutex> lck{m_mtx}; + log::log("Closing the remaining %1% session(s)...", m_sessions.size()); + for (const auto& session : m_sessions) { + session->stop(); + } + m_sessions.clear(); +} + +} diff --git a/server/main/session_manager.hpp b/server/main/session_manager.hpp new file mode 100644 index 0000000..f0bec0b --- /dev/null +++ b/server/main/session_manager.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include <boost/asio.hpp> + +#include <mutex> +#include <memory> +#include <unordered_set> + +namespace math::server { + +class Session; +using SessionPtr = std::shared_ptr<Session>; + +class SessionManager { +public: + SessionManager() = default; + + SessionPtr make_session(boost::asio::io_context&); + + void start(const SessionPtr&); + void stop(const SessionPtr&); + + void stop_all(); + +private: + std::mutex m_mtx; + std::unordered_set<SessionPtr> m_sessions; +}; + +} diff --git a/server/main/settings.hpp b/server/main/settings.hpp new file mode 100644 index 0000000..310163f --- /dev/null +++ b/server/main/settings.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include <boost/filesystem.hpp> +#include <boost/program_options.hpp> + +#include <exception> +#include <iostream> +#include <string> +#include <thread> +#include <vector> + +namespace math::server { + +struct Settings { + static constexpr unsigned DEFAULT_PORT = 18000; + + static unsigned default_threads() { return std::thread::hardware_concurrency(); } + + unsigned m_port; + unsigned m_threads; + + bool exit_with_usage() const { return m_vm.count("help"); } + + boost::program_options::variables_map m_vm; +}; + +class SettingsParser { +public: + explicit SettingsParser(const std::string& argv0) + : m_prog_name{extract_filename(argv0)} + { + m_visible.add_options() + ("help,h", + "show this message and exit") + ("port,p", + boost::program_options::value(&m_settings.m_port)->default_value(Settings::DEFAULT_PORT), + "server port number") + ("threads,n", + boost::program_options::value(&m_settings.m_threads)->default_value(Settings::default_threads()), + "number of threads"); + } + + static const char* get_short_description() { + return "[-h|--help] [-p|--port] [-n|--threads]"; + } + + Settings parse(int argc, char* argv[]) { + boost::program_options::store( + boost::program_options::command_line_parser{argc, argv} + .options(m_visible) + .run(), + m_settings.m_vm); + if (m_settings.exit_with_usage()) { + return m_settings; + } + boost::program_options::notify(m_settings.m_vm); + return m_settings; + } + + void usage() const { + std::cout << *this; + } + + void usage_error(const std::exception& e) const { + std::cerr << "usage error: " << e.what() << '\n'; + std::cerr << *this; + } + +private: + static std::string extract_filename(const std::string& path) { + return boost::filesystem::path{path}.filename().string(); + } + + const std::string m_prog_name; + + boost::program_options::options_description m_visible; + + Settings m_settings; + + friend std::ostream& operator<<(std::ostream& os, const SettingsParser& parser) { + os << "usage: " << parser.m_prog_name << ' ' << get_short_description() << '\n'; + os << parser.m_visible; + return os; + } +}; + +} |