Allow to stop server and restart server; allow to run loop in multiple threads.

master
Chunting Gu 6 years ago
parent 0f0f6fdf8e
commit 673a98cffb

@ -61,3 +61,6 @@ target_link_libraries(file_upload_server ${EXAMPLE_LIBS})
add_executable(static_server static_server.cc) add_executable(static_server static_server.cc)
target_link_libraries(static_server ${EXAMPLE_LIBS}) target_link_libraries(static_server ${EXAMPLE_LIBS})
add_executable(server_states server_states.cc)
target_link_libraries(server_states ${EXAMPLE_LIBS})

@ -53,7 +53,7 @@ int main(int argc, char* argv[]) {
server.Route("/upload", std::make_shared<FileUploadView>(), { "POST" }); server.Route("/upload", std::make_shared<FileUploadView>(), { "POST" });
server.Start(); server.Run();
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << e.what() << std::endl; std::cerr << e.what() << std::endl;

@ -21,7 +21,7 @@ int main() {
server.Route("/", std::make_shared<HelloView>()); server.Route("/", std::make_shared<HelloView>());
server.Start(); server.Run();
} catch (const std::exception&) { } catch (const std::exception&) {
return 1; return 1;

@ -19,152 +19,154 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class BookClientBase { class BookClient {
public: public:
BookClientBase(webcc::ClientSession& session, const std::string& url) explicit BookClient(const std::string& url, int timeout = 0);
: session_(session), url_(url) {
}
virtual ~BookClientBase() = default; ~BookClient() = default;
protected: bool ListBooks(std::list<Book>* books);
// Check HTTP response status.
bool CheckStatus(webcc::ResponsePtr response, int expected_status) {
int status = response->status();
if (status != expected_status) {
LOG_ERRO("HTTP status error (actual: %d, expected: %d).",
status, expected_status);
return false;
}
return true;
}
protected: bool CreateBook(const std::string& title, double price, std::string* id);
std::string url_;
webcc::ClientSession& session_; bool GetBook(const std::string& id, Book* book);
};
// ----------------------------------------------------------------------------- bool UpdateBook(const std::string& id, const std::string& title,
double price);
class BookListClient : public BookClientBase {
public:
BookListClient(webcc::ClientSession& session, const std::string& url)
: BookClientBase(session, url) {
}
bool ListBooks(std::list<Book>* books) { bool DeleteBook(const std::string& id);
try {
auto r = session_.Get(url_ + "/books");
if (!CheckStatus(r, webcc::Status::kOK)) { private:
// Response HTTP status error. // Check HTTP response status.
return false; bool CheckStatus(webcc::ResponsePtr response, int expected_status);
}
Json::Value rsp_json = StringToJson(r->data()); private:
std::string url_;
webcc::ClientSession session_;
};
if (!rsp_json.isArray()) { // -----------------------------------------------------------------------------
return false; // Should be a JSON array of books.
}
for (Json::ArrayIndex i = 0; i < rsp_json.size(); ++i) { BookClient::BookClient(const std::string& url, int timeout)
books->push_back(JsonToBook(rsp_json[i])); : url_(url), session_(timeout) {
} // If the request has body, default to this content type.
// Optional.
session_.set_media_type("application/json");
session_.set_charset("utf-8");
}
return true; bool BookClient::ListBooks(std::list<Book>* books) {
try {
auto r = session_.Get(url_ + "/books");
} catch (const webcc::Error& error) { if (!CheckStatus(r, webcc::Status::kOK)) {
std::cerr << error << std::endl; // Response HTTP status error.
return false; return false;
} }
}
bool CreateBook(const std::string& title, double price, std::string* id) { Json::Value rsp_json = StringToJson(r->data());
Json::Value req_json;
req_json["title"] = title; if (!rsp_json.isArray()) {
req_json["price"] = price; return false; // Should be a JSON array of books.
}
try { for (Json::ArrayIndex i = 0; i < rsp_json.size(); ++i) {
auto r = session_.Post(url_ + "/books", JsonToString(req_json), true); books->push_back(JsonToBook(rsp_json[i]));
}
if (!CheckStatus(r, webcc::Status::kCreated)) { return true;
return false;
}
Json::Value rsp_json = StringToJson(r->data()); } catch (const webcc::Error& error) {
*id = rsp_json["id"].asString(); std::cerr << error << std::endl;
return false;
}
}
bool BookClient::CreateBook(const std::string& title, double price,
std::string* id) {
Json::Value req_json;
req_json["title"] = title;
req_json["price"] = price;
return !id->empty(); try {
auto r = session_.Post(url_ + "/books", JsonToString(req_json), true);
} catch (const webcc::Error& error) { if (!CheckStatus(r, webcc::Status::kCreated)) {
std::cerr << error << std::endl;
return false; return false;
} }
}
};
// ----------------------------------------------------------------------------- Json::Value rsp_json = StringToJson(r->data());
*id = rsp_json["id"].asString();
class BookDetailClient : public BookClientBase { return !id->empty();
public:
BookDetailClient(webcc::ClientSession& session, const std::string& url)
: BookClientBase(session, url) {
}
bool GetBook(const std::string& id, Book* book) { } catch (const webcc::Error& error) {
try { std::cerr << error << std::endl;
auto r = session_.Get(url_ + "/books/" + id); return false;
}
if (!CheckStatus(r, webcc::Status::kOK)) { }
return false;
}
return JsonStringToBook(r->data(), book); bool BookClient::GetBook(const std::string& id, Book* book) {
try {
auto r = session_.Get(url_ + "/books/" + id);
} catch (const webcc::Error& error) { if (!CheckStatus(r, webcc::Status::kOK)) {
std::cerr << error << std::endl;
return false; return false;
} }
}
bool UpdateBook(const std::string& id, const std::string& title, return JsonStringToBook(r->data(), book);
double price) {
Json::Value json;
json["title"] = title;
json["price"] = price;
try { } catch (const webcc::Error& error) {
auto r = session_.Put(url_ + "/books/" + id, JsonToString(json), true); std::cerr << error << std::endl;
return false;
}
}
if (!CheckStatus(r, webcc::Status::kOK)) { bool BookClient::UpdateBook(const std::string& id, const std::string& title,
return false; double price) {
} Json::Value json;
json["title"] = title;
json["price"] = price;
return true; try {
auto r = session_.Put(url_ + "/books/" + id, JsonToString(json), true);
} catch (const webcc::Error& error) { if (!CheckStatus(r, webcc::Status::kOK)) {
std::cerr << error << std::endl;
return false; return false;
} }
}
bool DeleteBook(const std::string& id) { return true;
try {
auto r = session_.Delete(url_ + "/books/" + id);
if (!CheckStatus(r, webcc::Status::kOK)) { } catch (const webcc::Error& error) {
return false; std::cerr << error << std::endl;
} return false;
}
}
return true; bool BookClient::DeleteBook(const std::string& id) {
try {
auto r = session_.Delete(url_ + "/books/" + id);
} catch (const webcc::Error& error) { if (!CheckStatus(r, webcc::Status::kOK)) {
std::cerr << error << std::endl;
return false; return false;
} }
return true;
} catch (const webcc::Error& error) {
std::cerr << error << std::endl;
return false;
} }
}; }
bool BookClient::CheckStatus(webcc::ResponsePtr response, int expected_status) {
if (response->status() != expected_status) {
LOG_ERRO("HTTP status error (actual: %d, expected: %d).",
response->status(), expected_status);
return false;
}
return true;
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -208,30 +210,19 @@ int main(int argc, char* argv[]) {
WEBCC_LOG_INIT("", webcc::LOG_CONSOLE_FILE_OVERWRITE); WEBCC_LOG_INIT("", webcc::LOG_CONSOLE_FILE_OVERWRITE);
// Share the same session. BookClient client(url, timeout);
webcc::ClientSession session;
session.set_timeout(timeout);
// If the request has body, default to this content type.
// Optional.
session.set_media_type("application/json");
session.set_charset("utf-8");
BookListClient list_client(session, url);
BookDetailClient detail_client(session, url);
PrintSeparator(); PrintSeparator();
std::list<Book> books; std::list<Book> books;
if (list_client.ListBooks(&books)) { if (client.ListBooks(&books)) {
PrintBookList(books); PrintBookList(books);
} }
PrintSeparator(); PrintSeparator();
std::string id; std::string id;
if (list_client.CreateBook("1984", 12.3, &id)) { if (client.CreateBook("1984", 12.3, &id)) {
std::cout << "Book ID: " << id << std::endl; std::cout << "Book ID: " << id << std::endl;
} else { } else {
id = "1"; id = "1";
@ -241,35 +232,35 @@ int main(int argc, char* argv[]) {
PrintSeparator(); PrintSeparator();
books.clear(); books.clear();
if (list_client.ListBooks(&books)) { if (client.ListBooks(&books)) {
PrintBookList(books); PrintBookList(books);
} }
PrintSeparator(); PrintSeparator();
Book book; Book book;
if (detail_client.GetBook(id, &book)) { if (client.GetBook(id, &book)) {
PrintBook(book); PrintBook(book);
} }
PrintSeparator(); PrintSeparator();
detail_client.UpdateBook(id, "1Q84", 32.1); client.UpdateBook(id, "1Q84", 32.1);
PrintSeparator(); PrintSeparator();
if (detail_client.GetBook(id, &book)) { if (client.GetBook(id, &book)) {
PrintBook(book); PrintBook(book);
} }
PrintSeparator(); PrintSeparator();
detail_client.DeleteBook(id); client.DeleteBook(id);
PrintSeparator(); PrintSeparator();
books.clear(); books.clear();
if (list_client.ListBooks(&books)) { if (client.ListBooks(&books)) {
PrintBookList(books); PrintBookList(books);
} }

@ -224,8 +224,6 @@ int main(int argc, char* argv[]) {
sleep_seconds = std::atoi(argv[2]); sleep_seconds = std::atoi(argv[2]);
} }
std::size_t workers = 2;
try { try {
webcc::Server server(port); webcc::Server server(port);
@ -237,7 +235,7 @@ int main(int argc, char* argv[]) {
std::make_shared<BookDetailView>(sleep_seconds), std::make_shared<BookDetailView>(sleep_seconds),
{ "GET", "PUT", "DELETE" }); { "GET", "PUT", "DELETE" });
server.Start(workers); server.Run(2);
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << e.what() << std::endl; std::cerr << e.what() << std::endl;

@ -0,0 +1,50 @@
#include "webcc/logger.h"
#include "webcc/response_builder.h"
#include "webcc/server.h"
class HelloView : public webcc::View {
public:
webcc::ResponsePtr Handle(webcc::RequestPtr request) override {
if (request->method() == "GET") {
return webcc::ResponseBuilder{}.OK().Body("Hello, World!")();
}
return {};
}
};
int main() {
WEBCC_LOG_INIT("", webcc::LOG_CONSOLE);
try {
webcc::Server server(8080);
server.Route("/", std::make_shared<HelloView>());
// Run the server in a separate thread.
std::thread t([&server]() { server.Run(); });
// Let the server run for several seconds.
std::this_thread::sleep_for(std::chrono::seconds(3));
// Stop the server.
server.Stop();
// Wait for the server to finish.
t.join();
// Run the server again.
std::thread t2([&server]() { server.Run(); });
// Wait for the server to finish.
t2.join();
} catch (const std::exception&) {
// NOTE:
// Catch std::exception instead of webcc::Error.
// webcc::Error is for client only.
return 1;
}
return 0;
}

@ -28,7 +28,7 @@ int main(int argc, char* argv[]) {
try { try {
webcc::Server server(port, doc_root); webcc::Server server(port, doc_root);
server.Start(); server.Run();
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << e.what() << std::endl; std::cerr << e.what() << std::endl;

@ -6,18 +6,15 @@
#include "webcc/connection_pool.h" #include "webcc/connection_pool.h"
#include "webcc/logger.h" #include "webcc/logger.h"
#include "webcc/server.h"
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
namespace webcc { namespace webcc {
Connection::Connection(tcp::socket socket, ConnectionPool* pool, Connection::Connection(tcp::socket socket, ConnectionPool* pool,
Server* server) Queue<ConnectionPtr>* queue)
: socket_(std::move(socket)), : socket_(std::move(socket)), pool_(pool), queue_(queue),
pool_(pool), buffer_(kBufferSize) {
buffer_(kBufferSize),
server_(server) {
} }
void Connection::Start() { void Connection::Start() {
@ -122,9 +119,9 @@ void Connection::OnRead(boost::system::error_code ec, std::size_t length) {
LOG_VERB("HTTP request:\n%s", request_->Dump().c_str()); LOG_VERB("HTTP request:\n%s", request_->Dump().c_str());
// Enqueue this connection. // Enqueue this connection once the request has been read.
// Some worker thread will handle it later. // Some worker thread will handle the request later.
server_->Enqueue(shared_from_this()); queue_->Push(shared_from_this());
} }
void Connection::DoWrite() { void Connection::DoWrite() {

@ -8,19 +8,23 @@
#include "boost/asio/ip/tcp.hpp" #include "boost/asio/ip/tcp.hpp"
#include "webcc/globals.h" #include "webcc/globals.h"
#include "webcc/queue.h"
#include "webcc/request.h" #include "webcc/request.h"
#include "webcc/request_parser.h" #include "webcc/request_parser.h"
#include "webcc/response.h" #include "webcc/response.h"
namespace webcc { namespace webcc {
class Connection;
class ConnectionPool; class ConnectionPool;
class Server; class Server;
using ConnectionPtr = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> { class Connection : public std::enable_shared_from_this<Connection> {
public: public:
Connection(boost::asio::ip::tcp::socket socket, ConnectionPool* pool, Connection(boost::asio::ip::tcp::socket socket, ConnectionPool* pool,
Server* server); Queue<ConnectionPtr>* queue);
~Connection() = default; ~Connection() = default;
@ -61,15 +65,15 @@ private:
// The socket for the connection. // The socket for the connection.
boost::asio::ip::tcp::socket socket_; boost::asio::ip::tcp::socket socket_;
// The pool for this connection. // The connection pool.
ConnectionPool* pool_; ConnectionPool* pool_;
// The connection queue.
Queue<ConnectionPtr>* queue_;
// The buffer for incoming data. // The buffer for incoming data.
std::vector<char> buffer_; std::vector<char> buffer_;
// The server.
Server* server_;
// The incoming request. // The incoming request.
RequestPtr request_; RequestPtr request_;
@ -80,8 +84,6 @@ private:
ResponsePtr response_; ResponsePtr response_;
}; };
using ConnectionPtr = std::shared_ptr<Connection>;
} // namespace webcc } // namespace webcc
#endif // WEBCC_CONNECTION_H_ #endif // WEBCC_CONNECTION_H_

@ -6,17 +6,32 @@ namespace webcc {
void ConnectionPool::Start(ConnectionPtr c) { void ConnectionPool::Start(ConnectionPtr c) {
LOG_VERB("Starting connection..."); LOG_VERB("Starting connection...");
connections_.insert(c);
{
// Lock the container only.
std::lock_guard<std::mutex> lock(mutex_);
connections_.insert(c);
}
c->Start(); c->Start();
} }
void ConnectionPool::Close(ConnectionPtr c) { void ConnectionPool::Close(ConnectionPtr c) {
LOG_VERB("Closing connection..."); LOG_VERB("Closing connection...");
connections_.erase(c);
{
// Lock the container only.
std::lock_guard<std::mutex> lock(mutex_);
connections_.erase(c);
}
c->Close(); c->Close();
} }
void ConnectionPool::CloseAll() { void ConnectionPool::Clear() {
// Lock all since we are going to stop anyway.
std::lock_guard<std::mutex> lock(mutex_);
if (!connections_.empty()) { if (!connections_.empty()) {
LOG_VERB("Closing all (%u) connections...", connections_.size()); LOG_VERB("Closing all (%u) connections...", connections_.size());
for (auto& c : connections_) { for (auto& c : connections_) {

@ -1,6 +1,7 @@
#ifndef WEBCC_CONNECTION_POOL_H_ #ifndef WEBCC_CONNECTION_POOL_H_
#define WEBCC_CONNECTION_POOL_H_ #define WEBCC_CONNECTION_POOL_H_
#include <mutex>
#include <set> #include <set>
#include "webcc/connection.h" #include "webcc/connection.h"
@ -14,17 +15,24 @@ public:
ConnectionPool(const ConnectionPool&) = delete; ConnectionPool(const ConnectionPool&) = delete;
ConnectionPool& operator=(const ConnectionPool&) = delete; ConnectionPool& operator=(const ConnectionPool&) = delete;
// Add a connection to the pool and start it. // Add the connection and start to read the request from it.
// Called when a new connection has just been accepted.
void Start(ConnectionPtr c); void Start(ConnectionPtr c);
// Close a connection. // Close the connection.
// Called when the response of the connection has been sent back.
void Close(ConnectionPtr c); void Close(ConnectionPtr c);
// Close all pending connections. // Close all pending connections.
void CloseAll(); // Called when the server is about to stop.
void Clear();
private: private:
std::set<ConnectionPtr> connections_; std::set<ConnectionPtr> connections_;
// Mutex is necessary if the loop is running in multiple threads.
// See Server::Run().
std::mutex mutex_;
}; };
} // namespace webcc } // namespace webcc

@ -178,10 +178,11 @@ enum class ContentEncoding {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Error (or exception) for the client. // Error or exception (for client only).
class Error { class Error : public std::exception {
public: public:
enum Code { enum Code {
kUnknownError = -1,
kOK = 0, kOK = 0,
kSyntaxError, kSyntaxError,
kResolveError, kResolveError,
@ -193,11 +194,16 @@ class Error {
kDataError, kDataError,
}; };
public: public:
Error(Code code = kOK, const std::string& message = "") Error(Code code = kOK, const std::string& message = "")
: code_(code), message_(message), timeout_(false) { : code_(code), message_(message), timeout_(false) {
} }
// Note that `noexcept` is required by GCC.
const char* what() const WEBCC_NOEXCEPT override{
return message_.c_str();
}
Code code() const { Code code() const {
return code_; return code_;
} }
@ -223,7 +229,7 @@ class Error {
return code_ != kOK; return code_ != kOK;
} }
private: private:
Code code_; Code code_;
std::string message_; std::string message_;
bool timeout_; bool timeout_;

@ -22,9 +22,9 @@ RequestPtr RequestBuilder::operator()() {
} }
// If no Keep-Alive, explicitly set `Connection` to "Close". // If no Keep-Alive, explicitly set `Connection` to "Close".
if (!keep_alive_) { //if (!keep_alive_) {
request->SetHeader(headers::kConnection, "Close"); request->SetHeader(headers::kConnection, "Close");
} //}
if (body_) { if (body_) {
request->SetContentType(media_type_, charset_); request->SetContentType(media_type_, charset_);

@ -30,11 +30,34 @@ public:
// consistency and simplicity. // consistency and simplicity.
// Some shortcuts for different status codes: // Some shortcuts for different status codes:
ResponseBuilder& OK() { return Code(Status::kOK); }
ResponseBuilder& Created() { return Code(Status::kCreated); } ResponseBuilder& OK() {
ResponseBuilder& BadRequest() { return Code(Status::kBadRequest); } return Code(Status::kOK);
ResponseBuilder& NotFound() { return Code(Status::kNotFound); } }
ResponseBuilder& NotImplemented() { return Code(Status::kNotImplemented); }
ResponseBuilder& Created() {
return Code(Status::kCreated);
}
ResponseBuilder& BadRequest() {
return Code(Status::kBadRequest);
}
ResponseBuilder& NotFound() {
return Code(Status::kNotFound);
}
ResponseBuilder& InternalServerError() {
return Code(Status::kInternalServerError);
}
ResponseBuilder& NotImplemented() {
return Code(Status::kNotImplemented);
}
ResponseBuilder& ServiceUnavailable() {
return Code(Status::kServiceUnavailable);
}
ResponseBuilder& Code(Status code) { ResponseBuilder& Code(Status code) {
code_ = code; code_ = code;

@ -21,42 +21,9 @@ using tcp = boost::asio::ip::tcp;
namespace webcc { namespace webcc {
Server::Server(std::uint16_t port, const Path& doc_root) Server::Server(std::uint16_t port, const Path& doc_root)
: acceptor_(io_context_), signals_(io_context_), doc_root_(doc_root) { : port_(port), doc_root_(doc_root), running_(false),
RegisterSignals(); acceptor_(io_context_), signals_(io_context_) {
AddSignals();
boost::system::error_code ec;
tcp::endpoint endpoint(tcp::v4(), port);
// Open the acceptor.
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
LOG_ERRO("Acceptor open error (%s).", ec.message().c_str());
return;
}
// Set option SO_REUSEADDR on.
// When SO_REUSEADDR is set, multiple servers can listen on the same port.
// This is necessary for restarting the server on the same port.
// More details:
// - https://stackoverflow.com/a/3233022
// - http://www.andy-pearce.com/blog/posts/2013/Feb/so_reuseaddr-on-windows/
acceptor_.set_option(tcp::acceptor::reuse_address(true));
// Bind to the server address.
acceptor_.bind(endpoint, ec);
if (ec) {
LOG_ERRO("Acceptor bind error (%s).", ec.message().c_str());
return;
}
// Start listening for connections.
// After listen, the client is able to connect to the server even the server
// has not started to accept the connection yet.
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec) {
LOG_ERRO("Acceptor listen error (%s).", ec.message().c_str());
return;
}
} }
bool Server::Route(const std::string& url, ViewPtr view, bool Server::Route(const std::string& url, ViewPtr view,
@ -88,50 +55,73 @@ bool Server::Route(const UrlRegex& regex_url, ViewPtr view,
return true; return true;
} }
void Server::Start(std::size_t workers) { void Server::Run(std::size_t workers, std::size_t loops) {
assert(workers > 0); assert(workers > 0);
assert(worker_threads_.empty());
if (!acceptor_.is_open()) { {
LOG_ERRO("Server is NOT going to run."); std::lock_guard<std::mutex> lock(state_mutex_);
return;
}
LOG_INFO("Server is going to run..."); assert(worker_threads_.empty());
DoAwaitStop(); if (IsRunning()) {
LOG_WARN("Server is already running.");
return;
}
DoAccept(); running_ = true;
io_context_.restart();
// Create worker threads. if (!Listen(port_)) {
for (std::size_t i = 0; i < workers; ++i) { LOG_ERRO("Server is NOT going to run.");
worker_threads_.emplace_back(std::bind(&Server::WorkerRoutine, this)); return;
}
LOG_INFO("Server is going to run...");
AsyncWaitSignals();
AsyncAccept();
// Create worker threads.
for (std::size_t i = 0; i < workers; ++i) {
worker_threads_.emplace_back(std::bind(&Server::WorkerRoutine, this));
}
} }
// Run the loop. // Start the event loop.
// The io_context::run() call will block until all asynchronous operations // The io_context::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one // have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting // asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming connections. // for new incoming connections.
io_context_.run();
LOG_INFO("Loop is running in %u thread(s).", loops);
if (loops == 1) {
// Just run the loop in the current thread.
io_context_.run();
} else {
std::vector<std::thread> loop_threads;
for (std::size_t i = 0; i < loops; ++i) {
loop_threads.emplace_back(&boost::asio::io_context::run, &io_context_);
}
// Join the threads for blocking.
for (std::size_t i = 0; i < loops; ++i) {
loop_threads[i].join();
}
}
} }
void Server::Stop() { void Server::Stop() {
// Stop listener. std::lock_guard<std::mutex> lock(state_mutex_);
acceptor_.close();
// Stop worker threads.
StopWorkers();
// Close all pending connections. DoStop();
pool_.CloseAll();
} }
void Server::Enqueue(ConnectionPtr connection) { bool Server::IsRunning() const {
queue_.Push(connection); return running_ && !io_context_.stopped();
} }
void Server::RegisterSignals() { void Server::AddSignals() {
signals_.add(SIGINT); // Ctrl+C signals_.add(SIGINT); // Ctrl+C
signals_.add(SIGTERM); signals_.add(SIGTERM);
@ -140,7 +130,58 @@ void Server::RegisterSignals() {
#endif #endif
} }
void Server::DoAccept() { void Server::AsyncWaitSignals() {
signals_.async_wait(
[this](boost::system::error_code, int signo) {
// The server is stopped by canceling all outstanding asynchronous
// operations. Once all operations have finished the io_context::run()
// call will exit.
LOG_INFO("On signal %d, stopping the server...", signo);
DoStop();
});
}
bool Server::Listen(std::uint16_t port) {
boost::system::error_code ec;
tcp::endpoint endpoint(tcp::v4(), port);
// Open the acceptor.
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
LOG_ERRO("Acceptor open error (%s).", ec.message().c_str());
return false;
}
// Set option SO_REUSEADDR on.
// When SO_REUSEADDR is set, multiple servers can listen on the same port.
// This is necessary for restarting the server on the same port.
// More details:
// - https://stackoverflow.com/a/3233022
// - http://www.andy-pearce.com/blog/posts/2013/Feb/so_reuseaddr-on-windows/
acceptor_.set_option(tcp::acceptor::reuse_address(true));
// Bind to the server address.
acceptor_.bind(endpoint, ec);
if (ec) {
LOG_ERRO("Acceptor bind error (%s).", ec.message().c_str());
return false;
}
// Start listening for connections.
// After listen, the client is able to connect to the server even the server
// has not started to accept the connection yet.
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec) {
LOG_ERRO("Acceptor listen error (%s).", ec.message().c_str());
return false;
}
return true;
}
void Server::AsyncAccept() {
acceptor_.async_accept( acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) { [this](boost::system::error_code ec, tcp::socket socket) {
// Check whether the server was stopped by a signal before this // Check whether the server was stopped by a signal before this
@ -153,25 +194,33 @@ void Server::DoAccept() {
LOG_INFO("Accepted a connection."); LOG_INFO("Accepted a connection.");
auto connection = std::make_shared<Connection>( auto connection = std::make_shared<Connection>(
std::move(socket), &pool_, this); std::move(socket), &pool_, &queue_);
pool_.Start(connection); pool_.Start(connection);
} }
DoAccept(); AsyncAccept();
}); });
} }
void Server::DoAwaitStop() { void Server::DoStop() {
signals_.async_wait( // Stop accepting new connections.
[this](boost::system::error_code, int signo) { acceptor_.close();
// The server is stopped by canceling all outstanding asynchronous
// operations. Once all operations have finished the io_context::run()
// call will exit.
LOG_INFO("On signal %d, stopping the server...", signo);
Stop(); // Stop worker threads.
}); // This might take some time if the threads are still processing.
StopWorkers();
// Close all pending connections.
pool_.Clear();
// Finally, stop the event processing loop.
// This function does not block, but instead simply signals the io_context to
// stop. All invocations of its run() or run_one() member functions should
// return as soon as possible.
io_context_.stop();
running_ = false;
} }
void Server::WorkerRoutine() { void Server::WorkerRoutine() {
@ -186,7 +235,7 @@ void Server::WorkerRoutine() {
// For stopping next worker. // For stopping next worker.
queue_.Push(ConnectionPtr()); queue_.Push(ConnectionPtr());
// Stop the worker. // Stop this worker.
break; break;
} }
@ -198,19 +247,27 @@ void Server::StopWorkers() {
LOG_INFO("Stopping workers..."); LOG_INFO("Stopping workers...");
// Clear pending connections. // Clear pending connections.
// The connections will be closed later (see Server::DoAwaitStop). // The connections will be closed later.
LOG_INFO("Clear pending connections..."); LOG_INFO("Clear pending connections...");
queue_.Clear(); queue_.Clear();
// Enqueue a null connection to trigger the first worker to stop. // Enqueue a null connection to trigger the first worker to stop.
queue_.Push(ConnectionPtr()); queue_.Push(ConnectionPtr());
// Wait for worker threads to finish.
for (auto& t : worker_threads_) { for (auto& t : worker_threads_) {
if (t.joinable()) { if (t.joinable()) {
t.join(); t.join();
} }
} }
// Cleanup worker threads.
worker_threads_.clear();
// Clear the queue because it has a remaining null connection pushed by the
// last worker thread.
queue_.Clear();
LOG_INFO("All workers have been stopped."); LOG_INFO("All workers have been stopped.");
} }

@ -21,7 +21,7 @@ class Server {
public: public:
explicit Server(std::uint16_t port, const Path& doc_root = {}); explicit Server(std::uint16_t port, const Path& doc_root = {});
virtual ~Server() = default; ~Server() = default;
Server(const Server&) = delete; Server(const Server&) = delete;
Server& operator=(const Server&) = delete; Server& operator=(const Server&) = delete;
@ -37,24 +37,43 @@ public:
bool Route(const UrlRegex& regex_url, ViewPtr view, bool Route(const UrlRegex& regex_url, ViewPtr view,
const Strings& methods = { "GET" }); const Strings& methods = { "GET" });
// Start the server with a given number of worker threads. // Start and run the server.
void Start(std::size_t workers = 1); // This method is blocking so will not return until Stop() is called (from
// another thread) or a signal like SIGINT is caught.
// When the request of a connection has been read, the connection is put into
// a queue waiting for some worker thread to process. Normally, the more
// |workers| you have, the more concurrency you gain (the concurrency also
// depends on the number of CPU cores). The worker thread pops connections
// from the queue one by one, prepares the response by the user provided View,
// then sends it back to the client.
// Meanwhile, the (event) loop, i.e., io_context, is also running in a number
// (|loops|) of threads. Normally, one thread for the loop is good enough, but
// it could be more than that.
void Run(std::size_t workers = 1, std::size_t loops = 1);
// Stop the server. // Stop the server.
// This should be called from another thread since the Run() is blocking.
void Stop(); void Stop();
// Put the connection into the queue. // Is the server running?
void Enqueue(ConnectionPtr connection); bool IsRunning() const;
private: private:
// Register to handle the signals that indicate when the server should exit. // Register signals which indicate when the server should exit.
void RegisterSignals(); void AddSignals();
// Initiate an asynchronous accept operation. // Wait for a signal to stop the server.
void DoAccept(); void AsyncWaitSignals();
// Wait for a request to stop the server. // Listen on the given port.
void DoAwaitStop(); bool Listen(std::uint16_t port);
// Accept connections asynchronously.
void AsyncAccept();
// Stop acceptor and worker threads, close all pending connections, and
// finally stop the event loop.
void DoStop();
// Worker thread routine. // Worker thread routine.
void WorkerRoutine(); void WorkerRoutine();
@ -84,6 +103,18 @@ private:
Strings methods; Strings methods;
}; };
// Port number.
std::uint16_t port_;
// The directory with the static files to be served.
Path doc_root_;
// Is the server running?
bool running_;
// The mutex for guarding the state of the server.
std::mutex state_mutex_;
// The io_context used to perform asynchronous operations. // The io_context used to perform asynchronous operations.
boost::asio::io_context io_context_; boost::asio::io_context io_context_;
@ -99,9 +130,6 @@ private:
// Worker threads. // Worker threads.
std::vector<std::thread> worker_threads_; std::vector<std::thread> worker_threads_;
// The directory with the static files to be served.
Path doc_root_;
// The queue with connection waiting for the workers to process. // The queue with connection waiting for the workers to process.
Queue<ConnectionPtr> queue_; Queue<ConnectionPtr> queue_;

Loading…
Cancel
Save