diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 739fb16..408c074 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -61,3 +61,6 @@ target_link_libraries(file_upload_server ${EXAMPLE_LIBS}) add_executable(static_server static_server.cc) target_link_libraries(static_server ${EXAMPLE_LIBS}) + +add_executable(server_states server_states.cc) +target_link_libraries(server_states ${EXAMPLE_LIBS}) diff --git a/examples/file_upload_server.cc b/examples/file_upload_server.cc index 3e59327..160b297 100644 --- a/examples/file_upload_server.cc +++ b/examples/file_upload_server.cc @@ -53,7 +53,7 @@ int main(int argc, char* argv[]) { server.Route("/upload", std::make_shared(), { "POST" }); - server.Start(); + server.Run(); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; diff --git a/examples/hello_world_server.cc b/examples/hello_world_server.cc index 8db0990..16b29af 100644 --- a/examples/hello_world_server.cc +++ b/examples/hello_world_server.cc @@ -21,7 +21,7 @@ int main() { server.Route("/", std::make_shared()); - server.Start(); + server.Run(); } catch (const std::exception&) { return 1; diff --git a/examples/rest_book_client.cc b/examples/rest_book_client.cc index 44ce711..2083fc5 100644 --- a/examples/rest_book_client.cc +++ b/examples/rest_book_client.cc @@ -19,152 +19,154 @@ // ----------------------------------------------------------------------------- -class BookClientBase { +class BookClient { public: - BookClientBase(webcc::ClientSession& session, const std::string& url) - : session_(session), url_(url) { - } + explicit BookClient(const std::string& url, int timeout = 0); - virtual ~BookClientBase() = default; + ~BookClient() = default; -protected: - // Check HTTP response status. - bool CheckStatus(webcc::ResponsePtr response, int expected_status) { - int status = response->status(); - if (status != expected_status) { - LOG_ERRO("HTTP status error (actual: %d, expected: %d).", - status, expected_status); - return false; - } - return true; - } + bool ListBooks(std::list* books); -protected: - std::string url_; + bool CreateBook(const std::string& title, double price, std::string* id); - webcc::ClientSession& session_; -}; + bool GetBook(const std::string& id, Book* book); -// ----------------------------------------------------------------------------- + bool UpdateBook(const std::string& id, const std::string& title, + double price); -class BookListClient : public BookClientBase { -public: - BookListClient(webcc::ClientSession& session, const std::string& url) - : BookClientBase(session, url) { - } + bool DeleteBook(const std::string& id); - bool ListBooks(std::list* books) { - try { - auto r = session_.Get(url_ + "/books"); - - if (!CheckStatus(r, webcc::Status::kOK)) { - // Response HTTP status error. - return false; - } +private: + // Check HTTP response status. + bool CheckStatus(webcc::ResponsePtr response, int expected_status); - Json::Value rsp_json = StringToJson(r->data()); +private: + std::string url_; + webcc::ClientSession session_; +}; - if (!rsp_json.isArray()) { - return false; // Should be a JSON array of books. - } +// ----------------------------------------------------------------------------- - for (Json::ArrayIndex i = 0; i < rsp_json.size(); ++i) { - books->push_back(JsonToBook(rsp_json[i])); - } +BookClient::BookClient(const std::string& url, int timeout) + : url_(url), session_(timeout) { + // If the request has body, default to this content type. + // Optional. + session_.set_media_type("application/json"); + session_.set_charset("utf-8"); +} - return true; +bool BookClient::ListBooks(std::list* books) { + try { + auto r = session_.Get(url_ + "/books"); - } catch (const webcc::Error& error) { - std::cerr << error << std::endl; + if (!CheckStatus(r, webcc::Status::kOK)) { + // Response HTTP status error. return false; } - } - bool CreateBook(const std::string& title, double price, std::string* id) { - Json::Value req_json; - req_json["title"] = title; - req_json["price"] = price; + Json::Value rsp_json = StringToJson(r->data()); + + if (!rsp_json.isArray()) { + return false; // Should be a JSON array of books. + } - try { - auto r = session_.Post(url_ + "/books", JsonToString(req_json), true); + for (Json::ArrayIndex i = 0; i < rsp_json.size(); ++i) { + books->push_back(JsonToBook(rsp_json[i])); + } - if (!CheckStatus(r, webcc::Status::kCreated)) { - return false; - } + return true; - Json::Value rsp_json = StringToJson(r->data()); - *id = rsp_json["id"].asString(); + } catch (const webcc::Error& error) { + std::cerr << error << std::endl; + return false; + } +} + +bool BookClient::CreateBook(const std::string& title, double price, + std::string* id) { + Json::Value req_json; + req_json["title"] = title; + req_json["price"] = price; - return !id->empty(); + try { + auto r = session_.Post(url_ + "/books", JsonToString(req_json), true); - } catch (const webcc::Error& error) { - std::cerr << error << std::endl; + if (!CheckStatus(r, webcc::Status::kCreated)) { return false; } - } -}; -// ----------------------------------------------------------------------------- + Json::Value rsp_json = StringToJson(r->data()); + *id = rsp_json["id"].asString(); -class BookDetailClient : public BookClientBase { -public: - BookDetailClient(webcc::ClientSession& session, const std::string& url) - : BookClientBase(session, url) { - } + return !id->empty(); - bool GetBook(const std::string& id, Book* book) { - try { - auto r = session_.Get(url_ + "/books/" + id); - - if (!CheckStatus(r, webcc::Status::kOK)) { - return false; - } + } catch (const webcc::Error& error) { + std::cerr << error << std::endl; + return false; + } +} - return JsonStringToBook(r->data(), book); +bool BookClient::GetBook(const std::string& id, Book* book) { + try { + auto r = session_.Get(url_ + "/books/" + id); - } catch (const webcc::Error& error) { - std::cerr << error << std::endl; + if (!CheckStatus(r, webcc::Status::kOK)) { return false; } - } - bool UpdateBook(const std::string& id, const std::string& title, - double price) { - Json::Value json; - json["title"] = title; - json["price"] = price; + return JsonStringToBook(r->data(), book); - try { - auto r = session_.Put(url_ + "/books/" + id, JsonToString(json), true); + } catch (const webcc::Error& error) { + std::cerr << error << std::endl; + return false; + } +} - if (!CheckStatus(r, webcc::Status::kOK)) { - return false; - } +bool BookClient::UpdateBook(const std::string& id, const std::string& title, + double price) { + Json::Value json; + json["title"] = title; + json["price"] = price; - return true; + try { + auto r = session_.Put(url_ + "/books/" + id, JsonToString(json), true); - } catch (const webcc::Error& error) { - std::cerr << error << std::endl; + if (!CheckStatus(r, webcc::Status::kOK)) { return false; } - } - bool DeleteBook(const std::string& id) { - try { - auto r = session_.Delete(url_ + "/books/" + id); + return true; - if (!CheckStatus(r, webcc::Status::kOK)) { - return false; - } + } catch (const webcc::Error& error) { + std::cerr << error << std::endl; + return false; + } +} - return true; +bool BookClient::DeleteBook(const std::string& id) { + try { + auto r = session_.Delete(url_ + "/books/" + id); - } catch (const webcc::Error& error) { - std::cerr << error << std::endl; + if (!CheckStatus(r, webcc::Status::kOK)) { return false; } + + return true; + + } catch (const webcc::Error& error) { + std::cerr << error << std::endl; + return false; } -}; +} + +bool BookClient::CheckStatus(webcc::ResponsePtr response, int expected_status) { + if (response->status() != expected_status) { + LOG_ERRO("HTTP status error (actual: %d, expected: %d).", + response->status(), expected_status); + return false; + } + return true; +} // ----------------------------------------------------------------------------- @@ -208,30 +210,19 @@ int main(int argc, char* argv[]) { WEBCC_LOG_INIT("", webcc::LOG_CONSOLE_FILE_OVERWRITE); - // Share the same session. - webcc::ClientSession session; - - session.set_timeout(timeout); - - // If the request has body, default to this content type. - // Optional. - session.set_media_type("application/json"); - session.set_charset("utf-8"); - - BookListClient list_client(session, url); - BookDetailClient detail_client(session, url); + BookClient client(url, timeout); PrintSeparator(); std::list books; - if (list_client.ListBooks(&books)) { + if (client.ListBooks(&books)) { PrintBookList(books); } PrintSeparator(); std::string id; - if (list_client.CreateBook("1984", 12.3, &id)) { + if (client.CreateBook("1984", 12.3, &id)) { std::cout << "Book ID: " << id << std::endl; } else { id = "1"; @@ -241,35 +232,35 @@ int main(int argc, char* argv[]) { PrintSeparator(); books.clear(); - if (list_client.ListBooks(&books)) { + if (client.ListBooks(&books)) { PrintBookList(books); } PrintSeparator(); Book book; - if (detail_client.GetBook(id, &book)) { + if (client.GetBook(id, &book)) { PrintBook(book); } PrintSeparator(); - detail_client.UpdateBook(id, "1Q84", 32.1); + client.UpdateBook(id, "1Q84", 32.1); PrintSeparator(); - if (detail_client.GetBook(id, &book)) { + if (client.GetBook(id, &book)) { PrintBook(book); } PrintSeparator(); - detail_client.DeleteBook(id); + client.DeleteBook(id); PrintSeparator(); books.clear(); - if (list_client.ListBooks(&books)) { + if (client.ListBooks(&books)) { PrintBookList(books); } diff --git a/examples/rest_book_server.cc b/examples/rest_book_server.cc index 24a4b69..e24acad 100644 --- a/examples/rest_book_server.cc +++ b/examples/rest_book_server.cc @@ -224,8 +224,6 @@ int main(int argc, char* argv[]) { sleep_seconds = std::atoi(argv[2]); } - std::size_t workers = 2; - try { webcc::Server server(port); @@ -237,7 +235,7 @@ int main(int argc, char* argv[]) { std::make_shared(sleep_seconds), { "GET", "PUT", "DELETE" }); - server.Start(workers); + server.Run(2); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; diff --git a/examples/server_states.cc b/examples/server_states.cc new file mode 100644 index 0000000..0105667 --- /dev/null +++ b/examples/server_states.cc @@ -0,0 +1,50 @@ +#include "webcc/logger.h" +#include "webcc/response_builder.h" +#include "webcc/server.h" + +class HelloView : public webcc::View { +public: + webcc::ResponsePtr Handle(webcc::RequestPtr request) override { + if (request->method() == "GET") { + return webcc::ResponseBuilder{}.OK().Body("Hello, World!")(); + } + + return {}; + } +}; + +int main() { + WEBCC_LOG_INIT("", webcc::LOG_CONSOLE); + + try { + webcc::Server server(8080); + + server.Route("/", std::make_shared()); + + // Run the server in a separate thread. + std::thread t([&server]() { server.Run(); }); + + // Let the server run for several seconds. + std::this_thread::sleep_for(std::chrono::seconds(3)); + + // Stop the server. + server.Stop(); + + // Wait for the server to finish. + t.join(); + + // Run the server again. + std::thread t2([&server]() { server.Run(); }); + + // Wait for the server to finish. + t2.join(); + + } catch (const std::exception&) { + // NOTE: + // Catch std::exception instead of webcc::Error. + // webcc::Error is for client only. + return 1; + } + + return 0; +} diff --git a/examples/static_server.cc b/examples/static_server.cc index c50e0dd..df1b5b2 100644 --- a/examples/static_server.cc +++ b/examples/static_server.cc @@ -28,7 +28,7 @@ int main(int argc, char* argv[]) { try { webcc::Server server(port, doc_root); - server.Start(); + server.Run(); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; diff --git a/webcc/connection.cc b/webcc/connection.cc index d378514..ba0e6d8 100644 --- a/webcc/connection.cc +++ b/webcc/connection.cc @@ -6,18 +6,15 @@ #include "webcc/connection_pool.h" #include "webcc/logger.h" -#include "webcc/server.h" using boost::asio::ip::tcp; namespace webcc { Connection::Connection(tcp::socket socket, ConnectionPool* pool, - Server* server) - : socket_(std::move(socket)), - pool_(pool), - buffer_(kBufferSize), - server_(server) { + Queue* queue) + : socket_(std::move(socket)), pool_(pool), queue_(queue), + buffer_(kBufferSize) { } void Connection::Start() { @@ -122,9 +119,9 @@ void Connection::OnRead(boost::system::error_code ec, std::size_t length) { LOG_VERB("HTTP request:\n%s", request_->Dump().c_str()); - // Enqueue this connection. - // Some worker thread will handle it later. - server_->Enqueue(shared_from_this()); + // Enqueue this connection once the request has been read. + // Some worker thread will handle the request later. + queue_->Push(shared_from_this()); } void Connection::DoWrite() { diff --git a/webcc/connection.h b/webcc/connection.h index 647fc30..5a00709 100644 --- a/webcc/connection.h +++ b/webcc/connection.h @@ -8,19 +8,23 @@ #include "boost/asio/ip/tcp.hpp" #include "webcc/globals.h" +#include "webcc/queue.h" #include "webcc/request.h" #include "webcc/request_parser.h" #include "webcc/response.h" namespace webcc { +class Connection; class ConnectionPool; class Server; +using ConnectionPtr = std::shared_ptr; + class Connection : public std::enable_shared_from_this { public: Connection(boost::asio::ip::tcp::socket socket, ConnectionPool* pool, - Server* server); + Queue* queue); ~Connection() = default; @@ -61,15 +65,15 @@ private: // The socket for the connection. boost::asio::ip::tcp::socket socket_; - // The pool for this connection. + // The connection pool. ConnectionPool* pool_; + // The connection queue. + Queue* queue_; + // The buffer for incoming data. std::vector buffer_; - // The server. - Server* server_; - // The incoming request. RequestPtr request_; @@ -80,8 +84,6 @@ private: ResponsePtr response_; }; -using ConnectionPtr = std::shared_ptr; - } // namespace webcc #endif // WEBCC_CONNECTION_H_ diff --git a/webcc/connection_pool.cc b/webcc/connection_pool.cc index 61bd672..e99868c 100644 --- a/webcc/connection_pool.cc +++ b/webcc/connection_pool.cc @@ -6,17 +6,32 @@ namespace webcc { void ConnectionPool::Start(ConnectionPtr c) { LOG_VERB("Starting connection..."); - connections_.insert(c); + + { + // Lock the container only. + std::lock_guard lock(mutex_); + connections_.insert(c); + } + c->Start(); } void ConnectionPool::Close(ConnectionPtr c) { LOG_VERB("Closing connection..."); - connections_.erase(c); + + { + // Lock the container only. + std::lock_guard lock(mutex_); + connections_.erase(c); + } + c->Close(); } -void ConnectionPool::CloseAll() { +void ConnectionPool::Clear() { + // Lock all since we are going to stop anyway. + std::lock_guard lock(mutex_); + if (!connections_.empty()) { LOG_VERB("Closing all (%u) connections...", connections_.size()); for (auto& c : connections_) { diff --git a/webcc/connection_pool.h b/webcc/connection_pool.h index ce3877b..8176498 100644 --- a/webcc/connection_pool.h +++ b/webcc/connection_pool.h @@ -1,6 +1,7 @@ #ifndef WEBCC_CONNECTION_POOL_H_ #define WEBCC_CONNECTION_POOL_H_ +#include #include #include "webcc/connection.h" @@ -14,17 +15,24 @@ public: ConnectionPool(const ConnectionPool&) = delete; ConnectionPool& operator=(const ConnectionPool&) = delete; - // Add a connection to the pool and start it. + // Add the connection and start to read the request from it. + // Called when a new connection has just been accepted. void Start(ConnectionPtr c); - // Close a connection. + // Close the connection. + // Called when the response of the connection has been sent back. void Close(ConnectionPtr c); // Close all pending connections. - void CloseAll(); + // Called when the server is about to stop. + void Clear(); private: std::set connections_; + + // Mutex is necessary if the loop is running in multiple threads. + // See Server::Run(). + std::mutex mutex_; }; } // namespace webcc diff --git a/webcc/globals.h b/webcc/globals.h index a539980..0176aa9 100644 --- a/webcc/globals.h +++ b/webcc/globals.h @@ -178,10 +178,11 @@ enum class ContentEncoding { // ----------------------------------------------------------------------------- -// Error (or exception) for the client. -class Error { - public: +// Error or exception (for client only). +class Error : public std::exception { +public: enum Code { + kUnknownError = -1, kOK = 0, kSyntaxError, kResolveError, @@ -193,11 +194,16 @@ class Error { kDataError, }; - public: +public: Error(Code code = kOK, const std::string& message = "") : code_(code), message_(message), timeout_(false) { } + // Note that `noexcept` is required by GCC. + const char* what() const WEBCC_NOEXCEPT override{ + return message_.c_str(); + } + Code code() const { return code_; } @@ -223,7 +229,7 @@ class Error { return code_ != kOK; } - private: +private: Code code_; std::string message_; bool timeout_; diff --git a/webcc/request_builder.cc b/webcc/request_builder.cc index ea5eda0..d534fe2 100644 --- a/webcc/request_builder.cc +++ b/webcc/request_builder.cc @@ -22,9 +22,9 @@ RequestPtr RequestBuilder::operator()() { } // If no Keep-Alive, explicitly set `Connection` to "Close". - if (!keep_alive_) { + //if (!keep_alive_) { request->SetHeader(headers::kConnection, "Close"); - } + //} if (body_) { request->SetContentType(media_type_, charset_); diff --git a/webcc/response_builder.h b/webcc/response_builder.h index 5ad24c3..320ed05 100644 --- a/webcc/response_builder.h +++ b/webcc/response_builder.h @@ -30,11 +30,34 @@ public: // consistency and simplicity. // Some shortcuts for different status codes: - ResponseBuilder& OK() { return Code(Status::kOK); } - ResponseBuilder& Created() { return Code(Status::kCreated); } - ResponseBuilder& BadRequest() { return Code(Status::kBadRequest); } - ResponseBuilder& NotFound() { return Code(Status::kNotFound); } - ResponseBuilder& NotImplemented() { return Code(Status::kNotImplemented); } + + ResponseBuilder& OK() { + return Code(Status::kOK); + } + + ResponseBuilder& Created() { + return Code(Status::kCreated); + } + + ResponseBuilder& BadRequest() { + return Code(Status::kBadRequest); + } + + ResponseBuilder& NotFound() { + return Code(Status::kNotFound); + } + + ResponseBuilder& InternalServerError() { + return Code(Status::kInternalServerError); + } + + ResponseBuilder& NotImplemented() { + return Code(Status::kNotImplemented); + } + + ResponseBuilder& ServiceUnavailable() { + return Code(Status::kServiceUnavailable); + } ResponseBuilder& Code(Status code) { code_ = code; diff --git a/webcc/server.cc b/webcc/server.cc index 7ee0640..17934ca 100644 --- a/webcc/server.cc +++ b/webcc/server.cc @@ -21,42 +21,9 @@ using tcp = boost::asio::ip::tcp; namespace webcc { Server::Server(std::uint16_t port, const Path& doc_root) - : acceptor_(io_context_), signals_(io_context_), doc_root_(doc_root) { - RegisterSignals(); - - boost::system::error_code ec; - tcp::endpoint endpoint(tcp::v4(), port); - - // Open the acceptor. - acceptor_.open(endpoint.protocol(), ec); - if (ec) { - LOG_ERRO("Acceptor open error (%s).", ec.message().c_str()); - return; - } - - // Set option SO_REUSEADDR on. - // When SO_REUSEADDR is set, multiple servers can listen on the same port. - // This is necessary for restarting the server on the same port. - // More details: - // - https://stackoverflow.com/a/3233022 - // - http://www.andy-pearce.com/blog/posts/2013/Feb/so_reuseaddr-on-windows/ - acceptor_.set_option(tcp::acceptor::reuse_address(true)); - - // Bind to the server address. - acceptor_.bind(endpoint, ec); - if (ec) { - LOG_ERRO("Acceptor bind error (%s).", ec.message().c_str()); - return; - } - - // Start listening for connections. - // After listen, the client is able to connect to the server even the server - // has not started to accept the connection yet. - acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); - if (ec) { - LOG_ERRO("Acceptor listen error (%s).", ec.message().c_str()); - return; - } + : port_(port), doc_root_(doc_root), running_(false), + acceptor_(io_context_), signals_(io_context_) { + AddSignals(); } bool Server::Route(const std::string& url, ViewPtr view, @@ -88,50 +55,73 @@ bool Server::Route(const UrlRegex& regex_url, ViewPtr view, return true; } -void Server::Start(std::size_t workers) { +void Server::Run(std::size_t workers, std::size_t loops) { assert(workers > 0); - assert(worker_threads_.empty()); - if (!acceptor_.is_open()) { - LOG_ERRO("Server is NOT going to run."); - return; - } + { + std::lock_guard lock(state_mutex_); - LOG_INFO("Server is going to run..."); + assert(worker_threads_.empty()); - DoAwaitStop(); + if (IsRunning()) { + LOG_WARN("Server is already running."); + return; + } - DoAccept(); + running_ = true; + io_context_.restart(); - // Create worker threads. - for (std::size_t i = 0; i < workers; ++i) { - worker_threads_.emplace_back(std::bind(&Server::WorkerRoutine, this)); + if (!Listen(port_)) { + LOG_ERRO("Server is NOT going to run."); + return; + } + + LOG_INFO("Server is going to run..."); + + AsyncWaitSignals(); + + AsyncAccept(); + + // Create worker threads. + for (std::size_t i = 0; i < workers; ++i) { + worker_threads_.emplace_back(std::bind(&Server::WorkerRoutine, this)); + } } - // Run the loop. + // Start the event loop. // The io_context::run() call will block until all asynchronous operations // have finished. While the server is running, there is always at least one // asynchronous operation outstanding: the asynchronous accept call waiting // for new incoming connections. - io_context_.run(); + + LOG_INFO("Loop is running in %u thread(s).", loops); + + if (loops == 1) { + // Just run the loop in the current thread. + io_context_.run(); + } else { + std::vector loop_threads; + for (std::size_t i = 0; i < loops; ++i) { + loop_threads.emplace_back(&boost::asio::io_context::run, &io_context_); + } + // Join the threads for blocking. + for (std::size_t i = 0; i < loops; ++i) { + loop_threads[i].join(); + } + } } void Server::Stop() { - // Stop listener. - acceptor_.close(); - - // Stop worker threads. - StopWorkers(); + std::lock_guard lock(state_mutex_); - // Close all pending connections. - pool_.CloseAll(); + DoStop(); } -void Server::Enqueue(ConnectionPtr connection) { - queue_.Push(connection); +bool Server::IsRunning() const { + return running_ && !io_context_.stopped(); } -void Server::RegisterSignals() { +void Server::AddSignals() { signals_.add(SIGINT); // Ctrl+C signals_.add(SIGTERM); @@ -140,7 +130,58 @@ void Server::RegisterSignals() { #endif } -void Server::DoAccept() { +void Server::AsyncWaitSignals() { + signals_.async_wait( + [this](boost::system::error_code, int signo) { + // The server is stopped by canceling all outstanding asynchronous + // operations. Once all operations have finished the io_context::run() + // call will exit. + LOG_INFO("On signal %d, stopping the server...", signo); + + DoStop(); + }); +} + +bool Server::Listen(std::uint16_t port) { + boost::system::error_code ec; + + tcp::endpoint endpoint(tcp::v4(), port); + + // Open the acceptor. + acceptor_.open(endpoint.protocol(), ec); + if (ec) { + LOG_ERRO("Acceptor open error (%s).", ec.message().c_str()); + return false; + } + + // Set option SO_REUSEADDR on. + // When SO_REUSEADDR is set, multiple servers can listen on the same port. + // This is necessary for restarting the server on the same port. + // More details: + // - https://stackoverflow.com/a/3233022 + // - http://www.andy-pearce.com/blog/posts/2013/Feb/so_reuseaddr-on-windows/ + acceptor_.set_option(tcp::acceptor::reuse_address(true)); + + // Bind to the server address. + acceptor_.bind(endpoint, ec); + if (ec) { + LOG_ERRO("Acceptor bind error (%s).", ec.message().c_str()); + return false; + } + + // Start listening for connections. + // After listen, the client is able to connect to the server even the server + // has not started to accept the connection yet. + acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); + if (ec) { + LOG_ERRO("Acceptor listen error (%s).", ec.message().c_str()); + return false; + } + + return true; +} + +void Server::AsyncAccept() { acceptor_.async_accept( [this](boost::system::error_code ec, tcp::socket socket) { // Check whether the server was stopped by a signal before this @@ -153,25 +194,33 @@ void Server::DoAccept() { LOG_INFO("Accepted a connection."); auto connection = std::make_shared( - std::move(socket), &pool_, this); + std::move(socket), &pool_, &queue_); pool_.Start(connection); } - DoAccept(); + AsyncAccept(); }); } -void Server::DoAwaitStop() { - signals_.async_wait( - [this](boost::system::error_code, int signo) { - // The server is stopped by canceling all outstanding asynchronous - // operations. Once all operations have finished the io_context::run() - // call will exit. - LOG_INFO("On signal %d, stopping the server...", signo); +void Server::DoStop() { + // Stop accepting new connections. + acceptor_.close(); - Stop(); - }); + // Stop worker threads. + // This might take some time if the threads are still processing. + StopWorkers(); + + // Close all pending connections. + pool_.Clear(); + + // Finally, stop the event processing loop. + // This function does not block, but instead simply signals the io_context to + // stop. All invocations of its run() or run_one() member functions should + // return as soon as possible. + io_context_.stop(); + + running_ = false; } void Server::WorkerRoutine() { @@ -186,7 +235,7 @@ void Server::WorkerRoutine() { // For stopping next worker. queue_.Push(ConnectionPtr()); - // Stop the worker. + // Stop this worker. break; } @@ -198,19 +247,27 @@ void Server::StopWorkers() { LOG_INFO("Stopping workers..."); // Clear pending connections. - // The connections will be closed later (see Server::DoAwaitStop). + // The connections will be closed later. LOG_INFO("Clear pending connections..."); queue_.Clear(); // Enqueue a null connection to trigger the first worker to stop. queue_.Push(ConnectionPtr()); + // Wait for worker threads to finish. for (auto& t : worker_threads_) { if (t.joinable()) { t.join(); } } + // Cleanup worker threads. + worker_threads_.clear(); + + // Clear the queue because it has a remaining null connection pushed by the + // last worker thread. + queue_.Clear(); + LOG_INFO("All workers have been stopped."); } diff --git a/webcc/server.h b/webcc/server.h index c3b8f5b..9df0735 100644 --- a/webcc/server.h +++ b/webcc/server.h @@ -21,7 +21,7 @@ class Server { public: explicit Server(std::uint16_t port, const Path& doc_root = {}); - virtual ~Server() = default; + ~Server() = default; Server(const Server&) = delete; Server& operator=(const Server&) = delete; @@ -37,24 +37,43 @@ public: bool Route(const UrlRegex& regex_url, ViewPtr view, const Strings& methods = { "GET" }); - // Start the server with a given number of worker threads. - void Start(std::size_t workers = 1); + // Start and run the server. + // This method is blocking so will not return until Stop() is called (from + // another thread) or a signal like SIGINT is caught. + // When the request of a connection has been read, the connection is put into + // a queue waiting for some worker thread to process. Normally, the more + // |workers| you have, the more concurrency you gain (the concurrency also + // depends on the number of CPU cores). The worker thread pops connections + // from the queue one by one, prepares the response by the user provided View, + // then sends it back to the client. + // Meanwhile, the (event) loop, i.e., io_context, is also running in a number + // (|loops|) of threads. Normally, one thread for the loop is good enough, but + // it could be more than that. + void Run(std::size_t workers = 1, std::size_t loops = 1); // Stop the server. + // This should be called from another thread since the Run() is blocking. void Stop(); - // Put the connection into the queue. - void Enqueue(ConnectionPtr connection); + // Is the server running? + bool IsRunning() const; private: - // Register to handle the signals that indicate when the server should exit. - void RegisterSignals(); + // Register signals which indicate when the server should exit. + void AddSignals(); - // Initiate an asynchronous accept operation. - void DoAccept(); + // Wait for a signal to stop the server. + void AsyncWaitSignals(); - // Wait for a request to stop the server. - void DoAwaitStop(); + // Listen on the given port. + bool Listen(std::uint16_t port); + + // Accept connections asynchronously. + void AsyncAccept(); + + // Stop acceptor and worker threads, close all pending connections, and + // finally stop the event loop. + void DoStop(); // Worker thread routine. void WorkerRoutine(); @@ -84,6 +103,18 @@ private: Strings methods; }; + // Port number. + std::uint16_t port_; + + // The directory with the static files to be served. + Path doc_root_; + + // Is the server running? + bool running_; + + // The mutex for guarding the state of the server. + std::mutex state_mutex_; + // The io_context used to perform asynchronous operations. boost::asio::io_context io_context_; @@ -99,9 +130,6 @@ private: // Worker threads. std::vector worker_threads_; - // The directory with the static files to be served. - Path doc_root_; - // The queue with connection waiting for the workers to process. Queue queue_;