/* * BlobGranuleReader.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "fmt/format.h" #include "fdbclient/AsyncFileS3BlobStore.actor.h" #include "fdbclient/BlobGranuleCommon.h" #include "fdbclient/BlobGranuleFiles.h" #include "fdbclient/BlobGranuleReader.actor.h" #include "fdbclient/BlobWorkerCommon.h" #include "fdbclient/BlobWorkerInterface.h" #include "fdbclient/FDBTypes.h" #include "flow/actorcompiler.h" // This must be the last #include. ACTOR Future> readFile(Reference bstoreProvider, BlobFilePointerRef f) { try { state Arena arena; std::string fname = f.filename.toString(); state Reference bstore = bstoreProvider->getForRead(fname); state Reference reader = wait(bstore->readFile(fname)); state uint8_t* data = new (arena) uint8_t[f.length]; int readSize = wait(reader->read(data, f.length, f.offset)); ASSERT(f.length == readSize); StringRef dataRef(data, f.length); return Standalone(dataRef, arena); } catch (Error& e) { throw e; } } // TODO: improve the interface of this function so that it doesn't need // to be passed the entire BlobWorkerStats object // FIXME: probably want to chunk this up with yields to avoid slow task for blob worker re-snapshotting by calling the // sub-functions that BlobGranuleFiles actually exposes? ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, KeyRangeRef keyRange, Version beginVersion, Version readVersion, Reference bstore, Optional stats) { // TODO REMOVE with early replying ASSERT(readVersion == chunk.includedVersion); state Arena arena; try { Future> readSnapshotFuture; if (chunk.snapshotFile.present()) { readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get()); if (stats.present()) { ++stats.get()->s3GetReqs; } } state std::vector>> readDeltaFutures; readDeltaFutures.reserve(chunk.deltaFiles.size()); for (BlobFilePointerRef deltaFile : chunk.deltaFiles) { readDeltaFutures.push_back(readFile(bstore, deltaFile)); if (stats.present()) { ++stats.get()->s3GetReqs; } } state Optional snapshotData; // not present if snapshotFile isn't present if (chunk.snapshotFile.present()) { state Standalone s = wait(readSnapshotFuture); arena.dependsOn(s.arena()); snapshotData = s; } state int numDeltaFiles = chunk.deltaFiles.size(); state std::vector deltaData; state int deltaIdx; deltaData.reserve(numDeltaFiles); for (deltaIdx = 0; deltaIdx < numDeltaFiles; deltaIdx++) { Standalone data = wait(readDeltaFutures[deltaIdx]); deltaData.push_back(data); arena.dependsOn(data.arena()); } // TODO do something useful with stats? GranuleMaterializeStats stats; return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData, stats); } catch (Error& e) { throw e; } } // TODO probably should add things like limit/bytelimit at some point? ACTOR Future readBlobGranules(BlobGranuleFileRequest request, BlobGranuleFileReply reply, Reference bstore, PromiseStream results) { // TODO for large amount of chunks, this should probably have some sort of buffer limit like ReplyPromiseStream. // Maybe just use ReplyPromiseStream instead of PromiseStream? try { state int i; for (i = 0; i < reply.chunks.size(); i++) { RangeResult chunkResult = wait( readBlobGranule(reply.chunks[i], request.keyRange, request.beginVersion, request.readVersion, bstore)); results.send(std::move(chunkResult)); } results.sendError(end_of_stream()); } catch (Error& e) { results.sendError(e); } return Void(); } // Return true if a given range is fully covered by blob chunks bool isRangeFullyCovered(KeyRange range, Standalone> blobChunks) { std::vector blobRanges; for (const BlobGranuleChunkRef& chunk : blobChunks) { blobRanges.push_back(chunk.keyRange); } return range.isCovered(blobRanges); } void testAddChunkRange(KeyRef begin, KeyRef end, Standalone>& chunks) { BlobGranuleChunkRef chunk; chunk.keyRange = KeyRangeRef(begin, end); chunks.push_back(chunks.arena(), chunk); } TEST_CASE("/fdbserver/blobgranule/isRangeCoveredByBlob") { Standalone> chunks; // chunk1 key_a1 - key_a9 testAddChunkRange("key_a1"_sr, "key_a9"_sr, chunks); // chunk2 key_b1 - key_b9 testAddChunkRange("key_b1"_sr, "key_b9"_sr, chunks); // check empty range. not covered { ASSERT(isRangeFullyCovered(KeyRangeRef(), chunks) == false); } // check empty chunks. not covered { Standalone> empyChunks; ASSERT(isRangeFullyCovered(KeyRangeRef(), empyChunks) == false); } // check '' to \xff { ASSERT(isRangeFullyCovered(KeyRangeRef(""_sr, "\xff"_sr), chunks) == false); } // check {key_a1, key_a9} { ASSERT(isRangeFullyCovered(KeyRangeRef("key_a1"_sr, "key_a9"_sr), chunks)); } // check {key_a1, key_a3} { ASSERT(isRangeFullyCovered(KeyRangeRef("key_a1"_sr, "key_a3"_sr), chunks)); } // check {key_a0, key_a3} { ASSERT(isRangeFullyCovered(KeyRangeRef("key_a0"_sr, "key_a3"_sr), chunks) == false); } // check {key_a5, key_b2} { auto range = KeyRangeRef("key_a5"_sr, "key_b5"_sr); ASSERT(isRangeFullyCovered(range, chunks) == false); ASSERT(range.begin == "key_a5"_sr); ASSERT(range.end == "key_b5"_sr); } // check continued chunks { Standalone> continuedChunks; testAddChunkRange("key_a1"_sr, "key_a9"_sr, continuedChunks); testAddChunkRange("key_a9"_sr, "key_b1"_sr, continuedChunks); testAddChunkRange("key_b1"_sr, "key_b9"_sr, continuedChunks); ASSERT(isRangeFullyCovered(KeyRangeRef("key_a1"_sr, "key_b9"_sr), continuedChunks)); } // check functionality of isCovered() { std::vector ranges; ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_x"_sr, "key_y"_sr)); ASSERT(KeyRangeRef("key_x"_sr, "key_y"_sr).isCovered(ranges)); ranges.clear(); ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_v"_sr, "key_y"_sr)); ASSERT(KeyRangeRef("key_x"_sr, "key_y"_sr).isCovered(ranges)); ranges.clear(); ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_x"_sr, "key_xa"_sr)); ranges.push_back(KeyRangeRef("key_xa"_sr, "key_ya"_sr)); ASSERT(KeyRangeRef("key_x"_sr, "key_y"_sr).isCovered(ranges)); ranges.clear(); ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_x"_sr, "key_xa"_sr)); ranges.push_back(KeyRangeRef("key_xa"_sr, "key_xb"_sr)); ASSERT(!KeyRangeRef("key_x"_sr, "key_y"_sr).isCovered(ranges)); ranges.clear(); ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_x"_sr, "key_xa"_sr)); ASSERT(!KeyRangeRef("key_x"_sr, "key_y"_sr).isCovered(ranges)); ranges.clear(); ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_xa"_sr, "key_y"_sr)); ASSERT(!KeyRangeRef("key_x"_sr, "key_y"_sr).isCovered(ranges)); ranges.clear(); ranges.push_back(KeyRangeRef("key_a"_sr, "key_b"_sr)); ranges.push_back(KeyRangeRef("key_x"_sr, "key_y"_sr)); ASSERT(!KeyRangeRef("key_a"_sr, "key_y"_sr).isCovered(ranges)); } return Void(); }