Rework the data streaming

master
Chunting Gu 6 years ago
parent 4f9263048f
commit 66cab76ce6

@ -21,13 +21,7 @@ Error Client::Request(RequestPtr request, bool connect, bool stream) {
error_ = Error{};
response_.reset(new Response{});
if (!response_parser_.Init(response_.get(), stream)) {
// Failed to generate the temp file for streaming.
// I don't know when this would happen. Keep the error handling here just
// for preciseness.
return Error{ Error::kFileError, "Streaming temp file error" };
}
response_parser_.Init(response_.get(), stream);
if (buffer_.size() != buffer_size_) {
LOG_VERB("Resize buffer: %u -> %u.", buffer_.size(), buffer_size_);

@ -17,65 +17,29 @@ namespace webcc {
// -----------------------------------------------------------------------------
ParseHandlerBase::ParseHandlerBase(Message* message)
: message_(message), content_length_(kInvalidLength) {
}
void ParseHandlerBase::OnStartLine(const std::string& start_line) {
message_->set_start_line(start_line);
}
void ParseHandlerBase::OnContentLength(std::size_t content_length) {
content_length_ = content_length;
}
void ParseHandlerBase::OnHeader(Header&& header) {
message_->SetHeader(std::move(header));
}
bool ParseHandlerBase::IsCompressed() const {
bool BodyHandler::IsCompressed() const {
return message_->GetContentEncoding() != ContentEncoding::kUnknown;
}
// -----------------------------------------------------------------------------
ParseHandler::ParseHandler(Message* message) : ParseHandlerBase(message) {
}
void ParseHandler::OnContentLength(std::size_t content_length) {
ParseHandlerBase::OnContentLength(content_length);
// Reserve memory to avoid frequent reallocation when append.
try {
content_.reserve(content_length_);
} catch (const std::exception& e) {
LOG_ERRO("Failed to reserve content memory: %s.", e.what());
}
}
// TODO
// // Reserve memory to avoid frequent reallocation when append.
// try {
// content_.reserve(content_length_);
// } catch (const std::exception& e) {
// LOG_ERRO("Failed to reserve content memory: %s.", e.what());
// }
void ParseHandler::AddContent(const char* data, std::size_t count) {
void StringBodyHandler::AddContent(const char* data, std::size_t count) {
content_.append(data, count);
}
void ParseHandler::AddContent(const std::string& data) {
void StringBodyHandler::AddContent(const std::string& data) {
content_.append(data);
}
bool ParseHandler::IsFixedContentFull() const {
if (content_length_ == kInvalidLength) {
// Shouldn't be here.
// See Parser::ParseFixedContent().
return false;
}
return content_.length() >= content_length_;
}
bool ParseHandler::Finish() {
// Could be `kInvalidLength` (chunked).
// Could be `0` (empty body and `Content-Length : 0`).
message_->set_content_length(content_length_);
bool StringBodyHandler::Finish() {
if (content_.empty()) {
// The call to message_->SetBody() is not necessary since message is
// always initialized with an empty body.
@ -95,60 +59,41 @@ bool ParseHandler::Finish() {
#endif // WEBCC_ENABLE_GZIP
message_->SetBody(body, false);
return true;
}
// -----------------------------------------------------------------------------
StreamedParseHandler::StreamedParseHandler(Message* message)
: ParseHandlerBase(message) {
}
bool StreamedParseHandler::Init() {
FileBodyHandler::FileBodyHandler(Message* message) : BodyHandler(message) {
try {
temp_path_ = bfs::temp_directory_path() / bfs::unique_path();
LOG_VERB("Generate a temp path for streaming: %s",
temp_path_.string().c_str());
} catch (const bfs::filesystem_error&) {
LOG_ERRO("Failed to generate temp path: %s", temp_path_.string().c_str());
return false;
throw Error{ Error::kFileError };
}
ofstream_.open(temp_path_, std::ios::binary);
if (ofstream_.fail()) {
LOG_ERRO("Failed to open the temp file: %s", temp_path_.string().c_str());
return false;
throw Error{ Error::kFileError };
}
return true;
}
void StreamedParseHandler::AddContent(const char* data, std::size_t count) {
void FileBodyHandler::AddContent(const char* data, std::size_t count) {
ofstream_.write(data, count);
streamed_size_ += count;
}
void StreamedParseHandler::AddContent(const std::string& data) {
void FileBodyHandler::AddContent(const std::string& data) {
ofstream_ << data;
streamed_size_ += data.size();
}
bool StreamedParseHandler::IsFixedContentFull() const {
if (content_length_ == kInvalidLength) {
// Shouldn't be here.
// See Parser::ParseFixedContent().
return false;
}
return streamed_size_ >= content_length_;
}
bool StreamedParseHandler::Finish() {
// Could be `kInvalidLength` (chunked).
// Could be `0` (empty body and `Content-Length : 0`).
message_->set_content_length(content_length_);
bool FileBodyHandler::Finish() {
ofstream_.close();
// Create a file body based on the streamed temp file.
@ -157,35 +102,19 @@ bool StreamedParseHandler::Finish() {
// TODO: Compress
message_->SetBody(body, false);
return true;
}
// -----------------------------------------------------------------------------
Parser::Parser()
: start_line_parsed_(false),
content_length_parsed_(false),
header_ended_(false),
chunked_(false),
chunk_size_(kInvalidLength),
finished_(false) {
Parser::Parser() {
Reset();
}
bool Parser::Init(Message* message, bool stream) {
void Parser::Init(Message* message) {
Reset();
if (stream) {
handler_.reset(new StreamedParseHandler{ message });
} else {
handler_.reset(new ParseHandler{ message });
}
if (!handler_->Init()) {
// Failed to generate temp file for streaming.
return false;
}
return true;
message_ = message;
}
bool Parser::Parse(const char* data, std::size_t length) {
@ -207,13 +136,26 @@ bool Parser::Parse(const char* data, std::size_t length) {
LOG_INFO("HTTP headers just ended.");
CreateBodyHandler();
if (!body_handler_) {
// The only reason to reach here is that it was failed to generate the temp
// file for streaming. Normally, it shouldn't happen.
// TODO: Keep a member |error_| for the user to query.
return false;
}
// The left data, if any, is still in the pending data.
return ParseContent("", 0);
}
void Parser::Reset() {
message_ = nullptr;
body_handler_.reset();
pending_data_.clear();
content_length_ = kInvalidLength;
content_type_.Reset();
start_line_parsed_ = false;
content_length_parsed_ = false;
@ -242,7 +184,7 @@ bool Parser::ParseHeaders() {
if (!start_line_parsed_) {
start_line_parsed_ = true;
handler_->OnStartLine(line);
message_->set_start_line(line);
if (!ParseStartLine(line)) {
return false;
@ -297,7 +239,7 @@ bool Parser::ParseHeaderLine(const std::string& line) {
}
LOG_INFO("Content length: %u.", content_length);
handler_->OnContentLength(content_length);
content_length_ = content_length;
} else if (boost::iequals(header.first, headers::kContentType)) {
content_type_.Parse(header.second);
@ -312,7 +254,8 @@ bool Parser::ParseHeaderLine(const std::string& line) {
}
}
handler_->OnHeader(std::move(header));
message_->SetHeader(std::move(header));
return true;
}
@ -331,21 +274,21 @@ bool Parser::ParseFixedContent(const char* data, std::size_t length) {
return true;
}
if (handler_->content_length() == kInvalidLength) {
if (content_length_ == kInvalidLength) {
// Invalid content length (syntax error).
return false;
}
if (!pending_data_.empty()) {
// This is the data left after the headers are parsed.
handler_->AddContent(pending_data_);
body_handler_->AddContent(pending_data_);
pending_data_.clear();
}
// Don't have to firstly put the data to the pending data.
handler_->AddContent(data, length);
body_handler_->AddContent(data, length);
if (handler_->IsFixedContentFull()) {
if (IsFixedContentFull()) {
// All content has been read.
Finish();
}
@ -372,7 +315,7 @@ bool Parser::ParseChunkedContent(const char* data, std::size_t length) {
}
if (chunk_size_ + 2 <= pending_data_.size()) { // +2 for CRLF
handler_->AddContent(pending_data_.c_str(), chunk_size_);
body_handler_->AddContent(pending_data_.c_str(), chunk_size_);
pending_data_.erase(0, chunk_size_ + 2);
@ -383,7 +326,7 @@ bool Parser::ParseChunkedContent(const char* data, std::size_t length) {
continue;
} else if (chunk_size_ > pending_data_.size()) {
handler_->AddContent(pending_data_);
body_handler_->AddContent(pending_data_);
chunk_size_ -= pending_data_.size();
@ -431,9 +374,19 @@ bool Parser::ParseChunkSize() {
return true;
}
bool Parser::IsFixedContentFull() const {
assert(content_length_ != kInvalidLength);
return body_handler_->GetContentLength() >= content_length_;
}
bool Parser::Finish() {
finished_ = true;
return handler_->Finish();
// Could be `kInvalidLength` (chunked).
// Could be `0` (empty body and `Content-Length : 0`).
message_->set_content_length(content_length_);
return body_handler_->Finish();
}
} // namespace webcc

@ -14,29 +14,18 @@ class Message;
// -----------------------------------------------------------------------------
class ParseHandlerBase {
class BodyHandler {
public:
ParseHandlerBase(Message* message);
virtual ~ParseHandlerBase() = default;
virtual bool Init() = 0;
std::size_t content_length() const {
return content_length_;
explicit BodyHandler(Message* message) : message_(message) {
}
void OnStartLine(const std::string& start_line);
void OnHeader(Header&& header);
virtual void OnContentLength(std::size_t content_length);
virtual ~BodyHandler() = default;
virtual void AddContent(const char* data, std::size_t count) = 0;
virtual void AddContent(const std::string& data) = 0;
virtual bool IsFixedContentFull() const = 0;
virtual std::size_t GetContentLength() const = 0;
virtual bool Finish() = 0;
@ -45,46 +34,46 @@ protected:
protected:
Message* message_;
std::size_t content_length_;
};
class ParseHandler : public ParseHandlerBase {
public:
explicit ParseHandler(Message* message);
~ParseHandler() override = default;
// -----------------------------------------------------------------------------
bool Init() override {
return true;
class StringBodyHandler : public BodyHandler {
public:
explicit StringBodyHandler(Message* message) : BodyHandler(message) {
}
void OnContentLength(std::size_t content_length) override;
~StringBodyHandler() override = default;
void AddContent(const char* data, std::size_t count) override;
void AddContent(const std::string& data) override;
bool IsFixedContentFull() const override;
std::size_t GetContentLength() const override {
return content_.size();
}
bool Finish() override;
private:
std::string content_;
};
// If |stream| is true, the data will be streamed to a temp file, and the
// body of the message will be FileBody instead of StringBody.
class StreamedParseHandler : public ParseHandlerBase {
public:
explicit StreamedParseHandler(Message* message);
// -----------------------------------------------------------------------------
~StreamedParseHandler() override = default;
class FileBodyHandler : public BodyHandler {
public:
// NOTE: Might throw Error::kFileError.
explicit FileBodyHandler(Message* message);
// Generate a temp file.
bool Init() override;
~FileBodyHandler() override = default;
void AddContent(const char* data, std::size_t count) override;
void AddContent(const std::string& data) override;
bool IsFixedContentFull() const override;
std::size_t GetContentLength() const override {
return streamed_size_;
}
bool Finish() override;
private:
@ -104,7 +93,7 @@ public:
Parser(const Parser&) = delete;
Parser& operator=(const Parser&) = delete;
bool Init(Message* message, bool stream = false);
void Init(Message* message);
bool finished() const {
return finished_;
@ -113,13 +102,14 @@ public:
bool Parse(const char* data, std::size_t length);
protected:
// Reset for next parse.
void Reset();
// Parse headers from pending data.
// Return false only on syntax errors.
bool ParseHeaders();
virtual void CreateBodyHandler() = 0;
// Get next line (using delimiter CRLF) from the pending data.
// The line will not contain a trailing CRLF.
// If |erase| is true, the line, as well as the trailing CRLF, will be erased
@ -137,16 +127,20 @@ protected:
bool ParseChunkedContent(const char* data, std::size_t length);
bool ParseChunkSize();
bool IsFixedContentFull() const;
// Return false if the compressed content cannot be decompressed.
bool Finish();
protected:
std::unique_ptr<ParseHandlerBase> handler_;
Message* message_;
std::unique_ptr<BodyHandler> body_handler_;
// Data waiting to be parsed.
std::string pending_data_;
// Temporary data and helper flags for parsing.
std::size_t content_length_;
ContentType content_type_;
bool start_line_parsed_;
bool content_length_parsed_;

@ -13,13 +13,14 @@ namespace webcc {
RequestParser::RequestParser() : request_(nullptr) {
}
bool RequestParser::Init(Request* request, bool stream) {
if (!Parser::Init(request, stream)) {
return false;
}
void RequestParser::Init(Request* request) {
Parser::Init(request);
request_ = request;
return true;
}
// TODO
void RequestParser::CreateBodyHandler() {
body_handler_.reset(new StringBodyHandler{ message_ });
}
bool RequestParser::ParseStartLine(const std::string& line) {
@ -54,7 +55,7 @@ bool RequestParser::ParseMultipartContent(const char* data,
std::size_t length) {
pending_data_.append(data, length);
if (!content_length_parsed_ || handler_->content_length() == kInvalidLength) {
if (!content_length_parsed_ || content_length_ == kInvalidLength) {
// Invalid content length (syntax error).
return false;
}

@ -15,9 +15,11 @@ public:
~RequestParser() override = default;
bool Init(Request* request, bool stream = false);
void Init(Request* request);
private:
void CreateBodyHandler() override;
bool ParseStartLine(const std::string& line) override;
// Override to handle multipart form data which is request only.

@ -38,16 +38,23 @@ void SplitStartLine(const std::string& line, std::vector<std::string>* parts) {
// -----------------------------------------------------------------------------
ResponseParser::ResponseParser() : response_(nullptr) {
void ResponseParser::Init(Response* response, bool stream) {
Parser::Init(response);
response_ = response;
stream_ = stream;
}
bool ResponseParser::Init(Response* response, bool stream) {
if (!Parser::Init(response, stream)) {
return false;
void ResponseParser::CreateBodyHandler() {
if (stream_) {
try {
body_handler_.reset(new FileBodyHandler{ message_ });
} catch (const Error&) {
body_handler_.reset();
}
} else {
body_handler_.reset(new StringBodyHandler{ message_ });
}
response_ = response;
return true;
}
bool ResponseParser::ParseStartLine(const std::string& line) {

@ -11,17 +11,18 @@ class Response;
class ResponseParser : public Parser {
public:
ResponseParser();
ResponseParser() = default;
~ResponseParser() override = default;
bool Init(Response* response, bool stream = false);
void Init(Response* response, bool stream = false);
void set_ignroe_body(bool ignroe_body) {
ignroe_body_ = ignroe_body;
}
private:
void CreateBodyHandler() override;
// Parse HTTP start line; E.g., "HTTP/1.1 200 OK".
bool ParseStartLine(const std::string& line) override;
@ -30,7 +31,10 @@ private:
private:
// The result response message.
Response* response_;
Response* response_ = nullptr;
// Data streaming or not.
bool stream_ = false;
// The response for HEAD request could also have `Content-Length` header,
// set this flag to ignore it.

Loading…
Cancel
Save