diff --git a/examples/hello_world_server.cc b/examples/hello_world_server.cc index 351a46f..5b3939f 100644 --- a/examples/hello_world_server.cc +++ b/examples/hello_world_server.cc @@ -27,17 +27,20 @@ int main(int argc, const char* argv[]) { WEBCC_LOG_INIT("", webcc::LOG_CONSOLE); int workers = 1; + int loops = 1; int sleep_seconds = 0; if (argc > 1) { workers = std::stoi(argv[1]); if (argc > 2) { - sleep_seconds = std::stoi(argv[2]); + loops = std::stoi(argv[2]); + if (argc > 3) { + sleep_seconds = std::stoi(argv[3]); + } } } - LOG_USER("Workers: %d", workers); - LOG_USER("Sleep seconds: %d", sleep_seconds); + LOG_USER("Workers: %d, loops: %d, sleep: %ds", workers, loops, sleep_seconds); try { webcc::Server server{ boost::asio::ip::tcp::v4(), 8080 }; @@ -46,7 +49,7 @@ int main(int argc, const char* argv[]) { server.Route("/", view); server.Route("/hello", view); - server.Run(workers); + server.Run(workers, loops); } catch (const std::exception&) { return 1; diff --git a/webcc/connection.cc b/webcc/connection.cc index b9cba51..8cb1812 100644 --- a/webcc/connection.cc +++ b/webcc/connection.cc @@ -28,7 +28,7 @@ void Connection::Start() { } request_parser_.Init(request_.get(), view_matcher_); - DoRead(); + AsyncRead(); } void Connection::Close() { @@ -68,7 +68,7 @@ void Connection::SendResponse(ResponsePtr response, bool no_keep_alive) { response_->Prepare(); - DoWrite(); + AsyncWrite(); } void Connection::SendResponse(Status status, bool no_keep_alive) { @@ -82,7 +82,11 @@ void Connection::SendResponse(Status status, bool no_keep_alive) { SendResponse(response, no_keep_alive); } -void Connection::DoRead() { +void Connection::AsyncRead() { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("[%u] AsyncRead()", (unsigned int)this); +#endif + socket_.async_read_some(boost::asio::buffer(buffer_), std::bind(&Connection::OnRead, shared_from_this(), std::placeholders::_1, @@ -90,6 +94,10 @@ void Connection::DoRead() { } void Connection::OnRead(boost::system::error_code ec, std::size_t length) { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("[%u] OnRead()", (unsigned int)this); +#endif + if (ec) { if (ec == boost::asio::error::eof) { LOG_INFO("Socket read EOF (%s)", ec.message().c_str()); @@ -121,7 +129,7 @@ void Connection::OnRead(boost::system::error_code ec, std::size_t length) { if (!request_parser_.finished()) { // Continue to read the request. - DoRead(); + AsyncRead(); return; } @@ -132,7 +140,11 @@ void Connection::OnRead(boost::system::error_code ec, std::size_t length) { queue_->Push(shared_from_this()); } -void Connection::DoWrite() { +void Connection::AsyncWrite() { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("[%u] AsyncWrite()", (unsigned int)this); +#endif + LOG_VERB("Response:\n%s", response_->Dump().c_str()); // Firstly, write the headers. @@ -144,16 +156,20 @@ void Connection::DoWrite() { void Connection::OnWriteHeaders(boost::system::error_code ec, std::size_t length) { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("[%u] OnWriteHeaders()", (unsigned int)this); +#endif + if (ec) { - OnWriteError(ec); + HandleWriteError(ec); } else { // Write the body payload by payload. response_->body()->InitPayload(); - DoWriteBody(); + AsyncWriteBody(); } } -void Connection::DoWriteBody() { +void Connection::AsyncWriteBody() { auto payload = response_->body()->NextPayload(); if (!payload.empty()) { @@ -164,19 +180,23 @@ void Connection::DoWriteBody() { std::placeholders::_2)); } else { // No more body payload left, we're done. - OnWriteOK(); + HandleWriteOK(); } } void Connection::OnWriteBody(boost::system::error_code ec, std::size_t length) { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("[%u] OnWriteBody()", (unsigned int)this); +#endif + if (ec) { - OnWriteError(ec); + HandleWriteError(ec); } else { - DoWriteBody(); + AsyncWriteBody(); } } -void Connection::OnWriteOK() { +void Connection::HandleWriteOK() { LOG_INFO("Response has been sent back"); if (request_->IsConnectionKeepAlive()) { @@ -188,7 +208,7 @@ void Connection::OnWriteOK() { } } -void Connection::OnWriteError(boost::system::error_code ec) { +void Connection::HandleWriteError(boost::system::error_code ec) { LOG_ERRO("Socket write error (%s)", ec.message().c_str()); if (ec != boost::asio::error::operation_aborted) { diff --git a/webcc/connection.h b/webcc/connection.h index 7259922..a4b383f 100644 --- a/webcc/connection.h +++ b/webcc/connection.h @@ -13,6 +13,11 @@ #include "webcc/request_parser.h" #include "webcc/response.h" +// Set 1 to enable the log for the study of server thread model. +// Need to use multiple workers and loops for Server::Run(). +// Suggest to configure the log level to USER. +#define WEBCC_STUDY_SERVER_THREADING 0 + namespace webcc { class Connection; @@ -27,11 +32,11 @@ public: Queue* queue, ViewMatcher&& view_matcher, std::size_t buffer_size); - ~Connection() = default; - Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; + ~Connection() = default; + RequestPtr request() const { return request_; } @@ -53,16 +58,19 @@ public: void SendResponse(Status status, bool no_keep_alive = false); private: - void DoRead(); + void AsyncRead(); void OnRead(boost::system::error_code ec, std::size_t length); - void DoWrite(); + void AsyncWrite(); void OnWriteHeaders(boost::system::error_code ec, std::size_t length); - void DoWriteBody(); + + void AsyncWriteBody(); void OnWriteBody(boost::system::error_code ec, std::size_t length); - void OnWriteOK(); - void OnWriteError(boost::system::error_code ec); + void HandleWriteOK(); + void HandleWriteError(boost::system::error_code ec); + +private: // The socket for the connection. boost::asio::ip::tcp::socket socket_; diff --git a/webcc/server.cc b/webcc/server.cc index e00f1f3..5859c1f 100644 --- a/webcc/server.cc +++ b/webcc/server.cc @@ -10,10 +10,21 @@ #include "webcc/response.h" #include "webcc/utility.h" +using namespace std::placeholders; using tcp = boost::asio::ip::tcp; namespace webcc { +// NOTE: +// Using `asio::strand` is possible but not neccessary: +// Define a memeber variable: +// asio::strand strand_; +// Initialize the strand with io_context: +// strand_(asio::make_strand(io_context_)), +// Initialize the acceptor with strand: +// acceptor_(strand_) +// The same applies to the sockets. + Server::Server(boost::asio::ip::tcp protocol, std::uint16_t port, const fs::path& doc_root) : protocol_(protocol), @@ -27,6 +38,10 @@ Server::Server(boost::asio::ip::tcp protocol, std::uint16_t port, void Server::Run(std::size_t workers, std::size_t loops) { assert(workers > 0); +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("Run(workers:%u, loops:%u)", workers, loops); +#endif + { std::lock_guard lock{ state_mutex_ }; @@ -151,8 +166,16 @@ bool Server::Listen(std::uint16_t port) { } void Server::AsyncAccept() { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("AsyncAccept"); +#endif + acceptor_.async_accept( [this](boost::system::error_code ec, tcp::socket socket) { +#if WEBCC_STUDY_SERVER_THREADING + LOG_USER("Accept handler"); +#endif + // Check whether the server was stopped by a signal before this // completion handler had a chance to run. if (!acceptor_.is_open()) { @@ -162,8 +185,6 @@ void Server::AsyncAccept() { if (!ec) { LOG_INFO("Accepted a connection"); - using namespace std::placeholders; - auto view_matcher = std::bind(&Server::MatchViewOrStatic, this, _1, _2, _3);