Refine timeout control; refine rest book examples.

master
Adam Gu 7 years ago
parent 2e2b45dd43
commit e9096d4e53

@ -0,0 +1,52 @@
#include "example/common/book_json.h"
#include <sstream>
#include <iostream>
#include "json/json.h"
#include "example/common/book.h"
std::string JsonToString(const Json::Value& json) {
Json::StreamWriterBuilder builder;
return Json::writeString(builder, json);
}
Json::Value StringToJson(const std::string& str) {
Json::Value json;
Json::CharReaderBuilder builder;
std::stringstream stream(str);
std::string errs;
if (!Json::parseFromStream(builder, stream, &json, &errs)) {
std::cerr << errs << std::endl;
}
return json;
}
Json::Value BookToJson(const Book& book) {
Json::Value root;
root["id"] = book.id;
root["title"] = book.title;
root["price"] = book.price;
return root;
}
std::string BookToJsonString(const Book& book) {
return JsonToString(BookToJson(book));
}
bool JsonStringToBook(const std::string& json_str, Book* book) {
Json::Value json = StringToJson(json_str);
if (!json) {
return false;
}
book->id = json["id"].asString();
book->title = json["title"].asString();
book->price = json["price"].asDouble();
return true;
}

@ -0,0 +1,19 @@
#ifndef EXAMPLE_COMMON_BOOK_JSON_H_
#define EXAMPLE_COMMON_BOOK_JSON_H_
#include <string>
#include "json/json-forwards.h"
struct Book;
std::string JsonToString(const Json::Value& json);
Json::Value StringToJson(const std::string& str);
Json::Value BookToJson(const Book& book);
std::string BookToJsonString(const Book& book);
bool JsonStringToBook(const std::string& json_str, Book* book);
#endif // EXAMPLE_COMMON_BOOK_JSON_H_

@ -1,4 +1,13 @@
add_executable(rest_book_async_client main.cc) set(TARGET_NAME rest_book_async_client)
target_link_libraries(rest_book_async_client webcc jsoncpp ${Boost_LIBRARIES}) set(SRCS
target_link_libraries(rest_book_async_client "${CMAKE_THREAD_LIBS_INIT}") ../common/book.cc
../common/book.h
../common/book_json.cc
../common/book_json.h
main.cc)
add_executable(${TARGET_NAME} ${SRCS})
target_link_libraries(${TARGET_NAME} webcc jsoncpp ${Boost_LIBRARIES})
target_link_libraries(${TARGET_NAME} "${CMAKE_THREAD_LIBS_INIT}")

@ -8,18 +8,72 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Write a JSON object to string. // Write a JSON object to string.
std::string JsonToString(const Json::Value& json) { static std::string JsonToString(const Json::Value& json) {
Json::StreamWriterBuilder builder; Json::StreamWriterBuilder builder;
return Json::writeString(builder, json); return Json::writeString(builder, json);
} }
static Json::Value StringToJson(const std::string& str) {
Json::Value json;
Json::CharReaderBuilder builder;
std::stringstream stream(str);
std::string errs;
if (!Json::parseFromStream(builder, stream, &json, &errs)) {
std::cerr << errs << std::endl;
}
return json;
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class BookListClient { class BookClientBase {
public: public:
BookListClient(boost::asio::io_context& io_context, BookClientBase(boost::asio::io_context& io_context,
const std::string& host, const std::string& port) const std::string& host, const std::string& port,
int timeout_seconds)
: rest_client_(io_context, host, port) { : rest_client_(io_context, host, port) {
rest_client_.set_timeout_seconds(timeout_seconds);
}
virtual ~BookClientBase() = default;
protected:
void PrintSeparateLine() {
std::cout << "--------------------------------";
std::cout << "--------------------------------";
std::cout << std::endl;
}
// Generic response handler for RestAsyncClient APIs.
void GenericHandler(std::function<void(webcc::HttpResponsePtr)> rsp_callback,
webcc::HttpResponsePtr response,
webcc::Error error,
bool timed_out) {
if (error != webcc::kNoError) {
std::cout << webcc::DescribeError(error);
if (timed_out) {
std::cout << " (timed out)";
}
std::cout << std::endl;
} else {
// Call the response callback on success.
rsp_callback(response);
}
}
webcc::RestAsyncClient rest_client_;
};
// -----------------------------------------------------------------------------
class BookListClient : public BookClientBase {
public:
BookListClient(boost::asio::io_context& io_context,
const std::string& host, const std::string& port,
int timeout_seconds)
: BookClientBase(io_context, host, port, timeout_seconds) {
} }
void ListBooks(webcc::HttpResponseHandler handler) { void ListBooks(webcc::HttpResponseHandler handler) {
@ -28,38 +82,48 @@ class BookListClient {
rest_client_.Get("/books", handler); rest_client_.Get("/books", handler);
} }
void CreateBook(const std::string& id, void CreateBook(const std::string& title, double price,
const std::string& title, std::function<void(std::string)> id_callback) {
double price, std::cout << "CreateBook: " << title << " " << price << std::endl;
webcc::HttpResponseHandler handler) {
std::cout << "CreateBook: " << id << " " << title << " " << price
<< std::endl;
Json::Value json(Json::objectValue); Json::Value json(Json::objectValue);
json["id"] = id;
json["title"] = title; json["title"] = title;
json["price"] = price; json["price"] = price;
rest_client_.Post("/books", JsonToString(json), handler); auto rsp_callback = [id_callback](webcc::HttpResponsePtr response) {
Json::Value rsp_json = StringToJson(response->content());
id_callback(rsp_json["id"].asString());
};
rest_client_.Post("/books", JsonToString(json),
std::bind(&BookListClient::GenericHandler, this,
rsp_callback,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
} }
private:
webcc::RestAsyncClient rest_client_;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class BookDetailClient { class BookDetailClient : public BookClientBase {
public: public:
BookDetailClient(boost::asio::io_context& io_context, BookDetailClient(boost::asio::io_context& io_context,
const std::string& host, const std::string& port) const std::string& host, const std::string& port,
: rest_client_(io_context, host, port) { int timeout_seconds)
: BookClientBase(io_context, host, port, timeout_seconds) {
} }
void GetBook(const std::string& id, webcc::HttpResponseHandler handler) { void GetBook(const std::string& id, webcc::HttpResponseHandler handler) {
std::cout << "GetBook: " << id << std::endl; std::cout << "GetBook: " << id << std::endl;
rest_client_.Get("/book/" + id, handler); auto rsp_callback = [](webcc::HttpResponsePtr response) {
Json::Value rsp_json = StringToJson(response->content());
//id_callback(rsp_json["id"].asString());
};
rest_client_.Get("/books/" + id, handler);
} }
void UpdateBook(const std::string& id, void UpdateBook(const std::string& id,
@ -74,29 +138,27 @@ class BookDetailClient {
json["title"] = title; json["title"] = title;
json["price"] = price; json["price"] = price;
rest_client_.Put("/book/" + id, JsonToString(json), handler); rest_client_.Put("/books/" + id, JsonToString(json), handler);
} }
void DeleteBook(const std::string& id, webcc::HttpResponseHandler handler) { void DeleteBook(const std::string& id, webcc::HttpResponseHandler handler) {
std::cout << "DeleteBook: " << id << std::endl; std::cout << "DeleteBook: " << id << std::endl;
rest_client_.Delete("/book/" + id, handler); rest_client_.Delete("/books/" + id, handler);
} }
private:
webcc::RestAsyncClient rest_client_;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void Help(const char* argv0) { void Help(const char* argv0) {
std::cout << "Usage: " << argv0 << " <host> <port>" << std::endl; std::cout << "Usage: " << argv0 << " <host> <port> [timeout]" << std::endl;
std::cout << " E.g.," << std::endl; std::cout << " E.g.," << std::endl;
std::cout << " " << argv0 << " localhost 8080" << std::endl; std::cout << " " << argv0 << " localhost 8080" << std::endl;
std::cout << " " << argv0 << " localhost 8080 2" << std::endl;
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
if (argc != 3) { if (argc < 3) {
Help(argv[0]); Help(argv[0]);
return 1; return 1;
} }
@ -106,10 +168,15 @@ int main(int argc, char* argv[]) {
std::string host = argv[1]; std::string host = argv[1];
std::string port = argv[2]; std::string port = argv[2];
int timeout_seconds = -1;
if (argc > 3) {
timeout_seconds = std::atoi(argv[3]);
}
boost::asio::io_context io_context; boost::asio::io_context io_context;
BookListClient list_client(io_context, host, port); BookListClient list_client(io_context, host, port, timeout_seconds);
BookDetailClient detail_client(io_context, host, port); BookDetailClient detail_client(io_context, host, port, timeout_seconds);
// Response handler. // Response handler.
auto handler = [](webcc::HttpResponsePtr response, webcc::Error error, auto handler = [](webcc::HttpResponsePtr response, webcc::Error error,
@ -126,16 +193,20 @@ int main(int argc, char* argv[]) {
}; };
list_client.ListBooks(handler); list_client.ListBooks(handler);
list_client.CreateBook("1", "1984", 12.3, handler);
detail_client.GetBook("1", handler); list_client.CreateBook("1984", 12.3, [](std::string id) {
detail_client.UpdateBook("1", "1Q84", 32.1, handler); std::cout << "ID: " << id << std::endl;
detail_client.GetBook("1", handler); });
detail_client.DeleteBook("1", handler);
list_client.ListBooks(handler); //detail_client.GetBook("1", handler);
//detail_client.UpdateBook("1", "1Q84", 32.1, handler);
//detail_client.GetBook("1", handler);
//detail_client.DeleteBook("1", handler);
//list_client.ListBooks(handler);
io_context.run(); io_context.run();
return 0; return 0;
} }

@ -1,6 +1,11 @@
set(TARGET_NAME rest_book_client) set(TARGET_NAME rest_book_client)
set(SRCS main.cc) set(SRCS
../common/book.cc
../common/book.h
../common/book_json.cc
../common/book_json.h
main.cc)
add_executable(${TARGET_NAME} ${SRCS}) add_executable(${TARGET_NAME} ${SRCS})

@ -5,6 +5,9 @@
#include "webcc/logger.h" #include "webcc/logger.h"
#include "webcc/rest_client.h" #include "webcc/rest_client.h"
#include "example/common/book.h"
#include "example/common/book_json.h"
// In order to run with VLD, please copy the following files to the example // In order to run with VLD, please copy the following files to the example
// output folder from "third_party\win32\bin": // output folder from "third_party\win32\bin":
// - dbghelp.dll // - dbghelp.dll
@ -41,7 +44,7 @@ static Json::Value StringToJson(const std::string& str) {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class BookClientBase { class BookClientBase {
public: public:
BookClientBase(const std::string& host, const std::string& port, BookClientBase(const std::string& host, const std::string& port,
int timeout_seconds) int timeout_seconds)
: rest_client_(host, port) { : rest_client_(host, port) {
@ -50,7 +53,7 @@ public:
virtual ~BookClientBase() = default; virtual ~BookClientBase() = default;
protected: protected:
void PrintSeparateLine() { void PrintSeparateLine() {
std::cout << "--------------------------------"; std::cout << "--------------------------------";
std::cout << "--------------------------------"; std::cout << "--------------------------------";
@ -121,17 +124,16 @@ public:
: BookClientBase(host, port, timeout_seconds) { : BookClientBase(host, port, timeout_seconds) {
} }
bool GetBook(const std::string& id) { bool GetBook(const std::string& id, Book* book) {
PrintSeparateLine(); PrintSeparateLine();
std::cout << "GetBook: " << id << std::endl; std::cout << "GetBook: " << id << std::endl;
if (!rest_client_.Get("/book/" + id)) { if (!rest_client_.Get("/books/" + id)) {
PrintError(); PrintError();
return false; return false;
} }
std::cout << rest_client_.response_content() << std::endl; return JsonStringToBook(rest_client_.response_content(), book);
return true;
} }
bool UpdateBook(const std::string& id, const std::string& title, bool UpdateBook(const std::string& id, const std::string& title,
@ -145,7 +147,7 @@ public:
json["title"] = title; json["title"] = title;
json["price"] = price; json["price"] = price;
if (!rest_client_.Put("/book/" + id, JsonToString(json))) { if (!rest_client_.Put("/books/" + id, JsonToString(json))) {
PrintError(); PrintError();
return false; return false;
} }
@ -158,7 +160,7 @@ public:
PrintSeparateLine(); PrintSeparateLine();
std::cout << "DeleteBook: " << id << std::endl; std::cout << "DeleteBook: " << id << std::endl;
if (!rest_client_.Delete("/book/" + id)) { if (!rest_client_.Delete("/books/" + id)) {
PrintError(); PrintError();
return false; return false;
} }
@ -201,9 +203,14 @@ int main(int argc, char* argv[]) {
std::string id; std::string id;
list_client.CreateBook("1984", 12.3, &id); list_client.CreateBook("1984", 12.3, &id);
detail_client.GetBook(id); Book book;
if (detail_client.GetBook(id, &book)) {
std::cout << "Book " << id << ": " << book << std::endl;
}
detail_client.UpdateBook(id, "1Q84", 32.1); detail_client.UpdateBook(id, "1Q84", 32.1);
detail_client.GetBook(id); detail_client.GetBook(id, &book);
detail_client.DeleteBook(id); detail_client.DeleteBook(id);
list_client.ListBooks(); list_client.ListBooks();

@ -3,6 +3,8 @@ set(TARGET_NAME rest_book_server)
set(SRCS set(SRCS
../common/book.cc ../common/book.cc
../common/book.h ../common/book.h
../common/book_json.cc
../common/book_json.h
services.cc services.cc
services.h services.h
main.cc) main.cc)

@ -52,7 +52,7 @@ int main(int argc, char* argv[]) {
"/books", false); "/books", false);
server.Bind(std::make_shared<BookDetailService>(sleep_seconds), server.Bind(std::make_shared<BookDetailService>(sleep_seconds),
"/book/(\\d+)", true); "/books/(\\d+)", true);
server.Run(); server.Run();

@ -8,36 +8,12 @@
#include "webcc/logger.h" #include "webcc/logger.h"
#include "example/common/book.h" #include "example/common/book.h"
#include "example/common/book_json.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
static BookStore g_book_store; static BookStore g_book_store;
static Json::Value BookToJson(const Book& book) {
Json::Value root;
root["id"] = book.id;
root["title"] = book.title;
root["price"] = book.price;
return root;
}
static bool JsonToBook(const std::string& json, Book* book) {
Json::Value root;
Json::CharReaderBuilder builder;
std::stringstream stream(json);
std::string errs;
if (!Json::parseFromStream(builder, stream, &root, &errs)) {
std::cerr << errs << std::endl;
return false;
}
book->id = root["id"].asString();
book->title = root["title"].asString();
book->price = root["price"].asDouble();
return true;
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Return all books as a JSON array. // Return all books as a JSON array.
@ -49,13 +25,12 @@ bool BookListService::Get(const webcc::UrlQuery& /*query*/,
std::this_thread::sleep_for(std::chrono::seconds(sleep_seconds_)); std::this_thread::sleep_for(std::chrono::seconds(sleep_seconds_));
} }
Json::Value root(Json::arrayValue); Json::Value json(Json::arrayValue);
for (const Book& book : g_book_store.books()) { for (const Book& book : g_book_store.books()) {
root.append(BookToJson(book)); json.append(BookToJson(book));
} }
Json::StreamWriterBuilder builder; *response_content = JsonToString(json);
*response_content = Json::writeString(builder, root);
return true; return true;
} }
@ -69,14 +44,13 @@ bool BookListService::Post(const std::string& request_content,
} }
Book book; Book book;
if (JsonToBook(request_content, &book)) { if (JsonStringToBook(request_content, &book)) {
std::string id = g_book_store.AddBook(book); std::string id = g_book_store.AddBook(book);
Json::Value root; Json::Value json;
root["id"] = id; json["id"] = id;
Json::StreamWriterBuilder builder; *response_content = JsonToString(json);
*response_content = Json::writeString(builder, root);
return true; return true;
} }
@ -102,8 +76,7 @@ bool BookDetailService::Get(const std::vector<std::string>& url_sub_matches,
const Book& book = g_book_store.GetBook(book_id); const Book& book = g_book_store.GetBook(book_id);
if (!book.IsNull()) { if (!book.IsNull()) {
Json::StreamWriterBuilder builder; *response_content = BookToJsonString(book);
*response_content = Json::writeString(builder, BookToJson(book));
return true; return true;
} }
@ -126,7 +99,7 @@ bool BookDetailService::Put(const std::vector<std::string>& url_sub_matches,
const std::string& book_id = url_sub_matches[0]; const std::string& book_id = url_sub_matches[0];
Book book; Book book;
if (JsonToBook(request_content, &book)) { if (JsonStringToBook(request_content, &book)) {
book.id = book_id; book.id = book_id;
return g_book_store.UpdateBook(book); return g_book_store.UpdateBook(book);
} }

@ -23,11 +23,8 @@ const std::size_t kBufferSize = 1024;
const std::size_t kInvalidLength = static_cast<std::size_t>(-1); const std::size_t kInvalidLength = static_cast<std::size_t>(-1);
// Timeout seconds. // Default timeout for reading response.
// TODO const int kMaxReadSeconds = 30;
const int kMaxConnectSeconds = 10;
const int kMaxSendSeconds = 30;
const int kMaxReceiveSeconds = 30;
extern const std::string kHost; extern const std::string kHost;
extern const std::string kContentType; extern const std::string kContentType;

@ -9,15 +9,12 @@
namespace webcc { namespace webcc {
extern void AdjustBufferSize(std::size_t content_length,
std::vector<char>* buffer);
HttpAsyncClient::HttpAsyncClient(boost::asio::io_context& io_context) HttpAsyncClient::HttpAsyncClient(boost::asio::io_context& io_context)
: socket_(io_context), : socket_(io_context),
resolver_(new tcp::resolver(io_context)), resolver_(new tcp::resolver(io_context)),
buffer_(kBufferSize), buffer_(kBufferSize),
deadline_(io_context), deadline_(io_context),
timeout_seconds_(kMaxReceiveSeconds), timeout_seconds_(kMaxReadSeconds),
stopped_(false), stopped_(false),
timed_out_(false) { timed_out_(false) {
} }
@ -30,6 +27,8 @@ void HttpAsyncClient::Request(std::shared_ptr<HttpRequest> request,
response_.reset(new HttpResponse()); response_.reset(new HttpResponse());
response_parser_.reset(new HttpResponseParser(response_.get())); response_parser_.reset(new HttpResponseParser(response_.get()));
stopped_ = timed_out_ = false;
LOG_VERB("HTTP request:\n%s", request->Dump(4, "> ").c_str()); LOG_VERB("HTTP request:\n%s", request->Dump(4, "> ").c_str());
request_ = request; request_ = request;
@ -54,6 +53,7 @@ void HttpAsyncClient::Stop() {
LOG_ERRO("Failed to close socket."); LOG_ERRO("Failed to close socket.");
} }
LOG_INFO("Cancel deadline timer...");
deadline_.cancel(); deadline_.cancel();
} }
} }
@ -68,28 +68,19 @@ void HttpAsyncClient::ResolveHandler(boost::system::error_code ec,
// Start the connect actor. // Start the connect actor.
endpoints_ = endpoints; endpoints_ = endpoints;
// Set a deadline for the connect operation.
deadline_.expires_from_now(boost::posix_time::seconds(kMaxConnectSeconds));
// ConnectHandler: void(boost::system::error_code, tcp::endpoint) // ConnectHandler: void(boost::system::error_code, tcp::endpoint)
boost::asio::async_connect(socket_, endpoints_, boost::asio::async_connect(socket_, endpoints_,
std::bind(&HttpAsyncClient::ConnectHandler, std::bind(&HttpAsyncClient::ConnectHandler,
shared_from_this(), shared_from_this(),
std::placeholders::_1, std::placeholders::_1,
std::placeholders::_2)); std::placeholders::_2));
// Start the deadline actor. You will note that we're not setting any
// particular deadline here. Instead, the connect and input actors will
// update the deadline prior to each asynchronous operation.
deadline_.async_wait(std::bind(&HttpAsyncClient::CheckDeadline,
shared_from_this()));
} }
} }
void HttpAsyncClient::ConnectHandler(boost::system::error_code ec, void HttpAsyncClient::ConnectHandler(boost::system::error_code ec,
tcp::endpoint endpoint) { tcp::endpoint endpoint) {
if (ec) { if (ec) {
LOG_ERRO("Socket connect error: %s", ec.message().c_str()); LOG_ERRO("Socket connect error (%s).", ec.message().c_str());
Stop(); Stop();
response_handler_(response_, kEndpointConnectError, timed_out_); response_handler_(response_, kEndpointConnectError, timed_out_);
return; return;
@ -115,8 +106,6 @@ void HttpAsyncClient::AsyncWrite() {
return; return;
} }
deadline_.expires_from_now(boost::posix_time::seconds(kMaxSendSeconds));
boost::asio::async_write(socket_, boost::asio::async_write(socket_,
request_->ToBuffers(), request_->ToBuffers(),
std::bind(&HttpAsyncClient::WriteHandler, std::bind(&HttpAsyncClient::WriteHandler,
@ -134,6 +123,8 @@ void HttpAsyncClient::WriteHandler(boost::system::error_code ec) {
response_handler_(response_, kSocketWriteError, timed_out_); response_handler_(response_, kSocketWriteError, timed_out_);
} else { } else {
deadline_.expires_from_now(boost::posix_time::seconds(timeout_seconds_)); deadline_.expires_from_now(boost::posix_time::seconds(timeout_seconds_));
AsyncWaitDeadline();
AsyncRead(); AsyncRead();
} }
} }
@ -148,15 +139,11 @@ void HttpAsyncClient::AsyncRead() {
void HttpAsyncClient::ReadHandler(boost::system::error_code ec, void HttpAsyncClient::ReadHandler(boost::system::error_code ec,
std::size_t length) { std::size_t length) {
if (stopped_) {
return;
}
LOG_VERB("Socket async read handler."); LOG_VERB("Socket async read handler.");
if (ec || length == 0) { if (ec || length == 0) {
Stop(); Stop();
LOG_ERRO("Socket read error."); LOG_ERRO("Socket read error (%s).", ec.message().c_str());
response_handler_(response_, kSocketReadError, timed_out_); response_handler_(response_, kSocketReadError, timed_out_);
return; return;
} }
@ -188,27 +175,28 @@ void HttpAsyncClient::ReadHandler(boost::system::error_code ec,
return; return;
} }
AsyncRead(); if (!stopped_) {
AsyncRead();
}
} }
void HttpAsyncClient::CheckDeadline() { void HttpAsyncClient::AsyncWaitDeadline() {
if (stopped_) { deadline_.async_wait(std::bind(&HttpAsyncClient::DeadlineHandler,
shared_from_this(), std::placeholders::_1));
}
void HttpAsyncClient::DeadlineHandler(boost::system::error_code ec) {
LOG_VERB("Deadline handler.");
if (ec == boost::asio::error::operation_aborted) {
LOG_VERB("Deadline timer canceled.");
return; return;
} }
if (deadline_.expires_at() <= LOG_WARN("HTTP client timed out.");
boost::asio::deadline_timer::traits_type::now()) { timed_out_ = true;
// The deadline has passed.
// The socket is closed so that any outstanding asynchronous operations
// are canceled.
LOG_WARN("HTTP client timed out.");
Stop();
timed_out_ = true;
}
// Put the actor back to sleep. Stop();
deadline_.async_wait(std::bind(&HttpAsyncClient::CheckDeadline,
shared_from_this()));
} }
} // namespace webcc } // namespace webcc

@ -52,7 +52,8 @@ class HttpAsyncClient : public std::enable_shared_from_this<HttpAsyncClient> {
void AsyncRead(); void AsyncRead();
void ReadHandler(boost::system::error_code ec, std::size_t length); void ReadHandler(boost::system::error_code ec, std::size_t length);
void CheckDeadline(); void AsyncWaitDeadline();
void DeadlineHandler(boost::system::error_code ec);
tcp::socket socket_; tcp::socket socket_;
std::unique_ptr<tcp::resolver> resolver_; std::unique_ptr<tcp::resolver> resolver_;

@ -1,5 +1,6 @@
#include "webcc/http_client.h" #include "webcc/http_client.h"
#include <algorithm> // for min
#include <string> #include <string>
#include "boost/asio/connect.hpp" #include "boost/asio/connect.hpp"
@ -10,35 +11,17 @@
#include "boost/lambda/lambda.hpp" #include "boost/lambda/lambda.hpp"
#include "webcc/logger.h" #include "webcc/logger.h"
#include "webcc/utility.h"
namespace webcc { using boost::asio::ip::tcp;
// Adjust buffer size according to content length.
// This is to avoid reading too many times.
// Also used by AsyncHttpClient.
void AdjustBufferSize(std::size_t content_length, std::vector<char>* buffer) {
const std::size_t kMaxTimes = 10;
// According to test, a client never read more than 200000 bytes a time.
// So it doesn't make sense to set any larger size, e.g., 1MB.
const std::size_t kMaxBufferSize = 200000;
LOG_INFO("Adjust buffer size according to content length.");
std::size_t min_buffer_size = content_length / kMaxTimes; namespace webcc {
if (min_buffer_size > buffer->size()) {
buffer->resize(std::min(min_buffer_size, kMaxBufferSize));
LOG_INFO("Resize read buffer to %u.", buffer->size());
} else {
LOG_INFO("Keep the current buffer size: %u.", buffer->size());
}
}
HttpClient::HttpClient() HttpClient::HttpClient()
: socket_(io_context_), : socket_(io_context_),
buffer_(kBufferSize), buffer_(kBufferSize),
deadline_(io_context_), deadline_(io_context_),
timeout_seconds_(kMaxReceiveSeconds), timeout_seconds_(kMaxReadSeconds),
stopped_(false), stopped_(false),
timed_out_(false), timed_out_(false),
error_(kNoError) { error_(kNoError) {
@ -48,12 +31,7 @@ bool HttpClient::Request(const HttpRequest& request) {
response_.reset(new HttpResponse()); response_.reset(new HttpResponse());
response_parser_.reset(new HttpResponseParser(response_.get())); response_parser_.reset(new HttpResponseParser(response_.get()));
stopped_ = false; stopped_ = timed_out_ = false;
timed_out_ = false;
// Start the persistent actor that checks for deadline expiry.
deadline_.expires_at(boost::posix_time::pos_infin);
CheckDeadline();
if ((error_ = Connect(request)) != kNoError) { if ((error_ = Connect(request)) != kNoError) {
return false; return false;
@ -71,8 +49,6 @@ bool HttpClient::Request(const HttpRequest& request) {
} }
Error HttpClient::Connect(const HttpRequest& request) { Error HttpClient::Connect(const HttpRequest& request) {
using boost::asio::ip::tcp;
tcp::resolver resolver(io_context_); tcp::resolver resolver(io_context_);
std::string port = request.port(kHttpPort); std::string port = request.port(kHttpPort);
@ -88,8 +64,6 @@ Error HttpClient::Connect(const HttpRequest& request) {
LOG_VERB("Connect to server..."); LOG_VERB("Connect to server...");
deadline_.expires_from_now(boost::posix_time::seconds(kMaxConnectSeconds));
ec = boost::asio::error::would_block; ec = boost::asio::error::would_block;
// ConnectHandler: void (boost::system::error_code, tcp::endpoint) // ConnectHandler: void (boost::system::error_code, tcp::endpoint)
@ -109,33 +83,28 @@ Error HttpClient::Connect(const HttpRequest& request) {
// Determine whether a connection was successfully established. // Determine whether a connection was successfully established.
if (ec) { if (ec) {
LOG_ERRO("Socket connect error: %s", ec.message().c_str()); LOG_ERRO("Socket connect error (%s).", ec.message().c_str());
Stop(); Stop();
return kEndpointConnectError; return kEndpointConnectError;
} }
LOG_VERB("Socket connected."); LOG_VERB("Socket connected.");
// The deadline actor may have had a chance to run and close our socket, even // ISSUE: |async_connect| reports success on failure.
// though the connect operation notionally succeeded. // See the following bugs:
if (stopped_) { // - https://svn.boost.org/trac10/ticket/8795
// |timed_out_| should be true in this case. // - https://svn.boost.org/trac10/ticket/8995
LOG_ERRO("Socket connect timed out.");
return kEndpointConnectError;
}
return kNoError; return kNoError;
} }
Error HttpClient::SendReqeust(const HttpRequest& request) { Error HttpClient::SendReqeust(const HttpRequest& request) {
LOG_VERB("Send request (timeout: %ds)...", kMaxSendSeconds);
LOG_VERB("HTTP request:\n%s", request.Dump(4, "> ").c_str()); LOG_VERB("HTTP request:\n%s", request.Dump(4, "> ").c_str());
// NOTE: // NOTE:
// It doesn't make much sense to set a timeout for socket write. // It doesn't make much sense to set a timeout for socket write.
// I find that it's almost impossible to simulate a situation in the server // I find that it's almost impossible to simulate a situation in the server
// side to test this timeout. // side to test this timeout.
deadline_.expires_from_now(boost::posix_time::seconds(kMaxSendSeconds));
boost::system::error_code ec = boost::asio::error::would_block; boost::system::error_code ec = boost::asio::error::would_block;
@ -149,17 +118,11 @@ Error HttpClient::SendReqeust(const HttpRequest& request) {
} while (ec == boost::asio::error::would_block); } while (ec == boost::asio::error::would_block);
if (ec) { if (ec) {
LOG_ERRO("Socket write error: %s", ec.message().c_str()); LOG_ERRO("Socket write error (%s).", ec.message().c_str());
Stop(); Stop();
return kSocketWriteError; return kSocketWriteError;
} }
if (stopped_) {
// |timed_out_| should be true in this case.
LOG_ERRO("Socket write timed out.");
return kSocketWriteError;
}
return kNoError; return kNoError;
} }
@ -167,6 +130,7 @@ Error HttpClient::ReadResponse() {
LOG_VERB("Read response (timeout: %ds)...", timeout_seconds_); LOG_VERB("Read response (timeout: %ds)...", timeout_seconds_);
deadline_.expires_from_now(boost::posix_time::seconds(timeout_seconds_)); deadline_.expires_from_now(boost::posix_time::seconds(timeout_seconds_));
AsyncWaitDeadline();
Error error = kNoError; Error error = kNoError;
DoReadResponse(&error); DoReadResponse(&error);
@ -190,14 +154,10 @@ void HttpClient::DoReadResponse(Error* error) {
LOG_VERB("Socket async read handler."); LOG_VERB("Socket async read handler.");
if (stopped_) { if (ec || length == 0) {
return;
}
if (inner_ec || length == 0) {
Stop(); Stop();
*error = kSocketReadError; *error = kSocketReadError;
LOG_ERRO("Socket read error."); LOG_ERRO("Socket read error (%s).", ec.message().c_str());
return; return;
} }
@ -227,7 +187,9 @@ void HttpClient::DoReadResponse(Error* error) {
return; return;
} }
DoReadResponse(error); if (!stopped_) {
DoReadResponse(error);
}
}); });
// Block until the asynchronous operation has completed. // Block until the asynchronous operation has completed.
@ -236,25 +198,23 @@ void HttpClient::DoReadResponse(Error* error) {
} while (ec == boost::asio::error::would_block); } while (ec == boost::asio::error::would_block);
} }
void HttpClient::CheckDeadline() { void HttpClient::AsyncWaitDeadline() {
if (stopped_) { deadline_.async_wait(std::bind(&HttpClient::DeadlineHandler, this,
return; std::placeholders::_1));
} }
LOG_VERB("Check deadline."); void HttpClient::DeadlineHandler(boost::system::error_code ec) {
LOG_VERB("Deadline handler.");
if (deadline_.expires_at() <= if (ec == boost::asio::error::operation_aborted) {
boost::asio::deadline_timer::traits_type::now()) { LOG_VERB("Deadline timer canceled.");
// The deadline has passed. return;
// The socket is closed so that any outstanding asynchronous operations
// are canceled.
LOG_WARN("HTTP client timed out.");
Stop();
timed_out_ = true;
} }
// Put the actor back to sleep. LOG_WARN("HTTP client timed out.");
deadline_.async_wait(std::bind(&HttpClient::CheckDeadline, this)); timed_out_ = true;
Stop();
} }
void HttpClient::Stop() { void HttpClient::Stop() {
@ -269,6 +229,7 @@ void HttpClient::Stop() {
LOG_ERRO("Failed to close socket."); LOG_ERRO("Failed to close socket.");
} }
LOG_INFO("Cancel deadline timer...");
deadline_.cancel(); deadline_.cancel();
} }
} }

@ -46,7 +46,8 @@ class HttpClient {
void DoReadResponse(Error* error); void DoReadResponse(Error* error);
void CheckDeadline(); void AsyncWaitDeadline();
void DeadlineHandler(boost::system::error_code ec);
void Stop(); void Stop();
@ -62,7 +63,7 @@ class HttpClient {
boost::asio::deadline_timer deadline_; boost::asio::deadline_timer deadline_;
// Maximum seconds to wait before the client cancels the operation. // Maximum seconds to wait before the client cancels the operation.
// Only for receiving response from server. // Only for reading response from server.
int timeout_seconds_; int timeout_seconds_;
bool stopped_; bool stopped_;

@ -1,12 +1,33 @@
#include "webcc/utility.h" #include "webcc/utility.h"
#include <algorithm>
#include <ostream> #include <ostream>
#include <sstream> #include <sstream>
#include "webcc/logger.h"
using tcp = boost::asio::ip::tcp; using tcp = boost::asio::ip::tcp;
namespace webcc { namespace webcc {
void AdjustBufferSize(std::size_t content_length, std::vector<char>* buffer) {
const std::size_t kMaxTimes = 10;
// According to test, a client never read more than 200000 bytes a time.
// So it doesn't make sense to set any larger size, e.g., 1MB.
const std::size_t kMaxBufferSize = 200000;
LOG_INFO("Adjust buffer size according to content length.");
std::size_t min_buffer_size = content_length / kMaxTimes;
if (min_buffer_size > buffer->size()) {
buffer->resize(std::min(min_buffer_size, kMaxBufferSize));
LOG_INFO("Resize read buffer to %u.", buffer->size());
} else {
LOG_INFO("Keep the current buffer size: %u.", buffer->size());
}
}
void PrintEndpoint(std::ostream& ostream, void PrintEndpoint(std::ostream& ostream,
const boost::asio::ip::tcp::endpoint& endpoint) { const boost::asio::ip::tcp::endpoint& endpoint) {
ostream << endpoint; ostream << endpoint;

@ -3,11 +3,16 @@
#include <iosfwd> #include <iosfwd>
#include <string> #include <string>
#include <vector>
#include "boost/asio/ip/tcp.hpp" #include "boost/asio/ip/tcp.hpp"
namespace webcc { namespace webcc {
// Adjust buffer size according to content length.
// This is to avoid reading too many times.
void AdjustBufferSize(std::size_t content_length, std::vector<char>* buffer);
void PrintEndpoint(std::ostream& ostream, void PrintEndpoint(std::ostream& ostream,
const boost::asio::ip::tcp::endpoint& endpoint); const boost::asio::ip::tcp::endpoint& endpoint);

Loading…
Cancel
Save