Refine keep-alive connections.

master
Chunting Gu 6 years ago
parent 901b2902a2
commit 5abd7e275c

@ -1,4 +1,5 @@
#include <iostream>
#include <thread>
#include "webcc/http_client_session.h"
#include "webcc/logger.h"
@ -82,29 +83,11 @@ void Test(HttpClientSession& session) {
}
}
void TestKeepAlive1(HttpClientSession& session) {
try {
auto r = session.Request(HttpRequestArgs{ "GET" }.
url("http://httpbin.org/get").
parameters({ "key1", "value1", "key2", "value2" }).
headers({ "Accept", "application/json" }).
buffer_size(1000));
std::cout << r->content() << std::endl;
} catch (const Exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
}
void TestKeepAlive2(HttpClientSession& session) {
try {
auto r = session.Request(webcc::HttpRequestArgs("GET").
url("https://api.github.com/events").
ssl_verify(false).buffer_size(1500));
//std::cout << r->content() << std::endl;
} catch (const Exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
@ -138,15 +121,31 @@ void TestKeepAlive4(HttpClientSession& session) {
// -----------------------------------------------------------------------------
void Sleep(int seconds) {
if (seconds > 0) {
LOG_INFO("Sleep %d seconds...", seconds);
std::this_thread::sleep_for(std::chrono::seconds(seconds));
}
}
int main() {
WEBCC_LOG_INIT("", LOG_CONSOLE);
HttpClientSession session;
GetBoostOrgLicense(session);
// TEST keep-alive.
try {
// Keep-Alive by default
session.Get("http://httpbin.org/get");
//TestKeepAlive1(session);
//TestKeepAlive1(session);
session.Get("http://httpbin.org/get", {}, { "Connection", "Close" });
session.Get("http://httpbin.org/get");
} catch (const Exception& e) {
std::cout << "Exception: " << e.what() << std::endl;
}
return 0;
}

@ -9,7 +9,7 @@ public:
~CalcService() override = default;
bool Handle(const webcc::SoapRequest& soap_request,
webcc::SoapResponse* soap_response) override;
webcc::SoapResponse* soap_response) final;
};
#endif // CALC_SERVICE_H_

@ -1,5 +1,6 @@
#include "webcc/http_client.h"
#include "boost/algorithm/string.hpp" // TODO: Remove
#include "boost/date_time/posix_time/posix_time.hpp"
#include "webcc/logger.h"
@ -11,10 +12,10 @@ namespace webcc {
HttpClient::HttpClient(std::size_t buffer_size, bool ssl_verify)
: buffer_(buffer_size == 0 ? kBufferSize : buffer_size),
deadline_(io_context_),
timer_(io_context_),
ssl_verify_(ssl_verify),
timeout_seconds_(kMaxReadSeconds),
stopped_(false),
closed_(false),
timed_out_(false),
error_(kNoError) {
}
@ -33,13 +34,14 @@ bool HttpClient::Request(const HttpRequest& request,
response_.reset(new HttpResponse());
response_parser_.reset(new HttpResponseParser(response_.get()));
stopped_ = false;
closed_ = false;
timed_out_ = false;
error_ = kNoError;
BufferResizer buffer_resizer(&buffer_, buffer_size);
if (connect) {
// No existing socket connection was specified, create a new one.
if ((error_ = Connect(request)) != kNoError) {
return false;
}
@ -56,6 +58,23 @@ bool HttpClient::Request(const HttpRequest& request,
return true;
}
void HttpClient::Close() {
if (closed_) {
return;
}
closed_ = true;
LOG_INFO("Close socket...");
boost::system::error_code ec;
socket_->Close(&ec);
if (ec) {
LOG_ERRO("Socket close error (%s).", ec.message().c_str());
}
}
Error HttpClient::Connect(const HttpRequest& request) {
if (request.url().scheme() == "https") {
socket_.reset(new HttpSslSocket{ io_context_, ssl_verify_ });
@ -89,7 +108,7 @@ Error HttpClient::DoConnect(const HttpRequest& request,
// Determine whether a connection was successfully established.
if (ec) {
LOG_ERRO("Socket connect error (%s).", ec.message().c_str());
Stop();
Close();
return kEndpointConnectError;
}
@ -113,7 +132,7 @@ Error HttpClient::WriteReqeust(const HttpRequest& request) {
if (ec) {
LOG_ERRO("Socket write error (%s).", ec.message().c_str());
Stop();
Close();
return kSocketWriteError;
}
@ -125,8 +144,8 @@ Error HttpClient::WriteReqeust(const HttpRequest& request) {
Error HttpClient::ReadResponse() {
LOG_VERB("Read response (timeout: %ds)...", timeout_seconds_);
deadline_.expires_from_now(boost::posix_time::seconds(timeout_seconds_));
DoWaitDeadline();
timer_.expires_from_now(boost::posix_time::seconds(timeout_seconds_));
DoWaitTimer();
Error error = kNoError;
DoReadResponse(&error);
@ -148,7 +167,8 @@ void HttpClient::DoReadResponse(Error* error) {
LOG_VERB("Socket async read handler.");
if (ec || length == 0) {
Stop();
StopTimer();
Close();
*error = kSocketReadError;
LOG_ERRO("Socket read error (%s).", ec.message().c_str());
return;
@ -158,7 +178,8 @@ void HttpClient::DoReadResponse(Error* error) {
// Parse the response piece just read.
if (!response_parser_->Parse(buffer_.data(), length)) {
Stop();
StopTimer();
Close();
*error = kHttpError;
LOG_ERRO("Failed to parse HTTP response.");
return;
@ -167,14 +188,23 @@ void HttpClient::DoReadResponse(Error* error) {
if (response_parser_->finished()) {
// Stop trying to read once all content has been received, because
// some servers will block extra call to read_some().
Stop(); // TODO: Keep-Alive
StopTimer();
if (response_->IsConnectionKeepAlive()) {
// Close the timer but keep the socket connection.
LOG_INFO("Keep the socket connection alive.");
} else {
Close();
}
LOG_INFO("Finished to read and parse HTTP response.");
// Stop reading.
return;
}
if (!stopped_) {
if (!closed_) {
DoReadResponse(error);
}
};
@ -187,17 +217,17 @@ void HttpClient::DoReadResponse(Error* error) {
} while (ec == boost::asio::error::would_block);
}
void HttpClient::DoWaitDeadline() {
deadline_.async_wait(
std::bind(&HttpClient::OnDeadline, this, std::placeholders::_1));
void HttpClient::DoWaitTimer() {
timer_.async_wait(
std::bind(&HttpClient::OnTimer, this, std::placeholders::_1));
}
void HttpClient::OnDeadline(boost::system::error_code ec) {
if (stopped_) {
void HttpClient::OnTimer(boost::system::error_code ec) {
if (closed_) {
return;
}
LOG_VERB("OnDeadline.");
LOG_VERB("On deadline timer.");
// NOTE: Can't check this:
// if (ec == boost::asio::error::operation_aborted) {
@ -205,39 +235,24 @@ void HttpClient::OnDeadline(boost::system::error_code ec) {
// return;
// }
if (deadline_.expires_at() <=
boost::asio::deadline_timer::traits_type::now()) {
if (timer_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
// The deadline has passed.
// The socket is closed so that any outstanding asynchronous operations
// are canceled.
LOG_WARN("HTTP client timed out.");
timed_out_ = true;
Stop();
Close();
return;
}
// Put the actor back to sleep.
DoWaitDeadline();
DoWaitTimer();
}
void HttpClient::Stop() {
if (stopped_) {
return;
}
stopped_ = true;
LOG_INFO("Close socket...");
boost::system::error_code ec;
socket_->Close(&ec);
if (ec) {
LOG_ERRO("Socket close error (%s).", ec.message().c_str());
}
void HttpClient::StopTimer() {
// Cancel any asynchronous operations that are waiting on the timer.
LOG_INFO("Cancel deadline timer...");
deadline_.cancel();
timer_.cancel();
}
} // namespace webcc

@ -18,9 +18,10 @@
namespace webcc {
class HttpSocketBase;
class HttpClient;
typedef std::shared_ptr<HttpClient> HttpClientPtr;
// The base class of synchronous HTTP clients.
// Synchronous HTTP & HTTPS client.
// In synchronous mode, a request won't return until the response is received
// or timeout occurs.
// Please don't use the same client object in multiple threads.
@ -46,6 +47,9 @@ public:
std::size_t buffer_size = 0,
bool connect = true);
// Close the socket.
void Close();
HttpResponsePtr response() const { return response_; }
int response_status() const {
@ -58,11 +62,13 @@ public:
return response_->content();
}
bool closed() const { return closed_; }
bool timed_out() const { return timed_out_; }
Error error() const { return error_; }
public:
private:
Error Connect(const HttpRequest& request);
Error DoConnect(const HttpRequest& request, const std::string& default_port);
@ -73,13 +79,15 @@ public:
void DoReadResponse(Error* error);
void DoWaitDeadline();
void OnDeadline(boost::system::error_code ec);
void DoWaitTimer();
void OnTimer(boost::system::error_code ec);
void Stop();
void StopTimer();
private:
boost::asio::io_context io_context_;
// Socket connection.
std::unique_ptr<HttpSocketBase> socket_;
std::vector<char> buffer_;
@ -88,7 +96,7 @@ public:
std::unique_ptr<HttpResponseParser> response_parser_;
// Timer for the timeout control.
boost::asio::deadline_timer deadline_;
boost::asio::deadline_timer timer_;
// Verify the certificate of the peer (remote server) or not.
// HTTPS only.
@ -98,8 +106,8 @@ public:
// Only for reading response from server.
int timeout_seconds_;
// Request stopped due to timeout or socket error.
bool stopped_;
// Connection closed.
bool closed_;
// If the error was caused by timeout or not.
bool timed_out_;

@ -1,16 +1,34 @@
#include "webcc/http_client_pool.h"
#include "webcc/logger.h"
namespace webcc {
HttpClientPtr HttpClientPool::Get(const Url& url) const {
for (const auto& client : clients_) {
return client;
HttpClientPool::~HttpClientPool() {
LOG_INFO("Close socket for all (%u) connections in the pool.",
clients_.size());
for (auto& pair : clients_) {
pair.second->Close();
}
}
HttpClientPtr HttpClientPool::Get(const Key& key) const {
auto it = clients_.find(key);
if (it != clients_.end()) {
return it->second;
} else {
return HttpClientPtr{};
}
return HttpClientPtr{};
}
void HttpClientPool::Add(HttpClientPtr client) {
clients_.push_back(client);
void HttpClientPool::Add(const Key& key, HttpClientPtr client) {
clients_[key] = client;
}
void HttpClientPool::Remove(const Key& key) {
clients_.erase(key);
}
} // namespace webcc

@ -1,10 +1,7 @@
#ifndef WEBCC_HTTP_CLIENT_POOL_H_
#define WEBCC_HTTP_CLIENT_POOL_H_
// HTTP client connection pool for keep-alive connections.
#include <list>
#include <memory>
#include <map>
#include <string>
#include "webcc/http_client.h"
@ -12,18 +9,51 @@
namespace webcc {
typedef std::shared_ptr<HttpClient> HttpClientPtr;
// Connection pool for keep-alive connections.
class HttpClientPool {
public:
struct Key {
std::string scheme;
std::string host;
std::string port;
Key() = default;
explicit Key(const Url& url)
: scheme(url.scheme()), host(url.host()), port(url.port()) {
}
bool operator==(const Key& rhs) const {
return scheme == rhs.scheme && host == rhs.host && port == rhs.port;
}
bool operator<(const Key& rhs) const {
if (scheme < rhs.scheme) {
return true;
}
if (host < rhs.host) {
return true;
}
if (port < rhs.port) {
return true;
}
return false;
}
};
public:
HttpClientPool() = default;
HttpClientPtr Get(const Url& url) const;
~HttpClientPool();
HttpClientPtr Get(const Key& key) const;
void Add(const Key& key, HttpClientPtr client);
void Add(HttpClientPtr client);
void Remove(const Key& key);
private:
std::list<HttpClientPtr> clients_;
std::map<Key, HttpClientPtr> clients_;
};
} // namespace webcc

@ -1,12 +1,10 @@
#include "webcc/http_client_session.h"
#include "webcc/http_client.h"
#include "webcc/url.h"
namespace webcc {
HttpClientSession::HttpClientSession()
: pool_(new HttpClientPool{}) {
HttpClientSession::HttpClientSession() {
InitHeaders();
}
@ -45,22 +43,39 @@ HttpResponsePtr HttpClientSession::Request(HttpRequestArgs&& args) {
request.Prepare();
bool connect = false;
HttpClientPtr impl = pool_->Get(request.url());
const HttpClientPool::Key key{ request.url() };
bool new_created = false;
if (!impl) {
impl.reset(new HttpClient{ 0, args.ssl_verify_ });
connect = true;
HttpClientPtr client = pool_.Get(key);
if (!client) {
new_created = true;
client.reset(new HttpClient{ 0, args.ssl_verify_ });
} else {
new_created = false;
LOG_VERB("Reuse an existing connection.");
}
pool_->Add(impl);
if (!client->Request(request, args.buffer_size_, new_created)) {
throw Exception(client->error(), client->timed_out());
}
if (!impl->Request(request, args.buffer_size_, connect)) {
throw Exception(impl->error(), impl->timed_out());
if (new_created) {
if (!client->closed()) {
pool_.Add(key, client);
LOG_VERB("Added connection to the pool (%s, %s, %s).",
key.scheme.c_str(), key.host.c_str(), key.port.c_str());
}
} else {
if (client->closed()) {
pool_.Remove(key);
LOG_VERB("Removed connection from the pool (%s, %s, %s).",
key.scheme.c_str(), key.host.c_str(), key.port.c_str());
}
}
return impl->response();
return client->response();
}
HttpResponsePtr HttpClientSession::Get(const std::string& url,
@ -85,7 +100,6 @@ HttpResponsePtr HttpClientSession::Post(const std::string& url,
}
void HttpClientSession::InitHeaders() {
// NOTE: C++11 requires a space between literal and string macro.
headers_.Add(http::headers::kUserAgent, http::UserAgent());
// TODO: Support gzip, deflate
@ -93,7 +107,6 @@ void HttpClientSession::InitHeaders() {
headers_.Add(http::headers::kAccept, "*/*");
// TODO
headers_.Add(http::headers::kConnection, "Keep-Alive");
}

@ -52,7 +52,8 @@ private:
// Headers for each request sent from this session.
HttpHeaderDict headers_;
std::unique_ptr<HttpClientPool> pool_;
// Connection pool for keep-alive.
HttpClientPool pool_;
};
} // namespace webcc

@ -25,36 +25,68 @@ std::ostream& operator<<(std::ostream& os, const HttpMessage& message) {
// -----------------------------------------------------------------------------
void HttpHeaderDict::Add(const std::string& key, const std::string& value) {
for (HttpHeader& h : headers_) {
if (boost::iequals(h.first, key)) {
h.second = value;
return;
}
auto it = Find(key);
if (it != headers_.end()) {
it->second = value;
} else {
headers_.push_back({ key, value });
}
headers_.push_back({ key, value });
}
void HttpHeaderDict::Add(std::string&& key, std::string&& value) {
for (HttpHeader& h : headers_) {
if (boost::iequals(h.first, key)) {
h.second = std::move(value);
return;
}
auto it = Find(key);
if (it != headers_.end()) {
it->second = std::move(value);
} else {
headers_.push_back({ std::move(key), std::move(value) });
}
}
const std::string& HttpHeaderDict::Get(const std::string& key,
bool* existed) const {
auto it = const_cast<HttpHeaderDict*>(this)->Find(key);
if (existed != nullptr) {
*existed = (it != headers_.end());
}
if (it != headers_.end()) {
return it->second;
}
headers_.push_back({ std::move(key), std::move(value) });
static const std::string s_no_value;
return s_no_value;
}
bool HttpHeaderDict::Has(const std::string& key) const {
for (const HttpHeader& h : headers_) {
if (boost::iequals(h.first, key)) {
return true;
std::vector<HttpHeader>::iterator HttpHeaderDict::Find(const std::string& key) {
auto it = headers_.begin();
for (; it != headers_.end(); ++it) {
if (boost::iequals(it->first, key)) {
break;
}
}
return false;
return it;
}
// -----------------------------------------------------------------------------
bool HttpMessage::IsConnectionKeepAlive() const {
bool existed = false;
const std::string& connection =
GetHeader(http::headers::kConnection, &existed);
if (!existed) {
// Keep-Alive is by default for HTTP/1.1.
return true;
}
if (boost::iequals(connection, "Keep-Alive")) {
return true;
}
return false;
}
// See: https://tools.ietf.org/html/rfc7231#section-3.1.1.1
void HttpMessage::SetContentType(const std::string& media_type,
const std::string& charset) {

@ -14,36 +14,45 @@ namespace webcc {
// -----------------------------------------------------------------------------
typedef std::pair<std::string, std::string> HttpHeader;
class HttpMessage;
std::ostream& operator<<(std::ostream& os, const HttpMessage& message);
// -----------------------------------------------------------------------------
typedef std::pair<std::string, std::string> HttpHeader;
class HttpHeaderDict {
public:
std::size_t size() const {
return headers_.size();
}
const std::vector<HttpHeader>& data() const {
return headers_;
}
void Add(const std::string& key, const std::string& value);
void Add(std::string&& key, std::string&& value);
bool Has(const std::string& key) const;
const HttpHeader& Get(std::size_t i) const {
assert(i < size());
return headers_[i];
bool Has(const std::string& key) const {
return const_cast<HttpHeaderDict*>(this)->Find(key) != headers_.end();
}
const std::vector<HttpHeader>& data() const {
return headers_;
// Get header by index.
const HttpHeader& Get(std::size_t index) const {
assert(index < size());
return headers_[index];
}
// Get header value by key.
// If there's no such header with the given key, besides return empty, the
// optional |existed| parameter will be set to false.
const std::string& Get(const std::string& key, bool* existed = nullptr) const;
private:
std::vector<HttpHeader>::iterator Find(const std::string& key);
std::vector<HttpHeader> headers_;
};
@ -73,6 +82,8 @@ public:
return content_;
}
bool IsConnectionKeepAlive() const;
void SetHeader(const std::string& key, const std::string& value) {
headers_.Add(key, value);
}
@ -81,11 +92,15 @@ public:
headers_.Add(std::move(key), std::move(value));
}
const std::string& GetHeader(const std::string& key,
bool* existed = nullptr) const {
return headers_.Get(key, existed);
}
// E.g., "text/html", "application/json; charset=utf-8", etc.
void SetContentType(const std::string& media_type,
const std::string& charset);
// TODO: Remove parameter |set_length|.
void SetContent(std::string&& content, bool set_length);
// Make the message (e.g., update start line).

@ -19,7 +19,7 @@ public:
bool Bind(RestServicePtr service, const std::string& url, bool is_regex);
private:
void HandleConnection(HttpConnectionPtr connection) override;
void HandleConnection(HttpConnectionPtr connection) final;
RestServiceManager service_manager_;
};

@ -32,7 +32,7 @@ public:
}
private:
HttpRequestHandler* GetRequestHandler() override {
HttpRequestHandler* GetRequestHandler() final {
return &request_handler_;
}

@ -33,7 +33,7 @@ public:
private:
class ServiceItem {
public:
public:
ServiceItem(RestServicePtr _service, const std::string& _url,
bool _is_regex)
: service(_service), url(_url), is_regex(_is_regex) {

@ -38,7 +38,7 @@ public:
// Parse from SOAP XML.
bool FromXml(const std::string& xml_string);
public:
protected:
// Convert to SOAP body XML.
virtual void ToXmlBody(pugi::xml_node xbody) = 0;

@ -21,9 +21,9 @@ public:
// Get parameter value by key.
const std::string& GetParameter(const std::string& key) const;
public:
void ToXmlBody(pugi::xml_node xbody) override;
bool FromXmlBody(pugi::xml_node xbody) override;
protected:
void ToXmlBody(pugi::xml_node xbody) final;
bool FromXmlBody(pugi::xml_node xbody) final;
private:
std::vector<SoapParameter> parameters_;

@ -28,7 +28,7 @@ public:
bool Bind(SoapServicePtr service, const std::string& url);
private:
void HandleConnection(HttpConnectionPtr connection) override;
void HandleConnection(HttpConnectionPtr connection) final;
SoapServicePtr GetServiceByUrl(const std::string& url);

@ -50,7 +50,7 @@ public:
// - composer(xxxResponse);
// The composer then add proper children to xxxResponse as the result.
class Composer {
public:
public:
void operator()(pugi::xml_node xresponse) {
Compose(xresponse);
}
@ -127,10 +127,10 @@ public:
// TODO: Set fault from server.
public:
void ToXmlBody(pugi::xml_node xbody) override;
protected:
void ToXmlBody(pugi::xml_node xbody) final;
bool FromXmlBody(pugi::xml_node xbody) override;
bool FromXmlBody(pugi::xml_node xbody) final;
private:
// Fault element if any.

@ -37,7 +37,7 @@ public:
}
private:
HttpRequestHandler* GetRequestHandler() override {
HttpRequestHandler* GetRequestHandler() final {
return &request_handler_;
}

@ -19,7 +19,7 @@ public:
virtual bool Handle(const SoapRequest& soap_request,
SoapResponse* soap_response) = 0;
public:
protected:
http::Status http_status_ = http::Status::kOK;
};

Loading…
Cancel
Save