Support response data streaming and add an example file downloader.

master
Chunting Gu 6 years ago
parent 3077f21549
commit 0292b4b51e

@ -62,5 +62,8 @@ target_link_libraries(file_upload_server ${EXAMPLE_LIBS})
add_executable(file_server file_server.cc)
target_link_libraries(file_server ${EXAMPLE_LIBS})
add_executable(file_downloader file_downloader.cc)
target_link_libraries(file_downloader ${EXAMPLE_LIBS})
add_executable(server_states server_states.cc)
target_link_libraries(server_states ${EXAMPLE_LIBS})

@ -0,0 +1,44 @@
// Download files.
// This example demonstrates the usage of streamed response.
#include <iostream>
#include "webcc/client_session.h"
#include "webcc/logger.h"
void Help(const char* argv0) {
std::cout << "Usage: file_downloader <url> <path>" << std::endl;
std::cout << "E.g.," << std::endl;
std::cout << " file_downloader http://httpbin.org/image/jpeg D:/test.jpg"
<< std::endl;
std::cout << " file_downloader https://www.google.com/favicon.ico"
<< " D:/test.ico" << std::endl;
}
int main(int argc, char* argv[]) {
if (argc != 3) {
Help(argv[0]);
return 1;
}
std::string url = argv[1];
std::string path = argv[2];
WEBCC_LOG_INIT("", webcc::LOG_CONSOLE);
webcc::ClientSession session;
try {
auto r = session.Request(webcc::RequestBuilder{}.Get(url)(),
true); // Stream the response data to file.
auto file_body = r->file_body();
file_body->Move(path);
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 0;
}

@ -8,7 +8,8 @@
// HTTP GET request parser test fixture.
class GetRequestParserTest : public testing::Test {
protected:
GetRequestParserTest() : parser_(&request_) {
GetRequestParserTest() {
parser_.Init(&request_);
}
void SetUp() override {
@ -81,7 +82,8 @@ TEST_F(GetRequestParserTest, ParseLineWise) {
// HTTP POST request parser test fixture.
class PostRequestParserTest : public testing::Test {
protected:
PostRequestParserTest() : parser_(&request_) {
PostRequestParserTest() {
parser_.Init(&request_);
}
void SetUp() override {
@ -146,7 +148,8 @@ TEST_F(PostRequestParserTest, ParseByteWise) {
class MultipartRequestParserTest : public testing::Test {
protected:
MultipartRequestParserTest() : parser_(&request_) {
MultipartRequestParserTest() {
parser_.Init(&request_);
}
void SetUp() override {

@ -2,6 +2,7 @@
#include "boost/algorithm/string.hpp"
#include "boost/core/ignore_unused.hpp"
#include "boost/filesystem/operations.hpp"
#include "webcc/logger.h"
#include "webcc/utility.h"
@ -161,24 +162,40 @@ void FormBody::Free(std::size_t index) {
// -----------------------------------------------------------------------------
FileBody::FileBody(const Path& path, std::size_t chunk_size)
: path_(path), chunk_size_(chunk_size) {
: path_(path), chunk_size_(chunk_size), auto_delete_(false), size_(0) {
size_ = utility::TellSize(path_);
if (size_ == kInvalidLength) {
throw Error{ Error::kFileError, "Cannot read the file" };
}
}
FileBody::FileBody(const Path& path, bool auto_delete)
: path_(path), chunk_size_(0), auto_delete_(auto_delete), size_(0) {
// Don't need to tell file size.
}
FileBody::~FileBody() {
if (auto_delete_ && !path_.empty()) {
boost::system::error_code ec;
bfs::remove(path_, ec);
if (ec) {
LOG_ERRO("Failed to remove file (%s).", ec.message().c_str());
}
}
}
void FileBody::InitPayload() {
assert(chunk_size_ > 0);
chunk_.resize(chunk_size_);
if (stream_.is_open()) {
stream_.close();
if (ifstream_.is_open()) {
ifstream_.close();
}
stream_.open(path_, std::ios::binary);
ifstream_.open(path_, std::ios::binary);
if (stream_.fail()) {
if (ifstream_.fail()) {
throw Error{ Error::kFileError, "Cannot read the file" };
}
}
@ -186,9 +203,9 @@ void FileBody::InitPayload() {
Payload FileBody::NextPayload(bool free_previous) {
boost::ignore_unused(free_previous);
if (stream_.read(&chunk_[0], chunk_.size()).gcount() > 0) {
if (ifstream_.read(&chunk_[0], chunk_.size()).gcount() > 0) {
return {
boost::asio::buffer(chunk_.data(), (std::size_t)stream_.gcount())
boost::asio::buffer(chunk_.data(), (std::size_t)ifstream_.gcount())
};
}
return {};
@ -198,4 +215,27 @@ void FileBody::Dump(std::ostream& os, const std::string& prefix) const {
os << prefix << "<file: " << path_.string() << ">" << std::endl;
}
bool FileBody::Move(const Path& new_path) {
if (path_ == new_path) {
return false;
}
if (ifstream_.is_open()) {
ifstream_.close();
}
boost::system::error_code ec;
bfs::rename(path_, new_path, ec);
if (ec) {
LOG_ERRO("Failed to rename file (%s).", ec.message().c_str());
return false;
}
// Reset original file path.
path_.clear();
return true;
}
} // namespace webcc

@ -147,8 +147,18 @@ private:
// the memory.
class FileBody : public Body {
public:
// For message to be sent out.
FileBody(const Path& path, std::size_t chunk_size);
// For message received.
// No |chunk_size| is needed since you don't iterate the payload of a
// received message.
// If |auto_delete| is true, the file will be deleted on destructor unless it
// is moved to another path (see Move()).
FileBody(const Path& path, bool auto_delete = false);
~FileBody() override;
std::size_t GetSize() const override {
return size_;
}
@ -159,13 +169,26 @@ public:
void Dump(std::ostream& os, const std::string& prefix) const override;
// Move (or rename) the file.
// Used to move the streamed file of the received message to a new place.
// Applicable to both client and server.
// After move, the original path will be reset to empty.
// If |new_path| and |path_| resolve to the same file, do nothing and just
// return false.
// If |new_path| resolves to an existing non-directory file, it is removed.
// If |new_path| resolves to an existing directory, it is removed if empty
// on ISO/IEC 9945 but is an error on Windows.
// See boost::filesystem::rename() for more details.
bool Move(const Path& new_path);
private:
Path path_;
std::size_t chunk_size_;
bool auto_delete_;
std::size_t size_; // File size in bytes
boost::filesystem::ifstream stream_;
boost::filesystem::ifstream ifstream_;
std::string chunk_;
};

@ -15,8 +15,20 @@ Client::Client()
timer_canceled_(false) {
}
Error Client::Request(RequestPtr request, bool connect) {
Restart();
Error Client::Request(RequestPtr request, bool connect, bool stream) {
io_context_.restart();
response_.reset(new Response{});
response_parser_.Init(response_.get(), stream);
closed_ = false;
timer_canceled_ = false;
error_ = Error{};
if (buffer_.size() != buffer_size_) {
LOG_VERB("Resize buffer: %u -> %u.", buffer_.size(), buffer_size_);
buffer_.resize(buffer_size_);
}
// Response to HEAD could also have Content-Length.
// Set this flag to skip the reading and parsing of the body.
@ -64,22 +76,6 @@ void Client::Close() {
socket_->Close();
}
void Client::Restart() {
io_context_.restart();
response_.reset(new Response{});
response_parser_.Init(response_.get());
closed_ = false;
timer_canceled_ = false;
error_ = Error{};
if (buffer_.size() != buffer_size_) {
LOG_VERB("Resize buffer: %u -> %u.", buffer_.size(), buffer_size_);
buffer_.resize(buffer_size_);
}
}
void Client::Connect(RequestPtr request) {
if (request->url().scheme() == "https") {
#if WEBCC_ENABLE_SSL

@ -49,7 +49,7 @@ public:
}
// Connect to server, send request, wait until response is received.
Error Request(RequestPtr request, bool connect = true);
Error Request(RequestPtr request, bool connect = true, bool stream = false);
// Close the socket.
void Close();
@ -59,8 +59,6 @@ public:
bool closed() const { return closed_; }
private:
void Restart();
void Connect(RequestPtr request);
void DoConnect(RequestPtr request, const std::string& default_port);

@ -28,7 +28,7 @@ void ClientSession::AuthToken(const std::string& token) {
return Auth("Token", token);
}
ResponsePtr ClientSession::Request(RequestPtr request) {
ResponsePtr ClientSession::Request(RequestPtr request, bool stream) {
assert(request);
for (auto& h : headers_.data()) {
@ -44,7 +44,7 @@ ResponsePtr ClientSession::Request(RequestPtr request) {
request->Prepare();
return Send(request);
return Send(request, stream);
}
static void SetHeaders(const Strings& headers, RequestBuilder* builder) {
@ -183,7 +183,7 @@ void ClientSession::InitHeaders() {
headers_.Set(kConnection, "Keep-Alive");
}
ResponsePtr ClientSession::Send(RequestPtr request) {
ResponsePtr ClientSession::Send(RequestPtr request, bool stream) {
const ClientPool::Key key{ request->url() };
// Reuse a pooled connection.
@ -202,13 +202,13 @@ ResponsePtr ClientSession::Send(RequestPtr request) {
client->set_buffer_size(buffer_size_);
client->set_timeout(timeout_);
Error error = client->Request(request, !reuse);
Error error = client->Request(request, !reuse, stream);
if (error) {
if (reuse && error.code() == Error::kSocketWriteError) {
LOG_WARN("Cannot send request with the reused connection. "
"The server must have closed it, reconnect and try again.");
error = client->Request(request, true);
error = client->Request(request, true, stream);
}
}

@ -57,7 +57,11 @@ public:
// Send a request.
// Please use RequestBuilder to build the request.
ResponsePtr Request(RequestPtr request);
// If |stream| is true, the response data will be written into a temp file,
// the response body will be FileBody, and you can easily move the temp file
// to another path with FileBody::Move(). So |stream| is useful for
// downloading files (JPEG, etc.) or saving memory for huge data responses.
ResponsePtr Request(RequestPtr request, bool stream = false);
// Shortcut for GET request.
ResponsePtr Get(const std::string& url, const Strings& parameters = {},
@ -85,7 +89,7 @@ public:
private:
void InitHeaders();
ResponsePtr Send(RequestPtr request);
ResponsePtr Send(RequestPtr request, bool stream);
private:
// Default media type for `Content-Type` header.

@ -31,14 +31,19 @@ void Message::SetBody(BodyPtr body, bool set_length) {
const std::string& Message::data() const {
static const std::string kEmptyData;
auto string_body = std::dynamic_pointer_cast<StringBody>(body_);
if (string_body) {
return string_body->data();
if (!string_body) {
return kEmptyData;
}
return string_body->data();
}
return kEmptyData;
std::shared_ptr<FileBody> Message::file_body() const {
auto file_body = std::dynamic_pointer_cast<FileBody>(body_);
if (!file_body) {
throw Error{ Error::kDataError, "Not a file body" };
}
return file_body;
}
bool Message::IsConnectionKeepAlive() const {

@ -26,10 +26,14 @@ public:
return body_;
}
// Get the data from the string body.
// Exception Error(kDataError) will be thrown if the body is FormBody.
// Get the data from the (string) body.
// Empty string will be returned if the body is not a StringBody.
const std::string& data() const;
// Get the body as a FileBody.
// Exception Error::kDataError will be thrown if the body is not a FileBody.
std::shared_ptr<FileBody> file_body() const;
// ---------------------------------------------------------------------------
void SetHeader(Header&& header) {

@ -1,6 +1,7 @@
#include "webcc/parser.h"
#include "boost/algorithm/string.hpp"
#include "boost/filesystem/operations.hpp"
#include "webcc/logger.h"
#include "webcc/message.h"
@ -10,29 +11,135 @@
#include "webcc/gzip.h"
#endif
namespace bfs = boost::filesystem;
namespace webcc {
// -----------------------------------------------------------------------------
namespace {
ParseHandler::ParseHandler(Message* message, bool stream)
: message_(message), content_length_(kInvalidLength), stream_(stream),
streamed_size_(0) {
if (stream_) {
try {
temp_path_ = bfs::temp_directory_path() / bfs::unique_path();
} catch (const bfs::filesystem_error&) {
throw Error{ Error::kFileError, "Cannot generate temp file path" };
}
ofstream_.open(temp_path_, std::ios::binary);
if (ofstream_.fail()) {
throw Error{ Error::kFileError, "Cannot open the temp file" };
}
}
}
ParseHandler::~ParseHandler() {
}
void ParseHandler::OnStartLine(const std::string& start_line) {
message_->set_start_line(start_line);
}
void ParseHandler::OnContentLength(std::size_t content_length) {
content_length_ = content_length;
if (!stream_) {
// Reserve memory to avoid frequent reallocation when append.
// TODO: Don't know if it's really necessary.
try {
content_.reserve(content_length_);
} catch (const std::exception& e) {
LOG_ERRO("Failed to reserve content memory: %s.", e.what());
}
}
}
void ParseHandler::OnHeader(Header&& header) {
message_->SetHeader(std::move(header));
}
void ParseHandler::AddContent(const char* data, std::size_t count) {
if (stream_) {
ofstream_.write(data, count);
streamed_size_ += count;
} else {
content_.append(data, count);
}
}
void ParseHandler::AddContent(const std::string& data) {
if (stream_) {
ofstream_ << data;
streamed_size_ += data.size();
} else {
content_.append(data);
}
}
bool StringToSizeT(const std::string& str, int base, std::size_t* output) {
try {
*output = static_cast<std::size_t>(std::stoul(str, 0, base));
} catch (const std::exception&) {
bool ParseHandler::IsFixedContentFull() const {
if (content_length_ == kInvalidLength) {
// Shouldn't be here.
// See Parser::ParseFixedContent().
return false;
}
if (stream_) {
return streamed_size_ >= content_length_;
} else {
return content_.length() >= content_length_;
}
}
bool ParseHandler::Finish() {
// Could be `kInvalidLength` (chunked).
// Could be `0` (empty body and `Content-Length : 0`).
message_->set_content_length(content_length_);
if (!stream_ && content_.empty()) {
// The call to message_->SetBody() is not necessary since message is
// always initialized with an empty body.
return true;
}
BodyPtr body;
if (stream_) {
ofstream_.close();
// Create a file body based on the streamed temp file.
body = std::make_shared<FileBody>(temp_path_, true);
// TODO: Compress
} else {
body = std::make_shared<StringBody>(std::move(content_), IsCompressed());
#if WEBCC_ENABLE_GZIP
LOG_INFO("Decompress the HTTP content...");
if (!body->Decompress()) {
LOG_ERRO("Cannot decompress the HTTP content!");
return false;
}
#else
LOG_WARN("Compressed HTTP content remains untouched.");
#endif // WEBCC_ENABLE_GZIP
}
message_->SetBody(body, false);
return true;
}
} // namespace
bool ParseHandler::IsCompressed() const {
return message_->GetContentEncoding() != ContentEncoding::kUnknown;
}
// -----------------------------------------------------------------------------
Parser::Parser(Message* message)
: message_(message),
content_length_(kInvalidLength),
start_line_parsed_(false),
Parser::Parser()
: start_line_parsed_(false),
content_length_parsed_(false),
header_ended_(false),
chunked_(false),
@ -40,9 +147,9 @@ Parser::Parser(Message* message)
finished_(false) {
}
void Parser::Init(Message* message) {
void Parser::Init(Message* message, bool stream) {
Reset();
message_ = message;
handler_.reset(new ParseHandler{ message, stream });
}
bool Parser::Parse(const char* data, std::size_t length) {
@ -70,9 +177,7 @@ bool Parser::Parse(const char* data, std::size_t length) {
void Parser::Reset() {
pending_data_.clear();
content_.clear();
content_length_ = kInvalidLength;
content_type_.Reset();
start_line_parsed_ = false;
content_length_parsed_ = false;
@ -101,7 +206,8 @@ bool Parser::ParseHeaders() {
if (!start_line_parsed_) {
start_line_parsed_ = true;
message_->set_start_line(line);
handler_->OnStartLine(line);
if (!ParseStartLine(line)) {
return false;
}
@ -148,20 +254,15 @@ bool Parser::ParseHeaderLine(const std::string& line) {
if (boost::iequals(header.first, headers::kContentLength)) {
content_length_parsed_ = true;
if (!StringToSizeT(header.second, 10, &content_length_)) {
std::size_t content_length = kInvalidLength;
if (!utility::ToSize(header.second, 10, &content_length)) {
LOG_ERRO("Invalid content length: %s.", header.second.c_str());
return false;
}
LOG_INFO("Content length: %u.", content_length_);
LOG_INFO("Content length: %u.", content_length);
handler_->OnContentLength(content_length);
// Reserve memory to avoid frequent reallocation when append.
try {
content_.reserve(content_length_);
} catch (const std::exception& e) {
LOG_ERRO("Failed to reserve content memory: %s.", e.what());
return false;
}
} else if (boost::iequals(header.first, headers::kContentType)) {
content_type_.Parse(header.second);
if (!content_type_.Valid()) {
@ -175,8 +276,7 @@ bool Parser::ParseHeaderLine(const std::string& line) {
}
}
message_->SetHeader(std::move(header));
handler_->OnHeader(std::move(header));
return true;
}
@ -195,21 +295,21 @@ bool Parser::ParseFixedContent(const char* data, std::size_t length) {
return true;
}
if (content_length_ == kInvalidLength) {
if (handler_->content_length() == kInvalidLength) {
// Invalid content length (syntax error).
return false;
}
if (!pending_data_.empty()) {
// This is the data left after the headers are parsed.
AppendContent(pending_data_);
handler_->AddContent(pending_data_);
pending_data_.clear();
}
// Don't have to firstly put the data to the pending data.
AppendContent(data, length);
handler_->AddContent(data, length);
if (IsFixedContentFull()) {
if (handler_->IsFixedContentFull()) {
// All content has been read.
Finish();
}
@ -236,7 +336,7 @@ bool Parser::ParseChunkedContent(const char* data, std::size_t length) {
}
if (chunk_size_ + 2 <= pending_data_.size()) { // +2 for CRLF
AppendContent(pending_data_.c_str(), chunk_size_);
handler_->AddContent(pending_data_.c_str(), chunk_size_);
pending_data_.erase(0, chunk_size_ + 2);
@ -247,7 +347,7 @@ bool Parser::ParseChunkedContent(const char* data, std::size_t length) {
continue;
} else if (chunk_size_ > pending_data_.size()) {
AppendContent(pending_data_);
handler_->AddContent(pending_data_);
chunk_size_ -= pending_data_.size();
@ -287,7 +387,7 @@ bool Parser::ParseChunkSize() {
hex_str = line;
}
if (!StringToSizeT(hex_str, 16, &chunk_size_)) {
if (!utility::ToSize(hex_str, 16, &chunk_size_)) {
LOG_ERRO("Invalid chunk-size: %s.", hex_str.c_str());
return false;
}
@ -297,46 +397,7 @@ bool Parser::ParseChunkSize() {
bool Parser::Finish() {
finished_ = true;
if (content_.empty()) {
return true;
}
// Could be kInvalidLength when chunked.
message_->set_content_length(content_length_);
bool compressed = IsContentCompressed();
auto body = std::make_shared<StringBody>(std::move(content_), compressed);
#if WEBCC_ENABLE_GZIP
LOG_INFO("Decompress the HTTP content...");
if (!body->Decompress()) {
LOG_ERRO("Cannot decompress the HTTP content!");
return false;
}
#else
LOG_WARN("Compressed HTTP content remains untouched.");
#endif // WEBCC_ENABLE_GZIP
message_->SetBody(body, false);
return true;
}
void Parser::AppendContent(const char* data, std::size_t count) {
content_.append(data, count);
}
void Parser::AppendContent(const std::string& data) {
content_.append(data);
}
bool Parser::IsFixedContentFull() const {
return content_length_ != kInvalidLength &&
content_length_ <= content_.length();
}
bool Parser::IsContentCompressed() const {
return message_->GetContentEncoding() != ContentEncoding::kUnknown;
return handler_->Finish();
}
} // namespace webcc

@ -3,6 +3,8 @@
#include <string>
#include "boost/filesystem/fstream.hpp"
#include "webcc/common.h"
#include "webcc/globals.h"
@ -10,26 +12,66 @@ namespace webcc {
class Message;
// -----------------------------------------------------------------------------
class ParseHandler {
public:
// If |stream| is true, the data will be streamed to a temp file, and the
// body of the message will be FileBody instead of StringBody.
ParseHandler(Message* message, bool stream = false);
~ParseHandler();
std::size_t content_length() const {
return content_length_;
}
void OnStartLine(const std::string& start_line);
void OnContentLength(std::size_t content_length);
void OnHeader(Header&& header);
void AddContent(const char* data, std::size_t count);
void AddContent(const std::string& data);
bool IsFixedContentFull() const;
bool Finish();
private:
bool IsCompressed() const;
private:
Message* message_;
std::size_t content_length_;
std::string content_;
bool stream_;
std::size_t streamed_size_;
boost::filesystem::ofstream ofstream_;
Path temp_path_;
};
// -----------------------------------------------------------------------------
// HTTP request and response parser.
class Parser {
public:
explicit Parser(Message* message);
Parser();
virtual ~Parser() = default;
Parser(const Parser&) = delete;
Parser& operator=(const Parser&) = delete;
void Init(Message* message);
void Init(Message* message, bool stream = false);
bool finished() const {
return finished_;
}
std::size_t content_length() const {
return content_length_;
}
bool Parse(const char* data, std::size_t length);
protected:
@ -60,25 +102,14 @@ protected:
// Return false if the compressed content cannot be decompressed.
bool Finish();
void AppendContent(const char* data, std::size_t count);
void AppendContent(const std::string& data);
bool IsFixedContentFull() const;
// Check header Content-Encoding to see if the content is compressed.
bool IsContentCompressed() const;
protected:
// The message parsed.
Message* message_;
std::unique_ptr<ParseHandler> handler_;
// Data waiting to be parsed.
std::string pending_data_;
// Temporary data and helper flags for parsing.
std::size_t content_length_;
ContentType content_type_;
std::string content_;
bool start_line_parsed_;
bool content_length_parsed_;
bool header_ended_;

@ -10,8 +10,7 @@
namespace webcc {
RequestParser::RequestParser(Request* request)
: Parser(request), request_(request) {
RequestParser::RequestParser() : request_(nullptr) {
}
void RequestParser::Init(Request* request) {
@ -51,7 +50,7 @@ bool RequestParser::ParseMultipartContent(const char* data,
std::size_t length) {
pending_data_.append(data, length);
if (!content_length_parsed_ || content_length_ == kInvalidLength) {
if (!content_length_parsed_ || handler_->content_length() == kInvalidLength) {
// Invalid content length (syntax error).
return false;
}

@ -11,7 +11,7 @@ class Request;
class RequestParser : public Parser {
public:
explicit RequestParser(Request* request = nullptr);
RequestParser();
~RequestParser() override = default;

@ -38,12 +38,11 @@ void SplitStartLine(const std::string& line, std::vector<std::string>* parts) {
// -----------------------------------------------------------------------------
ResponseParser::ResponseParser(Response* response)
: Parser(response), response_(response) {
ResponseParser::ResponseParser() : response_(nullptr) {
}
void ResponseParser::Init(Response* response) {
Parser::Init(response);
void ResponseParser::Init(Response* response, bool stream) {
Parser::Init(response, stream);
response_ = response;
}

@ -11,11 +11,11 @@ class Response;
class ResponseParser : public Parser {
public:
explicit ResponseParser(Response* response = nullptr);
ResponseParser();
~ResponseParser() override = default;
void Init(Response* response);
void Init(Response* response, bool stream = false);
void set_ignroe_body(bool ignroe_body) {
ignroe_body_ = ignroe_body;

@ -51,6 +51,15 @@ bool SplitKV(const std::string& str, char delimiter,
return true;
}
bool ToSize(const std::string& str, int base, std::size_t* size) {
try {
*size = static_cast<std::size_t>(std::stoul(str, 0, base));
} catch (const std::exception&) {
return false;
}
return true;
}
std::size_t TellSize(const Path& path) {
// Flag "ate": seek to the end of stream immediately after open.
bfs::ifstream stream{ path, std::ios::binary | std::ios::ate };

@ -24,6 +24,9 @@ std::string GetTimestamp();
bool SplitKV(const std::string& str, char delimiter,
std::string* key, std::string* value);
// Convert string to size_t.
bool ToSize(const std::string& str, int base, std::size_t* size);
// Tell the size in bytes of the given file.
// Return kInvalidLength (-1) on failure.
std::size_t TellSize(const Path& path);

Loading…
Cancel
Save