From 6c5dedb807a1ec97e3433e012ce94d4378c11608 Mon Sep 17 00:00:00 2001 From: Chunting Gu Date: Mon, 25 Mar 2019 18:16:58 +0800 Subject: [PATCH] Support persistent connection for server. --- webcc/CMakeLists.txt | 2 ++ webcc/http_client.cc | 31 +++++++++++++++---------------- webcc/http_client.h | 2 +- webcc/http_client_session.h | 3 --- webcc/http_connection.cc | 33 +++++++++++++++++++++++---------- webcc/http_connection.h | 10 +++++++--- webcc/http_connection_pool.cc | 30 ++++++++++++++++++++++++++++++ webcc/http_connection_pool.h | 33 +++++++++++++++++++++++++++++++++ webcc/http_message.h | 4 ++++ webcc/http_parser.cc | 18 ++++++++++++++++++ webcc/http_parser.h | 5 +++++ webcc/http_request.h | 2 ++ webcc/http_request_handler.cc | 9 ++++----- webcc/http_request_handler.h | 2 +- webcc/http_request_parser.cc | 5 +++++ webcc/http_request_parser.h | 6 ++++-- webcc/http_response_parser.cc | 7 ++++++- webcc/http_response_parser.h | 6 ++++-- webcc/http_server.cc | 12 ++++++++++-- webcc/http_server.h | 4 ++++ webcc/queue.h | 5 +++++ webcc/rest_request_handler.cc | 9 +++++---- webcc/soap_request_handler.cc | 7 +++++-- webcc/utility.h | 20 -------------------- 24 files changed, 193 insertions(+), 72 deletions(-) create mode 100644 webcc/http_connection_pool.cc create mode 100644 webcc/http_connection_pool.h diff --git a/webcc/CMakeLists.txt b/webcc/CMakeLists.txt index 85ca38b..2f4cfa9 100644 --- a/webcc/CMakeLists.txt +++ b/webcc/CMakeLists.txt @@ -19,6 +19,7 @@ set(HEADERS http_client_pool.h http_client_session.h http_connection.h + http_connection_pool.h http_message.h http_parser.h http_request.h @@ -46,6 +47,7 @@ set(SOURCES http_client_pool.cc http_client_session.cc http_connection.cc + http_connection_pool.cc http_message.cc http_parser.cc http_request.cc diff --git a/webcc/http_client.cc b/webcc/http_client.cc index ac75ea2..fa12624 100644 --- a/webcc/http_client.cc +++ b/webcc/http_client.cc @@ -22,8 +22,8 @@ HttpClient::HttpClient() bool HttpClient::Request(HttpRequestPtr request, bool connect) { io_context_.restart(); - response_.reset(new HttpResponse()); - response_parser_.reset(new HttpResponseParser(response_.get())); + response_.reset(new HttpResponse{}); + response_parser_.Init(response_.get()); closed_ = false; timer_canceled_ = false; @@ -166,7 +166,7 @@ void HttpClient::DoReadResponse(Error* error) { LOG_VERB("Socket async read handler."); - // Stop the deadline timer once the read has started. + // Stop the deadline timer once the read has started (or failed). CancelTimer(); if (ec || length == 0) { @@ -179,20 +179,17 @@ void HttpClient::DoReadResponse(Error* error) { LOG_INFO("Read data, length: %u.", length); // Parse the response piece just read. - if (!response_parser_->Parse(buffer_.data(), length)) { - //CancelTimer(); + if (!response_parser_.Parse(buffer_.data(), length)) { Close(); *error = kHttpError; LOG_ERRO("Failed to parse HTTP response."); return; } - if (response_parser_->finished()) { + if (response_parser_.finished()) { // Stop trying to read once all content has been received, because // some servers will block extra call to read_some(). - //CancelTimer(); - if (response_->IsConnectionKeepAlive()) { // Close the timer but keep the socket connection. LOG_INFO("Keep the socket connection alive."); @@ -220,22 +217,24 @@ void HttpClient::DoReadResponse(Error* error) { } void HttpClient::DoWaitTimer() { + LOG_VERB("Wait timer asynchronously."); timer_.async_wait( std::bind(&HttpClient::OnTimer, this, std::placeholders::_1)); } void HttpClient::OnTimer(boost::system::error_code ec) { - if (closed_) { + LOG_VERB("On deadline timer."); + + // timer_.cancel() was called. + if (ec == boost::asio::error::operation_aborted) { + LOG_VERB("Deadline timer canceled."); return; } - LOG_VERB("On deadline timer."); - - // NOTE: Can't check this: - // if (ec == boost::asio::error::operation_aborted) { - // LOG_VERB("Deadline timer canceled."); - // return; - // } + if (closed_) { + LOG_VERB("Socket has been closed."); + return; + } if (timer_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) { // The deadline has passed. diff --git a/webcc/http_client.h b/webcc/http_client.h index a16dc39..cd52282 100644 --- a/webcc/http_client.h +++ b/webcc/http_client.h @@ -83,7 +83,7 @@ private: std::unique_ptr socket_; HttpResponsePtr response_; - std::unique_ptr response_parser_; + HttpResponseParser response_parser_; // Timer for the timeout control. boost::asio::deadline_timer timer_; diff --git a/webcc/http_client_session.h b/webcc/http_client_session.h index 5180146..a53413c 100644 --- a/webcc/http_client_session.h +++ b/webcc/http_client_session.h @@ -1,12 +1,9 @@ #ifndef WEBCC_HTTP_CLIENT_SESSION_H_ #define WEBCC_HTTP_CLIENT_SESSION_H_ -#include #include #include -#include "boost/optional.hpp" - #include "webcc/http_client_pool.h" #include "webcc/http_request_builder.h" #include "webcc/http_response.h" diff --git a/webcc/http_connection.cc b/webcc/http_connection.cc index 9d89ecc..ef924ad 100644 --- a/webcc/http_connection.cc +++ b/webcc/http_connection.cc @@ -4,6 +4,7 @@ #include "boost/asio/write.hpp" +#include "webcc/http_connection_pool.h" #include "webcc/http_request_handler.h" #include "webcc/logger.h" @@ -11,14 +12,17 @@ using boost::asio::ip::tcp; namespace webcc { -HttpConnection::HttpConnection(tcp::socket socket, HttpRequestHandler* handler) +HttpConnection::HttpConnection(tcp::socket socket, HttpConnectionPool* pool, + HttpRequestHandler* handler) : socket_(std::move(socket)), + pool_(pool), buffer_(kBufferSize), - request_handler_(handler), - request_parser_(&request_) { + request_handler_(handler) { } void HttpConnection::Start() { + request_.reset(new HttpRequest{}); + request_parser_.Init(request_.get()); DoRead(); } @@ -37,8 +41,11 @@ void HttpConnection::SendResponse(HttpResponsePtr response) { response_ = response; - // TODO: Support keep-alive. - response_->SetHeader(http::headers::kConnection, "Close"); + if (request_->IsConnectionKeepAlive()) { + response_->SetHeader(http::headers::kConnection, "Keep-Alive"); + } else { + response_->SetHeader(http::headers::kConnection, "Close"); + } response_->Prepare(); @@ -60,7 +67,7 @@ void HttpConnection::OnRead(boost::system::error_code ec, std::size_t length) { if (ec) { LOG_ERRO("Socket read error (%s).", ec.message().c_str()); if (ec != boost::asio::error::operation_aborted) { - Close(); + pool_->Close(shared_from_this()); } return; } @@ -79,7 +86,7 @@ void HttpConnection::OnRead(boost::system::error_code ec, std::size_t length) { return; } - LOG_VERB("HTTP request:\n%s", request_.Dump(4, "> ").c_str()); + LOG_VERB("HTTP request:\n%s", request_->Dump(4, "> ").c_str()); // Enqueue this connection. // Some worker thread will handle it later. @@ -104,13 +111,19 @@ void HttpConnection::OnWrite(boost::system::error_code ec, std::size_t length) { LOG_ERRO("Socket write error (%s).", ec.message().c_str()); if (ec != boost::asio::error::operation_aborted) { - Close(); + pool_->Close(shared_from_this()); } } else { LOG_INFO("Response has been sent back, length: %u.", length); - Shutdown(); - Close(); // Necessary even after shutdown! + if (request_->IsConnectionKeepAlive()) { + LOG_INFO("The client asked for keep-alive connection."); + LOG_INFO("Continue to read next request..."); + Start(); + } else { + Shutdown(); + pool_->Close(shared_from_this()); + } } } diff --git a/webcc/http_connection.h b/webcc/http_connection.h index fb38cb3..0d4cdb3 100644 --- a/webcc/http_connection.h +++ b/webcc/http_connection.h @@ -15,13 +15,14 @@ namespace webcc { class HttpConnection; +class HttpConnectionPool; class HttpRequestHandler; typedef std::shared_ptr HttpConnectionPtr; class HttpConnection : public std::enable_shared_from_this { public: - HttpConnection(boost::asio::ip::tcp::socket socket, + HttpConnection(boost::asio::ip::tcp::socket socket, HttpConnectionPool* pool, HttpRequestHandler* handler); ~HttpConnection() = default; @@ -29,7 +30,7 @@ public: HttpConnection(const HttpConnection&) = delete; HttpConnection& operator=(const HttpConnection&) = delete; - const HttpRequest& request() const { + HttpRequestPtr request() const { return request_; } @@ -57,6 +58,9 @@ private: // Socket for the connection. boost::asio::ip::tcp::socket socket_; + // The pool for this connection. + HttpConnectionPool* pool_; + // Buffer for incoming data. std::vector buffer_; @@ -64,7 +68,7 @@ private: HttpRequestHandler* request_handler_; // The incoming request. - HttpRequest request_; + HttpRequestPtr request_; // The parser for the incoming request. HttpRequestParser request_parser_; diff --git a/webcc/http_connection_pool.cc b/webcc/http_connection_pool.cc new file mode 100644 index 0000000..e19bca9 --- /dev/null +++ b/webcc/http_connection_pool.cc @@ -0,0 +1,30 @@ +#include "webcc/http_connection_pool.h" + +#include "webcc/logger.h" + +namespace webcc { + +HttpConnectionPool::HttpConnectionPool() { +} + +void HttpConnectionPool::Start(HttpConnectionPtr c) { + LOG_VERB("Starting connection..."); + connections_.insert(c); + c->Start(); +} + +void HttpConnectionPool::Close(HttpConnectionPtr c) { + LOG_VERB("Closing connection..."); + connections_.erase(c); + c->Close(); +} + +void HttpConnectionPool::CloseAll() { + LOG_VERB("Closing all (%u) connections...", connections_.size()); + for (auto& c : connections_) { + c->Close(); + } + connections_.clear(); +} + +} // namespace webcc diff --git a/webcc/http_connection_pool.h b/webcc/http_connection_pool.h new file mode 100644 index 0000000..5f89e66 --- /dev/null +++ b/webcc/http_connection_pool.h @@ -0,0 +1,33 @@ +#ifndef WEBCC_HTTP_CONNECTION_POOL_H_ +#define WEBCC_HTTP_CONNECTION_POOL_H_ + +#include + +#include "webcc/http_connection.h" + +namespace webcc { + +class HttpConnectionPool { +public: + HttpConnectionPool(const HttpConnectionPool&) = delete; + HttpConnectionPool& operator=(const HttpConnectionPool&) = delete; + + HttpConnectionPool(); + + // Add a connection to the pool and start it. + void Start(HttpConnectionPtr c); + + // Close a connection. + void Close(HttpConnectionPtr c); + + // Close all connections. + void CloseAll(); + +private: + /// The managed connections. + std::set connections_; +}; + +} // namespace webcc + +#endif // WEBCC_HTTP_CONNECTION_POOL_H_ diff --git a/webcc/http_message.h b/webcc/http_message.h index 54e83c0..f4b7643 100644 --- a/webcc/http_message.h +++ b/webcc/http_message.h @@ -48,6 +48,10 @@ public: // optional |existed| parameter will be set to false. const std::string& Get(const std::string& key, bool* existed = nullptr) const; + void Clear() { + headers_.clear(); + } + private: std::vector::iterator Find(const std::string& key); diff --git a/webcc/http_parser.cc b/webcc/http_parser.cc index e0d3b1b..520e0f4 100644 --- a/webcc/http_parser.cc +++ b/webcc/http_parser.cc @@ -33,6 +33,11 @@ HttpParser::HttpParser(HttpMessage* message) finished_(false) { } +void HttpParser::Init(HttpMessage* message) { + Reset(); + message_ = message; +} + bool HttpParser::Parse(const char* data, std::size_t length) { // Append the new data to the pending data. pending_data_.append(data, length); @@ -63,6 +68,19 @@ bool HttpParser::Parse(const char* data, std::size_t length) { } } +void HttpParser::Reset() { + pending_data_.clear(); + content_.clear(); + + content_length_ = kInvalidLength; + start_line_parsed_ = false; + content_length_parsed_ = false; + header_ended_ = false; + chunked_ = false; + chunk_size_ = kInvalidLength; + finished_ = false; +} + bool HttpParser::ParseHeaders() { std::size_t off = 0; diff --git a/webcc/http_parser.h b/webcc/http_parser.h index e705a25..d4e8bdb 100644 --- a/webcc/http_parser.h +++ b/webcc/http_parser.h @@ -19,6 +19,8 @@ public: HttpParser(const HttpParser&) = delete; HttpParser& operator=(const HttpParser&) = delete; + void Init(HttpMessage* message); + bool finished() const { return finished_; } std::size_t content_length() const { return content_length_; } @@ -26,6 +28,9 @@ public: bool Parse(const char* data, std::size_t length); protected: + // Reset for next parse. + void Reset(); + // Parse headers from pending data. // Return false only on syntax errors. bool ParseHeaders(); diff --git a/webcc/http_request.h b/webcc/http_request.h index b1c5635..cac713b 100644 --- a/webcc/http_request.h +++ b/webcc/http_request.h @@ -64,10 +64,12 @@ public: return port().empty() ? default_port : port(); } + // TODO: Remove std::size_t buffer_size() const { return buffer_size_; } + // TODO: Remove bool ssl_verify() const { return ssl_verify_; } diff --git a/webcc/http_request_handler.cc b/webcc/http_request_handler.cc index 6d3cf7e..2a6640f 100644 --- a/webcc/http_request_handler.cc +++ b/webcc/http_request_handler.cc @@ -24,11 +24,10 @@ void HttpRequestHandler::Start(std::size_t count) { void HttpRequestHandler::Stop() { LOG_INFO("Stopping workers..."); - // Close pending connections. - for (HttpConnectionPtr s = queue_.Pop(); s; s = queue_.Pop()) { - LOG_INFO("Closing pending connection..."); - s->Close(); - } + // Clear pending connections. + // The connections will be closed later (see HttpServer::DoAwaitStop). + LOG_INFO("Clear pending connections..."); + queue_.Clear(); // Enqueue a null connection to trigger the first worker to stop. queue_.Push(HttpConnectionPtr()); diff --git a/webcc/http_request_handler.h b/webcc/http_request_handler.h index 7a13f0d..037d799 100644 --- a/webcc/http_request_handler.h +++ b/webcc/http_request_handler.h @@ -29,7 +29,7 @@ public: // Start worker threads. void Start(std::size_t count); - // Close pending connections and stop worker threads. + // Clear pending connections from the queue and stop worker threads. void Stop(); private: diff --git a/webcc/http_request_parser.cc b/webcc/http_request_parser.cc index 75de792..57cc9bd 100644 --- a/webcc/http_request_parser.cc +++ b/webcc/http_request_parser.cc @@ -11,6 +11,11 @@ HttpRequestParser::HttpRequestParser(HttpRequest* request) : HttpParser(request), request_(request) { } +void HttpRequestParser::Init(HttpRequest* request) { + HttpParser::Init(request); + request_ = request; +} + bool HttpRequestParser::ParseStartLine(const std::string& line) { std::vector strs; boost::split(strs, line, boost::is_any_of(" "), boost::token_compress_on); diff --git a/webcc/http_request_parser.h b/webcc/http_request_parser.h index f584c14..163d1e5 100644 --- a/webcc/http_request_parser.h +++ b/webcc/http_request_parser.h @@ -11,12 +11,14 @@ class HttpRequest; class HttpRequestParser : public HttpParser { public: - explicit HttpRequestParser(HttpRequest* request); + explicit HttpRequestParser(HttpRequest* request = nullptr); ~HttpRequestParser() override = default; + void Init(HttpRequest* request); + private: - bool ParseStartLine(const std::string& line) override; + bool ParseStartLine(const std::string& line) final; HttpRequest* request_; }; diff --git a/webcc/http_response_parser.cc b/webcc/http_response_parser.cc index 6d6a2c8..c9f94fd 100644 --- a/webcc/http_response_parser.cc +++ b/webcc/http_response_parser.cc @@ -2,8 +2,8 @@ #include "boost/algorithm/string.hpp" -#include "webcc/logger.h" #include "webcc/http_response.h" +#include "webcc/logger.h" namespace webcc { @@ -11,6 +11,11 @@ HttpResponseParser::HttpResponseParser(HttpResponse* response) : HttpParser(response), response_(response) { } +void HttpResponseParser::Init(HttpResponse* response) { + HttpParser::Init(response); + response_ = response; +} + bool HttpResponseParser::ParseStartLine(const std::string& line) { std::vector parts; boost::split(parts, line, boost::is_any_of(" "), boost::token_compress_on); diff --git a/webcc/http_response_parser.h b/webcc/http_response_parser.h index 6d7be71..d03eec0 100644 --- a/webcc/http_response_parser.h +++ b/webcc/http_response_parser.h @@ -11,13 +11,15 @@ class HttpResponse; class HttpResponseParser : public HttpParser { public: - explicit HttpResponseParser(HttpResponse* response); + explicit HttpResponseParser(HttpResponse* response = nullptr); ~HttpResponseParser() override = default; + void Init(HttpResponse* response); + private: // Parse HTTP start line; E.g., "HTTP/1.1 200 OK". - bool ParseStartLine(const std::string& line) override; + bool ParseStartLine(const std::string& line) final; // The result response message. HttpResponse* response_; diff --git a/webcc/http_server.cc b/webcc/http_server.cc index 039430c..a2b5ff8 100644 --- a/webcc/http_server.cc +++ b/webcc/http_server.cc @@ -96,8 +96,10 @@ void HttpServer::DoAccept() { if (!ec) { LOG_INFO("Accepted a connection."); - std::make_shared(std::move(socket), - GetRequestHandler())->Start(); + auto connection = std::make_shared( + std::move(socket), &pool_, GetRequestHandler()); + + pool_.Start(connection); } DoAccept(); @@ -111,8 +113,14 @@ void HttpServer::DoAwaitStop() { // operations. Once all operations have finished the io_context::run() // call will exit. LOG_INFO("On signal %d, stopping the server...", signo); + acceptor_.close(); + + // Stop worker threads. GetRequestHandler()->Stop(); + + // Close all connections. + pool_.CloseAll(); }); } diff --git a/webcc/http_server.h b/webcc/http_server.h index 1956991..48580d5 100644 --- a/webcc/http_server.h +++ b/webcc/http_server.h @@ -9,6 +9,7 @@ #include "webcc/globals.h" #include "webcc/http_connection.h" +#include "webcc/http_connection_pool.h" namespace webcc { @@ -47,6 +48,9 @@ private: // Acceptor used to listen for incoming connections. boost::asio::ip::tcp::acceptor acceptor_; + // The connection pool which owns all live connections. + HttpConnectionPool pool_; + // The signal_set is used to register for process termination notifications. boost::asio::signal_set signals_; diff --git a/webcc/queue.h b/webcc/queue.h index 3e51174..dfc19a5 100644 --- a/webcc/queue.h +++ b/webcc/queue.h @@ -41,6 +41,11 @@ public: return message; } + void Clear() { + std::lock_guard lock(mutex_); + message_list_.clear(); + } + void Push(const T& message) { { std::lock_guard lock(mutex_); diff --git a/webcc/rest_request_handler.cc b/webcc/rest_request_handler.cc index b41ce38..5847d82 100644 --- a/webcc/rest_request_handler.cc +++ b/webcc/rest_request_handler.cc @@ -15,12 +15,13 @@ bool RestRequestHandler::Bind(RestServicePtr service, const std::string& url, } void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) { - const HttpRequest& http_request = connection->request(); + HttpRequestPtr http_request = connection->request(); + assert(http_request); - const Url& url = http_request.url(); + const Url& url = http_request->url(); RestRequest rest_request{ - http_request.method(), http_request.content(), url.query() + http_request->method(), http_request->content(), url.query() }; // Get service by URL path. @@ -46,7 +47,7 @@ void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) { // Only support gzip for response compression. if (rest_response.content.size() > kGzipThreshold && - http_request.AcceptEncodingGzip()) { + http_request->AcceptEncodingGzip()) { std::string compressed; if (Compress(rest_response.content, &compressed)) { http_response->SetHeader(http::headers::kContentEncoding, "gzip"); diff --git a/webcc/soap_request_handler.cc b/webcc/soap_request_handler.cc index 4568cf3..46fbccb 100644 --- a/webcc/soap_request_handler.cc +++ b/webcc/soap_request_handler.cc @@ -16,12 +16,15 @@ bool SoapRequestHandler::Bind(SoapServicePtr service, const std::string& url) { } void SoapRequestHandler::HandleConnection(HttpConnectionPtr connection) { + HttpRequestPtr http_request = connection->request(); + assert(http_request); + auto http_response = std::make_shared(); // TODO: Support keep-alive. http_response->SetHeader(http::headers::kConnection, "Close"); - std::string path = "/" + connection->request().url().path(); + std::string path = "/" + http_request->url().path(); SoapServicePtr service = GetServiceByUrl(path); if (!service) { http_response->set_status(http::Status::kBadRequest); @@ -31,7 +34,7 @@ void SoapRequestHandler::HandleConnection(HttpConnectionPtr connection) { // Parse the SOAP request XML. SoapRequest soap_request; - if (!soap_request.FromXml(connection->request().content())) { + if (!soap_request.FromXml(http_request->content())) { http_response->set_status(http::Status::kBadRequest); connection->SendResponse(http_response); return; diff --git a/webcc/utility.h b/webcc/utility.h index c34ec9f..68e5656 100644 --- a/webcc/utility.h +++ b/webcc/utility.h @@ -3,7 +3,6 @@ #include #include -#include #include "boost/asio/ip/tcp.hpp" @@ -23,25 +22,6 @@ std::string EndpointToString(const TcpEndpoint& endpoint); // See: https://tools.ietf.org/html/rfc7231#section-7.1.1.2 std::string GetHttpDateTimestamp(); -// Resize a buffer in ctor and restore its original size in dtor. -struct BufferResizer { - BufferResizer(std::vector* buffer, std::size_t new_size) - : buffer_(buffer), old_size_(buffer->size()) { - if (new_size != 0 && new_size != old_size_) { - buffer_->resize(new_size); - } - } - - ~BufferResizer() { - if (buffer_->size() != old_size_) { - buffer_->resize(old_size_); - } - } - - std::vector* buffer_; - std::size_t old_size_; -}; - } // namespace webcc #endif // WEBCC_UTILITY_H_