Support data streaming on server side.

master
Chunting Gu 6 years ago
parent 0ea4ab7526
commit 9b13dceab9

@ -5,6 +5,7 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#if 0
// HTTP GET request parser test fixture. // HTTP GET request parser test fixture.
class GetRequestParserTest : public testing::Test { class GetRequestParserTest : public testing::Test {
protected: protected:
@ -231,3 +232,4 @@ TEST_F(MultipartRequestParserTest, ParseByteWise) {
CheckResult(); CheckResult();
} }
#endif // 0

@ -12,14 +12,14 @@ using boost::asio::ip::tcp;
namespace webcc { namespace webcc {
Connection::Connection(tcp::socket socket, ConnectionPool* pool, Connection::Connection(tcp::socket socket, ConnectionPool* pool,
Queue<ConnectionPtr>* queue) Queue<ConnectionPtr>* queue, ViewMatcher&& view_matcher)
: socket_(std::move(socket)), pool_(pool), queue_(queue), : socket_(std::move(socket)), pool_(pool), queue_(queue),
buffer_(kBufferSize) { view_matcher_(std::move(view_matcher)), buffer_(kBufferSize) {
} }
void Connection::Start() { void Connection::Start() {
request_.reset(new Request{}); request_.reset(new Request{});
request_parser_.Init(request_.get()); request_parser_.Init(request_.get(), view_matcher_);
DoRead(); DoRead();
} }

@ -24,7 +24,7 @@ using ConnectionPtr = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> { class Connection : public std::enable_shared_from_this<Connection> {
public: public:
Connection(boost::asio::ip::tcp::socket socket, ConnectionPool* pool, Connection(boost::asio::ip::tcp::socket socket, ConnectionPool* pool,
Queue<ConnectionPtr>* queue); Queue<ConnectionPtr>* queue, ViewMatcher&& view_matcher);
~Connection() = default; ~Connection() = default;
@ -71,6 +71,10 @@ private:
// The connection queue. // The connection queue.
Queue<ConnectionPtr>* queue_; Queue<ConnectionPtr>* queue_;
// A function for matching view once the headers of a request has been
// received.
ViewMatcher view_matcher_;
// The buffer for incoming data. // The buffer for incoming data.
std::vector<char> buffer_; std::vector<char> buffer_;

@ -65,22 +65,24 @@ bool StringBodyHandler::Finish() {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
FileBodyHandler::FileBodyHandler(Message* message) : BodyHandler(message) { bool FileBodyHandler::OpenFile() {
try { try {
temp_path_ = bfs::temp_directory_path() / bfs::unique_path(); temp_path_ = bfs::temp_directory_path() / bfs::unique_path();
LOG_VERB("Generate a temp path for streaming: %s", LOG_VERB("Generate a temp path for streaming: %s",
temp_path_.string().c_str()); temp_path_.string().c_str());
} catch (const bfs::filesystem_error&) { } catch (const bfs::filesystem_error&) {
LOG_ERRO("Failed to generate temp path: %s", temp_path_.string().c_str()); LOG_ERRO("Failed to generate temp path: %s", temp_path_.string().c_str());
throw Error{ Error::kFileError }; return false;
} }
ofstream_.open(temp_path_, std::ios::binary); ofstream_.open(temp_path_, std::ios::binary);
if (ofstream_.fail()) { if (ofstream_.fail()) {
LOG_ERRO("Failed to open the temp file: %s", temp_path_.string().c_str()); LOG_ERRO("Failed to open the temp file: %s", temp_path_.string().c_str());
throw Error{ Error::kFileError }; return false;
} }
return true;
} }
void FileBodyHandler::AddContent(const char* data, std::size_t count) { void FileBodyHandler::AddContent(const char* data, std::size_t count) {
@ -136,12 +138,17 @@ bool Parser::Parse(const char* data, std::size_t length) {
LOG_INFO("HTTP headers just ended."); LOG_INFO("HTTP headers just ended.");
if (!OnHeadersEnd()) {
// Only request parser can reach here when no view matches the request.
// Data streaming or not is also determined for request parser.
return false;
}
CreateBodyHandler(); CreateBodyHandler();
if (!body_handler_) { if (!body_handler_) {
// The only reason to reach here is that it was failed to generate the temp // The only reason to reach here is that it was failed to generate the temp
// file for streaming. Normally, it shouldn't happen. // file for streaming. Normally, it shouldn't happen.
// TODO: Keep a member |error_| for the user to query.
return false; return false;
} }
@ -152,6 +159,7 @@ bool Parser::Parse(const char* data, std::size_t length) {
void Parser::Reset() { void Parser::Reset() {
message_ = nullptr; message_ = nullptr;
body_handler_.reset(); body_handler_.reset();
stream_ = false;
pending_data_.clear(); pending_data_.clear();
@ -202,6 +210,20 @@ bool Parser::ParseHeaders() {
return true; return true;
} }
void Parser::CreateBodyHandler() {
if (stream_) {
auto file_body_handler = new FileBodyHandler{ message_ };
if (!file_body_handler->OpenFile()) {
body_handler_.reset();
delete file_body_handler;
} else {
body_handler_.reset(file_body_handler);
}
} else {
body_handler_.reset(new StringBodyHandler{ message_ });
}
}
bool Parser::GetNextLine(std::size_t off, std::string* line, bool erase) { bool Parser::GetNextLine(std::size_t off, std::string* line, bool erase) {
std::size_t pos = pending_data_.find(kCRLF, off); std::size_t pos = pending_data_.find(kCRLF, off);

@ -63,10 +63,14 @@ private:
class FileBodyHandler : public BodyHandler { class FileBodyHandler : public BodyHandler {
public: public:
// NOTE: Might throw Error::kFileError. // NOTE: Might throw Error::kFileError.
explicit FileBodyHandler(Message* message); explicit FileBodyHandler(Message* message) : BodyHandler(message) {
}
~FileBodyHandler() override = default; ~FileBodyHandler() override = default;
// Open a temp file for data streaming.
bool OpenFile();
void AddContent(const char* data, std::size_t count) override; void AddContent(const char* data, std::size_t count) override;
void AddContent(const std::string& data) override; void AddContent(const std::string& data) override;
@ -108,7 +112,11 @@ protected:
// Return false only on syntax errors. // Return false only on syntax errors.
bool ParseHeaders(); bool ParseHeaders();
virtual void CreateBodyHandler() = 0; // Called when headers just parsed.
// Return false if something is wrong.
virtual bool OnHeadersEnd() = 0;
void CreateBodyHandler();
// Get next line (using delimiter CRLF) from the pending data. // Get next line (using delimiter CRLF) from the pending data.
// The line will not contain a trailing CRLF. // The line will not contain a trailing CRLF.
@ -134,8 +142,12 @@ protected:
protected: protected:
Message* message_; Message* message_;
std::unique_ptr<BodyHandler> body_handler_; std::unique_ptr<BodyHandler> body_handler_;
// Data streaming or not.
bool stream_;
// Data waiting to be parsed. // Data waiting to be parsed.
std::string pending_data_; std::string pending_data_;

@ -96,6 +96,7 @@ public:
} }
// Use the file content as body. // Use the file content as body.
// NOTE: Error::kFileError might be thrown.
RequestBuilder& File(const Path& path, bool infer_media_type = true, RequestBuilder& File(const Path& path, bool infer_media_type = true,
std::size_t chunk_size = 1024); std::size_t chunk_size = 1024);

@ -13,14 +13,25 @@ namespace webcc {
RequestParser::RequestParser() : request_(nullptr) { RequestParser::RequestParser() : request_(nullptr) {
} }
void RequestParser::Init(Request* request) { void RequestParser::Init(Request* request, ViewMatcher view_matcher) {
assert(view_matcher);
Parser::Init(request); Parser::Init(request);
request_ = request; request_ = request;
view_matcher_ = view_matcher;
} }
// TODO bool RequestParser::OnHeadersEnd() {
void RequestParser::CreateBodyHandler() { bool matched = view_matcher_(request_->method(), request_->url().path(),
body_handler_.reset(new StringBodyHandler{ message_ }); &stream_);
if (!matched) {
LOG_WARN("No view matches the request: %s %s", request_->method().c_str(),
request_->url().path().c_str());
}
return matched;
} }
bool RequestParser::ParseStartLine(const std::string& line) { bool RequestParser::ParseStartLine(const std::string& line) {

@ -1,12 +1,16 @@
#ifndef WEBCC_REQUEST_PARSER_H_ #ifndef WEBCC_REQUEST_PARSER_H_
#define WEBCC_REQUEST_PARSER_H_ #define WEBCC_REQUEST_PARSER_H_
#include <functional>
#include <string> #include <string>
#include "webcc/parser.h" #include "webcc/parser.h"
namespace webcc { namespace webcc {
using ViewMatcher =
std::function<bool(const std::string&, const std::string&, bool*)>;
class Request; class Request;
class RequestParser : public Parser { class RequestParser : public Parser {
@ -15,10 +19,14 @@ public:
~RequestParser() override = default; ~RequestParser() override = default;
void Init(Request* request); void Init(Request* request, ViewMatcher view_matcher);
private: private:
void CreateBodyHandler() override; // Override to match the URL against views and check if the matched view
// asks for data streaming.
bool OnHeadersEnd() override;
bool Stream() const;
bool ParseStartLine(const std::string& line) override; bool ParseStartLine(const std::string& line) override;
@ -39,6 +47,10 @@ private:
private: private:
Request* request_; Request* request_;
// A function for matching view once the headers of a request has been
// received. The parsing will stop and fail if no view can be matched.
ViewMatcher view_matcher_;
// Form data parsing step. // Form data parsing step.
enum Step { enum Step {
kStart, kStart,

@ -45,18 +45,6 @@ void ResponseParser::Init(Response* response, bool stream) {
stream_ = stream; stream_ = stream;
} }
void ResponseParser::CreateBodyHandler() {
if (stream_) {
try {
body_handler_.reset(new FileBodyHandler{ message_ });
} catch (const Error&) {
body_handler_.reset();
}
} else {
body_handler_.reset(new StringBodyHandler{ message_ });
}
}
bool ResponseParser::ParseStartLine(const std::string& line) { bool ResponseParser::ParseStartLine(const std::string& line) {
std::vector<std::string> parts; std::vector<std::string> parts;
SplitStartLine(line, &parts); SplitStartLine(line, &parts);

@ -21,7 +21,9 @@ public:
} }
private: private:
void CreateBodyHandler() override; bool OnHeadersEnd() override {
return true;
}
// Parse HTTP start line; E.g., "HTTP/1.1 200 OK". // Parse HTTP start line; E.g., "HTTP/1.1 200 OK".
bool ParseStartLine(const std::string& line) override; bool ParseStartLine(const std::string& line) override;
@ -33,9 +35,6 @@ private:
// The result response message. // The result response message.
Response* response_ = nullptr; Response* response_ = nullptr;
// Data streaming or not.
bool stream_ = false;
// The response for HEAD request could also have `Content-Length` header, // The response for HEAD request could also have `Content-Length` header,
// set this flag to ignore it. // set this flag to ignore it.
bool ignroe_body_ = false; bool ignroe_body_ = false;

@ -6,6 +6,7 @@
#include "boost/algorithm/string.hpp" #include "boost/algorithm/string.hpp"
#include "boost/filesystem/fstream.hpp" #include "boost/filesystem/fstream.hpp"
#include "boost/filesystem/operations.hpp"
#include "webcc/body.h" #include "webcc/body.h"
#include "webcc/logger.h" #include "webcc/logger.h"
@ -193,8 +194,11 @@ void Server::AsyncAccept() {
if (!ec) { if (!ec) {
LOG_INFO("Accepted a connection."); LOG_INFO("Accepted a connection.");
using namespace std::placeholders;
auto view_matcher = std::bind(&Server::MatchView, this, _1, _2, _3);
auto connection = std::make_shared<Connection>( auto connection = std::make_shared<Connection>(
std::move(socket), &pool_, &queue_); std::move(socket), &pool_, &queue_, std::move(view_matcher));
pool_.Start(connection); pool_.Start(connection);
} }
@ -275,17 +279,28 @@ void Server::Handle(ConnectionPtr connection) {
auto request = connection->request(); auto request = connection->request();
const Url& url = request->url(); const Url& url = request->url();
UrlArgs args;
LOG_INFO("Request URL path: %s", url.path().c_str()); LOG_INFO("Request URL path: %s", url.path().c_str());
UrlArgs args;
auto view = FindView(request->method(), url.path(), &args); auto view = FindView(request->method(), url.path(), &args);
if (!view) { if (!view) {
LOG_WARN("No view matches the URL path: %s", url.path().c_str()); LOG_WARN("No view matches the request: %s %s", request->method().c_str(),
if (!ServeStatic(connection)) { url.path().c_str());
connection->SendResponse(Status::kNotFound);
if (request->method() == methods::kGet) {
// Try to serve static files for GET request.
auto response = ServeStatic(request);
if (!response) {
// Static file not found.
connection->SendResponse(Status::kNotFound);
} else {
connection->SendResponse(response);
}
} else {
connection->SendResponse(Status::kBadRequest);
} }
return; return;
} }
@ -299,7 +314,7 @@ void Server::Handle(ConnectionPtr connection) {
if (response) { if (response) {
connection->SendResponse(response); connection->SendResponse(response);
} else { } else {
connection->SendResponse(Status::kNotImplemented); connection->SendResponse(Status::kBadRequest);
} }
} }
@ -335,41 +350,70 @@ ViewPtr Server::FindView(const std::string& method, const std::string& url,
return ViewPtr(); return ViewPtr();
} }
bool Server::ServeStatic(ConnectionPtr connection) { bool Server::MatchView(const std::string& method, const std::string& url,
if (doc_root_.empty()) { bool* stream) {
LOG_INFO("The doc root was not specified."); assert(stream != nullptr);
return false; *stream = false;
for (auto& route : routes_) {
if (std::find(route.methods.begin(), route.methods.end(), method) ==
route.methods.end()) {
continue;
}
if (route.url.empty()) {
std::smatch match;
if (std::regex_match(url, match, route.url_regex)) {
*stream = route.view->Stream(method);
return true;
}
} else {
if (boost::iequals(route.url, url)) {
*stream = route.view->Stream(method);
return true;
}
}
} }
auto request = connection->request(); // Try to match a static file.
std::string path = request->url().path(); if (method == methods::kGet && !doc_root_.empty()) {
Path path = doc_root_ / url;
if (!bfs::is_directory(path) && bfs::exists(path)) {
return true;
}
}
return false;
}
// If path ends in slash (i.e. is a directory) then add "index.html". ResponsePtr Server::ServeStatic(RequestPtr request) {
if (path[path.size() - 1] == '/') { assert(request->method() == methods::kGet);
path += "index.html"; // TODO
if (doc_root_.empty()) {
LOG_INFO("The doc root was not specified.");
return {};
} }
Path p = doc_root_ / path; Path path = doc_root_ / request->url().path();
try { try {
auto body = std::make_shared<FileBody>(p, file_chunk_size_); // NOTE: FileBody might throw Error::kFileError.
auto body = std::make_shared<FileBody>(path, file_chunk_size_);
auto response = std::make_shared<Response>(Status::kOK); auto response = std::make_shared<Response>(Status::kOK);
std::string extension = p.extension().string(); std::string extension = path.extension().string();
response->SetContentType(media_types::FromExtension(extension), ""); response->SetContentType(media_types::FromExtension(extension), "");
// NOTE: Gzip compression is not supported. // NOTE: Gzip compression is not supported.
response->SetBody(body, true); response->SetBody(body, true);
// Send response back to client. return response;
connection->SendResponse(response);
return true;
} catch (const Error& error) { } catch (const Error& error) {
LOG_ERRO("File error: %s.", error.message().c_str()); LOG_ERRO("File error: %s.", error.message().c_str());
return false; return {};
} }
} }

@ -93,12 +93,18 @@ private:
// request comes, this connection will be put back to the queue again. // request comes, this connection will be put back to the queue again.
virtual void Handle(ConnectionPtr connection); virtual void Handle(ConnectionPtr connection);
// Find the view by HTTP method and URL. // Find the view by HTTP method and URL (path).
ViewPtr FindView(const std::string& method, const std::string& url, ViewPtr FindView(const std::string& method, const std::string& url,
UrlArgs* args); UrlArgs* args);
// Match the view by HTTP method and URL (path).
// Return if a view or static file is matched or not.
// If the view asks for data streaming, |stream| will be set to true.
bool MatchView(const std::string& method, const std::string& url,
bool* stream);
// Serve static files from the doc root. // Serve static files from the doc root.
bool ServeStatic(ConnectionPtr connection); ResponsePtr ServeStatic(RequestPtr request);
private: private:
struct RouteInfo { struct RouteInfo {

@ -13,6 +13,13 @@ public:
virtual ~View() = default; virtual ~View() = default;
virtual ResponsePtr Handle(RequestPtr request) = 0; virtual ResponsePtr Handle(RequestPtr request) = 0;
// Return true if you want the request data of the given method to be streamed
// to a temp file. Data streaming is useful for receiving large data, e.g.,
// a JPEG image, posted from the client.
virtual bool Stream(const std::string& /*method*/) {
return false; // No streaming by default
}
}; };
using ViewPtr = std::shared_ptr<View>; using ViewPtr = std::shared_ptr<View>;

Loading…
Cancel
Save