Rename HttpConnection to HttpSession; refine error messages.

master
Chunting Gu 7 years ago
parent 3cab68cf33
commit 87ead87527

@ -12,7 +12,7 @@ set(HEADERS
globals.h
http_async_client.h
http_client.h
http_connection.h
http_session.h
http_message.h
http_parser.h
http_request.h
@ -37,7 +37,7 @@ set(SOURCES
globals.cc
http_async_client.cc
http_client.cc
http_connection.cc
http_session.cc
http_message.cc
http_parser.cc
http_request.cc

@ -19,7 +19,7 @@
#endif // _MSC_VER
// Explicitly declare the copy constructor and assignment operator as deleted.
#define DELETE_COPY_AND_ASSIGN(TypeName) \
#define WEBCC_DELETE_COPY_ASSIGN(TypeName) \
TypeName(const TypeName&) = delete; \
TypeName& operator=(const TypeName&) = delete;

@ -27,7 +27,8 @@ void HttpAsyncClient::Request(std::shared_ptr<HttpRequest> request,
response_.reset(new HttpResponse());
response_parser_.reset(new HttpResponseParser(response_.get()));
stopped_ = timed_out_ = false;
stopped_ = false;
timed_out_ = false;
LOG_VERB("HTTP request:\n%s", request->Dump(4, "> ").c_str());
@ -42,20 +43,22 @@ void HttpAsyncClient::Request(std::shared_ptr<HttpRequest> request,
}
void HttpAsyncClient::Stop() {
if (!stopped_) {
stopped_ = true;
if (stopped_) {
return;
}
LOG_INFO("Close socket...");
stopped_ = true;
boost::system::error_code ec;
socket_.close(ec);
if (ec) {
LOG_ERRO("Failed to close socket.");
}
LOG_INFO("Close socket...");
LOG_INFO("Cancel deadline timer...");
deadline_.cancel();
boost::system::error_code ec;
socket_.close(ec);
if (ec) {
LOG_ERRO("Socket close error (%s).", ec.message().c_str());
}
LOG_INFO("Cancel deadline timer...");
deadline_.cancel();
}
void HttpAsyncClient::OnResolve(boost::system::error_code ec,

@ -27,7 +27,7 @@ class HttpAsyncClient : public std::enable_shared_from_this<HttpAsyncClient> {
public:
explicit HttpAsyncClient(boost::asio::io_context& io_context);
DELETE_COPY_AND_ASSIGN(HttpAsyncClient);
WEBCC_DELETE_COPY_ASSIGN(HttpAsyncClient);
void set_timeout_seconds(int timeout_seconds) {
timeout_seconds_ = timeout_seconds;

@ -7,8 +7,6 @@
#include "boost/asio/read.hpp"
#include "boost/asio/write.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/lambda/bind.hpp"
#include "boost/lambda/lambda.hpp"
#include "webcc/logger.h"
#include "webcc/utility.h"
@ -67,7 +65,7 @@ Error HttpClient::Connect(const HttpRequest& request) {
auto endpoints = resolver.resolve(tcp::v4(), request.host(), port, ec);
if (ec) {
LOG_ERRO("Can't resolve host (%s): %s, %s", ec.message().c_str(),
LOG_ERRO("Host resolve error (%s): %s, %s.", ec.message().c_str(),
request.host().c_str(), port.c_str());
return kHostResolveError;
}
@ -219,20 +217,22 @@ void HttpClient::OnDeadline(boost::system::error_code ec) {
}
void HttpClient::Stop() {
if (!stopped_) {
stopped_ = true;
if (stopped_) {
return;
}
LOG_INFO("Close socket...");
stopped_ = true;
boost::system::error_code ec;
socket_.close(ec);
if (ec) {
LOG_ERRO("Failed to close socket.");
}
LOG_INFO("Close socket...");
LOG_INFO("Cancel deadline timer...");
deadline_.cancel();
boost::system::error_code ec;
socket_.close(ec);
if (ec) {
LOG_ERRO("Socket close error (%s).", ec.message().c_str());
}
LOG_INFO("Cancel deadline timer...");
deadline_.cancel();
}
} // namespace webcc

@ -24,7 +24,7 @@ class HttpClient {
HttpClient();
~HttpClient() = default;
DELETE_COPY_AND_ASSIGN(HttpClient);
WEBCC_DELETE_COPY_ASSIGN(HttpClient);
// Set the timeout seconds for reading response.
// The |seconds| is only effective when greater than 0.

@ -16,7 +16,7 @@ class HttpParser {
virtual ~HttpParser() = default;
DELETE_COPY_AND_ASSIGN(HttpParser);
WEBCC_DELETE_COPY_ASSIGN(HttpParser);
bool finished() const { return finished_; }

@ -9,8 +9,8 @@
namespace webcc {
void HttpRequestHandler::Enqueue(HttpConnectionPtr connection) {
queue_.Push(connection);
void HttpRequestHandler::Enqueue(HttpSessionPtr session) {
queue_.Push(session);
}
void HttpRequestHandler::Start(std::size_t count) {
@ -24,14 +24,14 @@ void HttpRequestHandler::Start(std::size_t count) {
void HttpRequestHandler::Stop() {
LOG_INFO("Stopping workers...");
// Close pending connections.
for (HttpConnectionPtr conn = queue_.Pop(); conn; conn = queue_.Pop()) {
LOG_INFO("Closing pending connection...");
conn->Close();
// Close pending sessions.
for (HttpSessionPtr s = queue_.Pop(); s; s = queue_.Pop()) {
LOG_INFO("Closing pending session...");
s->Close();
}
// Enqueue a null connection to trigger the first worker to stop.
queue_.Push(HttpConnectionPtr());
// Enqueue a null session to trigger the first worker to stop.
queue_.Push(HttpSessionPtr());
for (auto& worker : workers_) {
if (worker.joinable()) {
@ -46,19 +46,19 @@ void HttpRequestHandler::WorkerRoutine() {
LOG_INFO("Worker is running.");
for (;;) {
HttpConnectionPtr connection = queue_.PopOrWait();
HttpSessionPtr session = queue_.PopOrWait();
if (!connection) {
if (!session) {
LOG_INFO("Worker is going to stop.");
// For stopping next worker.
queue_.Push(HttpConnectionPtr());
queue_.Push(HttpSessionPtr());
// Stop the worker.
break;
}
HandleConnection(connection);
HandleSession(session);
}
}

@ -5,7 +5,7 @@
#include <thread>
#include <vector>
#include "webcc/http_connection.h"
#include "webcc/http_session.h"
#include "webcc/queue.h"
#include "webcc/soap_service.h"
@ -20,24 +20,24 @@ class HttpRequestHandler {
HttpRequestHandler() = default;
virtual ~HttpRequestHandler() = default;
DELETE_COPY_AND_ASSIGN(HttpRequestHandler);
WEBCC_DELETE_COPY_ASSIGN(HttpRequestHandler);
// Put the connection into the queue.
void Enqueue(HttpConnectionPtr connection);
// Put the session into the queue.
void Enqueue(HttpSessionPtr session);
// Start worker threads.
void Start(std::size_t count);
// Close pending connections and stop worker threads.
// Close pending sessions and stop worker threads.
void Stop();
private:
void WorkerRoutine();
// Called by the worker routine.
virtual void HandleConnection(HttpConnectionPtr connection) = 0;
virtual void HandleSession(HttpSessionPtr session) = 0;
Queue<HttpConnectionPtr> queue_;
Queue<HttpSessionPtr> queue_;
std::vector<std::thread> workers_;
};

@ -22,7 +22,7 @@ HttpServer::HttpServer(std::uint16_t port, std::size_t workers)
// Open the acceptor.
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
LOG_ERRO("Acceptor open error: %s", ec.message().c_str());
LOG_ERRO("Acceptor open error (%s).", ec.message().c_str());
return;
}
@ -37,7 +37,7 @@ HttpServer::HttpServer(std::uint16_t port, std::size_t workers)
// Bind to the server address.
acceptor_.bind(endpoint, ec);
if (ec) {
LOG_ERRO("Acceptor bind error: %s", ec.message().c_str());
LOG_ERRO("Acceptor bind error (%s).", ec.message().c_str());
return;
}
@ -46,7 +46,7 @@ HttpServer::HttpServer(std::uint16_t port, std::size_t workers)
// has not started to accept the connection yet.
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec) {
LOG_ERRO("Acceptor listen error: %s", ec.message().c_str());
LOG_ERRO("Acceptor listen error (%s).", ec.message().c_str());
return;
}
}
@ -96,8 +96,8 @@ void HttpServer::DoAccept() {
if (!ec) {
LOG_INFO("Accepted a connection.");
std::make_shared<HttpConnection>(std::move(socket),
GetRequestHandler())->Start();
std::make_shared<HttpSession>(std::move(socket),
GetRequestHandler())->Start();
}
DoAccept();

@ -8,7 +8,7 @@
#include "boost/asio/signal_set.hpp"
#include "webcc/globals.h"
#include "webcc/http_connection.h"
#include "webcc/http_session.h"
namespace webcc {
@ -22,7 +22,7 @@ class HttpServer {
virtual ~HttpServer() = default;
DELETE_COPY_AND_ASSIGN(HttpServer);
WEBCC_DELETE_COPY_ASSIGN(HttpServer);
// Run the server's io_service loop.
void Run();

@ -1,61 +1,59 @@
#include "webcc/http_connection.h"
#include "webcc/http_session.h"
#include <utility> // for move()
#include <vector>
#include "boost/asio/write.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "webcc/http_request_handler.h"
#include "webcc/logger.h"
using boost::asio::ip::tcp;
namespace webcc {
HttpConnection::HttpConnection(boost::asio::ip::tcp::socket socket,
HttpRequestHandler* handler)
HttpSession::HttpSession(tcp::socket socket, HttpRequestHandler* handler)
: socket_(std::move(socket)),
buffer_(kBufferSize),
request_handler_(handler),
request_parser_(&request_) {
}
void HttpConnection::Start() {
void HttpSession::Start() {
DoRead();
}
void HttpConnection::Close() {
void HttpSession::Close() {
LOG_INFO("Close socket...");
boost::system::error_code ec;
socket_.close(ec);
if (ec) {
LOG_ERRO("Failed to close socket.");
LOG_ERRO("Socket close error (%s).", ec.message().c_str());
}
}
void HttpConnection::SetResponseContent(std::string&& content,
const std::string& type) {
void HttpSession::SetResponseContent(std::string&& content,
const std::string& type) {
response_.SetContent(std::move(content), true);
response_.SetContentType(type);
}
void HttpConnection::SendResponse(HttpStatus::Enum status) {
void HttpSession::SendResponse(HttpStatus::Enum status) {
response_.set_status(status);
response_.UpdateStartLine();
DoWrite();
}
void HttpConnection::DoRead() {
void HttpSession::DoRead() {
socket_.async_read_some(boost::asio::buffer(buffer_),
std::bind(&HttpConnection::OnRead,
shared_from_this(),
std::bind(&HttpSession::OnRead, shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
void HttpConnection::OnRead(boost::system::error_code ec,
std::size_t length) {
void HttpSession::OnRead(boost::system::error_code ec, std::size_t length) {
if (ec) {
LOG_ERRO("Socket read error: %s", ec.message().c_str());
LOG_ERRO("Socket read error (%s).", ec.message().c_str());
if (ec != boost::asio::error::operation_aborted) {
Close();
}
@ -78,18 +76,16 @@ void HttpConnection::OnRead(boost::system::error_code ec,
LOG_VERB("HTTP request:\n%s", request_.Dump(4, "> ").c_str());
// Enqueue this connection.
// Enqueue this session.
// Some worker thread will handle it later.
request_handler_->Enqueue(shared_from_this());
}
void HttpConnection::DoWrite() {
void HttpSession::DoWrite() {
LOG_VERB("HTTP response:\n%s", response_.Dump(4, "> ").c_str());
boost::asio::async_write(socket_,
response_.ToBuffers(),
std::bind(&HttpConnection::OnWrite,
shared_from_this(),
boost::asio::async_write(socket_, response_.ToBuffers(),
std::bind(&HttpSession::OnWrite, shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
@ -98,10 +94,9 @@ void HttpConnection::DoWrite() {
// This write handler will be called from main thread (the thread calling
// io_context.run), even though AsyncWrite() is invoked by worker threads.
// This is ensured by Asio.
void HttpConnection::OnWrite(boost::system::error_code ec,
std::size_t length) {
void HttpSession::OnWrite(boost::system::error_code ec, std::size_t length) {
if (ec) {
LOG_ERRO("Socket write error: %s", ec.message().c_str());
LOG_ERRO("Socket write error (%s).", ec.message().c_str());
if (ec != boost::asio::error::operation_aborted) {
Close();
@ -114,17 +109,17 @@ void HttpConnection::OnWrite(boost::system::error_code ec,
}
}
// Socket close VS. Shutdown:
// Socket close VS. shutdown:
// https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket
void HttpConnection::Shutdown() {
void HttpSession::Shutdown() {
LOG_INFO("Shutdown socket...");
// Initiate graceful connection closure.
boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_.shutdown(tcp::socket::shutdown_both, ec);
if (ec) {
LOG_ERRO("Socket shutdown error: %s", ec.message().c_str());
LOG_ERRO("Socket shutdown error (%s).", ec.message().c_str());
}
}

@ -1,5 +1,5 @@
#ifndef WEBCC_HTTP_CONNECTION_H_
#define WEBCC_HTTP_CONNECTION_H_
#ifndef WEBCC_HTTP_SESSION_H_
#define WEBCC_HTTP_SESSION_H_
#include <memory>
#include <string>
@ -16,14 +16,14 @@ namespace webcc {
class HttpRequestHandler;
class HttpConnection : public std::enable_shared_from_this<HttpConnection> {
class HttpSession : public std::enable_shared_from_this<HttpSession> {
public:
HttpConnection(boost::asio::ip::tcp::socket socket, // Will be moved
HttpRequestHandler* handler);
HttpSession(boost::asio::ip::tcp::socket socket,
HttpRequestHandler* handler);
~HttpConnection() = default;
~HttpSession() = default;
DELETE_COPY_AND_ASSIGN(HttpConnection);
WEBCC_DELETE_COPY_ASSIGN(HttpSession);
const HttpRequest& request() const { return request_; }
@ -67,8 +67,8 @@ class HttpConnection : public std::enable_shared_from_this<HttpConnection> {
HttpResponse response_;
};
typedef std::shared_ptr<HttpConnection> HttpConnectionPtr;
typedef std::shared_ptr<HttpSession> HttpSessionPtr;
} // namespace webcc
#endif // WEBCC_HTTP_CONNECTION_H_
#endif // WEBCC_HTTP_SESSION_H_

@ -19,7 +19,7 @@ class RestClient {
~RestClient() = default;
DELETE_COPY_AND_ASSIGN(RestClient);
WEBCC_DELETE_COPY_ASSIGN(RestClient);
void SetTimeout(int seconds) {
http_client_.SetTimeout(seconds);

@ -13,13 +13,13 @@ bool RestRequestHandler::Bind(RestServicePtr service, const std::string& url,
return service_manager_.AddService(service, url, is_regex);
}
void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) {
const HttpRequest& http_request = connection->request();
void RestRequestHandler::HandleSession(HttpSessionPtr session) {
const HttpRequest& http_request = session->request();
Url url(http_request.url(), /*decode*/true);
if (!url.IsValid()) {
connection->SendResponse(HttpStatus::kBadRequest);
session->SendResponse(HttpStatus::kBadRequest);
return;
}
@ -28,12 +28,12 @@ void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) {
};
// Get service by URL path.
RestServicePtr service = service_manager_.GetService(
url.path(), &rest_request.url_sub_matches);
RestServicePtr service =
service_manager_.GetService(url.path(), &rest_request.url_sub_matches);
if (!service) {
LOG_WARN("No service matches the URL path: %s", url.path().c_str());
connection->SendResponse(HttpStatus::kNotFound);
session->SendResponse(HttpStatus::kNotFound);
return;
}
@ -41,12 +41,12 @@ void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) {
service->Handle(rest_request, &rest_response);
if (!rest_response.content.empty()) {
connection->SetResponseContent(std::move(rest_response.content),
kAppJsonUtf8);
session->SetResponseContent(std::move(rest_response.content),
kAppJsonUtf8);
}
// Send response back to client.
connection->SendResponse(rest_response.status);
session->SendResponse(rest_response.status);
}
} // namespace webcc

@ -18,7 +18,7 @@ class RestRequestHandler : public HttpRequestHandler {
bool Bind(RestServicePtr service, const std::string& url, bool is_regex);
private:
void HandleConnection(HttpConnectionPtr connection) override;
void HandleSession(HttpSessionPtr session) override;
RestServiceManager service_manager_;
};

@ -14,7 +14,7 @@ class RestServiceManager {
public:
RestServiceManager() = default;
DELETE_COPY_AND_ASSIGN(RestServiceManager);
WEBCC_DELETE_COPY_ASSIGN(RestServiceManager);
// Add a service and bind it with the given URL.
// The |url| should start with "/" and will be treated as a regular expression

@ -23,7 +23,7 @@ class SoapAsyncClient {
~SoapAsyncClient() = default;
DELETE_COPY_AND_ASSIGN(SoapAsyncClient);
WEBCC_DELETE_COPY_ASSIGN(SoapAsyncClient);
void set_timeout_seconds(int timeout_seconds) {
timeout_seconds_ = timeout_seconds;

@ -18,7 +18,7 @@ class SoapClient {
~SoapClient() = default;
DELETE_COPY_AND_ASSIGN(SoapClient);
WEBCC_DELETE_COPY_ASSIGN(SoapClient);
void SetTimeout(int seconds) {
http_client_.SetTimeout(seconds);

@ -15,30 +15,30 @@ bool SoapRequestHandler::Bind(SoapServicePtr service, const std::string& url) {
return true;
}
void SoapRequestHandler::HandleConnection(HttpConnectionPtr connection) {
SoapServicePtr service = GetServiceByUrl(connection->request().url());
void SoapRequestHandler::HandleSession(HttpSessionPtr session) {
SoapServicePtr service = GetServiceByUrl(session->request().url());
if (!service) {
connection->SendResponse(HttpStatus::kBadRequest);
session->SendResponse(HttpStatus::kBadRequest);
return;
}
// Parse the SOAP request XML.
SoapRequest soap_request;
if (!soap_request.FromXml(connection->request().content())) {
connection->SendResponse(HttpStatus::kBadRequest);
if (!soap_request.FromXml(session->request().content())) {
session->SendResponse(HttpStatus::kBadRequest);
return;
}
SoapResponse soap_response;
if (!service->Handle(soap_request, &soap_response)) {
connection->SendResponse(HttpStatus::kBadRequest);
session->SendResponse(HttpStatus::kBadRequest);
return;
}
std::string content;
soap_response.ToXml(format_raw_, indent_str_, &content);
connection->SetResponseContent(std::move(content), kTextXmlUtf8);
connection->SendResponse(HttpStatus::kOK);
session->SetResponseContent(std::move(content), kTextXmlUtf8);
session->SendResponse(HttpStatus::kOK);
}
SoapServicePtr SoapRequestHandler::GetServiceByUrl(const std::string& url) {

@ -23,7 +23,7 @@ class SoapRequestHandler : public HttpRequestHandler {
bool Bind(SoapServicePtr service, const std::string& url);
private:
void HandleConnection(HttpConnectionPtr connection) override;
void HandleSession(HttpSessionPtr session) override;
SoapServicePtr GetServiceByUrl(const std::string& url);

Loading…
Cancel
Save