From f41b625f51df2d2ed26aa9b37b8c218ef2547eae Mon Sep 17 00:00:00 2001 From: Adam Gu Date: Thu, 25 Jan 2018 19:30:33 +0800 Subject: [PATCH] Add message queue and worker threads. --- src/csoap/connection.cc | 29 +++-- src/csoap/connection.h | 12 +- src/csoap/http_request_handler.cc | 117 ++++++++++-------- src/csoap/http_request_handler.h | 20 +++ src/csoap/http_server.cc | 44 ++++--- src/csoap/http_server.h | 2 + src/csoap/queue.cc | 0 src/csoap/queue.h | 49 ++++++++ .../calculator_client/calculator_client.cc | 2 +- src/demo/calculator_server/main.cc | 2 +- 10 files changed, 192 insertions(+), 85 deletions(-) create mode 100644 src/csoap/queue.cc create mode 100644 src/csoap/queue.h diff --git a/src/csoap/connection.cc b/src/csoap/connection.cc index dabdfe7..f93c1c8 100644 --- a/src/csoap/connection.cc +++ b/src/csoap/connection.cc @@ -1,5 +1,9 @@ #include "csoap/connection.h" + #include +#if CSOAP_ENABLE_OUTPUT +#include +#endif #include "boost/asio/write.hpp" @@ -42,7 +46,7 @@ void Connection::DoWrite() { } void Connection::HandleRead(boost::system::error_code ec, - std::size_t bytes_transferred) { + std::size_t length) { if (ec) { if (ec != boost::asio::error::operation_aborted) { connection_manager_.Stop(shared_from_this()); @@ -50,7 +54,7 @@ void Connection::HandleRead(boost::system::error_code ec, return; } - Error error = request_parser_.Parse(buffer_.data(), bytes_transferred); + Error error = request_parser_.Parse(buffer_.data(), length); if (error != kNoError) { // Bad request. @@ -65,16 +69,23 @@ void Connection::HandleRead(boost::system::error_code ec, return; } - // Handle request. - // TODO: Time consuming - request_handler_.HandleRequest(request_, &response_); - - // Send back the response. - DoWrite(); + // Enqueue this connection. + // Some worker thread will handle it later. + // And DoWrite() will be called in the worker thread. + request_handler_.Enqueue(shared_from_this()); } +// NOTE: +// This write handler will be called from main thread (the thread calling +// io_context.run), even though DoWrite() is invoked by worker threads. This is +// ensured by Asio. void Connection::HandleWrite(boost::system::error_code ec, - size_t bytes_transferred) { + size_t length) { +#if CSOAP_ENABLE_OUTPUT + boost::thread::id thread_id = boost::this_thread::get_id(); + std::cout << "Response has been sent back (thread: " << thread_id << ")\n"; +#endif + if (!ec) { // Initiate graceful connection closure. boost::system::error_code ignored_ec; diff --git a/src/csoap/connection.h b/src/csoap/connection.h index 0943123..ec68b91 100644 --- a/src/csoap/connection.h +++ b/src/csoap/connection.h @@ -22,15 +22,15 @@ public: Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; + friend class HttpRequestHandler; + // Construct a connection with the given io_service. Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager, HttpRequestHandler& handler); - // Start the first asynchronous operation for the connection. void Start(); - // Stop all asynchronous operations associated with the connection. void Stop(); private: @@ -38,13 +38,11 @@ private: void DoWrite(); - // Handle completion of a read operation. void HandleRead(boost::system::error_code ec, - std::size_t bytes_transferred); + std::size_t length); - // Handle completion of a write operation. void HandleWrite(boost::system::error_code ec, - size_t bytes_transferred); + std::size_t length); private: // Socket for the connection. @@ -65,7 +63,7 @@ private: // The parser for the incoming request. HttpRequestParser request_parser_; - // The reply to be sent back to the client. + // The response to be sent back to the client. HttpResponse response_; }; diff --git a/src/csoap/http_request_handler.cc b/src/csoap/http_request_handler.cc index 0fbc120..1874233 100644 --- a/src/csoap/http_request_handler.cc +++ b/src/csoap/http_request_handler.cc @@ -1,48 +1,23 @@ #include "csoap/http_request_handler.h" #include +#if CSOAP_ENABLE_OUTPUT +#include +#endif #include "csoap/common.h" #include "csoap/http_request.h" #include "csoap/http_response.h" #include "csoap/soap_request.h" #include "csoap/soap_response.h" -#include "csoap/soap_service.h" namespace csoap { -#if 0 -// Perform URL-decoding on a string. Returns false if the encoding was invalid. -static bool UrlDecode(const std::string& in, std::string& out) { - out.clear(); - out.reserve(in.size()); - - for (std::size_t i = 0; i < in.size(); ++i) { - if (in[i] == '%') { - if (i + 3 <= in.size()) { - int value = 0; - std::istringstream is(in.substr(i + 1, 2)); - if (is >> std::hex >> value) { - out += static_cast(value); - i += 2; - } else { - return false; - } - } else { - return false; - } - } else if (in[i] == '+') { - out += ' '; - } else { - out += in[i]; - } - } - - return true; -} -#endif - HttpRequestHandler::HttpRequestHandler() { + // Create worker threads. + for (std::size_t i = 0; i < 2; ++i) { + workers_.create_thread(std::bind(&HttpRequestHandler::WorkerRoutine, this)); + } } bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) { @@ -57,6 +32,7 @@ bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) { return true; } +#if 0 void HttpRequestHandler::HandleRequest(const HttpRequest& http_request, HttpResponse* http_response) { // Parse the SOAP request XML. @@ -81,28 +57,69 @@ void HttpRequestHandler::HandleRequest(const HttpRequest& http_request, http_response->SetContentType(kTextXmlUtf8); http_response->SetContentLength(content.length()); http_response->set_content(std::move(content)); +} +#endif -#if 0 - // Decode URL to path. - std::string request_path; - if (!UrlDecode(request.uri, request_path)) { - reply = HttpReply::StockReply(HttpReply::BAD_REQUEST); - return; - } +void HttpRequestHandler::Enqueue(ConnectionPtr conn) { + queue_.Put(conn); +} - // Request path must be absolute and not contain "..". - if (request_path.empty() || - request_path[0] != '/' || - request_path.find("..") != std::string::npos) { - reply = HttpReply::StockReply(HttpReply::BAD_REQUEST); - return; - } +void HttpRequestHandler::StopWorkers() { +#if CSOAP_ENABLE_OUTPUT + std::cout << "Stopping workers...\n"; +#endif - // If path ends in slash (i.e. is a directory) then add "index.html". - if (request_path[request_path.size() - 1] == '/') { - request_path += "index.html"; - } + // Enqueue an null connection to trigger the first worker to stop. + queue_.Put(ConnectionPtr()); + + workers_.join_all(); + +#if CSOAP_ENABLE_OUTPUT + std::cout << "Workers have been stopped.\n"; +#endif +} + +// TODO: How and when to exit? +void HttpRequestHandler::WorkerRoutine() { + for (;;) { + ConnectionPtr conn = queue_.Get(); + + if (!conn) { +#if CSOAP_ENABLE_OUTPUT + boost::thread::id thread_id = boost::this_thread::get_id(); + std::cout << "Worker is going to stop (thread: " << thread_id << ")\n"; #endif + // For stopping next worker. + queue_.Put(ConnectionPtr()); + // Stop the worker. + break; + } + + // Parse the SOAP request XML. + SoapRequest soap_request; + if (!soap_request.FromXml(conn->request_.content())) { + conn->response_.set_status(HttpStatus::BAD_REQUEST); + conn->DoWrite(); // TODO + continue; + } + + SoapResponse soap_response; + + // TODO: Get service by URL. + for (SoapServicePtr& service : soap_services_) { + service->Handle(soap_request, &soap_response); + } + + std::string content; + soap_response.ToXml(&content); + + conn->response_.set_status(HttpStatus::OK); + conn->response_.SetContentType(kTextXmlUtf8); + conn->response_.SetContentLength(content.length()); + conn->response_.set_content(std::move(content)); + + conn->DoWrite(); + } } } // namespace csoap diff --git a/src/csoap/http_request_handler.h b/src/csoap/http_request_handler.h index 01fa088..238c579 100644 --- a/src/csoap/http_request_handler.h +++ b/src/csoap/http_request_handler.h @@ -1,7 +1,13 @@ #ifndef CSOAP_HTTP_REQUEST_HANDLER_H_ #define CSOAP_HTTP_REQUEST_HANDLER_H_ +#include #include + +#include "boost/thread/thread.hpp" + +#include "csoap/connection.h" +#include "csoap/queue.h" #include "csoap/soap_service.h" namespace csoap { @@ -20,11 +26,25 @@ public: bool RegisterService(SoapServicePtr soap_service); // Handle a request and produce a response. +#if 0 void HandleRequest(const HttpRequest& http_request, HttpResponse* http_response); +#endif + + // Put the connection into the queue. + void Enqueue(ConnectionPtr conn); + + // Stop all worker threads. + void StopWorkers(); + +private: + void WorkerRoutine(); private: std::vector soap_services_; + + Queue queue_; + boost::thread_group workers_; }; } // namespace csoap diff --git a/src/csoap/http_server.cc b/src/csoap/http_server.cc index 79e6550..034e64d 100644 --- a/src/csoap/http_server.cc +++ b/src/csoap/http_server.cc @@ -2,6 +2,10 @@ #include +#if CSOAP_ENABLE_OUTPUT +#include +#endif + #include "csoap/soap_service.h" #include "csoap/utility.h" @@ -10,7 +14,7 @@ using tcp = boost::asio::ip::tcp; namespace csoap { HttpServer::HttpServer(unsigned short port) - : io_context_(1) // TODO: concurrency_hint (threads) + : io_context_(1) , signals_(io_context_) { // Register to handle the signals that indicate when the server should exit. @@ -43,6 +47,11 @@ bool HttpServer::RegisterService(SoapServicePtr soap_service) { } void HttpServer::Run() { +#if CSOAP_ENABLE_OUTPUT + boost::thread::id thread_id = boost::this_thread::get_id(); + std::cout << "Server main thread: " << thread_id << std::endl; +#endif + // The io_context::run() call will block until all asynchronous operations // have finished. While the server is running, there is always at least one // asynchronous operation outstanding: the asynchronous accept call waiting @@ -53,30 +62,31 @@ void HttpServer::Run() { void HttpServer::DoAccept() { acceptor_->async_accept( [this](boost::system::error_code ec, tcp::socket socket) { - // Check whether the server was stopped by a signal before this - // completion handler had a chance to run. - if (!acceptor_->is_open()) { - return; - } - - if (!ec) { - connection_manager_.Start( - std::make_shared(std::move(socket), - connection_manager_, - request_handler_)); - } - - DoAccept(); - }); + // Check whether the server was stopped by a signal before this + // completion handler had a chance to run. + if (!acceptor_->is_open()) { + return; + } + + if (!ec) { + connection_manager_.Start( + std::make_shared(std::move(socket), + connection_manager_, + request_handler_)); + } + + DoAccept(); + }); } void HttpServer::DoAwaitStop() { signals_.async_wait( [this](boost::system::error_code /*ec*/, int /*signo*/) { - // The server is stopped by cancelling all outstanding asynchronous + // The server is stopped by canceling all outstanding asynchronous // operations. Once all operations have finished the io_context::run() // call will exit. acceptor_->close(); + request_handler_.StopWorkers(); connection_manager_.StopAll(); }); } diff --git a/src/csoap/http_server.h b/src/csoap/http_server.h index 2ef183d..46917b7 100644 --- a/src/csoap/http_server.h +++ b/src/csoap/http_server.h @@ -5,6 +5,8 @@ #include #include "boost/scoped_ptr.hpp" +#include "boost/thread/thread.hpp" + #include "boost/asio/io_context.hpp" #include "boost/asio/signal_set.hpp" #include "boost/asio/ip/tcp.hpp" diff --git a/src/csoap/queue.cc b/src/csoap/queue.cc new file mode 100644 index 0000000..e69de29 diff --git a/src/csoap/queue.h b/src/csoap/queue.h new file mode 100644 index 0000000..ef29bea --- /dev/null +++ b/src/csoap/queue.h @@ -0,0 +1,49 @@ +#ifndef CSOAP_QUEUE_H_ +#define CSOAP_QUEUE_H_ + +// A general message queue. + +#include + +#include "boost/thread/condition_variable.hpp" +#include "boost/thread/locks.hpp" +#include "boost/thread/mutex.hpp" + +namespace csoap { + +template +class Queue { +public: + Queue(const Queue& rhs) = delete; + Queue& operator=(const Queue& rhs) = delete; + + Queue() = default; + + T Get() { + boost::unique_lock lock(mutex_); + + // Wait for a message. + not_empty_cv_.wait(lock, [=] { return !message_list_.empty(); }); + + T message = message_list_.front(); + message_list_.pop_front(); + return message; + } + + void Put(const T& message) { + { + boost::lock_guard lock(mutex_); + message_list_.push_back(message); + } + not_empty_cv_.notify_one(); + } + +private: + std::list message_list_; + boost::mutex mutex_; + boost::condition_variable not_empty_cv_; +}; + +} // namespace csoap + +#endif // CSOAP_QUEUE_H_ diff --git a/src/demo/calculator_client/calculator_client.cc b/src/demo/calculator_client/calculator_client.cc index b545784..5070e44 100644 --- a/src/demo/calculator_client/calculator_client.cc +++ b/src/demo/calculator_client/calculator_client.cc @@ -24,7 +24,7 @@ bool CalculatorClient::Divide(double x, double y, double* result) { } // Set to 0 to test our own calculator server created with csoap. -#define ACCESS_PARASOFT 1 +#define ACCESS_PARASOFT 0 void CalculatorClient::Init() { #if ACCESS_PARASOFT diff --git a/src/demo/calculator_server/main.cc b/src/demo/calculator_server/main.cc index bd2ad25..1a0b76c 100644 --- a/src/demo/calculator_server/main.cc +++ b/src/demo/calculator_server/main.cc @@ -25,7 +25,7 @@ int main(int argc, char* argv[]) { server.Run(); } catch (std::exception& e) { - std::cerr << "exception: " << e.what() << "\n"; + std::cerr << "Exception: " << e.what() << std::endl; return 1; }