Refine the streaming, add UT, add error handling.

master
Chunting Gu 6 years ago
parent c669551552
commit feae9b7acc

@ -3,6 +3,8 @@
#include "gtest/gtest.h"
#include "boost/algorithm/string.hpp"
#include "boost/filesystem/operations.hpp"
#include "json/json.h"
#include "webcc/client_session.h"
@ -280,22 +282,85 @@ TEST(ClientTest, KeepAlive) {
// -----------------------------------------------------------------------------
// Get a JPEG image.
TEST(ClientTest, GetImageJpeg) {
// Get a JPEG image (without streaming).
TEST(ClientTest, GetImageJpeg_NoStream) {
webcc::ClientSession session;
try {
auto r = session.Get("http://httpbin.org/image/jpeg");
// Or
// auto r = session.Get("http://httpbin.org/image", {},
// {"Accept", "image/jpeg"});
//std::ofstream ofs(path, std::ios::binary);
// TODO: Verify the response is a valid JPEG image.
//std::ofstream ofs(<path>, std::ios::binary);
//ofs << r->data();
// TODO: Verify the response is a valid JPEG image.
} catch (const webcc::Error& error) {
std::cerr << error << std::endl;
}
}
// -----------------------------------------------------------------------------
// Streaming
TEST(ClientTest, Stream_GetImageJpeg) {
webcc::ClientSession session;
try {
auto r = session.Request(webcc::RequestBuilder{}.
Get("http://httpbin.org/image/jpeg")(),
true);
auto file_body = r->file_body();
EXPECT_TRUE(!!file_body);
EXPECT_TRUE(!file_body->path().empty());
// Backup the path of the temp file.
const webcc::Path ori_path = file_body->path();
const webcc::Path new_path("./wolf.jpeg");
bool moved = file_body->Move(new_path);
EXPECT_TRUE(moved);
EXPECT_TRUE(boost::filesystem::exists(new_path));
// The file in the original path should not exist any more.
EXPECT_TRUE(!boost::filesystem::exists(ori_path));
// After move, the original path should be reset.
EXPECT_TRUE(file_body->path().empty());
} catch (const webcc::Error& error) {
std::cerr << error << std::endl;
}
}
// Test whether the streamed file will be deleted or not at the end if it's
// not moved to another path by the user.
TEST(ClientTest, Stream_GetImageJpeg_NoMove) {
webcc::ClientSession session;
try {
webcc::Path ori_path;
{
auto r = session.Request(webcc::RequestBuilder{}.
Get("http://httpbin.org/image/jpeg")(),
true);
auto file_body = r->file_body();
EXPECT_TRUE(!!file_body);
EXPECT_TRUE(!file_body->path().empty());
// Backup the path of the temp file.
ori_path = file_body->path();
}
// The temp file should be deleted.
EXPECT_TRUE(!boost::filesystem::exists(ori_path));
} catch (const webcc::Error& error) {
std::cerr << error << std::endl;

@ -21,8 +21,8 @@ int main(int argc, char* argv[]) {
return 1;
}
std::string url = argv[1];
std::string path = argv[2];
const char* url = argv[1];
const char* path = argv[2];
WEBCC_LOG_INIT("", webcc::LOG_CONSOLE);
@ -32,9 +32,9 @@ int main(int argc, char* argv[]) {
auto r = session.Request(webcc::RequestBuilder{}.Get(url)(),
true); // Stream the response data to file.
auto file_body = r->file_body();
if (auto file_body = r->file_body()) {
file_body->Move(path);
}
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;

@ -1,12 +1,7 @@
# Unit test
set(UT_SRCS
base64_unittest.cc
body_unittest.cc
request_parser_unittest.cc
url_unittest.cc
utility_unittest.cc
)
file(GLOB UT_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/*.cc)
set(UT_TARGET_NAME webcc_unittest)

@ -0,0 +1,17 @@
#include "gtest/gtest.h"
#include "webcc/response_builder.h"
// Empty body should also have `Content-Length` header.
TEST(ResponseBuilderTest, EmptyBody) {
using namespace webcc;
auto response = ResponseBuilder{}.OK()();
bool existed = false;
const auto& value = response->GetHeader(headers::kContentLength, &existed);
EXPECT_TRUE(existed);
EXPECT_EQ("0", value);
}

@ -1,77 +0,0 @@
#include "gtest/gtest.h"
#include "webcc/service_manager.h"
// -----------------------------------------------------------------------------
class MyService : public webcc::Service {
public:
webcc::ResponsePtr Handle(webcc::RequestPtr request,
const webcc::UrlArgs& args) override {
return webcc::ResponseBuilder{}.OK()();
}
};
// -----------------------------------------------------------------------------
/*
TEST(ServiceManagerTest, URL_RegexBasic) {
webcc::ServiceManager service_manager;
service_manager.Add(std::make_shared<MyService>(), "/instance/(\\d+)", true);
std::string url = "/instance/12345";
webcc::UrlArgs args;
webcc::ServicePtr service = service_manager.Get(url, &args);
EXPECT_TRUE(!!service);
EXPECT_EQ(1, args.size());
EXPECT_EQ("12345", args[0]);
url = "/instance/abcde";
args.clear();
service = service_manager.Get(url, &args);
EXPECT_FALSE(!!service);
}
TEST(RestServiceManagerTest, URL_RegexMultiple) {
webcc::ServiceManager service_manager;
service_manager.Add(std::make_shared<MyService>(),
"/study/(\\d+)/series/(\\d+)/instance/(\\d+)", true);
std::string url = "/study/1/series/2/instance/3";
webcc::UrlArgs args;
webcc::ServicePtr service = service_manager.Get(url, &args);
EXPECT_TRUE(!!service);
EXPECT_EQ(3, args.size());
EXPECT_EQ("1", args[0]);
EXPECT_EQ("2", args[1]);
EXPECT_EQ("3", args[2]);
url = "/study/a/series/b/instance/c";
args.clear();
service = service_manager.Get(url, &args);
EXPECT_FALSE(!!service);
}
TEST(RestServiceManagerTest, URL_NonRegex) {
webcc::ServiceManager service_manager;
service_manager.Add(std::make_shared<MyService>(), "/instances", false);
std::string url = "/instances";
webcc::UrlArgs args;
webcc::ServicePtr service = service_manager.Get(url, &args);
EXPECT_TRUE(!!service);
EXPECT_TRUE(args.empty());
}
*/

@ -169,6 +169,10 @@ public:
void Dump(std::ostream& os, const std::string& prefix) const override;
const Path& path() const {
return path_;
}
// Move (or rename) the file.
// Used to move the streamed file of the received message to a new place.
// Applicable to both client and server.

@ -16,15 +16,19 @@ Client::Client()
}
Error Client::Request(RequestPtr request, bool connect, bool stream) {
io_context_.restart();
response_.reset(new Response{});
response_parser_.Init(response_.get(), stream);
closed_ = false;
timer_canceled_ = false;
error_ = Error{};
response_.reset(new Response{});
if (!response_parser_.Init(response_.get(), stream)) {
// Failed to generate the temp file for streaming.
// I don't know when this would happen. Keep the error handling here just
// for preciseness.
return Error{ Error::kFileError, "Streaming temp file error" };
}
if (buffer_.size() != buffer_size_) {
LOG_VERB("Resize buffer: %u -> %u.", buffer_.size(), buffer_size_);
buffer_.resize(buffer_size_);
@ -44,6 +48,8 @@ Error Client::Request(RequestPtr request, bool connect, bool stream) {
response_parser_.set_ignroe_body(false);
}
io_context_.restart();
if (connect) {
// No existing socket connection was specified, create a new one.
Connect(request);

@ -54,9 +54,21 @@ public:
// Close the socket.
void Close();
ResponsePtr response() const { return response_; }
ResponsePtr response() const {
return response_;
}
// Reset response object.
// Used to make sure the response object will released even the client object
// itself will be cached for keep-alive purpose.
void Reset() {
response_.reset();
response_parser_.Init(nullptr, false);
}
bool closed() const { return closed_; }
bool closed() const {
return closed_;
}
private:
void Connect(RequestPtr request);

@ -232,7 +232,11 @@ ResponsePtr ClientSession::Send(RequestPtr request, bool stream) {
}
}
return client->response();
auto response = client->response();
// The client object might be cached in the pool.
// Reset to make sure it won't keep a reference to the response object.
client->Reset();
return response;
}
} // namespace webcc

@ -39,11 +39,7 @@ const std::string& Message::data() const {
}
std::shared_ptr<FileBody> Message::file_body() const {
auto file_body = std::dynamic_pointer_cast<FileBody>(body_);
if (!file_body) {
throw Error{ Error::kDataError, "Not a file body" };
}
return file_body;
return std::dynamic_pointer_cast<FileBody>(body_);
}
bool Message::IsConnectionKeepAlive() const {

@ -27,11 +27,11 @@ public:
}
// Get the data from the (string) body.
// Empty string will be returned if the body is not a StringBody.
// Return empty string if the body is not a StringBody.
const std::string& data() const;
// Get the body as a FileBody.
// Exception Error::kDataError will be thrown if the body is not a FileBody.
// Return null if the body is not a FileBody.
std::shared_ptr<FileBody> file_body() const;
// ---------------------------------------------------------------------------

@ -17,66 +17,49 @@ namespace webcc {
// -----------------------------------------------------------------------------
ParseHandler::ParseHandler(Message* message, bool stream)
: message_(message), content_length_(kInvalidLength), stream_(stream),
streamed_size_(0) {
if (stream_) {
try {
temp_path_ = bfs::temp_directory_path() / bfs::unique_path();
} catch (const bfs::filesystem_error&) {
throw Error{ Error::kFileError, "Cannot generate temp file path" };
ParseHandlerBase::ParseHandlerBase(Message* message)
: message_(message), content_length_(kInvalidLength) {
}
ofstream_.open(temp_path_, std::ios::binary);
if (ofstream_.fail()) {
throw Error{ Error::kFileError, "Cannot open the temp file" };
void ParseHandlerBase::OnStartLine(const std::string& start_line) {
message_->set_start_line(start_line);
}
void ParseHandlerBase::OnContentLength(std::size_t content_length) {
content_length_ = content_length;
}
void ParseHandlerBase::OnHeader(Header&& header) {
message_->SetHeader(std::move(header));
}
ParseHandler::~ParseHandler() {
bool ParseHandlerBase::IsCompressed() const {
return message_->GetContentEncoding() != ContentEncoding::kUnknown;
}
void ParseHandler::OnStartLine(const std::string& start_line) {
message_->set_start_line(start_line);
// -----------------------------------------------------------------------------
ParseHandler::ParseHandler(Message* message) : ParseHandlerBase(message) {
}
void ParseHandler::OnContentLength(std::size_t content_length) {
content_length_ = content_length;
ParseHandlerBase::OnContentLength(content_length);
if (!stream_) {
// Reserve memory to avoid frequent reallocation when append.
// TODO: Don't know if it's really necessary.
try {
content_.reserve(content_length_);
} catch (const std::exception& e) {
LOG_ERRO("Failed to reserve content memory: %s.", e.what());
}
}
}
void ParseHandler::OnHeader(Header&& header) {
message_->SetHeader(std::move(header));
}
void ParseHandler::AddContent(const char* data, std::size_t count) {
if (stream_) {
ofstream_.write(data, count);
streamed_size_ += count;
} else {
content_.append(data, count);
}
}
void ParseHandler::AddContent(const std::string& data) {
if (stream_) {
ofstream_ << data;
streamed_size_ += data.size();
} else {
content_.append(data);
}
}
bool ParseHandler::IsFixedContentFull() const {
if (content_length_ == kInvalidLength) {
@ -85,36 +68,21 @@ bool ParseHandler::IsFixedContentFull() const {
return false;
}
if (stream_) {
return streamed_size_ >= content_length_;
} else {
return content_.length() >= content_length_;
}
}
bool ParseHandler::Finish() {
// Could be `kInvalidLength` (chunked).
// Could be `0` (empty body and `Content-Length : 0`).
message_->set_content_length(content_length_);
if (!stream_ && content_.empty()) {
if (content_.empty()) {
// The call to message_->SetBody() is not necessary since message is
// always initialized with an empty body.
return true;
}
BodyPtr body;
if (stream_) {
ofstream_.close();
// Create a file body based on the streamed temp file.
body = std::make_shared<FileBody>(temp_path_, true);
// TODO: Compress
} else {
body = std::make_shared<StringBody>(std::move(content_), IsCompressed());
auto body = std::make_shared<StringBody>(std::move(content_), IsCompressed());
#if WEBCC_ENABLE_GZIP
LOG_INFO("Decompress the HTTP content...");
@ -125,15 +93,71 @@ bool ParseHandler::Finish() {
#else
LOG_WARN("Compressed HTTP content remains untouched.");
#endif // WEBCC_ENABLE_GZIP
}
message_->SetBody(body, false);
return true;
}
// -----------------------------------------------------------------------------
StreamedParseHandler::StreamedParseHandler(Message* message)
: ParseHandlerBase(message) {
}
bool StreamedParseHandler::Init() {
try {
temp_path_ = bfs::temp_directory_path() / bfs::unique_path();
LOG_VERB("Generate a temp path for streaming: %s",
temp_path_.string().c_str());
} catch (const bfs::filesystem_error&) {
LOG_ERRO("Failed to generate temp path: %s", temp_path_.string().c_str());
return false;
}
ofstream_.open(temp_path_, std::ios::binary);
if (ofstream_.fail()) {
LOG_ERRO("Failed to open the temp file: %s", temp_path_.string().c_str());
return false;
}
return true;
}
bool ParseHandler::IsCompressed() const {
return message_->GetContentEncoding() != ContentEncoding::kUnknown;
void StreamedParseHandler::AddContent(const char* data, std::size_t count) {
ofstream_.write(data, count);
streamed_size_ += count;
}
void StreamedParseHandler::AddContent(const std::string& data) {
ofstream_ << data;
streamed_size_ += data.size();
}
bool StreamedParseHandler::IsFixedContentFull() const {
if (content_length_ == kInvalidLength) {
// Shouldn't be here.
// See Parser::ParseFixedContent().
return false;
}
return streamed_size_ >= content_length_;
}
bool StreamedParseHandler::Finish() {
// Could be `kInvalidLength` (chunked).
// Could be `0` (empty body and `Content-Length : 0`).
message_->set_content_length(content_length_);
ofstream_.close();
// Create a file body based on the streamed temp file.
auto body = std::make_shared<FileBody>(temp_path_, true);
// TODO: Compress
message_->SetBody(body, false);
return true;
}
// -----------------------------------------------------------------------------
@ -147,9 +171,21 @@ Parser::Parser()
finished_(false) {
}
void Parser::Init(Message* message, bool stream) {
bool Parser::Init(Message* message, bool stream) {
Reset();
handler_.reset(new ParseHandler{ message, stream });
if (stream) {
handler_.reset(new StreamedParseHandler{ message });
} else {
handler_.reset(new ParseHandler{ message });
}
if (!handler_->Init()) {
// Failed to generate temp file for streaming.
return false;
}
return true;
}
bool Parser::Parse(const char* data, std::size_t length) {

@ -14,13 +14,13 @@ class Message;
// -----------------------------------------------------------------------------
class ParseHandler {
class ParseHandlerBase {
public:
// If |stream| is true, the data will be streamed to a temp file, and the
// body of the message will be FileBody instead of StringBody.
ParseHandler(Message* message, bool stream = false);
ParseHandlerBase(Message* message);
~ParseHandler();
virtual ~ParseHandlerBase() = default;
virtual bool Init() = 0;
std::size_t content_length() const {
return content_length_;
@ -28,29 +28,67 @@ public:
void OnStartLine(const std::string& start_line);
void OnContentLength(std::size_t content_length);
void OnHeader(Header&& header);
void AddContent(const char* data, std::size_t count);
virtual void OnContentLength(std::size_t content_length);
void AddContent(const std::string& data);
virtual void AddContent(const char* data, std::size_t count) = 0;
bool IsFixedContentFull() const;
virtual void AddContent(const std::string& data) = 0;
bool Finish();
virtual bool IsFixedContentFull() const = 0;
private:
virtual bool Finish() = 0;
protected:
bool IsCompressed() const;
private:
protected:
Message* message_;
std::size_t content_length_;
};
class ParseHandler : public ParseHandlerBase {
public:
explicit ParseHandler(Message* message);
~ParseHandler() override = default;
bool Init() override {
return true;
}
void OnContentLength(std::size_t content_length) override;
void AddContent(const char* data, std::size_t count) override;
void AddContent(const std::string& data) override;
bool IsFixedContentFull() const override;
bool Finish() override;
private:
std::string content_;
};
// If |stream| is true, the data will be streamed to a temp file, and the
// body of the message will be FileBody instead of StringBody.
class StreamedParseHandler : public ParseHandlerBase {
public:
explicit StreamedParseHandler(Message* message);
~StreamedParseHandler() override = default;
bool stream_;
std::size_t streamed_size_;
// Generate a temp file.
bool Init() override;
void AddContent(const char* data, std::size_t count) override;
void AddContent(const std::string& data) override;
bool IsFixedContentFull() const override;
bool Finish() override;
private:
std::size_t streamed_size_ = 0;
boost::filesystem::ofstream ofstream_;
Path temp_path_;
};
@ -66,7 +104,7 @@ public:
Parser(const Parser&) = delete;
Parser& operator=(const Parser&) = delete;
void Init(Message* message, bool stream = false);
bool Init(Message* message, bool stream = false);
bool finished() const {
return finished_;
@ -103,7 +141,7 @@ protected:
bool Finish();
protected:
std::unique_ptr<ParseHandler> handler_;
std::unique_ptr<ParseHandlerBase> handler_;
// Data waiting to be parsed.
std::string pending_data_;

@ -13,9 +13,13 @@ namespace webcc {
RequestParser::RequestParser() : request_(nullptr) {
}
void RequestParser::Init(Request* request) {
Parser::Init(request);
bool RequestParser::Init(Request* request, bool stream) {
if (!Parser::Init(request, stream)) {
return false;
}
request_ = request;
return true;
}
bool RequestParser::ParseStartLine(const std::string& line) {

@ -15,7 +15,7 @@ public:
~RequestParser() override = default;
void Init(Request* request);
bool Init(Request* request, bool stream = false);
private:
bool ParseStartLine(const std::string& line) override;

@ -41,9 +41,13 @@ void SplitStartLine(const std::string& line, std::vector<std::string>* parts) {
ResponseParser::ResponseParser() : response_(nullptr) {
}
void ResponseParser::Init(Response* response, bool stream) {
Parser::Init(response, stream);
bool ResponseParser::Init(Response* response, bool stream) {
if (!Parser::Init(response, stream)) {
return false;
}
response_ = response;
return true;
}
bool ResponseParser::ParseStartLine(const std::string& line) {

@ -15,7 +15,7 @@ public:
~ResponseParser() override = default;
void Init(Response* response, bool stream = false);
bool Init(Response* response, bool stream = false);
void set_ignroe_body(bool ignroe_body) {
ignroe_body_ = ignroe_body;

Loading…
Cancel
Save