Add timeout control to http server (draft)

master
Adam Gu 7 years ago
parent 23090ee369
commit e2bf8aaaf4

@ -65,7 +65,12 @@ find_package(Threads REQUIRED)
# Boost version: 1.66+ # Boost version: 1.66+
set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_MULTITHREADED ON) set(Boost_USE_MULTITHREADED ON)
find_package(Boost 1.66.0 REQUIRED COMPONENTS system thread) if(WIN32)
# TODO: Fix COMPONENTS on Windows.
find_package(Boost 1.66.0 REQUIRED)
else()
find_package(Boost 1.66.0 REQUIRED COMPONENTS system thread)
endif()
if(Boost_FOUND) if(Boost_FOUND)
include_directories(${Boost_INCLUDE_DIRS}) include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS}) link_directories(${Boost_LIBRARY_DIRS})

@ -1,2 +0,0 @@
#include "book_client.h"

@ -1,6 +0,0 @@
#ifndef BOOK_CLIENT_H_
#define BOOK_CLIENT_H_
#include <string>
#endif // BOOK_CLIENT_H_

@ -1,5 +1,4 @@
#include <iostream> #include <iostream>
#include "boost/lexical_cast.hpp"
#include "webcc/http_client.h" #include "webcc/http_client.h"
#include "webcc/http_request.h" #include "webcc/http_request.h"

@ -1,7 +1,10 @@
#include "book_services.h" #include "book_services.h"
#include <list> #include <list>
#include "boost/lexical_cast.hpp" #include "boost/lexical_cast.hpp"
#include "boost/thread.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -21,6 +24,7 @@ static const Book kNullBook{};
class BookStore { class BookStore {
public: public:
BookStore() { BookStore() {
// Prepare test data.
books_.push_back({ "1", "Title1", 11.1 }); books_.push_back({ "1", "Title1", 11.1 });
books_.push_back({ "2", "Title2", 22.2 }); books_.push_back({ "2", "Title2", 22.2 });
books_.push_back({ "3", "Title3", 33.3 }); books_.push_back({ "3", "Title3", 33.3 });
@ -77,8 +81,8 @@ static BookStore g_book_store;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Naively create JSON object string for a book. // Naively create JSON object for a book.
// You should use real JSON library in your product code. // You should use real JSON library in real product.
static std::string CreateBookJson(const Book& book) { static std::string CreateBookJson(const Book& book) {
std::string json = "{ "; std::string json = "{ ";
json += "\"id\": " + book.id + ", "; json += "\"id\": " + book.id + ", ";
@ -88,17 +92,36 @@ static std::string CreateBookJson(const Book& book) {
return json; return json;
} }
// Naively create JSON array object for a list of books.
// You should use real JSON library in real product.
static std::string CreateBookListJson(const std::list<Book>& books) {
std::string json = "[ ";
for (const Book& book : books) {
json += CreateBookJson(book);
json += ",";
}
// Remove last ','.
if (!books.empty()) {
json[json.size() - 1] = ' ';
}
json += "]";
return json;
}
bool BookListService::Handle(const std::string& http_method, bool BookListService::Handle(const std::string& http_method,
const std::vector<std::string>& url_sub_matches, const std::vector<std::string>& url_sub_matches,
const std::string& request_content, const std::string& request_content,
std::string* response_content) { std::string* response_content) {
if (http_method == webcc::kHttpGet) { if (http_method == webcc::kHttpGet) {
*response_content = "{ "; *response_content = CreateBookListJson(g_book_store.books());
for (const Book& book : g_book_store.books()) {
*response_content += CreateBookJson(book); // Sleep for testing timeout control.
*response_content += ","; boost::this_thread::sleep(boost::posix_time::seconds(2));
}
*response_content += " }";
return true; return true;
} }
@ -126,6 +149,9 @@ bool BookDetailService::Handle(const std::string& http_method,
*response_content = CreateBookJson(book); *response_content = CreateBookJson(book);
// Sleep for testing timeout control.
//boost::this_thread::sleep(boost::posix_time::seconds(2));
return true; return true;
} else if (http_method == webcc::kHttpPost) { } else if (http_method == webcc::kHttpPost) {

@ -27,6 +27,11 @@ int main(int argc, char* argv[]) {
server.RegisterService(std::make_shared<BookDetailService>(), server.RegisterService(std::make_shared<BookDetailService>(),
"/books/(\\d+)"); "/books/(\\d+)");
// For test purpose.
// Timeout like 60s makes more sense in a real product.
// Leave it as default (0) for no timeout control.
server.set_timeout_seconds(1);
server.Run(); server.Run();
} catch (std::exception& e) { } catch (std::exception& e) {

@ -16,7 +16,8 @@ namespace webcc {
HttpServer::HttpServer(unsigned short port, std::size_t workers) HttpServer::HttpServer(unsigned short port, std::size_t workers)
: signals_(io_context_) : signals_(io_context_)
, workers_(workers) { , workers_(workers)
, timeout_seconds_(0) {
// Register to handle the signals that indicate when the server should exit. // Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program, // It is safe to register for the same signal multiple times in a program,
@ -47,7 +48,7 @@ HttpServer::~HttpServer() {
} }
void HttpServer::Run() { void HttpServer::Run() {
assert(request_handler_ != NULL); assert(GetRequestHandler() != NULL);
#if WEBCC_DEBUG_OUTPUT #if WEBCC_DEBUG_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id(); boost::thread::id thread_id = boost::this_thread::get_id();
@ -55,7 +56,7 @@ void HttpServer::Run() {
#endif #endif
// Start worker threads. // Start worker threads.
request_handler_->Start(workers_); GetRequestHandler()->Start(workers_);
// The io_context::run() call will block until all asynchronous operations // The io_context::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one // have finished. While the server is running, there is always at least one
@ -74,10 +75,10 @@ void HttpServer::DoAccept() {
} }
if (!ec) { if (!ec) {
HttpSessionPtr conn{ HttpSessionPtr session{
new HttpSession(std::move(socket), request_handler_) new HttpSession(std::move(socket), GetRequestHandler())
}; };
conn->Start(); session->Start(timeout_seconds_);
} }
DoAccept(); DoAccept();
@ -91,7 +92,7 @@ void HttpServer::DoAwaitStop() {
// operations. Once all operations have finished the io_context::run() // operations. Once all operations have finished the io_context::run()
// call will exit. // call will exit.
acceptor_->close(); acceptor_->close();
request_handler_->Stop(); GetRequestHandler()->Stop();
}); });
} }

@ -28,6 +28,10 @@ public:
virtual ~HttpServer(); virtual ~HttpServer();
void set_timeout_seconds(long seconds) {
timeout_seconds_ = seconds;
}
// Run the server's io_service loop. // Run the server's io_service loop.
void Run(); void Run();
@ -38,10 +42,8 @@ private:
// Wait for a request to stop the server. // Wait for a request to stop the server.
void DoAwaitStop(); void DoAwaitStop();
protected: // Get the handler for incoming requests.
// The handler for all incoming requests. virtual HttpRequestHandler* GetRequestHandler() = 0;
// TODO: Replace with virtual GetRequestHandler()?
HttpRequestHandler* request_handler_;
private: private:
// The number of worker threads. // The number of worker threads.
@ -55,6 +57,10 @@ private:
// Acceptor used to listen for incoming connections. // Acceptor used to listen for incoming connections.
boost::scoped_ptr<boost::asio::ip::tcp::acceptor> acceptor_; boost::scoped_ptr<boost::asio::ip::tcp::acceptor> acceptor_;
// Timeout in seconds for socket connection.
// Default is 0 which means no timeout.
long timeout_seconds_;
}; };
} // namespace webcc } // namespace webcc

@ -6,6 +6,7 @@
#endif #endif
#include "boost/asio/write.hpp" #include "boost/asio/write.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "webcc/http_request_handler.h" #include "webcc/http_request_handler.h"
@ -18,12 +19,30 @@ HttpSession::HttpSession(boost::asio::ip::tcp::socket socket,
, request_parser_(&request_) { , request_parser_(&request_) {
} }
void HttpSession::Start() { HttpSession::~HttpSession() {
}
void HttpSession::Start(long timeout_seconds) {
if (timeout_seconds > 0) {
// Create timer only necessary.
boost::asio::io_context& ioc = socket_.get_executor().context();
timer_.reset(new boost::asio::deadline_timer(ioc));
timer_->expires_from_now(boost::posix_time::seconds(timeout_seconds));
timer_->async_wait(std::bind(&HttpSession::HandleTimer,
shared_from_this(),
std::placeholders::_1));
}
DoRead(); DoRead();
} }
void HttpSession::Stop() { void HttpSession::Stop() {
socket_.close(); CancelTimer();
boost::system::error_code ignored_ec;
socket_.close(ignored_ec);
} }
void HttpSession::SetResponseContent(const std::string& content_type, void HttpSession::SetResponseContent(const std::string& content_type,
@ -59,6 +78,8 @@ void HttpSession::HandleRead(boost::system::error_code ec,
if (ec) { if (ec) {
if (ec != boost::asio::error::operation_aborted) { if (ec != boost::asio::error::operation_aborted) {
Stop(); Stop();
} else {
CancelTimer();
} }
return; return;
} }
@ -89,20 +110,58 @@ void HttpSession::HandleRead(boost::system::error_code ec,
// io_context.run), even though DoWrite() is invoked by worker threads. This is // io_context.run), even though DoWrite() is invoked by worker threads. This is
// ensured by Asio. // ensured by Asio.
void HttpSession::HandleWrite(boost::system::error_code ec, void HttpSession::HandleWrite(boost::system::error_code ec,
size_t length) { std::size_t length) {
#if WEBCC_DEBUG_OUTPUT #if WEBCC_DEBUG_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id(); boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Response has been sent back (thread: " << thread_id << ")\n";
#endif #endif
if (!ec) { if (!ec) {
CancelTimer();
// Initiate graceful connection closure. // Initiate graceful connection closure.
boost::system::error_code ignored_ec; boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
#if WEBCC_DEBUG_OUTPUT
std::cout << "Response has been sent back (thread: " << thread_id << ")\n";
#endif
} else {
#if WEBCC_DEBUG_OUTPUT
std::cout << "(thread: " << thread_id << ") Sending response error: "
<< ec.message()
<< std::endl;
#endif
if (ec == boost::asio::error::operation_aborted) {
CancelTimer();
} else {
Stop();
}
} }
}
void HttpSession::HandleTimer(boost::system::error_code ec) {
std::cout << "HandleTimer: ";
if (!ec) {
if (socket_.is_open()) {
std::cout << "socket is open, close it.\n";
socket_.close();
} else {
std::cout << "socket is not open.\n";
}
} else {
if (ec == boost::asio::error::operation_aborted) {
std::cout << "Timer aborted\n";
}
}
}
if (ec != boost::asio::error::operation_aborted) { void HttpSession::CancelTimer() {
Stop(); if (timer_) {
// The wait handler will be invoked with the operation_aborted error code.
boost::system::error_code ec;
timer_->cancel(ec);
} }
} }

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include "boost/asio/ip/tcp.hpp" // for ip::tcp::socket #include "boost/asio/ip/tcp.hpp" // for ip::tcp::socket
#include "boost/asio/deadline_timer.hpp"
#include "webcc/common.h" #include "webcc/common.h"
#include "webcc/http_request.h" #include "webcc/http_request.h"
@ -17,19 +18,20 @@ class HttpRequestHandler;
class HttpSession : public std::enable_shared_from_this<HttpSession> { class HttpSession : public std::enable_shared_from_this<HttpSession> {
public: public:
friend class HttpRequestHandler;
HttpSession(const HttpSession&) = delete; HttpSession(const HttpSession&) = delete;
HttpSession& operator=(const HttpSession&) = delete; HttpSession& operator=(const HttpSession&) = delete;
HttpSession(boost::asio::ip::tcp::socket socket, HttpSession(boost::asio::ip::tcp::socket socket,
HttpRequestHandler* handler); HttpRequestHandler* handler);
~HttpSession();
const HttpRequest& request() const { const HttpRequest& request() const {
return request_; return request_;
} }
void Start(); // Start the session with an optional timeout.
void Start(long timeout_seconds = 0);
void Stop(); void Stop();
@ -49,16 +51,20 @@ private:
void DoWrite(); void DoWrite();
void HandleRead(boost::system::error_code ec, void HandleRead(boost::system::error_code ec, std::size_t length);
std::size_t length); void HandleWrite(boost::system::error_code ec, std::size_t length);
void HandleWrite(boost::system::error_code ec, void HandleTimer(boost::system::error_code ec);
std::size_t length);
void CancelTimer();
private: private:
// Socket for the connection. // Socket for the connection.
boost::asio::ip::tcp::socket socket_; boost::asio::ip::tcp::socket socket_;
// Timeout timer (optional).
std::unique_ptr<boost::asio::deadline_timer> timer_;
// The handler used to process the incoming request. // The handler used to process the incoming request.
HttpRequestHandler* request_handler_; HttpRequestHandler* request_handler_;

@ -104,18 +104,16 @@ HttpStatus::Enum RestRequestHandler::HandleSession(HttpSessionPtr session) {
RestServer::RestServer(unsigned short port, std::size_t workers) RestServer::RestServer(unsigned short port, std::size_t workers)
: HttpServer(port, workers) : HttpServer(port, workers)
, rest_request_handler_(new RestRequestHandler()) { , request_handler_(new RestRequestHandler()) {
request_handler_ = rest_request_handler_;
} }
RestServer::~RestServer() { RestServer::~RestServer() {
request_handler_ = NULL; delete request_handler_;
delete rest_request_handler_;
} }
bool RestServer::RegisterService(RestServicePtr service, bool RestServer::RegisterService(RestServicePtr service,
const std::string& url) { const std::string& url) {
return rest_request_handler_->RegisterService(service, url); return request_handler_->RegisterService(service, url);
} }
} // namespace webcc } // namespace webcc

@ -97,7 +97,12 @@ public:
bool RegisterService(RestServicePtr service, const std::string& url); bool RegisterService(RestServicePtr service, const std::string& url);
private: private:
RestRequestHandler* rest_request_handler_; HttpRequestHandler* GetRequestHandler() override {
return request_handler_;
}
private:
RestRequestHandler* request_handler_;
}; };
} // namespace webcc } // namespace webcc

@ -72,18 +72,16 @@ SoapServicePtr SoapRequestHandler::GetServiceByUrl(const std::string& url) {
SoapServer::SoapServer(unsigned short port, std::size_t workers) SoapServer::SoapServer(unsigned short port, std::size_t workers)
: HttpServer(port, workers) : HttpServer(port, workers)
, soap_request_handler_(new SoapRequestHandler()) { , request_handler_(new SoapRequestHandler()) {
request_handler_ = soap_request_handler_;
} }
SoapServer::~SoapServer() { SoapServer::~SoapServer() {
request_handler_ = NULL; delete request_handler_;
delete soap_request_handler_;
} }
bool SoapServer::RegisterService(SoapServicePtr service, bool SoapServer::RegisterService(SoapServicePtr service,
const std::string& url) { const std::string& url) {
return soap_request_handler_->RegisterService(service, url); return request_handler_->RegisterService(service, url);
} }
} // namespace webcc } // namespace webcc

@ -44,7 +44,12 @@ public:
bool RegisterService(SoapServicePtr service, const std::string& url); bool RegisterService(SoapServicePtr service, const std::string& url);
private: private:
SoapRequestHandler* soap_request_handler_; HttpRequestHandler* GetRequestHandler() override {
return request_handler_;
}
private:
SoapRequestHandler* request_handler_;
}; };
} // namespace webcc } // namespace webcc

Loading…
Cancel
Save