Remove ConnectionManager, refine message queue.

master
Adam Gu 8 years ago
parent f41b625f51
commit dfadd85a69

@ -2,10 +2,10 @@ if(UNIX)
add_definitions(-D_GLIBCXX_USE_WCHAR_T -std=c++11) add_definitions(-D_GLIBCXX_USE_WCHAR_T -std=c++11)
endif() endif()
option(CSOAP_ENABLE_OUTPUT "Enable output for request & response?" OFF) option(CSOAP_DEBUG_OUTPUT "Enable debug output?" OFF)
if(CSOAP_ENABLE_OUTPUT) if(CSOAP_DEBUG_OUTPUT)
add_definitions(-DCSOAP_ENABLE_OUTPUT) add_definitions(-DCSOAP_DEBUG_OUTPUT)
endif() endif()
# Don't use any deprecated definitions (e.g., io_service). # Don't use any deprecated definitions (e.g., io_service).
@ -14,4 +14,3 @@ add_definitions(-DBOOST_ASIO_NO_DEPRECATED)
file(GLOB SRCS *.cc *.h) file(GLOB SRCS *.cc *.h)
add_library(csoap ${SRCS}) add_library(csoap ${SRCS})

@ -82,6 +82,7 @@ Parameter::Parameter(const std::string& key, int value)
Parameter::Parameter(const std::string& key, double value) Parameter::Parameter(const std::string& key, double value)
: key_(key) { : key_(key) {
// TODO
char buf[32]; char buf[32];
sprintf(buf, "%f", value); sprintf(buf, "%f", value);
value_ = buf; value_ = buf;

@ -1,22 +1,19 @@
#include "csoap/connection.h" #include "csoap/connection.h"
#include <vector> #include <vector>
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
#include <iostream> #include <iostream>
#endif #endif
#include "boost/asio/write.hpp" #include "boost/asio/write.hpp"
#include "csoap/connection_manager.h"
#include "csoap/http_request_handler.h" #include "csoap/http_request_handler.h"
namespace csoap { namespace csoap {
Connection::Connection(boost::asio::ip::tcp::socket socket, Connection::Connection(boost::asio::ip::tcp::socket socket,
ConnectionManager& manager,
HttpRequestHandler& handler) HttpRequestHandler& handler)
: socket_(std::move(socket)) : socket_(std::move(socket))
, connection_manager_(manager)
, request_handler_(handler) , request_handler_(handler)
, request_parser_(&request_) { , request_parser_(&request_) {
} }
@ -49,7 +46,7 @@ void Connection::HandleRead(boost::system::error_code ec,
std::size_t length) { std::size_t length) {
if (ec) { if (ec) {
if (ec != boost::asio::error::operation_aborted) { if (ec != boost::asio::error::operation_aborted) {
connection_manager_.Stop(shared_from_this()); Stop();
} }
return; return;
} }
@ -81,7 +78,7 @@ void Connection::HandleRead(boost::system::error_code ec,
// ensured by Asio. // ensured by Asio.
void Connection::HandleWrite(boost::system::error_code ec, void Connection::HandleWrite(boost::system::error_code ec,
size_t length) { size_t length) {
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id(); boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Response has been sent back (thread: " << thread_id << ")\n"; std::cout << "Response has been sent back (thread: " << thread_id << ")\n";
#endif #endif
@ -94,7 +91,7 @@ void Connection::HandleWrite(boost::system::error_code ec,
} }
if (ec != boost::asio::error::operation_aborted) { if (ec != boost::asio::error::operation_aborted) {
connection_manager_.Stop(shared_from_this()); Stop();
} }
} }

@ -13,7 +13,6 @@
namespace csoap { namespace csoap {
class ConnectionManager;
class HttpRequestHandler; class HttpRequestHandler;
// Represents a single connection from a client. // Represents a single connection from a client.
@ -26,7 +25,6 @@ public:
// Construct a connection with the given io_service. // Construct a connection with the given io_service.
Connection(boost::asio::ip::tcp::socket socket, Connection(boost::asio::ip::tcp::socket socket,
ConnectionManager& manager,
HttpRequestHandler& handler); HttpRequestHandler& handler);
void Start(); void Start();
@ -48,9 +46,6 @@ private:
// Socket for the connection. // Socket for the connection.
boost::asio::ip::tcp::socket socket_; boost::asio::ip::tcp::socket socket_;
// The manager for this connection.
ConnectionManager& connection_manager_;
// The handler used to process the incoming request. // The handler used to process the incoming request.
HttpRequestHandler& request_handler_; HttpRequestHandler& request_handler_;

@ -1,25 +0,0 @@
#include "csoap/connection_manager.h"
namespace csoap {
ConnectionManager::ConnectionManager() {
}
void ConnectionManager::Start(ConnectionPtr conn) {
connections_.insert(conn);
conn->Start();
}
void ConnectionManager::Stop(ConnectionPtr conn) {
connections_.erase(conn);
conn->Stop();
}
void ConnectionManager::StopAll() {
for (const ConnectionPtr& conn : connections_) {
conn->Stop();
}
connections_.clear();
}
} // namespace csoap

@ -1,35 +0,0 @@
#ifndef CSOAP_CONNECTION_MANAGER_H_
#define CSOAP_CONNECTION_MANAGER_H_
#include <set>
#include "csoap/connection.h"
namespace csoap {
// ConnectionManager manages open connections so that they may be cleanly
// stopped when the server needs to shut down.
class ConnectionManager {
public:
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
// Construct a connection manager.
ConnectionManager();
// Add the specified connection to the manager and start it.
void Start(ConnectionPtr conn);
// Stop the specified connection.
void Stop(ConnectionPtr conn);
// Stop all connections.
void StopAll();
private:
// The managed connections.
std::set<ConnectionPtr> connections_;
};
} // namespace csoap
#endif // CSOAP_CONNECTION_MANAGER_H_

@ -1,6 +1,6 @@
#include "csoap/http_client.h" #include "csoap/http_client.h"
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
#include <iostream> #include <iostream>
#endif #endif
@ -82,7 +82,7 @@ Error HttpClient::SendRequest(const HttpRequest& request,
// Send HTTP request. // Send HTTP request.
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
std::cout << "# REQUEST" << std::endl << request << std::endl; std::cout << "# REQUEST" << std::endl << request << std::endl;
#endif #endif
@ -92,7 +92,7 @@ Error HttpClient::SendRequest(const HttpRequest& request,
return kSocketWriteError; return kSocketWriteError;
} }
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
std::cout << "# RESPONSE" << std::endl; std::cout << "# RESPONSE" << std::endl;
#endif #endif
@ -114,7 +114,7 @@ Error HttpClient::SendRequest(const HttpRequest& request,
} }
} }
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
// NOTE: the content XML might not be well formated. // NOTE: the content XML might not be well formated.
std::cout.write(buffer_.data(), length); std::cout.write(buffer_.data(), length);
#endif #endif
@ -129,7 +129,7 @@ Error HttpClient::SendRequest(const HttpRequest& request,
} }
} }
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
std::cout << std::endl; std::cout << std::endl;
std::cout << "# RESPONSE (PARSED)" << std::endl; std::cout << "# RESPONSE (PARSED)" << std::endl;
std::cout << *response << std::endl; std::cout << *response << std::endl;

@ -1,7 +1,7 @@
#include "csoap/http_request_handler.h" #include "csoap/http_request_handler.h"
#include <sstream> #include <sstream>
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
#include <iostream> #include <iostream>
#endif #endif
@ -13,13 +13,6 @@
namespace csoap { namespace csoap {
HttpRequestHandler::HttpRequestHandler() {
// Create worker threads.
for (std::size_t i = 0; i < 2; ++i) {
workers_.create_thread(std::bind(&HttpRequestHandler::WorkerRoutine, this));
}
}
bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) { bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) {
assert(soap_service); assert(soap_service);
@ -32,65 +25,63 @@ bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) {
return true; return true;
} }
#if 0 void HttpRequestHandler::Enqueue(ConnectionPtr conn) {
void HttpRequestHandler::HandleRequest(const HttpRequest& http_request, queue_.Push(conn);
HttpResponse* http_response) { }
// Parse the SOAP request XML.
SoapRequest soap_request;
if (!soap_request.FromXml(http_request.content())) {
http_response->set_status(HttpStatus::BAD_REQUEST);
return;
}
SoapResponse soap_response;
// TODO: Get service by URL.
for (SoapServicePtr& service : soap_services_) { void HttpRequestHandler::Start(std::size_t count) {
service->Handle(soap_request, &soap_response); assert(count > 0 && workers_.size() == 0);
}
std::string content; for (std::size_t i = 0; i < count; ++i) {
soap_response.ToXml(&content); #if CSOAP_DEBUG_OUTPUT
boost::thread* worker =
http_response->set_status(HttpStatus::OK);
http_response->SetContentType(kTextXmlUtf8);
http_response->SetContentLength(content.length());
http_response->set_content(std::move(content));
}
#endif #endif
workers_.create_thread(std::bind(&HttpRequestHandler::WorkerRoutine, this));
void HttpRequestHandler::Enqueue(ConnectionPtr conn) { #if CSOAP_DEBUG_OUTPUT
queue_.Put(conn); std::cout << "Worker is running (thread: " << worker->get_id() << ")\n";
#endif
}
} }
void HttpRequestHandler::StopWorkers() { void HttpRequestHandler::Stop() {
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
std::cout << "Stopping workers...\n"; std::cout << "Stopping workers...\n";
#endif #endif
// Enqueue an null connection to trigger the first worker to stop. // Close pending connections.
queue_.Put(ConnectionPtr()); for (ConnectionPtr conn = queue_.Pop(); conn; conn = queue_.Pop()) {
#if CSOAP_DEBUG_OUTPUT
std::cout << "Closing pending connection...\n";
#endif
conn->Stop();
}
// Enqueue a null connection to trigger the first worker to stop.
queue_.Push(ConnectionPtr());
workers_.join_all(); workers_.join_all();
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
std::cout << "Workers have been stopped.\n"; std::cout << "All workers have been stopped.\n";
#endif #endif
} }
// TODO: How and when to exit?
void HttpRequestHandler::WorkerRoutine() { void HttpRequestHandler::WorkerRoutine() {
#if CSOAP_DEBUG_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id();
#endif
for (;;) { for (;;) {
ConnectionPtr conn = queue_.Get(); ConnectionPtr conn = queue_.PopOrWait();
if (!conn) { if (!conn) {
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Worker is going to stop (thread: " << thread_id << ")\n"; std::cout << "Worker is going to stop (thread: " << thread_id << ")\n";
#endif #endif
// For stopping next worker. // For stopping next worker.
queue_.Put(ConnectionPtr()); queue_.Push(ConnectionPtr());
// Stop the worker. // Stop the worker.
break; break;
} }
@ -99,7 +90,7 @@ void HttpRequestHandler::WorkerRoutine() {
SoapRequest soap_request; SoapRequest soap_request;
if (!soap_request.FromXml(conn->request_.content())) { if (!soap_request.FromXml(conn->request_.content())) {
conn->response_.set_status(HttpStatus::BAD_REQUEST); conn->response_.set_status(HttpStatus::BAD_REQUEST);
conn->DoWrite(); // TODO conn->DoWrite();
continue; continue;
} }

@ -21,21 +21,18 @@ public:
HttpRequestHandler(const HttpRequestHandler&) = delete; HttpRequestHandler(const HttpRequestHandler&) = delete;
HttpRequestHandler& operator=(const HttpRequestHandler&) = delete; HttpRequestHandler& operator=(const HttpRequestHandler&) = delete;
HttpRequestHandler(); HttpRequestHandler() = default;
bool RegisterService(SoapServicePtr soap_service); bool RegisterService(SoapServicePtr soap_service);
// Handle a request and produce a response.
#if 0
void HandleRequest(const HttpRequest& http_request,
HttpResponse* http_response);
#endif
// Put the connection into the queue. // Put the connection into the queue.
void Enqueue(ConnectionPtr conn); void Enqueue(ConnectionPtr conn);
// Stop all worker threads. // Start worker threads.
void StopWorkers(); void Start(std::size_t count);
// Close pending connections, stop worker threads.
void Stop();
private: private:
void WorkerRoutine(); void WorkerRoutine();

@ -2,7 +2,7 @@
#include <signal.h> #include <signal.h>
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
#include <iostream> #include <iostream>
#endif #endif
@ -13,9 +13,9 @@ using tcp = boost::asio::ip::tcp;
namespace csoap { namespace csoap {
HttpServer::HttpServer(unsigned short port) HttpServer::HttpServer(unsigned short port, std::size_t workers)
: io_context_(1) : signals_(io_context_)
, signals_(io_context_) { , workers_(workers) {
// Register to handle the signals that indicate when the server should exit. // Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program, // It is safe to register for the same signal multiple times in a program,
@ -47,11 +47,14 @@ bool HttpServer::RegisterService(SoapServicePtr soap_service) {
} }
void HttpServer::Run() { void HttpServer::Run() {
#if CSOAP_ENABLE_OUTPUT #if CSOAP_DEBUG_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id(); boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Server main thread: " << thread_id << std::endl; std::cout << "Server main thread: " << thread_id << std::endl;
#endif #endif
// Start worker threads.
request_handler_.Start(workers_);
// The io_context::run() call will block until all asynchronous operations // The io_context::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one // have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting // asynchronous operation outstanding: the asynchronous accept call waiting
@ -69,10 +72,10 @@ void HttpServer::DoAccept() {
} }
if (!ec) { if (!ec) {
connection_manager_.Start( ConnectionPtr conn{
std::make_shared<Connection>(std::move(socket), new Connection(std::move(socket), request_handler_)
connection_manager_, };
request_handler_)); conn->Start();
} }
DoAccept(); DoAccept();
@ -86,8 +89,7 @@ void HttpServer::DoAwaitStop() {
// operations. Once all operations have finished the io_context::run() // operations. Once all operations have finished the io_context::run()
// call will exit. // call will exit.
acceptor_->close(); acceptor_->close();
request_handler_.StopWorkers(); request_handler_.Stop();
connection_manager_.StopAll();
}); });
} }

@ -12,7 +12,6 @@
#include "boost/asio/ip/tcp.hpp" #include "boost/asio/ip/tcp.hpp"
#include "csoap/connection.h" #include "csoap/connection.h"
#include "csoap/connection_manager.h"
#include "csoap/http_request_handler.h" #include "csoap/http_request_handler.h"
namespace csoap { namespace csoap {
@ -24,7 +23,7 @@ public:
HttpServer(const HttpServer&) = delete; HttpServer(const HttpServer&) = delete;
HttpServer& operator=(const HttpServer&) = delete; HttpServer& operator=(const HttpServer&) = delete;
HttpServer(unsigned short port); HttpServer(unsigned short port, std::size_t workers);
bool RegisterService(SoapServicePtr soap_service); bool RegisterService(SoapServicePtr soap_service);
@ -39,6 +38,9 @@ private:
void DoAwaitStop(); void DoAwaitStop();
private: private:
// The number of worker threads.
std::size_t workers_;
// The io_context used to perform asynchronous operations. // The io_context used to perform asynchronous operations.
boost::asio::io_context io_context_; boost::asio::io_context io_context_;
@ -48,9 +50,6 @@ private:
// Acceptor used to listen for incoming connections. // Acceptor used to listen for incoming connections.
boost::scoped_ptr<boost::asio::ip::tcp::acceptor> acceptor_; boost::scoped_ptr<boost::asio::ip::tcp::acceptor> acceptor_;
// The connection manager which owns all live connections.
ConnectionManager connection_manager_;
// The handler for all incoming requests. // The handler for all incoming requests.
HttpRequestHandler request_handler_; HttpRequestHandler request_handler_;
}; };

@ -4,6 +4,7 @@
// A general message queue. // A general message queue.
#include <list> #include <list>
#include <queue>
#include "boost/thread/condition_variable.hpp" #include "boost/thread/condition_variable.hpp"
#include "boost/thread/locks.hpp" #include "boost/thread/locks.hpp"
@ -19,18 +20,30 @@ public:
Queue() = default; Queue() = default;
T Get() { T PopOrWait() {
boost::unique_lock<boost::mutex> lock(mutex_); boost::unique_lock<boost::mutex> lock(mutex_);
// Wait for a message. // Wait for a message.
not_empty_cv_.wait(lock, [=] { return !message_list_.empty(); }); not_empty_cv_.wait(lock, [this] { return !message_list_.empty(); });
T message = message_list_.front(); T message = message_list_.front();
message_list_.pop_front(); message_list_.pop_front();
return message; return message;
} }
void Put(const T& message) { T Pop() {
boost::lock_guard<boost::mutex> lock(mutex_);
if (message_list_.empty()) {
return T();
}
T message = message_list_.front();
message_list_.pop_front();
return message;
}
void Push(const T& message) {
{ {
boost::lock_guard<boost::mutex> lock(mutex_); boost::lock_guard<boost::mutex> lock(mutex_);
message_list_.push_back(message); message_list_.push_back(message);

@ -16,8 +16,10 @@ int main(int argc, char* argv[]) {
unsigned short port = std::atoi(argv[1]); unsigned short port = std::atoi(argv[1]);
std::size_t workers = 2;
try { try {
csoap::HttpServer server(port); csoap::HttpServer server(port, workers);
csoap::SoapServicePtr service(new CalculatorService); csoap::SoapServicePtr service(new CalculatorService);
server.RegisterService(service); server.RegisterService(service);

Loading…
Cancel
Save