Add message queue and worker threads.

master
Adam Gu 8 years ago
parent 3266f2e72b
commit f41b625f51

@ -1,5 +1,9 @@
#include "csoap/connection.h"
#include <vector>
#if CSOAP_ENABLE_OUTPUT
#include <iostream>
#endif
#include "boost/asio/write.hpp"
@ -42,7 +46,7 @@ void Connection::DoWrite() {
}
void Connection::HandleRead(boost::system::error_code ec,
std::size_t bytes_transferred) {
std::size_t length) {
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
connection_manager_.Stop(shared_from_this());
@ -50,7 +54,7 @@ void Connection::HandleRead(boost::system::error_code ec,
return;
}
Error error = request_parser_.Parse(buffer_.data(), bytes_transferred);
Error error = request_parser_.Parse(buffer_.data(), length);
if (error != kNoError) {
// Bad request.
@ -65,16 +69,23 @@ void Connection::HandleRead(boost::system::error_code ec,
return;
}
// Handle request.
// TODO: Time consuming
request_handler_.HandleRequest(request_, &response_);
// Send back the response.
DoWrite();
// Enqueue this connection.
// Some worker thread will handle it later.
// And DoWrite() will be called in the worker thread.
request_handler_.Enqueue(shared_from_this());
}
// NOTE:
// This write handler will be called from main thread (the thread calling
// io_context.run), even though DoWrite() is invoked by worker threads. This is
// ensured by Asio.
void Connection::HandleWrite(boost::system::error_code ec,
size_t bytes_transferred) {
size_t length) {
#if CSOAP_ENABLE_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Response has been sent back (thread: " << thread_id << ")\n";
#endif
if (!ec) {
// Initiate graceful connection closure.
boost::system::error_code ignored_ec;

@ -22,15 +22,15 @@ public:
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
friend class HttpRequestHandler;
// Construct a connection with the given io_service.
Connection(boost::asio::ip::tcp::socket socket,
ConnectionManager& manager,
HttpRequestHandler& handler);
// Start the first asynchronous operation for the connection.
void Start();
// Stop all asynchronous operations associated with the connection.
void Stop();
private:
@ -38,13 +38,11 @@ private:
void DoWrite();
// Handle completion of a read operation.
void HandleRead(boost::system::error_code ec,
std::size_t bytes_transferred);
std::size_t length);
// Handle completion of a write operation.
void HandleWrite(boost::system::error_code ec,
size_t bytes_transferred);
std::size_t length);
private:
// Socket for the connection.
@ -65,7 +63,7 @@ private:
// The parser for the incoming request.
HttpRequestParser request_parser_;
// The reply to be sent back to the client.
// The response to be sent back to the client.
HttpResponse response_;
};

@ -1,48 +1,23 @@
#include "csoap/http_request_handler.h"
#include <sstream>
#if CSOAP_ENABLE_OUTPUT
#include <iostream>
#endif
#include "csoap/common.h"
#include "csoap/http_request.h"
#include "csoap/http_response.h"
#include "csoap/soap_request.h"
#include "csoap/soap_response.h"
#include "csoap/soap_service.h"
namespace csoap {
#if 0
// Perform URL-decoding on a string. Returns false if the encoding was invalid.
static bool UrlDecode(const std::string& in, std::string& out) {
out.clear();
out.reserve(in.size());
for (std::size_t i = 0; i < in.size(); ++i) {
if (in[i] == '%') {
if (i + 3 <= in.size()) {
int value = 0;
std::istringstream is(in.substr(i + 1, 2));
if (is >> std::hex >> value) {
out += static_cast<char>(value);
i += 2;
} else {
return false;
}
} else {
return false;
}
} else if (in[i] == '+') {
out += ' ';
} else {
out += in[i];
}
}
return true;
}
#endif
HttpRequestHandler::HttpRequestHandler() {
// Create worker threads.
for (std::size_t i = 0; i < 2; ++i) {
workers_.create_thread(std::bind(&HttpRequestHandler::WorkerRoutine, this));
}
}
bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) {
@ -57,6 +32,7 @@ bool HttpRequestHandler::RegisterService(SoapServicePtr soap_service) {
return true;
}
#if 0
void HttpRequestHandler::HandleRequest(const HttpRequest& http_request,
HttpResponse* http_response) {
// Parse the SOAP request XML.
@ -81,28 +57,69 @@ void HttpRequestHandler::HandleRequest(const HttpRequest& http_request,
http_response->SetContentType(kTextXmlUtf8);
http_response->SetContentLength(content.length());
http_response->set_content(std::move(content));
}
#endif
#if 0
// Decode URL to path.
std::string request_path;
if (!UrlDecode(request.uri, request_path)) {
reply = HttpReply::StockReply(HttpReply::BAD_REQUEST);
return;
}
void HttpRequestHandler::Enqueue(ConnectionPtr conn) {
queue_.Put(conn);
}
// Request path must be absolute and not contain "..".
if (request_path.empty() ||
request_path[0] != '/' ||
request_path.find("..") != std::string::npos) {
reply = HttpReply::StockReply(HttpReply::BAD_REQUEST);
return;
}
void HttpRequestHandler::StopWorkers() {
#if CSOAP_ENABLE_OUTPUT
std::cout << "Stopping workers...\n";
#endif
// If path ends in slash (i.e. is a directory) then add "index.html".
if (request_path[request_path.size() - 1] == '/') {
request_path += "index.html";
}
// Enqueue an null connection to trigger the first worker to stop.
queue_.Put(ConnectionPtr());
workers_.join_all();
#if CSOAP_ENABLE_OUTPUT
std::cout << "Workers have been stopped.\n";
#endif
}
// TODO: How and when to exit?
void HttpRequestHandler::WorkerRoutine() {
for (;;) {
ConnectionPtr conn = queue_.Get();
if (!conn) {
#if CSOAP_ENABLE_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Worker is going to stop (thread: " << thread_id << ")\n";
#endif
// For stopping next worker.
queue_.Put(ConnectionPtr());
// Stop the worker.
break;
}
// Parse the SOAP request XML.
SoapRequest soap_request;
if (!soap_request.FromXml(conn->request_.content())) {
conn->response_.set_status(HttpStatus::BAD_REQUEST);
conn->DoWrite(); // TODO
continue;
}
SoapResponse soap_response;
// TODO: Get service by URL.
for (SoapServicePtr& service : soap_services_) {
service->Handle(soap_request, &soap_response);
}
std::string content;
soap_response.ToXml(&content);
conn->response_.set_status(HttpStatus::OK);
conn->response_.SetContentType(kTextXmlUtf8);
conn->response_.SetContentLength(content.length());
conn->response_.set_content(std::move(content));
conn->DoWrite();
}
}
} // namespace csoap

@ -1,7 +1,13 @@
#ifndef CSOAP_HTTP_REQUEST_HANDLER_H_
#define CSOAP_HTTP_REQUEST_HANDLER_H_
#include <list>
#include <vector>
#include "boost/thread/thread.hpp"
#include "csoap/connection.h"
#include "csoap/queue.h"
#include "csoap/soap_service.h"
namespace csoap {
@ -20,11 +26,25 @@ public:
bool RegisterService(SoapServicePtr soap_service);
// Handle a request and produce a response.
#if 0
void HandleRequest(const HttpRequest& http_request,
HttpResponse* http_response);
#endif
// Put the connection into the queue.
void Enqueue(ConnectionPtr conn);
// Stop all worker threads.
void StopWorkers();
private:
void WorkerRoutine();
private:
std::vector<SoapServicePtr> soap_services_;
Queue<ConnectionPtr> queue_;
boost::thread_group workers_;
};
} // namespace csoap

@ -2,6 +2,10 @@
#include <signal.h>
#if CSOAP_ENABLE_OUTPUT
#include <iostream>
#endif
#include "csoap/soap_service.h"
#include "csoap/utility.h"
@ -10,7 +14,7 @@ using tcp = boost::asio::ip::tcp;
namespace csoap {
HttpServer::HttpServer(unsigned short port)
: io_context_(1) // TODO: concurrency_hint (threads)
: io_context_(1)
, signals_(io_context_) {
// Register to handle the signals that indicate when the server should exit.
@ -43,6 +47,11 @@ bool HttpServer::RegisterService(SoapServicePtr soap_service) {
}
void HttpServer::Run() {
#if CSOAP_ENABLE_OUTPUT
boost::thread::id thread_id = boost::this_thread::get_id();
std::cout << "Server main thread: " << thread_id << std::endl;
#endif
// The io_context::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
@ -53,30 +62,31 @@ void HttpServer::Run() {
void HttpServer::DoAccept() {
acceptor_->async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
// Check whether the server was stopped by a signal before this
// completion handler had a chance to run.
if (!acceptor_->is_open()) {
return;
}
if (!ec) {
connection_manager_.Start(
std::make_shared<Connection>(std::move(socket),
connection_manager_,
request_handler_));
}
DoAccept();
});
// Check whether the server was stopped by a signal before this
// completion handler had a chance to run.
if (!acceptor_->is_open()) {
return;
}
if (!ec) {
connection_manager_.Start(
std::make_shared<Connection>(std::move(socket),
connection_manager_,
request_handler_));
}
DoAccept();
});
}
void HttpServer::DoAwaitStop() {
signals_.async_wait(
[this](boost::system::error_code /*ec*/, int /*signo*/) {
// The server is stopped by cancelling all outstanding asynchronous
// The server is stopped by canceling all outstanding asynchronous
// operations. Once all operations have finished the io_context::run()
// call will exit.
acceptor_->close();
request_handler_.StopWorkers();
connection_manager_.StopAll();
});
}

@ -5,6 +5,8 @@
#include <vector>
#include "boost/scoped_ptr.hpp"
#include "boost/thread/thread.hpp"
#include "boost/asio/io_context.hpp"
#include "boost/asio/signal_set.hpp"
#include "boost/asio/ip/tcp.hpp"

@ -0,0 +1,49 @@
#ifndef CSOAP_QUEUE_H_
#define CSOAP_QUEUE_H_
// A general message queue.
#include <list>
#include "boost/thread/condition_variable.hpp"
#include "boost/thread/locks.hpp"
#include "boost/thread/mutex.hpp"
namespace csoap {
template <typename T>
class Queue {
public:
Queue(const Queue& rhs) = delete;
Queue& operator=(const Queue& rhs) = delete;
Queue() = default;
T Get() {
boost::unique_lock<boost::mutex> lock(mutex_);
// Wait for a message.
not_empty_cv_.wait(lock, [=] { return !message_list_.empty(); });
T message = message_list_.front();
message_list_.pop_front();
return message;
}
void Put(const T& message) {
{
boost::lock_guard<boost::mutex> lock(mutex_);
message_list_.push_back(message);
}
not_empty_cv_.notify_one();
}
private:
std::list<T> message_list_;
boost::mutex mutex_;
boost::condition_variable not_empty_cv_;
};
} // namespace csoap
#endif // CSOAP_QUEUE_H_

@ -24,7 +24,7 @@ bool CalculatorClient::Divide(double x, double y, double* result) {
}
// Set to 0 to test our own calculator server created with csoap.
#define ACCESS_PARASOFT 1
#define ACCESS_PARASOFT 0
void CalculatorClient::Init() {
#if ACCESS_PARASOFT

@ -25,7 +25,7 @@ int main(int argc, char* argv[]) {
server.Run();
} catch (std::exception& e) {
std::cerr << "exception: " << e.what() << "\n";
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}

Loading…
Cancel
Save