Merge branch 'main-v1' into sync-v2-to-v1

This commit is contained in:
Alex Auvolat
2025-09-14 16:59:51 +02:00
32 changed files with 1011 additions and 156 deletions

View File

@@ -31,6 +31,11 @@ steps:
commands:
- nix-build -j4 --attr flakePackages.tests-sqlite
- name: unit + func tests (fjall)
image: nixpkgs/nix:nixos-22.05
commands:
- nix-build -j4 --attr flakePackages.tests-fjall
- name: integration tests
image: nixpkgs/nix:nixos-22.05
commands:

198
Cargo.lock generated
View File

@@ -687,6 +687,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2c12f985c78475a6b8d629afd0c360260ef34cfef52efccdcfd31972f81c2e"
[[package]]
name = "byteview"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5"
[[package]]
name = "cc"
version = "1.2.16"
@@ -798,6 +804,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "compare"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
[[package]]
name = "core-foundation"
version = "0.9.4"
@@ -888,6 +900,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
@@ -897,6 +918,16 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@@ -968,6 +999,20 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.10",
]
[[package]]
name = "deranged"
version = "0.4.0"
@@ -1010,6 +1055,12 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "double-ended-peekable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
name = "dyn-clone"
version = "1.0.19"
@@ -1031,6 +1082,18 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "env_logger"
version = "0.10.2"
@@ -1098,6 +1161,23 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "fjall"
version = "2.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc"
dependencies = [
"byteorder",
"byteview",
"dashmap 6.1.0",
"log",
"lsm-tree",
"path-absolutize",
"std-semaphore",
"tempfile",
"xxhash-rust",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -1430,8 +1510,10 @@ name = "garage_db"
version = "2.0.0"
dependencies = [
"err-derive",
"fjall",
"heed",
"mktemp",
"parking_lot 0.12.3",
"r2d2",
"r2d2_sqlite",
"rusqlite",
@@ -1674,6 +1756,12 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "guardian"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f"
[[package]]
name = "h2"
version = "0.3.26"
@@ -2249,6 +2337,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "interval-heap"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
dependencies = [
"compare",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -2605,6 +2702,36 @@ dependencies = [
"hashbrown 0.15.2",
]
[[package]]
name = "lsm-tree"
version = "2.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab73c02eadb3dc12c0024e5b61d6284e6d59064e67e74fbad77856caa56f62c7"
dependencies = [
"byteorder",
"crossbeam-skiplist",
"double-ended-peekable",
"enum_dispatch",
"guardian",
"interval-heap",
"log",
"lz4_flex",
"path-absolutize",
"quick_cache",
"rustc-hash",
"self_cell",
"tempfile",
"value-log",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
[[package]]
name = "matchers"
version = "0.1.0"
@@ -2859,7 +2986,7 @@ checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
dependencies = [
"async-trait",
"crossbeam-channel",
"dashmap",
"dashmap 4.0.2",
"fnv",
"futures-channel",
"futures-executor",
@@ -3020,6 +3147,24 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "path-absolutize"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5"
dependencies = [
"path-dedot",
]
[[package]]
name = "path-dedot"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397"
dependencies = [
"once_cell",
]
[[package]]
name = "pem"
version = "3.0.5"
@@ -3315,6 +3460,16 @@ dependencies = [
"serde",
]
[[package]]
name = "quick_cache"
version = "0.6.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ad6644cb07b7f3488b9f3d2fde3b4c0a7fa367cafefb39dff93a659f76eb786"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
]
[[package]]
name = "quote"
version = "1.0.40"
@@ -3581,6 +3736,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.1"
@@ -3823,6 +3984,12 @@ dependencies = [
"libc",
]
[[package]]
name = "self_cell"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
[[package]]
name = "semver"
version = "1.0.26"
@@ -4036,6 +4203,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "std-semaphore"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -4736,6 +4909,29 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "value-log"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c"
dependencies = [
"byteorder",
"byteview",
"interval-heap",
"log",
"path-absolutize",
"rustc-hash",
"tempfile",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "varint-rs"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@@ -64,6 +64,7 @@ md-5 = "0.10"
mktemp = "0.5"
nix = { version = "0.29", default-features = false, features = ["fs"] }
nom = "7.1"
parking_lot = "0.12"
parse_duration = "2.1"
paste = "1.0"
pin-project = "1.0.12"
@@ -90,6 +91,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] }
rusqlite = "0.31.0"
r2d2 = "0.8"
r2d2_sqlite = "0.24"
fjall = "2.4"
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false }

View File

@@ -12,7 +12,7 @@ In this section, we cover the following web applications:
| [Mastodon](#mastodon) | ✅ | Natively supported |
| [Matrix](#matrix) | ✅ | Tested with `synapse-s3-storage-provider` |
| [ejabberd](#ejabberd) | ✅ | `mod_s3_upload` |
| [Pixelfed](#pixelfed) | | Not yet tested |
| [Pixelfed](#pixelfed) | | Natively supported |
| [Pleroma](#pleroma) | ❓ | Not yet tested |
| [Lemmy](#lemmy) | ✅ | Supported with pict-rs |
| [Funkwhale](#funkwhale) | ❓ | Not yet tested |
@@ -191,10 +191,10 @@ garage key create peertube-key
Keep the Key ID and the Secret key in a pad, they will be needed later.
We need two buckets, one for normal videos (named peertube-video) and one for webtorrent videos (named peertube-playlist).
We need two buckets, one for normal videos (named peertube-videos) and one for webtorrent videos (named peertube-playlists).
```bash
garage bucket create peertube-videos
garage bucket create peertube-playlist
garage bucket create peertube-playlists
```
Now we allow our key to read and write on these buckets:
@@ -253,7 +253,7 @@ object_storage:
proxify_private_files: false
streaming_playlists:
bucket_name: 'peertube-playlist'
bucket_name: 'peertube-playlists'
# Keep it empty for our example
prefix: ''

View File

@@ -24,6 +24,7 @@ db_engine = "lmdb"
block_size = "1M"
block_ram_buffer_max = "256MiB"
block_max_concurrent_reads = 16
lmdb_map_size = "1T"
@@ -97,6 +98,7 @@ The following gives details about each available configuration option.
Top-level configuration options, in alphabetical order:
[`allow_punycode`](#allow_punycode),
[`allow_world_readable_secrets`](#allow_world_readable_secrets),
[`block_max_concurrent_reads`](`block_max_concurrent_reads),
[`block_ram_buffer_max`](#block_ram_buffer_max),
[`block_size`](#block_size),
[`bootstrap_peers`](#bootstrap_peers),
@@ -335,6 +337,7 @@ Since `v0.8.0`, Garage can use alternative storage backends as follows:
| --------- | ----------------- | ------------- |
| [LMDB](https://www.symas.com/lmdb) (since `v0.8.0`, default since `v0.9.0`) | `"lmdb"` | `<metadata_dir>/db.lmdb/` |
| [Sqlite](https://sqlite.org) (since `v0.8.0`) | `"sqlite"` | `<metadata_dir>/db.sqlite` |
| [Fjall](https://github.com/fjall-rs/fjall) (**experimental support** since `v1.3.0`) | `"fjall"` | `<metadata_dir>/db.fjall/` |
| [Sled](https://sled.rs) (old default, removed since `v1.0`) | `"sled"` | `<metadata_dir>/db/` |
Sled was supported until Garage v0.9.x, and was removed in Garage v1.0.
@@ -371,6 +374,14 @@ LMDB works very well, but is known to have the following limitations:
so it is not the best choice for high-performance storage clusters,
but it should work fine in many cases.
- Fjall: a storage engine based on LSM trees, which theoretically allow for
higher write throughput than other storage engines that are based on B-trees.
Using Fjall could potentially improve Garage's performance significantly in
write-heavy workloads. **Support for Fjall is experimental at this point**,
we have added it to Garage for evaluation purposes only. **Do not use it for
production-critical workloads.**
It is possible to convert Garage's metadata directory from one format to another
using the `garage convert-db` command, which should be used as follows:
@@ -408,6 +419,7 @@ Here is how this option impacts the different database engines:
|----------|------------------------------------|-------------------------------|
| Sqlite | `PRAGMA synchronous = OFF` | `PRAGMA synchronous = NORMAL` |
| LMDB | `MDB_NOMETASYNC` + `MDB_NOSYNC` | `MDB_NOMETASYNC` |
| Fjall | default options | not supported |
Note that the Sqlite database is always ran in `WAL` mode (`PRAGMA journal_mode = WAL`).
@@ -514,6 +526,29 @@ node.
The default value is 256MiB.
#### `block_max_concurrent_reads` (since `v1.3.0` / `v2.1.0`) {#block_max_concurrent_reads}
The maximum number of blocks (individual files in the data directory) open
simultaneously for reading.
Reducing this number does not limit the number of data blocks that can be
transferred through the network simultaneously. This mechanism was just added
as a backpressure mechanism for HDD read speed: it helps avoid a situation
where too many requests are coming in and Garage is reading too many block
files simultaneously, thus not making timely progress on any of the reads.
When a request to read a data block comes in through the network, the requests
awaits for one of the `block_max_concurrent_reads` slots to be available
(internally implemented using a Semaphore object). Once it acquired a read
slot, it reads the entire block file to RAM and frees the slot as soon as the
block file is finished reading. Only after the slot is released will the
block's data start being transferred over the network. If the request fails to
acquire a reading slot wihtin 15 seconds, it fails with a timeout error.
Timeout events can be monitored through the `block_read_semaphore_timeouts`
metric in Prometheus: a non-zero number of such events indicates an I/O
bottleneck on HDD read speed.
#### `lmdb_map_size` {#lmdb_map_size}
This parameters can be used to set the map size used by LMDB,

View File

@@ -53,6 +53,9 @@
tests-sqlite = testWith {
GARAGE_TEST_INTEGRATION_DB_ENGINE = "sqlite";
};
tests-fjall = testWith {
GARAGE_TEST_INTEGRATION_DB_ENGINE = "fjall";
};
};
# ---- developpment shell, for making native builds only ----

View File

@@ -68,7 +68,7 @@ let
rootFeatures = if features != null then
features
else
([ "bundled-libs" "lmdb" "sqlite" "k2v" ] ++ (lib.optionals release [
([ "bundled-libs" "lmdb" "sqlite" "fjall" "k2v" ] ++ (lib.optionals release [
"consul-discovery"
"kubernetes-discovery"
"metrics"

View File

@@ -106,17 +106,17 @@ impl RequestHandler for LocalGetNodeStatisticsRequest {
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
let rc_len = garage.block_manager.rc_len()?.to_string();
let rc_len = garage.block_manager.rc_approximate_len()?.to_string();
ret += &format_table_to_string(vec![
format!(" number of RC entries:\t{} (~= number of blocks)", rc_len),
format!(
" resync queue length:\t{}",
garage.block_manager.resync.queue_len()?
garage.block_manager.resync.queue_approximate_len()?
),
format!(
" blocks with resync errors:\t{}",
garage.block_manager.resync.errors_len()?
garage.block_manager.resync.errors_approximate_len()?
),
]);
@@ -129,16 +129,21 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
let data_len = t
.data
.store
.approximate_len()
.map_err(GarageError::from)?
.to_string();
let mkl_len = t.merkle_updater.merkle_tree_approximate_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.insert_queue_len()?,
t.data.gc_todo_len()?
t.merkle_updater.todo_approximate_len()?,
t.data.insert_queue_approximate_len()?,
t.data.gc_todo_approximate_len()?
))
}

View File

@@ -352,7 +352,11 @@ where
while !*must_exit.borrow() {
let (stream, client_addr) = tokio::select! {
acc = listener.accept() => acc?,
acc = listener.accept() => match acc {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::ConnectionAborted => continue,
Err(e) => return Err(e.into()),
},
_ = must_exit.changed() => continue,
};

View File

@@ -48,6 +48,8 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
const BLOCK_READ_SEMAPHORE_TIMEOUT: Duration = Duration::from_secs(15);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@@ -85,6 +87,7 @@ pub struct BlockManager {
disable_scrub: bool,
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
read_semaphore: Semaphore,
pub rc: BlockRc,
pub resync: BlockResyncManager,
@@ -174,6 +177,8 @@ impl BlockManager {
.iter()
.map(|_| Mutex::new(BlockManagerLocked()))
.collect::<Vec<_>>(),
read_semaphore: Semaphore::new(config.block_max_concurrent_reads),
rc,
resync,
system,
@@ -416,8 +421,8 @@ impl BlockManager {
}
/// Get number of items in the refcount table
pub fn rc_len(&self) -> Result<usize, Error> {
Ok(self.rc.rc_table.len()?)
pub fn rc_approximate_len(&self) -> Result<usize, Error> {
Ok(self.rc.rc_table.approximate_len()?)
}
/// Send command to start/stop/manager scrub worker
@@ -435,7 +440,7 @@ impl BlockManager {
/// List all resync errors
pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
let mut blocks = Vec::with_capacity(self.resync.errors.len()?);
let mut blocks = Vec::with_capacity(self.resync.errors.approximate_len()?);
for ent in self.resync.errors.iter()? {
let (hash, cnt) = ent?;
let cnt = ErrorCounter::decode(&cnt);
@@ -565,9 +570,6 @@ impl BlockManager {
match self.find_block(hash).await {
Some(p) => self.read_block_from(hash, &p).await,
None => {
// Not found but maybe we should have had it ??
self.resync
.put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash
@@ -589,6 +591,15 @@ impl BlockManager {
) -> Result<DataBlock, Error> {
let (header, path) = block_path.as_parts_ref();
let permit = tokio::select! {
sem = self.read_semaphore.acquire() => sem.ok_or_message("acquire read semaphore")?,
_ = tokio::time::sleep(BLOCK_READ_SEMAPHORE_TIMEOUT) => {
self.metrics.block_read_semaphore_timeouts.add(1);
debug!("read block {:?}: read_semaphore acquire timeout", hash);
return Err(Error::Message("read block: read_semaphore acquire timeout".into()));
}
};
let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
@@ -613,6 +624,8 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
drop(permit);
Ok(data)
}

View File

@@ -22,6 +22,7 @@ pub struct BlockManagerMetrics {
pub(crate) bytes_read: BoundCounter<u64>,
pub(crate) block_read_duration: BoundValueRecorder<f64>,
pub(crate) block_read_semaphore_timeouts: BoundCounter<u64>,
pub(crate) bytes_written: BoundCounter<u64>,
pub(crate) block_write_duration: BoundValueRecorder<f64>,
pub(crate) delete_counter: BoundCounter<u64>,
@@ -50,7 +51,7 @@ impl BlockManagerMetrics {
.init(),
_rc_size: meter
.u64_value_observer("block.rc_size", move |observer| {
if let Ok(value) = rc_tree.len() {
if let Ok(value) = rc_tree.approximate_len() {
observer.observe(value as u64, &[])
}
})
@@ -58,7 +59,7 @@ impl BlockManagerMetrics {
.init(),
_resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| {
if let Ok(value) = resync_queue.len() {
if let Ok(value) = resync_queue.approximate_len() {
observer.observe(value as u64, &[]);
}
})
@@ -68,7 +69,7 @@ impl BlockManagerMetrics {
.init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
if let Ok(value) = resync_errors.len() {
if let Ok(value) = resync_errors.approximate_len() {
observer.observe(value as u64, &[]);
}
})
@@ -119,6 +120,11 @@ impl BlockManagerMetrics {
.with_description("Duration of block read operations")
.init()
.bind(&[]),
block_read_semaphore_timeouts: meter
.u64_counter("block.read_semaphore_timeouts")
.with_description("Number of block reads that failed due to semaphore acquire timeout")
.init()
.bind(&[]),
bytes_written: meter
.u64_counter("block.bytes_written")
.with_description("Number of bytes written to disk")

View File

@@ -104,13 +104,13 @@ impl BlockResyncManager {
}
/// Get length of resync queue
pub fn queue_len(&self) -> Result<usize, Error> {
Ok(self.queue.len()?)
pub fn queue_approximate_len(&self) -> Result<usize, Error> {
Ok(self.queue.approximate_len()?)
}
/// Get number of blocks that have an error
pub fn errors_len(&self) -> Result<usize, Error> {
Ok(self.errors.len()?)
pub fn errors_approximate_len(&self) -> Result<usize, Error> {
Ok(self.errors.approximate_len()?)
}
/// Clear the error counter for a block and put it in queue immediately
@@ -540,9 +540,11 @@ impl Worker for ResyncWorker {
}
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
queue_length: Some(self.manager.resync.queue_approximate_len().unwrap_or(0) as u64),
tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
persistent_errors: Some(
self.manager.resync.errors_approximate_len().unwrap_or(0) as u64
),
..Default::default()
}
}

View File

@@ -16,10 +16,14 @@ err-derive.workspace = true
tracing.workspace = true
heed = { workspace = true, optional = true }
rusqlite = { workspace = true, optional = true, features = ["backup"] }
r2d2 = { workspace = true, optional = true }
r2d2_sqlite = { workspace = true, optional = true }
fjall = { workspace = true, optional = true }
parking_lot = { workspace = true, optional = true }
[dev-dependencies]
mktemp.workspace = true
@@ -27,4 +31,5 @@ mktemp.workspace = true
default = [ "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ]
lmdb = [ "heed" ]
fjall = [ "dep:fjall", "dep:parking_lot" ]
sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]

453
src/db/fjall_adapter.rs Normal file
View File

@@ -0,0 +1,453 @@
use core::ops::Bound;
use std::path::PathBuf;
use std::sync::Arc;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use fjall::{
PartitionCreateOptions, PersistMode, TransactionalKeyspace, TransactionalPartitionHandle,
WriteTransaction,
};
use crate::{
open::{Engine, OpenOpt},
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub use fjall;
// --
pub(crate) fn open_db(path: &PathBuf, opt: &OpenOpt) -> Result<Db> {
info!("Opening Fjall database at: {}", path.display());
if opt.fsync {
return Err(Error(
"metadata_fsync is not supported with the Fjall database engine".into(),
));
}
let mut config = fjall::Config::new(path);
if let Some(block_cache_size) = opt.fjall_block_cache_size {
config = config.cache_size(block_cache_size as u64);
}
let keyspace = config.open_transactional()?;
Ok(FjallDb::init(keyspace))
}
// -- err
impl From<fjall::Error> for Error {
fn from(e: fjall::Error) -> Error {
Error(format!("fjall: {}", e).into())
}
}
impl From<fjall::LsmError> for Error {
fn from(e: fjall::LsmError) -> Error {
Error(format!("fjall lsm_tree: {}", e).into())
}
}
impl From<fjall::Error> for TxOpError {
fn from(e: fjall::Error) -> TxOpError {
TxOpError(e.into())
}
}
// -- db
pub struct FjallDb {
keyspace: TransactionalKeyspace,
trees: RwLock<Vec<(String, TransactionalPartitionHandle)>>,
}
type ByteRefRangeBound<'r> = (Bound<&'r [u8]>, Bound<&'r [u8]>);
impl FjallDb {
pub fn init(keyspace: TransactionalKeyspace) -> Db {
let s = Self {
keyspace,
trees: RwLock::new(Vec::new()),
};
Db(Arc::new(s))
}
fn get_tree(
&self,
i: usize,
) -> Result<MappedRwLockReadGuard<'_, TransactionalPartitionHandle>> {
RwLockReadGuard::try_map(self.trees.read(), |trees: &Vec<_>| {
trees.get(i).map(|tup| &tup.1)
})
.map_err(|_| Error("invalid tree id".into()))
}
}
impl IDb for FjallDb {
fn engine(&self) -> String {
"Fjall (EXPERIMENTAL!)".into()
}
fn open_tree(&self, name: &str) -> Result<usize> {
let mut trees = self.trees.write();
let safe_name = encode_name(name)?;
if let Some(i) = trees.iter().position(|(name, _)| *name == safe_name) {
Ok(i)
} else {
let tree = self
.keyspace
.open_partition(&safe_name, PartitionCreateOptions::default())?;
let i = trees.len();
trees.push((safe_name, tree));
Ok(i)
}
}
fn list_trees(&self) -> Result<Vec<String>> {
Ok(self
.keyspace
.list_partitions()
.iter()
.map(|n| decode_name(&n))
.collect::<Result<Vec<_>>>()?)
}
fn snapshot(&self, base_path: &PathBuf) -> Result<()> {
std::fs::create_dir_all(base_path)?;
let path = Engine::Fjall.db_path(base_path);
let source_state = self.keyspace.read_tx();
let copy_keyspace = fjall::Config::new(path).open()?;
for partition_name in self.keyspace.list_partitions() {
let source_partition = self
.keyspace
.open_partition(&partition_name, PartitionCreateOptions::default())?;
let copy_partition =
copy_keyspace.open_partition(&partition_name, PartitionCreateOptions::default())?;
for entry in source_state.iter(&source_partition) {
let (key, value) = entry?;
copy_partition.insert(key, value)?;
}
}
copy_keyspace.persist(PersistMode::SyncAll)?;
Ok(())
}
// ----
fn get(&self, tree_idx: usize, key: &[u8]) -> Result<Option<Value>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
let val = tx.get(&tree, key)?;
match val {
None => Ok(None),
Some(v) => Ok(Some(v.to_vec())),
}
}
fn approximate_len(&self, tree_idx: usize) -> Result<usize> {
let tree = self.get_tree(tree_idx)?;
Ok(tree.approximate_len())
}
fn is_empty(&self, tree_idx: usize) -> Result<bool> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
Ok(tx.is_empty(&tree)?)
}
fn insert(&self, tree_idx: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree_idx)?;
let mut tx = self.keyspace.write_tx();
tx.insert(&tree, key, value);
tx.commit()?;
Ok(())
}
fn remove(&self, tree_idx: usize, key: &[u8]) -> Result<()> {
let tree = self.get_tree(tree_idx)?;
let mut tx = self.keyspace.write_tx();
tx.remove(&tree, key);
tx.commit()?;
Ok(())
}
fn clear(&self, tree_idx: usize) -> Result<()> {
let mut trees = self.trees.write();
if tree_idx >= trees.len() {
return Err(Error("invalid tree id".into()));
}
let (name, tree) = trees.remove(tree_idx);
self.keyspace.delete_partition(tree)?;
let tree = self
.keyspace
.open_partition(&name, PartitionCreateOptions::default())?;
trees.insert(tree_idx, (name, tree));
Ok(())
}
fn iter(&self, tree_idx: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
Ok(Box::new(tx.iter(&tree).map(iterator_remap)))
}
fn iter_rev(&self, tree_idx: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
Ok(Box::new(tx.iter(&tree).rev().map(iterator_remap)))
}
fn range<'r>(
&self,
tree_idx: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
Ok(Box::new(
tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.map(iterator_remap),
))
}
fn range_rev<'r>(
&self,
tree_idx: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
Ok(Box::new(
tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.rev()
.map(iterator_remap),
))
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read();
let mut tx = FjallTx {
trees: &trees[..],
tx: self.keyspace.write_tx(),
};
let res = f.try_on(&mut tx);
match res {
TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.rollback();
Err(TxError::Abort(()))
}
TxFnResult::DbErr => {
tx.tx.rollback();
Err(TxError::Db(Error(
"(this message will be discarded)".into(),
)))
}
}
}
}
// ----
struct FjallTx<'a> {
trees: &'a [(String, TransactionalPartitionHandle)],
tx: WriteTransaction<'a>,
}
impl<'a> FjallTx<'a> {
fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalPartitionHandle> {
self.trees.get(i).map(|tup| &tup.1).ok_or_else(|| {
TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
))
})
}
}
impl<'a> ITx for FjallTx<'a> {
fn get(&self, tree_idx: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree_idx)?;
match self.tx.get(tree, key)? {
Some(v) => Ok(Some(v.to_vec())),
None => Ok(None),
}
}
fn len(&self, tree_idx: usize) -> TxOpResult<usize> {
let tree = self.get_tree(tree_idx)?;
Ok(self.tx.len(tree)? as usize)
}
fn insert(&mut self, tree_idx: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
let tree = self.get_tree(tree_idx)?.clone();
self.tx.insert(&tree, key, value);
Ok(())
}
fn remove(&mut self, tree_idx: usize, key: &[u8]) -> TxOpResult<()> {
let tree = self.get_tree(tree_idx)?.clone();
self.tx.remove(&tree, key);
Ok(())
}
fn clear(&mut self, _tree_idx: usize) -> TxOpResult<()> {
unimplemented!("LSM tree clearing in cross-partition transaction is not supported")
}
fn iter(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(self.tx.iter(&tree).map(iterator_remap_tx)))
}
fn iter_rev(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(self.tx.iter(&tree).rev().map(iterator_remap_tx)))
}
fn range<'r>(
&self,
tree_idx: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let low = clone_bound(low);
let high = clone_bound(high);
Ok(Box::new(
self.tx
.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high))
.map(iterator_remap_tx),
))
}
fn range_rev<'r>(
&self,
tree_idx: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let low = clone_bound(low);
let high = clone_bound(high);
Ok(Box::new(
self.tx
.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high))
.rev()
.map(iterator_remap_tx),
))
}
}
// -- maps fjall's (k, v) to ours
fn iterator_remap(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> Result<(Value, Value)> {
r.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| e.into())
}
fn iterator_remap_tx(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> TxOpResult<(Value, Value)> {
r.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| e.into())
}
// -- utils to deal with Garage's tightness on Bound lifetimes
type ByteVecBound = Bound<Vec<u8>>;
type ByteVecRangeBounds = (ByteVecBound, ByteVecBound);
fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound {
let value = match bound {
Bound::Excluded(v) | Bound::Included(v) => v.to_vec(),
Bound::Unbounded => vec![],
};
match bound {
Bound::Included(_) => Bound::Included(value),
Bound::Excluded(_) => Bound::Excluded(value),
Bound::Unbounded => Bound::Unbounded,
}
}
// -- utils to encode table names --
fn encode_name(s: &str) -> Result<String> {
let base = 'A' as u32;
let mut ret = String::with_capacity(s.len() + 10);
for c in s.chars() {
if c.is_alphanumeric() || c == '_' || c == '-' || c == '#' {
ret.push(c);
} else if c <= u8::MAX as char {
ret.push('$');
let c_hi = c as u32 / 16;
let c_lo = c as u32 % 16;
ret.push(char::from_u32(base + c_hi).unwrap());
ret.push(char::from_u32(base + c_lo).unwrap());
} else {
return Err(Error(
format!("table name {} could not be safely encoded", s).into(),
));
}
}
Ok(ret)
}
fn decode_name(s: &str) -> Result<String> {
use std::convert::TryFrom;
let errfn = || Error(format!("encoded table name {} is invalid", s).into());
let c_map = |c: char| {
let c = c as u32;
let base = 'A' as u32;
if (base..base + 16).contains(&c) {
Some(c - base)
} else {
None
}
};
let mut ret = String::with_capacity(s.len());
let mut it = s.chars();
while let Some(c) = it.next() {
if c == '$' {
let c_hi = it.next().and_then(c_map).ok_or_else(errfn)?;
let c_lo = it.next().and_then(c_map).ok_or_else(errfn)?;
let c_dec = char::try_from(c_hi * 16 + c_lo).map_err(|_| errfn())?;
ret.push(c_dec);
} else {
ret.push(c);
}
}
Ok(ret)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encdec_name() {
for name in [
"testname",
"test_name",
"test name",
"test$name",
"test:name@help.me$get/this**right",
] {
let encname = encode_name(name).unwrap();
assert!(!encname.contains(' '));
assert!(!encname.contains('.'));
assert!(!encname.contains('*'));
assert_eq!(*name, decode_name(&encname).unwrap());
}
}
}

View File

@@ -1,6 +1,8 @@
#[macro_use]
extern crate tracing;
#[cfg(feature = "fjall")]
pub mod fjall_adapter;
#[cfg(feature = "lmdb")]
pub mod lmdb_adapter;
#[cfg(feature = "sqlite")]
@@ -152,7 +154,7 @@ impl Db {
let tree_names = other.list_trees()?;
for name in tree_names {
let tree = self.open_tree(&name)?;
if tree.len()? > 0 {
if !tree.is_empty()? {
return Err(Error(format!("tree {} already contains data", name).into()));
}
@@ -194,8 +196,12 @@ impl Tree {
self.0.get(self.1, key.as_ref())
}
#[inline]
pub fn len(&self) -> Result<usize> {
self.0.len(self.1)
pub fn approximate_len(&self) -> Result<usize> {
self.0.approximate_len(self.1)
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
self.0.is_empty(self.1)
}
#[inline]
@@ -333,7 +339,8 @@ pub(crate) trait IDb: Send + Sync {
fn snapshot(&self, path: &PathBuf) -> Result<()>;
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>;
fn approximate_len(&self, tree: usize) -> Result<usize>;
fn is_empty(&self, tree: usize) -> Result<bool>;
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<()>;

View File

@@ -1,8 +1,8 @@
use core::ops::Bound;
use core::ptr::NonNull;
use std::collections::HashMap;
use std::convert::TryInto;
use std::marker::PhantomPinned;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
@@ -11,12 +11,55 @@ use heed::types::ByteSlice;
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
use crate::{
open::{Engine, OpenOpt},
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub use heed;
// ---- top-level open function
pub(crate) fn open_db(path: &PathBuf, opt: &OpenOpt) -> Result<Db> {
info!("Opening LMDB database at: {}", path.display());
if let Err(e) = std::fs::create_dir_all(&path) {
return Err(Error(
format!("Unable to create LMDB data directory: {}", e).into(),
));
}
let map_size = match opt.lmdb_map_size {
None => recommended_map_size(),
Some(v) => v - (v % 4096),
};
let mut env_builder = heed::EnvOpenOptions::new();
env_builder.max_dbs(100);
env_builder.map_size(map_size);
env_builder.max_readers(2048);
unsafe {
env_builder.flag(heed::flags::Flags::MdbNoRdAhead);
env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
if !opt.fsync {
env_builder.flag(heed::flags::Flags::MdbNoSync);
}
}
match env_builder.open(&path) {
Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
return Err(Error(
"OutOfMemory error while trying to open LMDB database. This can happen \
if your operating system is not allowing you to use sufficient virtual \
memory address space. Please check that no limit is set (ulimit -v). \
You may also try to set a smaller `lmdb_map_size` configuration parameter. \
On 32-bit machines, you should probably switch to another database engine."
.into(),
))
}
Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())),
Ok(db) => Ok(LmdbDb::init(db)),
}
}
// -- err
impl From<heed::Error> for Error {
@@ -104,10 +147,9 @@ impl IDb for LmdbDb {
Ok(ret2)
}
fn snapshot(&self, to: &PathBuf) -> Result<()> {
std::fs::create_dir_all(to)?;
let mut path = to.clone();
path.push("data.mdb");
fn snapshot(&self, base_path: &PathBuf) -> Result<()> {
std::fs::create_dir_all(base_path)?;
let path = Engine::Lmdb.db_path(base_path);
self.db
.copy_to_path(path, heed::CompactionOption::Enabled)?;
Ok(())
@@ -126,11 +168,16 @@ impl IDb for LmdbDb {
}
}
fn len(&self, tree: usize) -> Result<usize> {
fn approximate_len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
Ok(tree.len(&tx)?.try_into().unwrap())
}
fn is_empty(&self, tree: usize) -> Result<bool> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
Ok(tree.is_empty(&tx)?)
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
@@ -159,13 +206,15 @@ impl IDb for LmdbDb {
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?))
// Safety: the cloture does not store its argument anywhere,
unsafe { TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?)) }
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?))
// Safety: the cloture does not store its argument anywhere,
unsafe { TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?)) }
}
fn range<'r>(
@@ -176,7 +225,8 @@ impl IDb for LmdbDb {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?))
// Safety: the cloture does not store its argument anywhere,
unsafe { TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?)) }
}
fn range_rev<'r>(
&self,
@@ -186,7 +236,8 @@ impl IDb for LmdbDb {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?))
// Safety: the cloture does not store its argument anywhere,
unsafe { TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?)) }
}
// ----
@@ -316,28 +367,41 @@ where
{
tx: RoTxn<'a>,
iter: Option<I>,
_pin: PhantomPinned,
}
impl<'a, I> TxAndIterator<'a, I>
where
I: Iterator<Item = IteratorItem<'a>> + 'a,
{
fn make<F>(tx: RoTxn<'a>, iterfun: F) -> Result<ValueIter<'a>>
fn iter(self: Pin<&mut Self>) -> &mut Option<I> {
// Safety: iter is not structural
unsafe { &mut self.get_unchecked_mut().iter }
}
/// Safety: iterfun must not store its argument anywhere but in its result.
unsafe fn make<F>(tx: RoTxn<'a>, iterfun: F) -> Result<ValueIter<'a>>
where
F: FnOnce(&'a RoTxn<'a>) -> Result<I>,
{
let res = TxAndIterator { tx, iter: None };
let res = TxAndIterator {
tx,
iter: None,
_pin: PhantomPinned,
};
let mut boxed = Box::pin(res);
// This unsafe allows us to bypass lifetime checks
let tx = unsafe { NonNull::from(&boxed.tx).as_ref() };
let iter = iterfun(tx)?;
let tx_lifetime_overextended: &'a RoTxn<'a> = {
let tx = &boxed.tx;
// Safety: Artificially extending the lifetime because
// this reference will only be stored and accessed from the
// returned ValueIter which guarantees that it is destroyed
// before the tx it is pointing to.
unsafe { &*&raw const *tx }
};
let iter = iterfun(&tx_lifetime_overextended)?;
let mut_ref = Pin::as_mut(&mut boxed);
// This unsafe allows us to write in a field of the pinned struct
unsafe {
Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
}
*boxed.as_mut().iter() = Some(iter);
Ok(Box::new(TxAndIteratorPin(boxed)))
}
@@ -348,8 +412,10 @@ where
I: Iterator<Item = IteratorItem<'a>> + 'a,
{
fn drop(&mut self) {
// ensure the iterator is dropped before the RoTxn it references
drop(self.iter.take());
// Safety: `new_unchecked` is okay because we know this value is never
// used again after being dropped.
let this = unsafe { Pin::new_unchecked(self) };
drop(this.iter().take());
}
}
@@ -365,13 +431,12 @@ where
fn next(&mut self) -> Option<Self::Item> {
let mut_ref = Pin::as_mut(&mut self.0);
// This unsafe allows us to mutably access the iterator field
let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() };
match next {
None => None,
Some(Err(e)) => Some(Err(e.into())),
Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))),
}
let next = mut_ref.iter().as_mut()?.next()?;
let res = match next {
Err(e) => Err(e.into()),
Ok((k, v)) => Ok((k.to_vec(), v.to_vec())),
};
Some(res)
}
}

View File

@@ -11,6 +11,7 @@ use crate::{Db, Error, Result};
pub enum Engine {
Lmdb,
Sqlite,
Fjall,
}
impl Engine {
@@ -19,8 +20,26 @@ impl Engine {
match self {
Self::Lmdb => "lmdb",
Self::Sqlite => "sqlite",
Self::Fjall => "fjall",
}
}
/// Return engine-specific DB path from base path
pub fn db_path(&self, base_path: &PathBuf) -> PathBuf {
let mut ret = base_path.clone();
match self {
Self::Lmdb => {
ret.push("db.lmdb");
}
Self::Sqlite => {
ret.push("db.sqlite");
}
Self::Fjall => {
ret.push("db.fjall");
}
}
ret
}
}
impl std::fmt::Display for Engine {
@@ -36,10 +55,11 @@ impl std::str::FromStr for Engine {
match text {
"lmdb" | "heed" => Ok(Self::Lmdb),
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
"fjall" => Ok(Self::Fjall),
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
kind => Err(Error(
format!(
"Invalid DB engine: {} (options are: lmdb, sqlite)",
"Invalid DB engine: {} (options are: lmdb, sqlite, fjall)",
kind
)
.into(),
@@ -51,6 +71,7 @@ impl std::str::FromStr for Engine {
pub struct OpenOpt {
pub fsync: bool,
pub lmdb_map_size: Option<usize>,
pub fjall_block_cache_size: Option<usize>,
}
impl Default for OpenOpt {
@@ -58,6 +79,7 @@ impl Default for OpenOpt {
Self {
fsync: false,
lmdb_map_size: None,
fjall_block_cache_size: None,
}
}
}
@@ -66,53 +88,15 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
match engine {
// ---- Sqlite DB ----
#[cfg(feature = "sqlite")]
Engine::Sqlite => {
info!("Opening Sqlite database at: {}", path.display());
let manager = r2d2_sqlite::SqliteConnectionManager::file(path);
Ok(crate::sqlite_adapter::SqliteDb::new(manager, opt.fsync)?)
}
Engine::Sqlite => crate::sqlite_adapter::open_db(path, opt),
// ---- LMDB DB ----
#[cfg(feature = "lmdb")]
Engine::Lmdb => {
info!("Opening LMDB database at: {}", path.display());
if let Err(e) = std::fs::create_dir_all(&path) {
return Err(Error(
format!("Unable to create LMDB data directory: {}", e).into(),
));
}
Engine::Lmdb => crate::lmdb_adapter::open_db(path, opt),
let map_size = match opt.lmdb_map_size {
None => crate::lmdb_adapter::recommended_map_size(),
Some(v) => v - (v % 4096),
};
let mut env_builder = heed::EnvOpenOptions::new();
env_builder.max_dbs(100);
env_builder.map_size(map_size);
env_builder.max_readers(2048);
unsafe {
env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoRdAhead);
env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoMetaSync);
if !opt.fsync {
env_builder.flag(heed::flags::Flags::MdbNoSync);
}
}
match env_builder.open(&path) {
Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
return Err(Error(
"OutOfMemory error while trying to open LMDB database. This can happen \
if your operating system is not allowing you to use sufficient virtual \
memory address space. Please check that no limit is set (ulimit -v). \
You may also try to set a smaller `lmdb_map_size` configuration parameter. \
On 32-bit machines, you should probably switch to another database engine."
.into(),
))
}
Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())),
Ok(db) => Ok(crate::lmdb_adapter::LmdbDb::init(db)),
}
}
// ---- Fjall DB ----
#[cfg(feature = "fjall")]
Engine::Fjall => crate::fjall_adapter::open_db(path, opt),
// Pattern is unreachable when all supported DB engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building

View File

@@ -11,12 +11,23 @@ use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{params, Rows, Statement, Transaction};
use crate::{
open::{Engine, OpenOpt},
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub use rusqlite;
// ---- top-level open function
pub(crate) fn open_db(path: &PathBuf, opt: &OpenOpt) -> Result<Db> {
info!("Opening Sqlite database at: {}", path.display());
let manager = r2d2_sqlite::SqliteConnectionManager::file(path);
Ok(SqliteDb::new(manager, opt.fsync)?)
}
// ----
type Connection = r2d2::PooledConnection<SqliteConnectionManager>;
// --- err
@@ -139,17 +150,32 @@ impl IDb for SqliteDb {
Ok(trees)
}
fn snapshot(&self, to: &PathBuf) -> Result<()> {
fn snapshot(&self, base_path: &PathBuf) -> Result<()> {
fn progress(p: rusqlite::backup::Progress) {
let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
info!("Sqlite snapshot progress: {}%", percent);
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static LAST_LOG_TIME: AtomicU64 = AtomicU64::new(0);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Fix your clock :o")
.as_millis() as u64;
if now >= LAST_LOG_TIME.load(Ordering::Relaxed) + 10 * 1000 {
let percent = (p.pagecount - p.remaining) * 100 / p.pagecount;
info!("Sqlite snapshot progress: {}%", percent);
LAST_LOG_TIME.fetch_max(now, Ordering::Relaxed);
}
}
std::fs::create_dir_all(to)?;
let mut path = to.clone();
path.push("db.sqlite");
std::fs::create_dir_all(base_path)?;
let path = Engine::Sqlite.db_path(&base_path);
self.db
.get()?
.backup(rusqlite::DatabaseName::Main, path, Some(progress))?;
Ok(())
}
@@ -160,7 +186,7 @@ impl IDb for SqliteDb {
self.internal_get(&self.db.get()?, &tree, key)
}
fn len(&self, tree: usize) -> Result<usize> {
fn approximate_len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
let db = self.db.get()?;
@@ -172,6 +198,10 @@ impl IDb for SqliteDb {
}
}
fn is_empty(&self, tree: usize) -> Result<bool> {
Ok(self.approximate_len(tree)? == 0)
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
let db = self.db.get()?;

View File

@@ -1,7 +1,7 @@
use crate::*;
fn test_suite(db: Db) {
let tree = db.open_tree("tree").unwrap();
let tree = db.open_tree("tree:this_is_a_tree").unwrap();
let ka: &[u8] = &b"test"[..];
let kb: &[u8] = &b"zwello"[..];
@@ -14,7 +14,7 @@ fn test_suite(db: Db) {
assert!(tree.insert(ka, va).is_ok());
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
assert_eq!(tree.len().unwrap(), 1);
assert_eq!(tree.iter().unwrap().count(), 1);
// ---- test transaction logic ----
@@ -148,3 +148,15 @@ fn test_sqlite_db() {
let db = SqliteDb::new(manager, false).unwrap();
test_suite(db);
}
#[test]
#[cfg(feature = "fjall")]
fn test_fjall_db() {
use crate::fjall_adapter::{fjall, FjallDb};
let path = mktemp::Temp::new_dir().unwrap();
let config = fjall::Config::new(path).temporary(true);
let keyspace = config.open_transactional().unwrap();
let db = FjallDb::init(keyspace);
test_suite(db);
}

View File

@@ -93,6 +93,7 @@ k2v = [ "garage_util/k2v", "garage_api_k2v", "garage_api_admin/k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
fjall = [ "garage_model/fjall" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]

View File

@@ -180,10 +180,21 @@ fn watch_shutdown_signal() -> watch::Receiver<bool> {
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to install SIGTERM handler");
let mut sighup = signal(SignalKind::hangup()).expect("Failed to install SIGHUP handler");
tokio::select! {
_ = sigint.recv() => info!("Received SIGINT, shutting down."),
_ = sigterm.recv() => info!("Received SIGTERM, shutting down."),
_ = sighup.recv() => info!("Received SIGHUP, shutting down."),
loop {
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, shutting down.");
break
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down.");
break
}
_ = sighup.recv() => {
info!("Received SIGHUP, reload not supported.");
continue
}
}
}
send_cancel.send(true).unwrap();
});

View File

@@ -72,6 +72,16 @@ impl K2vClient {
.enable_http2()
.build();
let client = HttpClient::builder(TokioExecutor::new()).build(connector);
Self::new_with_client(config, client)
}
/// Create a new K2V client with an external client.
/// Useful for example if you plan on creating many clients but you want to mutualize the
/// underlying thread pools & co.
pub fn new_with_client(
config: K2vClientConfig,
client: HttpClient<HttpsConnector<HttpConnector>, Body>,
) -> Result<Self, Error> {
let user_agent: std::borrow::Cow<str> = match &config.user_agent {
Some(ua) => ua.into(),
None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),

View File

@@ -45,3 +45,4 @@ default = [ "lmdb", "sqlite" ]
k2v = [ "garage_util/k2v" ]
lmdb = [ "garage_db/lmdb" ]
sqlite = [ "garage_db/sqlite" ]
fjall = [ "garage_db/fjall" ]

View File

@@ -119,21 +119,17 @@ impl Garage {
info!("Opening database...");
let db_engine = db::Engine::from_str(&config.db_engine)
.ok_or_message("Invalid `db_engine` value in configuration file")?;
let mut db_path = config.metadata_dir.clone();
match db_engine {
db::Engine::Sqlite => {
db_path.push("db.sqlite");
}
db::Engine::Lmdb => {
db_path.push("db.lmdb");
}
}
let db_path = db_engine.db_path(&config.metadata_dir);
let db_opt = db::OpenOpt {
fsync: config.metadata_fsync,
lmdb_map_size: match config.lmdb_map_size {
v if v == usize::default() => None,
v => Some(v),
},
fjall_block_cache_size: match config.fjall_block_cache_size {
v if v == usize::default() => None,
v => Some(v),
},
};
let db = db::open_db(&db_path, db_engine, &db_opt)
.ok_or_message("Unable to open metadata db")?;

View File

@@ -121,13 +121,13 @@ impl Worker for LifecycleWorker {
mpu_aborted,
..
} => {
let n_objects = self.garage.object_table.data.store.len().ok();
let n_objects = self.garage.object_table.data.store.approximate_len().ok();
let progress = match n_objects {
None => "...".to_string(),
Some(total) => format!(
Some(total) if total > 0 => format!(
"~{:.2}%",
100. * std::cmp::min(*counter, total) as f32 / total as f32
),
_ => "...".to_string(),
};
WorkerStatus {
progress: Some(progress),

View File

@@ -229,13 +229,11 @@ impl LayoutManager {
}
/// Save cluster layout data to disk
async fn save_cluster_layout(&self) -> Result<(), Error> {
async fn save_cluster_layout(&self) {
let layout = self.layout.read().unwrap().inner().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
if let Err(e) = self.persist_cluster_layout.save_async(&layout).await {
error!("Failed to save cluster_layout: {}", e);
}
}
fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) {
@@ -313,7 +311,7 @@ impl LayoutManager {
self.change_notify.notify_waiters();
self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout));
self.save_cluster_layout().await?;
self.save_cluster_layout().await;
}
Ok(SystemRpc::Ok)
@@ -328,7 +326,7 @@ impl LayoutManager {
if let Some(new_trackers) = self.merge_layout_trackers(trackers) {
self.change_notify.notify_waiters();
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers));
self.save_cluster_layout().await?;
self.save_cluster_layout().await;
}
Ok(SystemRpc::Ok)

View File

@@ -368,11 +368,11 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
}
pub fn insert_queue_len(&self) -> Result<usize, Error> {
Ok(self.insert_queue.len()?)
pub fn insert_queue_approximate_len(&self) -> Result<usize, Error> {
Ok(self.insert_queue.approximate_len()?)
}
pub fn gc_todo_len(&self) -> Result<usize, Error> {
Ok(self.gc_todo.len()?)
pub fn gc_todo_approximate_len(&self) -> Result<usize, Error> {
Ok(self.gc_todo.approximate_len()?)
}
}

View File

@@ -313,7 +313,7 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
queue_length: Some(self.gc.data.gc_todo_approximate_len().unwrap_or(0) as u64),
..Default::default()
}
}

View File

@@ -287,12 +287,12 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
MerkleNode::decode_opt(&ent)
}
pub fn merkle_tree_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_tree.len()?)
pub fn merkle_tree_approximate_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_tree.approximate_len()?)
}
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
pub fn todo_approximate_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.approximate_len()?)
}
}
@@ -306,7 +306,7 @@ impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
queue_length: Some(self.0.todo_approximate_len().unwrap_or(0) as u64),
..Default::default()
}
}

View File

@@ -36,7 +36,7 @@ impl TableMetrics {
.u64_value_observer(
"table.size",
move |observer| {
if let Ok(value) = store.len() {
if let Ok(value) = store.approximate_len() {
observer.observe(
value as u64,
&[KeyValue::new("table_name", table_name)],
@@ -50,7 +50,7 @@ impl TableMetrics {
.u64_value_observer(
"table.merkle_tree_size",
move |observer| {
if let Ok(value) = merkle_tree.len() {
if let Ok(value) = merkle_tree.approximate_len() {
observer.observe(
value as u64,
&[KeyValue::new("table_name", table_name)],
@@ -64,7 +64,7 @@ impl TableMetrics {
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
if let Ok(v) = merkle_todo.len() {
if let Ok(v) = merkle_todo.approximate_len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
@@ -78,7 +78,7 @@ impl TableMetrics {
.u64_value_observer(
"table.insert_queue_length",
move |observer| {
if let Ok(v) = insert_queue.len() {
if let Ok(v) = insert_queue.approximate_len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
@@ -92,7 +92,7 @@ impl TableMetrics {
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
if let Ok(value) = gc_todo.len() {
if let Ok(value) = gc_todo.approximate_len() {
observer.observe(
value as u64,
&[KeyValue::new("table_name", table_name)],

View File

@@ -27,7 +27,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64),
queue_length: Some(self.0.data.insert_queue.approximate_len().unwrap_or(0) as u64),
..Default::default()
}
}

View File

@@ -75,6 +75,10 @@ pub struct Config {
)]
pub block_ram_buffer_max: usize,
/// Maximum number of concurrent reads of block files on disk
#[serde(default = "default_block_max_concurrent_reads")]
pub block_max_concurrent_reads: usize,
/// Skip the permission check of secret files. Useful when
/// POSIX ACLs (or more complex chmods) are used.
#[serde(default)]
@@ -122,6 +126,10 @@ pub struct Config {
#[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize,
/// Fjall block cache size
#[serde(deserialize_with = "deserialize_capacity", default)]
pub fjall_block_cache_size: usize,
// -- APIs
/// Configuration for S3 api
pub s3_api: S3ApiConfig,
@@ -279,6 +287,9 @@ fn default_block_size() -> usize {
fn default_block_ram_buffer_max() -> usize {
256 * 1024 * 1024
}
fn default_block_max_concurrent_reads() -> usize {
16
}
fn default_consistency_mode() -> String {
"consistent".into()