Add gRPC file transfer service (#11892)

Add gRPC file transfer service

* grpc: Add file size check
* grpc: change test addresses
* Fix CI/CD failure
* Disable gRPC for build
* Fixes for new gRPC in new build image
* Move FileTransfer definitions to CPP file
This commit is contained in:
Vishesh Yadav
2025-02-13 14:36:30 -08:00
committed by GitHub
parent 6a9898de44
commit 7f46fc11ff
15 changed files with 585 additions and 46 deletions

View File

@@ -233,25 +233,53 @@ endif()
################################################################################
# TODO (Vishesh): Replace with target_include_directories.
include_directories("${CMAKE_CURRENT_BINARY_DIR}/generated/")
set(WITH_GRPC ON CACHE BOOL "Build FDB with gRPC support")
find_program(PROTOC_COMPILER protoc)
if (PROTOC_COMPILER)
message(STATUS "Found protoc: ${PROTOC_COMPILER}")
else ()
message(STATUS "protoc compiler not found. Disabing gRPC")
set(FLOW_GRPC_ENABLED OFF)
find_package(gRPC CONFIG)
if (gRPC_FOUND)
message(STATUS "gRPC found. Enabling gRPC for Flow.")
set(FLOW_GRPC_ENABLED ON)
add_compile_definitions(FLOW_GRPC_ENABLED=1)
if (WITH_GRPC)
# Setup search paths.
if (UNIX AND CMAKE_CXX_COMPILER_ID MATCHES "Clang$" AND USE_LIBCXX)
list(APPEND CMAKE_PREFIX_PATH /opt/grpc_clang)
message(STATUS "Using Clang version of gRPC")
else ()
message(WARNING "gRPC not found. Disabling gRPC for Flow.")
set(FLOW_GRPC_ENABLED OFF)
list(APPEND CMAKE_PREFIX_PATH /opt/grpc)
message(STATUS "Using g++ version of gRPC")
endif ()
endif ()
# Find dependencies for gRPC.
find_program(PROTOC_EXECUTABLE protoc
HINTS ${CMAKE_PREFIX_PATH}
PATH_SUFFIXES bin
)
if (PROTOC_EXECUTABLE)
execute_process(
COMMAND ${PROTOC_EXECUTABLE} --version
OUTPUT_VARIABLE PROTOC_VERSION
OUTPUT_STRIP_TRAILING_WHITESPACE
)
string(REGEX MATCH "([0-9]+\\.[0-9]+)+" PROTOC_VERSION ${PROTOC_VERSION})
message(STATUS "protoc version: ${PROTOC_COMPILER} ${PROTOC_VERSION}")
if (PROTOC_VERSION VERSION_LESS "29.0")
message(WARNING "protoc version ${PROTOC_VERSION} is too old. Required: 29.0+")
set(PROTOC_EXECUTABLE NOTFOUND)
endif()
else ()
message(WARNING "protoc executable not found")
endif()
find_package(absl CONFIG)
find_package(utf8_range CONFIG)
find_package(protobuf CONFIG)
find_package(gRPC CONFIG)
if (PROTOC_EXECUTABLE AND gRPC_FOUND)
message(STATUS "gRPC found. Enabling gRPC for Flow.")
add_compile_definitions(FLOW_GRPC_ENABLED)
else()
message(WARNING "gRPC can't be enabled because of missing dependencies")
set(WITH_GRPC OFF)
endif()
endif()
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/packages)
add_custom_target(packages)
@@ -272,6 +300,7 @@ function(print_components)
message(STATUS "Configure CTest (depends on Python): ${WITH_PYTHON}")
message(STATUS "Build with RocksDB: ${WITH_ROCKSDB}")
message(STATUS "Build with AWS SDK: ${WITH_AWS_BACKUP}")
message(STATUS "Build with gRPC: ${WITH_GRPC}")
message(STATUS "=========================================")
endfunction()

View File

@@ -85,16 +85,20 @@ function(generate_grpc_protobuf pkg_name)
package_name_to_proto_target(target_name ${pkg_name})
package_name_to_path(out_rel_path ${pkg_name})
add_library(${target_name} ${proto_files})
add_library(${target_name} STATIC ${proto_files})
target_include_directories(${target_name} PUBLIC ${CMAKE_BINARY_DIR}/generated/)
target_include_directories(${target_name} PUBLIC ${Protobuf_INCLUDE_DIRS} ${gRPC_INCLUDE_DIRS})
target_link_libraries(${target_name} PUBLIC gRPC::grpc++)
set(protoc_out_dir "${CMAKE_BINARY_DIR}/generated/${out_rel_path}/")
message(STATUS "Generating protobuf target = ${target_name}, out_path = ${protoc_out_dir}")
set(protoc_out_dir "${CMAKE_BINARY_DIR}/generated/${out_rel_path}")
message(STATUS "Generating protobuf target = ${target_name}, out_path = ${protoc_out_dir}, files = ${ARGN}")
protobuf_generate(
TARGET ${target_name}
PROTOC_OUT_DIR ${protoc_out_dir}
GENERATE_EXTENSIONS .pb.h .pb.cc
APPEND_PATH ${out_rel_path}
PROTOS ${proto_files}
)
protobuf_generate(
@@ -103,7 +107,7 @@ function(generate_grpc_protobuf pkg_name)
PROTOC_OUT_DIR ${protoc_out_dir}
PLUGIN protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>
GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc
APPEND_PATH ${out_rel_path}
PROTOS ${proto_files}
)
target_include_directories(${target_name} PUBLIC "${protoc_out_dir}")
endfunction()

View File

@@ -68,10 +68,18 @@ target_include_directories(fdbrpc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(fdbrpc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
target_link_libraries(fdbrpc PUBLIC flow libb64 md5 PRIVATE rapidjson)
if (FLOW_GRPC_ENABLED)
if (WITH_GRPC)
generate_grpc_protobuf(fdbrpc.file_transfer protos/file_transfer.proto)
target_link_libraries(fdbrpc PUBLIC proto_fdbrpc_test)
target_include_directories(fdbrpc PUBLIC ${Protobuf_INCLUDE_DIRS} ${gRPC_INCLUDE_DIRS})
target_link_libraries(fdbrpc PUBLIC gRPC::grpc++)
target_link_libraries(fdbrpc PUBLIC proto_fdbrpc_test proto_fdbrpc_file_transfer)
target_link_libraries(fdbrpc_sampling PUBLIC proto_fdbrpc_test)
target_include_directories(fdbrpc_sampling PUBLIC ${Protobuf_INCLUDE_DIRS} ${gRPC_INCLUDE_DIRS})
target_link_libraries(fdbrpc_sampling PUBLIC gRPC::grpc++)
target_link_libraries(fdbrpc_sampling PUBLIC proto_fdbrpc_test proto_fdbrpc_file_transfer)
endif()
target_include_directories(fdbrpc_sampling PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)

215
fdbrpc/FileTransfer.cpp Normal file
View File

@@ -0,0 +1,215 @@
/**
* FileTransfer.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef FLOW_GRPC_ENABLED
#include <fstream>
#include <fdbrpc/FileTransfer.h>
#include "flow/IRandom.h"
#include "crc32/crc32c.h"
//------ FileTransferServiceImpl ------
uint32_t crc32_checksum_ifstream(std::ifstream* input_file) {
input_file->seekg(0);
uint32_t crc = 0;
std::vector<char> buffer(8192);
while (input_file->read(buffer.data(), buffer.size()) || input_file->gcount() > 0) {
crc = crc32c_append(crc, reinterpret_cast<const uint8_t*>(buffer.data()), input_file->gcount());
}
return crc;
}
grpc::Status FileTransferServiceImpl::DownloadFile(grpc::ServerContext* context,
const fdbrpc::DownloadRequest* request,
grpc::ServerWriter<fdbrpc::DownloadChunk>* writer) {
std::ifstream input_file(request->file_name(), std::ios::binary | std::ios::ate);
if (!input_file.is_open()) {
return grpc::Status(grpc::StatusCode::NOT_FOUND, "File found not");
}
const std::streamsize file_size = input_file.tellg();
const size_t buffer_size = request->chunk_size() > 0 ? request->chunk_size() : DEFAULT_CHUNK_SIZE;
const size_t start_chunk_index = request->first_chunk_index();
int64_t offset = buffer_size * start_chunk_index;
if (offset > file_size) {
return grpc::Status(grpc::StatusCode::OUT_OF_RANGE, "Offset beyond file size");
}
input_file.seekg(offset);
if (!input_file) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Failed to seek to the offset");
}
std::vector<char> buffer(buffer_size);
while (input_file.read(buffer.data(), buffer_size) || input_file.gcount() > 0) {
std::streamsize bytes_read = input_file.gcount();
if (error_inject_ != NO_ERROR && deterministicRandom()->random01() < 0.1) {
// Inject error 10% of the times times.
if (error_inject_ == FAIL_RANDOMLY) {
return grpc::Status(grpc::StatusCode::INTERNAL, "Random test failure");
} else if (error_inject_ == FLIP_BYTE) {
buffer[0] = ~buffer[0];
}
}
fdbrpc::DownloadChunk chunk;
chunk.set_offset(offset);
chunk.set_data(buffer.data(), bytes_read);
writer->Write(chunk);
offset += bytes_read;
}
if (input_file.bad()) {
return grpc::Status(grpc::StatusCode::DATA_LOSS, "Error while reading the file");
}
input_file.close();
return grpc::Status::OK;
}
grpc::Status FileTransferServiceImpl::GetFileInfo(grpc::ServerContext* context,
const fdbrpc::GetFileInfoRequest* request,
fdbrpc::GetFileInfoReply* response) {
std::ifstream input_file(request->file_name(), std::ios::binary | std::ios::ate);
if (!input_file.is_open()) {
return grpc::Status(grpc::StatusCode::NOT_FOUND, "File not found");
}
if (request->get_size()) {
response->set_file_size(input_file.tellg());
}
if (request->get_crc_checksum()) {
auto crc = crc32_checksum_ifstream(&input_file);
response->set_crc_checksum(crc);
}
return grpc::Status::OK;
}
std::optional<fdbrpc::GetFileInfoReply> FileTransferClient::GetFileInfo(const std::string& filename,
bool get_crc_checksum) {
grpc::ClientContext context;
fdbrpc::GetFileInfoRequest request;
request.set_file_name(filename);
request.set_get_size(true);
request.set_get_crc_checksum(get_crc_checksum);
fdbrpc::GetFileInfoReply response;
grpc::Status status = stub_->GetFileInfo(&context, request, &response);
if (status.ok()) {
return { response };
} else {
return std::nullopt;
}
}
//------ FileTransferClient ------
// Downloads a file from a remote server and saves it locally.
//
// This function communicates with a remote server to download a file specified by `filename`
// and saves it to the local file system with the name `output_filename`. Optionally, it can
// verify the integrity of the downloaded file using a CRC checksum.
//
// Params:
// - filename: The name of the file to download from the remote server.
// - output_filename: The name of the file to save the downloaded content locally.
// - verify: Whether to verify the file's integrity using CRC checksum
//
// Returns:
// std::optional<size_t> The size of the downloaded file in bytes if successful, or std::nullopt if the download
// failed. `output_filename` is deleted on failure.
std::optional<size_t> FileTransferClient::DownloadFile(const std::string& filename,
const std::string& output_filename,
bool verify) {
uint32_t expected_crc = 0;
uint32_t expected_size = 0;
{
fdbrpc::GetFileInfoRequest request;
grpc::ClientContext context;
request.set_file_name(filename);
request.set_get_crc_checksum(verify);
request.set_get_size(true);
fdbrpc::GetFileInfoReply response;
auto res = stub_->GetFileInfo(&context, request, &response);
if (!res.ok()) {
return std::nullopt;
}
expected_crc = response.crc_checksum();
expected_size = response.file_size();
}
fdbrpc::DownloadRequest request;
request.set_file_name(filename);
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReader<fdbrpc::DownloadChunk>> reader(stub_->DownloadFile(&context, request));
std::ofstream output_file(output_filename, std::ios::binary | std::ios::trunc);
if (!output_file.is_open()) {
return std::nullopt; // Failed to open file
}
fdbrpc::DownloadChunk chunk;
size_t bytes_read = 0;
bool failed = false;
while (reader->Read(&chunk)) {
if (chunk.offset() != bytes_read) {
// Abort on invalid offset
failed = true;
break;
}
output_file.write(chunk.data().data(), chunk.data().size());
bytes_read += chunk.data().size();
}
// Close file after writing
output_file.close();
failed = failed || (bytes_read != expected_size);
// Verify checksum
if (!failed && verify) {
std::ifstream output_file_reader(output_filename);
uint32_t actual_crc = crc32_checksum_ifstream(&output_file_reader);
failed = (actual_crc != expected_crc);
}
// Check final gRPC status
grpc::Status grpc_status = reader->Finish();
if (failed || !grpc_status.ok()) {
// TODO: Return error codes/exception to tell caller what happened?
// std::cerr << "File download failed: " << grpc_status.error_message() << std::endl;
if (delete_on_close_) {
output_file.close();
std::remove(output_filename.c_str());
}
return std::nullopt;
}
return { bytes_read };
}
#endif // FLOW_GRPC_ENABLED

View File

@@ -20,9 +20,9 @@
#ifdef FLOW_GRPC_ENABLED
#include <cstdio>
#include "flow/UnitTest.h"
#include "fdbrpc/FlowGrpc.h"
#include "fdbrpc/FlowGrpcTests.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@@ -98,4 +98,4 @@ TEST_CASE("/fdbrpc/grpc/destroy_server_without_shutdown") {
} // namespace fdbrpc_test
#endif
#endif

View File

@@ -1,4 +1,3 @@
/**
* FlowGrpcTests.actor.cpp
*
@@ -20,9 +19,11 @@
*/
#ifdef FLOW_GRPC_ENABLED
#include "flow/UnitTest.h"
#include <ctime>
#include "fdbrpc/FlowGrpc.h"
#include "fdbrpc/FlowGrpcTests.h"
#include "fdbrpc/FileTransfer.h"
#include "flow/UnitTest.h"
#include "flow/flow.h"
// So that tests are not optimized out. :/
@@ -31,6 +32,16 @@ void forceLinkGrpcTests2() {}
namespace fdbrpc_test {
namespace asio = boost::asio;
std::string generate_random_string(int size) {
const std::string characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
std::string random_string;
for (int i = 0; i < size; ++i) {
random_string += characters[deterministicRandom()->randomInt(0, characters.size())];
}
return random_string;
}
TEST_CASE("/fdbrpc/grpc/basic_coro") {
NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50500"));
GrpcServer server(addr);
@@ -66,7 +77,7 @@ TEST_CASE("/fdbrpc/grpc/basic_stream_server") {
EchoRequest request;
request.set_message("Ping!");
auto stream = client.call(&TestEchoService::Stub::EchoRecvStream10, request);
loop {
while (true) {
auto response = co_await stream;
ASSERT_EQ(response.message(), "Echo: Ping!");
count += 1;
@@ -82,7 +93,7 @@ TEST_CASE("/fdbrpc/grpc/basic_stream_server") {
}
TEST_CASE("/fdbrpc/grpc/future_destroy") {
NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50500"));
NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50502"));
GrpcServer server(addr);
server.registerService(make_shared<TestEchoServiceImpl>());
Future<Void> _ = server.run();
@@ -105,7 +116,7 @@ TEST_CASE("/fdbrpc/grpc/future_destroy") {
}
TEST_CASE("/fdbrpc/grpc/stream_destroy") {
NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50501"));
NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50503"));
GrpcServer server(addr);
server.registerService(make_shared<TestEchoServiceImpl>());
Future<Void> _ = server.run();
@@ -147,7 +158,7 @@ TEST_CASE("/fdbrpc/grpc/stream_destroy") {
// EchoRequest request;
// request.set_message("Ping!");
// auto stream = client.call(&TestEchoService::Stub::EchoSendStream10, request);
// loop {
// while (true) {
// auto response = co_await stream;
// ASSERT_EQ(response.message(), "Echo: Ping!");
// count += 1;
@@ -162,6 +173,137 @@ TEST_CASE("/fdbrpc/grpc/stream_destroy") {
// co_return;
// }
TEST_CASE("/fdbrpc/grpc/file_transfer") {
using platform::TmpFile;
// -- Server --
std::string server_address("127.0.0.1:50504");
FileTransferServiceImpl service;
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// -- Client --
auto channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
auto client = FileTransferClient(channel);
TmpFile src;
TmpFile dest;
std::cout << "Writing 1GB random bytes to source file.\n";
int bytes_written = 0;
for (int i = 0; i < 1024; ++i) {
const int size = i < 1023 ? 1024 * 1024 : (1024 * 1024) / 2; // Make last block smaller.
auto rand_str = generate_random_string(size);
src.append((const uint8_t*)rand_str.c_str(), size);
bytes_written += size;
}
std::cout << "Finished writing " << bytes_written << " bytes.\n";
// -- Start file transfer --
std::cout << "Invoking gRPC File Transfer call.\n";
auto start = std::chrono::high_resolution_clock::now();
auto res = client.DownloadFile(src.getFileName(), dest.getFileName());
std::cout << "Src = " << src.getFileName() << ", Dest = " << dest.getFileName() << "\n";
auto end = std::chrono::high_resolution_clock::now();
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Time taken: " << diff.count() << std::endl;
ASSERT(res.has_value());
std::cout << "Bytes downloaded: " << res.value() << std::endl;
ASSERT_EQ(res.value(), bytes_written);
src.destroyFile();
dest.destroyFile();
server->Shutdown();
return Void();
}
TEST_CASE("/fdbrpc/grpc/file_transfer_byte_flip") {
using platform::TmpFile;
// -- Server --
std::string server_address("127.0.0.1:50505");
FileTransferServiceImpl service;
service.SetErrorInjection(FileTransferServiceImpl::FLIP_BYTE);
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// -- Client --
auto channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
auto client = FileTransferClient(channel);
TmpFile src;
TmpFile dest;
std::cout << "Writing 1GB random bytes to source file.\n";
int bytes_written = 0;
for (int i = 0; i < 1024; ++i) {
const int size = i < 1023 ? 1024 * 1024 : (1024 * 1024) / 2; // Make last block smaller.
auto rand_str = generate_random_string(size);
src.append((const uint8_t*)rand_str.c_str(), size);
bytes_written += size;
}
std::cout << "Finished writing " << bytes_written << " bytes.\n";
// -- Start file transfer --
std::cout << "Invoking gRPC File Transfer call.\n";
auto res = client.DownloadFile(src.getFileName(), dest.getFileName());
std::cout << "Src = " << src.getFileName() << ", Dest = " << dest.getFileName() << "\n";
ASSERT(!res.has_value());
src.destroyFile();
dest.destroyFile();
server->Shutdown();
return Void();
}
TEST_CASE("/fdbrpc/grpc/file_transfer_fail_random") {
using platform::TmpFile;
// -- Server --
std::string server_address("127.0.0.1:50506");
FileTransferServiceImpl service;
service.SetErrorInjection(FileTransferServiceImpl::FAIL_RANDOMLY);
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// -- Client --
auto channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
auto client = FileTransferClient(channel);
TmpFile src;
TmpFile dest;
std::cout << "Writing 1GB random bytes to source file.\n";
int bytes_written = 0;
for (int i = 0; i < 1024; ++i) {
const int size = i < 1023 ? 1024 * 1024 : (1024 * 1024) / 2; // Make last block smaller.
auto rand_str = generate_random_string(size);
src.append((const uint8_t*)rand_str.c_str(), size);
bytes_written += size;
}
std::cout << "Finished writing " << bytes_written << " bytes.\n";
// -- Start file transfer --
std::cout << "Invoking gRPC File Transfer call.\n";
auto res = client.DownloadFile(src.getFileName(), dest.getFileName());
std::cout << "Src = " << src.getFileName() << ", Dest = " << dest.getFileName() << "\n";
ASSERT(!res.has_value());
src.destroyFile();
dest.destroyFile();
server->Shutdown();
return Void();
}
} // namespace fdbrpc_test
#endif
#endif

View File

@@ -0,0 +1,91 @@
/**
* FileTransfer.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef FLOW_GRPC_ENABLED
#ifndef FDBRPC_FILE_TRANSFER_H
#define FDBRPC_FILE_TRANSFER_H
#include <optional>
#undef loop
#include <grpcpp/grpcpp.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/status.h>
#include "fdbrpc/file_transfer/file_transfer.pb.h"
#include "fdbrpc/file_transfer/file_transfer.grpc.pb.h"
class FileTransferServiceImpl final : public fdbrpc::FileTransferService::Service {
const int DEFAULT_CHUNK_SIZE = 1024 * 1024;
public:
grpc::Status DownloadFile(grpc::ServerContext* context,
const fdbrpc::DownloadRequest* request,
grpc::ServerWriter<fdbrpc::DownloadChunk>* writer) override;
grpc::Status GetFileInfo(grpc::ServerContext* context,
const fdbrpc::GetFileInfoRequest* request,
fdbrpc::GetFileInfoReply* response) override;
//-- Testing --
enum ErrorInjection {
NO_ERROR,
FAIL_RANDOMLY,
FLIP_BYTE,
};
void SetErrorInjection(ErrorInjection error_inject) { error_inject_ = error_inject; }
private:
ErrorInjection error_inject_ = NO_ERROR;
};
class FileTransferClient {
public:
FileTransferClient(std::shared_ptr<grpc::Channel> channel) : stub_(fdbrpc::FileTransferService::NewStub(channel)) {}
std::optional<fdbrpc::GetFileInfoReply> GetFileInfo(const std::string& filename, bool get_crc_checksum = false);
// Downloads a file from a remote server and saves it locally.
//
// This function communicates with a remote server to download a file specified by `filename`
// and saves it to the local file system with the name `output_filename`. Optionally, it can
// verify the integrity of the downloaded file using a CRC checksum.
//
// Params:
// - filename: The name of the file to download from the remote server.
// - output_filename: The name of the file to save the downloaded content locally.
// - verify: Whether to verify the file's integrity using CRC checksum
//
// Returns:
// std::optional<size_t> The size of the downloaded file in bytes if successful, or std::nullopt if the download
// failed. `output_filename` is deleted on failure.
std::optional<size_t> DownloadFile(const std::string& filename,
const std::string& output_filename,
bool verify = true);
private:
// If download fails, delete the file from disk.
// TODO: Add ability to resume.
const bool delete_on_close_ = true;
std::unique_ptr<fdbrpc::FileTransferService::Stub> stub_;
};
#endif
#endif // FLOW_GRPC_ENABLED

View File

@@ -23,10 +23,11 @@
#include <memory>
#include <thread>
#include <grpcpp/grpcpp.h>
#include <boost/asio.hpp>
#undef loop
#include <grpcpp/grpcpp.h>
#include "flow/IThreadPool.h"
#include "flow/flow.h"
@@ -120,7 +121,7 @@ public:
if (status.ok()) {
promise->send(response);
} else {
std::cout << "Error: " << status.error_message() << std::endl;
// std::cout << "Error: " << status.error_message() << std::endl;
promise->sendError(grpc_error()); // TODO (Vishesh): Propogate the gRPC error codes.
}
});
@@ -140,7 +141,7 @@ public:
auto reader = (stub_.get()->*rpc)(&context, request);
while (reader->Read(&response)) {
if (promise->getFutureReferenceCount() == 0) {
std::cout << "Stream cancelled.\n";
// std::cout << "Stream cancelled.\n";
context.TryCancel();
return;
}
@@ -152,7 +153,7 @@ public:
if (status.ok()) {
promise->sendError(end_of_stream());
} else {
std::cout << "Error: " << status.error_message() << std::endl;
// std::cout << "Error: " << status.error_message() << std::endl;
promise->sendError(grpc_error()); // TODO (Vishesh): Propogate the gRPC error codes.
}
});
@@ -190,4 +191,4 @@ private:
};
#endif // FDBRPC_FLOW_GRPC_H
#endif // FLOW_GRPC_ENABLED
#endif // FLOW_GRPC_ENABLED

View File

@@ -19,11 +19,18 @@
*/
#ifdef FLOW_GRPC_ENABLED
#ifndef FDBRPC_FLOW_GRPC_TESTS_H
#define FDBRPC_FLOW_GRPC_TESTS_H
#include <cstdio>
#include <thread>
#include "flow/Error.h"
#undef loop
#include "fdbrpc/test/echo.grpc.pb.h"
#include "flow/Error.h"
namespace fdbrpc_test {
using std::make_shared;
@@ -104,4 +111,5 @@ private:
} // namespace fdbrpc_test
#endif
#endif // FDBRPC_FLOW_GRPC_TESTS_H
#endif // FLOW_GRPC_ENABLED

View File

@@ -0,0 +1,32 @@
syntax = "proto3";
package fdbrpc;
service FileTransferService {
rpc GetFileInfo(GetFileInfoRequest) returns (GetFileInfoReply);
rpc DownloadFile(DownloadRequest) returns (stream DownloadChunk);
}
message DownloadRequest {
string file_name = 1;
int32 chunk_size = 2;
int32 first_chunk_index = 3;
}
message DownloadChunk {
int64 offset = 1;
bytes data = 2;
}
message GetFileInfoRequest {
string file_name = 1;
bool get_size = 2;
bool get_crc_checksum = 3;
}
// TODO: Add stronger checksums.
// TODO: Ability to checksum parts if downloading files in part?
message GetFileInfoReply {
int64 file_size = 1;
uint32 crc_checksum = 2;
}

View File

@@ -8,8 +8,8 @@ if(NOT WIN32)
endif()
endif()
if(FLOW_GRPC_ENABLED)
generate_grpc_protobuf(fdbrpc.test echo.proto)
if(WITH_GRPC)
generate_grpc_protobuf(fdbrpc.test protos/echo.proto)
endif()
add_flow_target(EXECUTABLE NAME fdbrpc_bench SRCS fdbrpc_bench.actor.cpp)
target_link_libraries(fdbrpc_bench PUBLIC flow fdbrpc boost_target_program_options)

View File

@@ -220,4 +220,5 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
bool maybeInjectTargetedRestart();
};
#endif
#include "flow/unactorcompiler.h"
#endif

View File

@@ -3074,8 +3074,11 @@ std::string readFileBytes(std::string const& filename, size_t maxSize) {
return ret;
}
void writeFileBytes(std::string const& filename, const uint8_t* data, size_t count) {
std::ofstream ofs(filename, std::fstream::out | std::fstream::binary);
void writeFileBytes(std::string const& filename, const uint8_t* data, size_t count, bool append) {
auto fflags = std::fstream::out | std::fstream::binary;
if (append)
fflags |= std::fstream::app;
std::ofstream ofs(filename, fflags);
if (!ofs.good()) {
TraceEvent("WriteFileBytes_FileOpenError").detail("Filename", filename).GetLastError();
throw io_error();
@@ -3369,7 +3372,11 @@ size_t TmpFile::read(uint8_t* buff, size_t len) {
}
void TmpFile::write(const uint8_t* buff, size_t len) {
writeFileBytes(filename, buff, len);
writeFileBytes(filename, buff, len, false);
}
void TmpFile::append(const uint8_t* buff, size_t len) {
writeFileBytes(filename, buff, len, true);
}
bool TmpFile::destroyFile() {

View File

@@ -350,7 +350,7 @@ std::string readFileBytes(std::string const& filename, size_t maxSize);
size_t readFileBytes(std::string const& filename, uint8_t* buff, size_t len);
// Write data buffer into file
void writeFileBytes(std::string const& filename, const uint8_t* data, size_t count);
void writeFileBytes(std::string const& filename, const uint8_t* data, size_t count, bool append = false);
// Write text into file
void writeFile(std::string const& filename, std::string const& content);
@@ -462,6 +462,7 @@ public:
~TmpFile();
size_t read(uint8_t* buff, size_t len);
void write(const uint8_t* buff, size_t len);
void append(const uint8_t* buff, size_t len);
bool destroyFile();
std::string getFileName() const { return filename; }