From dfadd85a6967bd903bb236736049f2e90f939b26 Mon Sep 17 00:00:00 2001 From: Adam Gu Date: Fri, 26 Jan 2018 15:12:54 +0800 Subject: [PATCH] Remove ConnectionManager, refine message queue. --- src/csoap/CMakeLists.txt | 7 ++- src/csoap/common.cc | 1 + src/csoap/connection.cc | 11 ++-- src/csoap/connection.h | 5 -- src/csoap/connection_manager.cc | 25 --------- src/csoap/connection_manager.h | 35 ------------- src/csoap/http_client.cc | 10 ++-- src/csoap/http_request_handler.cc | 83 +++++++++++++----------------- src/csoap/http_request_handler.h | 15 +++--- src/csoap/http_server.cc | 24 +++++---- src/csoap/http_server.h | 9 ++-- src/csoap/queue.h | 19 +++++-- src/demo/calculator_server/main.cc | 4 +- 13 files changed, 92 insertions(+), 156 deletions(-) delete mode 100644 src/csoap/connection_manager.cc delete mode 100644 src/csoap/connection_manager.h diff --git a/src/csoap/CMakeLists.txt b/src/csoap/CMakeLists.txt index 0b1d78e..e75e180 100644 --- a/src/csoap/CMakeLists.txt +++ b/src/csoap/CMakeLists.txt @@ -2,10 +2,10 @@ if(UNIX) add_definitions(-D_GLIBCXX_USE_WCHAR_T -std=c++11) endif() -option(CSOAP_ENABLE_OUTPUT "Enable output for request & response?" OFF) +option(CSOAP_DEBUG_OUTPUT "Enable debug output?" OFF) -if(CSOAP_ENABLE_OUTPUT) - add_definitions(-DCSOAP_ENABLE_OUTPUT) +if(CSOAP_DEBUG_OUTPUT) + add_definitions(-DCSOAP_DEBUG_OUTPUT) endif() # Don't use any deprecated definitions (e.g., io_service). @@ -14,4 +14,3 @@ add_definitions(-DBOOST_ASIO_NO_DEPRECATED) file(GLOB SRCS *.cc *.h) add_library(csoap ${SRCS}) - diff --git a/src/csoap/common.cc b/src/csoap/common.cc index 110938c..46caeb0 100644 --- a/src/csoap/common.cc +++ b/src/csoap/common.cc @@ -82,6 +82,7 @@ Parameter::Parameter(const std::string& key, int value) Parameter::Parameter(const std::string& key, double value) : key_(key) { + // TODO char buf[32]; sprintf(buf, "%f", value); value_ = buf; diff --git a/src/csoap/connection.cc b/src/csoap/connection.cc index f93c1c8..e9e7724 100644 --- a/src/csoap/connection.cc +++ b/src/csoap/connection.cc @@ -1,22 +1,19 @@ #include "csoap/connection.h" #include -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT #include #endif #include "boost/asio/write.hpp" -#include "csoap/connection_manager.h" #include "csoap/http_request_handler.h" namespace csoap { Connection::Connection(boost::asio::ip::tcp::socket socket, - ConnectionManager& manager, HttpRequestHandler& handler) : socket_(std::move(socket)) - , connection_manager_(manager) , request_handler_(handler) , request_parser_(&request_) { } @@ -49,7 +46,7 @@ void Connection::HandleRead(boost::system::error_code ec, std::size_t length) { if (ec) { if (ec != boost::asio::error::operation_aborted) { - connection_manager_.Stop(shared_from_this()); + Stop(); } return; } @@ -81,7 +78,7 @@ void Connection::HandleRead(boost::system::error_code ec, // ensured by Asio. void Connection::HandleWrite(boost::system::error_code ec, size_t length) { -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT boost::thread::id thread_id = boost::this_thread::get_id(); std::cout << "Response has been sent back (thread: " << thread_id << ")\n"; #endif @@ -94,7 +91,7 @@ void Connection::HandleWrite(boost::system::error_code ec, } if (ec != boost::asio::error::operation_aborted) { - connection_manager_.Stop(shared_from_this()); + Stop(); } } diff --git a/src/csoap/connection.h b/src/csoap/connection.h index ec68b91..5ada15c 100644 --- a/src/csoap/connection.h +++ b/src/csoap/connection.h @@ -13,7 +13,6 @@ namespace csoap { -class ConnectionManager; class HttpRequestHandler; // Represents a single connection from a client. @@ -26,7 +25,6 @@ public: // Construct a connection with the given io_service. Connection(boost::asio::ip::tcp::socket socket, - ConnectionManager& manager, HttpRequestHandler& handler); void Start(); @@ -48,9 +46,6 @@ private: // Socket for the connection. boost::asio::ip::tcp::socket socket_; - // The manager for this connection. - ConnectionManager& connection_manager_; - // The handler used to process the incoming request. HttpRequestHandler& request_handler_; diff --git a/src/csoap/connection_manager.cc b/src/csoap/connection_manager.cc deleted file mode 100644 index ddaf1c2..0000000 --- a/src/csoap/connection_manager.cc +++ /dev/null @@ -1,25 +0,0 @@ -#include "csoap/connection_manager.h" - -namespace csoap { - -ConnectionManager::ConnectionManager() { -} - -void ConnectionManager::Start(ConnectionPtr conn) { - connections_.insert(conn); - conn->Start(); -} - -void ConnectionManager::Stop(ConnectionPtr conn) { - connections_.erase(conn); - conn->Stop(); -} - -void ConnectionManager::StopAll() { - for (const ConnectionPtr& conn : connections_) { - conn->Stop(); - } - connections_.clear(); -} - -} // namespace csoap diff --git a/src/csoap/connection_manager.h b/src/csoap/connection_manager.h deleted file mode 100644 index 33e014b..0000000 --- a/src/csoap/connection_manager.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef CSOAP_CONNECTION_MANAGER_H_ -#define CSOAP_CONNECTION_MANAGER_H_ - -#include -#include "csoap/connection.h" - -namespace csoap { - -// ConnectionManager manages open connections so that they may be cleanly -// stopped when the server needs to shut down. -class ConnectionManager { -public: - ConnectionManager(const ConnectionManager&) = delete; - ConnectionManager& operator=(const ConnectionManager&) = delete; - - // Construct a connection manager. - ConnectionManager(); - - // Add the specified connection to the manager and start it. - void Start(ConnectionPtr conn); - - // Stop the specified connection. - void Stop(ConnectionPtr conn); - - // Stop all connections. - void StopAll(); - -private: - // The managed connections. - std::set connections_; -}; - -} // namespace csoap - -#endif // CSOAP_CONNECTION_MANAGER_H_ diff --git a/src/csoap/http_client.cc b/src/csoap/http_client.cc index 704c52b..7f67538 100644 --- a/src/csoap/http_client.cc +++ b/src/csoap/http_client.cc @@ -1,6 +1,6 @@ #include "csoap/http_client.h" -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT #include #endif @@ -82,7 +82,7 @@ Error HttpClient::SendRequest(const HttpRequest& request, // Send HTTP request. -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT std::cout << "# REQUEST" << std::endl << request << std::endl; #endif @@ -92,7 +92,7 @@ Error HttpClient::SendRequest(const HttpRequest& request, return kSocketWriteError; } -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT std::cout << "# RESPONSE" << std::endl; #endif @@ -114,7 +114,7 @@ Error HttpClient::SendRequest(const HttpRequest& request, } } -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT // NOTE: the content XML might not be well formated. std::cout.write(buffer_.data(), length); #endif @@ -129,7 +129,7 @@ Error HttpClient::SendRequest(const HttpRequest& request, } } -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT std::cout << std::endl; std::cout << "# RESPONSE (PARSED)" << std::endl; std::cout << *response << std::endl; diff --git a/src/csoap/http_request_handler.cc b/src/csoap/http_request_handler.cc index 1874233..115761b 100644 --- a/src/csoap/http_request_handler.cc +++ b/src/csoap/http_request_handler.cc @@ -1,7 +1,7 @@ #include "csoap/http_request_handler.h" #include -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT #include #endif @@ -13,13 +13,6 @@ namespace csoap { -HttpRequestHandler::HttpRequestHandler() { - // Create worker threads. - for (std::size_t i = 0; i < 2; ++i) { - workers_.create_thread(std::bind(&HttpRequestHandler::WorkerRoutine, this)); - } -} - bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) { assert(soap_service); @@ -32,65 +25,63 @@ bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) { return true; } -#if 0 -void HttpRequestHandler::HandleRequest(const HttpRequest& http_request, - HttpResponse* http_response) { - // Parse the SOAP request XML. - SoapRequest soap_request; - if (!soap_request.FromXml(http_request.content())) { - http_response->set_status(HttpStatus::BAD_REQUEST); - return; - } - - SoapResponse soap_response; - - // TODO: Get service by URL. - - for (SoapServicePtr& service : soap_services_) { - service->Handle(soap_request, &soap_response); - } +void HttpRequestHandler::Enqueue(ConnectionPtr conn) { + queue_.Push(conn); +} - std::string content; - soap_response.ToXml(&content); +void HttpRequestHandler::Start(std::size_t count) { + assert(count > 0 && workers_.size() == 0); - http_response->set_status(HttpStatus::OK); - http_response->SetContentType(kTextXmlUtf8); - http_response->SetContentLength(content.length()); - http_response->set_content(std::move(content)); -} + for (std::size_t i = 0; i < count; ++i) { +#if CSOAP_DEBUG_OUTPUT + boost::thread* worker = #endif + workers_.create_thread(std::bind(&HttpRequestHandler::WorkerRoutine, this)); -void HttpRequestHandler::Enqueue(ConnectionPtr conn) { - queue_.Put(conn); +#if CSOAP_DEBUG_OUTPUT + std::cout << "Worker is running (thread: " << worker->get_id() << ")\n"; +#endif + } } -void HttpRequestHandler::StopWorkers() { -#if CSOAP_ENABLE_OUTPUT +void HttpRequestHandler::Stop() { +#if CSOAP_DEBUG_OUTPUT std::cout << "Stopping workers...\n"; #endif - // Enqueue an null connection to trigger the first worker to stop. - queue_.Put(ConnectionPtr()); + // Close pending connections. + for (ConnectionPtr conn = queue_.Pop(); conn; conn = queue_.Pop()) { +#if CSOAP_DEBUG_OUTPUT + std::cout << "Closing pending connection...\n"; +#endif + conn->Stop(); + } + + // Enqueue a null connection to trigger the first worker to stop. + queue_.Push(ConnectionPtr()); workers_.join_all(); -#if CSOAP_ENABLE_OUTPUT - std::cout << "Workers have been stopped.\n"; +#if CSOAP_DEBUG_OUTPUT + std::cout << "All workers have been stopped.\n"; #endif } -// TODO: How and when to exit? void HttpRequestHandler::WorkerRoutine() { +#if CSOAP_DEBUG_OUTPUT + boost::thread::id thread_id = boost::this_thread::get_id(); +#endif + for (;;) { - ConnectionPtr conn = queue_.Get(); + ConnectionPtr conn = queue_.PopOrWait(); if (!conn) { -#if CSOAP_ENABLE_OUTPUT - boost::thread::id thread_id = boost::this_thread::get_id(); +#if CSOAP_DEBUG_OUTPUT std::cout << "Worker is going to stop (thread: " << thread_id << ")\n"; #endif // For stopping next worker. - queue_.Put(ConnectionPtr()); + queue_.Push(ConnectionPtr()); + // Stop the worker. break; } @@ -99,7 +90,7 @@ void HttpRequestHandler::WorkerRoutine() { SoapRequest soap_request; if (!soap_request.FromXml(conn->request_.content())) { conn->response_.set_status(HttpStatus::BAD_REQUEST); - conn->DoWrite(); // TODO + conn->DoWrite(); continue; } diff --git a/src/csoap/http_request_handler.h b/src/csoap/http_request_handler.h index 238c579..0650e35 100644 --- a/src/csoap/http_request_handler.h +++ b/src/csoap/http_request_handler.h @@ -21,21 +21,18 @@ public: HttpRequestHandler(const HttpRequestHandler&) = delete; HttpRequestHandler& operator=(const HttpRequestHandler&) = delete; - HttpRequestHandler(); + HttpRequestHandler() = default; bool RegisterService(SoapServicePtr soap_service); - // Handle a request and produce a response. -#if 0 - void HandleRequest(const HttpRequest& http_request, - HttpResponse* http_response); -#endif - // Put the connection into the queue. void Enqueue(ConnectionPtr conn); - // Stop all worker threads. - void StopWorkers(); + // Start worker threads. + void Start(std::size_t count); + + // Close pending connections, stop worker threads. + void Stop(); private: void WorkerRoutine(); diff --git a/src/csoap/http_server.cc b/src/csoap/http_server.cc index 034e64d..aab300f 100644 --- a/src/csoap/http_server.cc +++ b/src/csoap/http_server.cc @@ -2,7 +2,7 @@ #include -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT #include #endif @@ -13,9 +13,9 @@ using tcp = boost::asio::ip::tcp; namespace csoap { -HttpServer::HttpServer(unsigned short port) - : io_context_(1) - , signals_(io_context_) { +HttpServer::HttpServer(unsigned short port, std::size_t workers) + : signals_(io_context_) + , workers_(workers) { // Register to handle the signals that indicate when the server should exit. // It is safe to register for the same signal multiple times in a program, @@ -47,11 +47,14 @@ bool HttpServer::RegisterService(SoapServicePtr soap_service) { } void HttpServer::Run() { -#if CSOAP_ENABLE_OUTPUT +#if CSOAP_DEBUG_OUTPUT boost::thread::id thread_id = boost::this_thread::get_id(); std::cout << "Server main thread: " << thread_id << std::endl; #endif + // Start worker threads. + request_handler_.Start(workers_); + // The io_context::run() call will block until all asynchronous operations // have finished. While the server is running, there is always at least one // asynchronous operation outstanding: the asynchronous accept call waiting @@ -69,10 +72,10 @@ void HttpServer::DoAccept() { } if (!ec) { - connection_manager_.Start( - std::make_shared(std::move(socket), - connection_manager_, - request_handler_)); + ConnectionPtr conn{ + new Connection(std::move(socket), request_handler_) + }; + conn->Start(); } DoAccept(); @@ -86,8 +89,7 @@ void HttpServer::DoAwaitStop() { // operations. Once all operations have finished the io_context::run() // call will exit. acceptor_->close(); - request_handler_.StopWorkers(); - connection_manager_.StopAll(); + request_handler_.Stop(); }); } diff --git a/src/csoap/http_server.h b/src/csoap/http_server.h index 46917b7..167ea7e 100644 --- a/src/csoap/http_server.h +++ b/src/csoap/http_server.h @@ -12,7 +12,6 @@ #include "boost/asio/ip/tcp.hpp" #include "csoap/connection.h" -#include "csoap/connection_manager.h" #include "csoap/http_request_handler.h" namespace csoap { @@ -24,7 +23,7 @@ public: HttpServer(const HttpServer&) = delete; HttpServer& operator=(const HttpServer&) = delete; - HttpServer(unsigned short port); + HttpServer(unsigned short port, std::size_t workers); bool RegisterService(SoapServicePtr soap_service); @@ -39,6 +38,9 @@ private: void DoAwaitStop(); private: + // The number of worker threads. + std::size_t workers_; + // The io_context used to perform asynchronous operations. boost::asio::io_context io_context_; @@ -48,9 +50,6 @@ private: // Acceptor used to listen for incoming connections. boost::scoped_ptr acceptor_; - // The connection manager which owns all live connections. - ConnectionManager connection_manager_; - // The handler for all incoming requests. HttpRequestHandler request_handler_; }; diff --git a/src/csoap/queue.h b/src/csoap/queue.h index ef29bea..8018369 100644 --- a/src/csoap/queue.h +++ b/src/csoap/queue.h @@ -4,6 +4,7 @@ // A general message queue. #include +#include #include "boost/thread/condition_variable.hpp" #include "boost/thread/locks.hpp" @@ -19,18 +20,30 @@ public: Queue() = default; - T Get() { + T PopOrWait() { boost::unique_lock lock(mutex_); // Wait for a message. - not_empty_cv_.wait(lock, [=] { return !message_list_.empty(); }); + not_empty_cv_.wait(lock, [this] { return !message_list_.empty(); }); T message = message_list_.front(); message_list_.pop_front(); return message; } - void Put(const T& message) { + T Pop() { + boost::lock_guard lock(mutex_); + + if (message_list_.empty()) { + return T(); + } + + T message = message_list_.front(); + message_list_.pop_front(); + return message; + } + + void Push(const T& message) { { boost::lock_guard lock(mutex_); message_list_.push_back(message); diff --git a/src/demo/calculator_server/main.cc b/src/demo/calculator_server/main.cc index 1a0b76c..7981a32 100644 --- a/src/demo/calculator_server/main.cc +++ b/src/demo/calculator_server/main.cc @@ -16,8 +16,10 @@ int main(int argc, char* argv[]) { unsigned short port = std::atoi(argv[1]); + std::size_t workers = 2; + try { - csoap::HttpServer server(port); + csoap::HttpServer server(port, workers); csoap::SoapServicePtr service(new CalculatorService); server.RegisterService(service);