From 8b35a946d9f6b31b26b9783acbfab984316051f4 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 23 Feb 2024 17:09:47 +0100 Subject: [PATCH 01/18] Allow external HTTP client --- src/k2v-client/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index 852274a7..5b6d7f58 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -72,6 +72,16 @@ impl K2vClient { .enable_http2() .build(); let client = HttpClient::builder(TokioExecutor::new()).build(connector); + Self::new_with_client(config, client) + } + + /// Create a new K2V client with an external client. + /// Useful for example if you plan on creating many clients but you want to mutualize the + /// underlying thread pools & co. + pub fn new_with_client( + config: K2vClientConfig, + client: HttpClient, Body>, + ) -> Result { let user_agent: std::borrow::Cow = match &config.user_agent { Some(ua) => ua.into(), None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(), From 1b42919bf7d63a7f9856e926c2068af5bbbb6d39 Mon Sep 17 00:00:00 2001 From: Arthur Carcano Date: Wed, 9 Jul 2025 12:32:56 +0200 Subject: [PATCH 02/18] Fix some unsoundness in lmdb adapter unsafe --- src/db/lmdb_adapter.rs | 66 +++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 259aa566..bd85f1b4 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -1,8 +1,8 @@ use core::ops::Bound; -use core::ptr::NonNull; use std::collections::HashMap; use std::convert::TryInto; +use std::marker::PhantomPinned; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, RwLock}; @@ -159,13 +159,15 @@ impl IDb for LmdbDb { fn iter(&self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; - TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?)) + // Safety: the cloture does not store its argument anywhere, + unsafe { TxAndIterator::make(tx, |tx| Ok(tree.iter(tx)?)) } } fn iter_rev(&self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; - TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?)) + // Safety: the cloture does not store its argument anywhere, + unsafe { TxAndIterator::make(tx, |tx| Ok(tree.rev_iter(tx)?)) } } fn range<'r>( @@ -176,7 +178,8 @@ impl IDb for LmdbDb { ) -> Result> { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; - TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?)) + // Safety: the cloture does not store its argument anywhere, + unsafe { TxAndIterator::make(tx, |tx| Ok(tree.range(tx, &(low, high))?)) } } fn range_rev<'r>( &self, @@ -186,7 +189,8 @@ impl IDb for LmdbDb { ) -> Result> { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; - TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?)) + // Safety: the cloture does not store its argument anywhere, + unsafe { TxAndIterator::make(tx, |tx| Ok(tree.rev_range(tx, &(low, high))?)) } } // ---- @@ -316,28 +320,41 @@ where { tx: RoTxn<'a>, iter: Option, + _pin: PhantomPinned, } impl<'a, I> TxAndIterator<'a, I> where I: Iterator> + 'a, { - fn make(tx: RoTxn<'a>, iterfun: F) -> Result> + fn iter(self: Pin<&mut Self>) -> &mut Option { + // Safety: iter is not structural + unsafe { &mut self.get_unchecked_mut().iter } + } + + /// Safety: iterfun must not store its argument anywhere but in its result. + unsafe fn make(tx: RoTxn<'a>, iterfun: F) -> Result> where F: FnOnce(&'a RoTxn<'a>) -> Result, { - let res = TxAndIterator { tx, iter: None }; + let res = TxAndIterator { + tx, + iter: None, + _pin: PhantomPinned, + }; let mut boxed = Box::pin(res); - // This unsafe allows us to bypass lifetime checks - let tx = unsafe { NonNull::from(&boxed.tx).as_ref() }; - let iter = iterfun(tx)?; + let tx_lifetime_overextended: &'a RoTxn<'a> = { + let tx = &boxed.tx; + // Safety: Artificially extending the lifetime because + // this reference will only be stored and accessed from the + // returned ValueIter which guarantees that it is destroyed + // before the tx it is pointing to. + unsafe { &*&raw const *tx } + }; + let iter = iterfun(&tx_lifetime_overextended)?; - let mut_ref = Pin::as_mut(&mut boxed); - // This unsafe allows us to write in a field of the pinned struct - unsafe { - Pin::get_unchecked_mut(mut_ref).iter = Some(iter); - } + *boxed.as_mut().iter() = Some(iter); Ok(Box::new(TxAndIteratorPin(boxed))) } @@ -348,8 +365,10 @@ where I: Iterator> + 'a, { fn drop(&mut self) { - // ensure the iterator is dropped before the RoTxn it references - drop(self.iter.take()); + // Safety: `new_unchecked` is okay because we know this value is never + // used again after being dropped. + let this = unsafe { Pin::new_unchecked(self) }; + drop(this.iter().take()); } } @@ -365,13 +384,12 @@ where fn next(&mut self) -> Option { let mut_ref = Pin::as_mut(&mut self.0); - // This unsafe allows us to mutably access the iterator field - let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() }; - match next { - None => None, - Some(Err(e)) => Some(Err(e.into())), - Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))), - } + let next = mut_ref.iter().as_mut()?.next()?; + let res = match next { + Err(e) => Err(e.into()), + Ok((k, v)) => Ok((k.to_vec(), v.to_vec())), + }; + Some(res) } } From 70cf6004ae79c26f0d1b17b03fb92b5081b83efb Mon Sep 17 00:00:00 2001 From: Lapineige Date: Fri, 1 Aug 2025 21:32:59 +0000 Subject: [PATCH 03/18] Fix typo in peertube buckets names --- doc/book/connect/apps/index.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/book/connect/apps/index.md b/doc/book/connect/apps/index.md index baf6ba50..f4ca9865 100644 --- a/doc/book/connect/apps/index.md +++ b/doc/book/connect/apps/index.md @@ -144,10 +144,10 @@ garage key new --name peertube-key Keep the Key ID and the Secret key in a pad, they will be needed later. -We need two buckets, one for normal videos (named peertube-video) and one for webtorrent videos (named peertube-playlist). +We need two buckets, one for normal videos (named peertube-videos) and one for webtorrent videos (named peertube-playlists). ```bash garage bucket create peertube-videos -garage bucket create peertube-playlist +garage bucket create peertube-playlists ``` Now we allow our key to read and write on these buckets: @@ -206,7 +206,7 @@ object_storage: proxify_private_files: false streaming_playlists: - bucket_name: 'peertube-playlist' + bucket_name: 'peertube-playlists' # Keep it empty for our example prefix: '' From cc29a40d51222d9dffb36e0747d4a164d1d0f9b8 Mon Sep 17 00:00:00 2001 From: Lapineige Date: Fri, 1 Aug 2025 21:35:15 +0000 Subject: [PATCH 04/18] Actualiser doc/book/connect/apps/index.md --- doc/book/connect/apps/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/book/connect/apps/index.md b/doc/book/connect/apps/index.md index 242e6fb1..f52d434b 100644 --- a/doc/book/connect/apps/index.md +++ b/doc/book/connect/apps/index.md @@ -12,7 +12,7 @@ In this section, we cover the following web applications: | [Mastodon](#mastodon) | ✅ | Natively supported | | [Matrix](#matrix) | ✅ | Tested with `synapse-s3-storage-provider` | | [ejabberd](#ejabberd) | ✅ | `mod_s3_upload` | -| [Pixelfed](#pixelfed) | ❓ | Not yet tested | +| [Pixelfed](#pixelfed) | ✅ | Natively supported | | [Pleroma](#pleroma) | ❓ | Not yet tested | | [Lemmy](#lemmy) | ✅ | Supported with pict-rs | | [Funkwhale](#funkwhale) | ❓ | Not yet tested | From f930c6f64302d2de1cff6fab6ed95468d2d99969 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 2 Aug 2025 13:09:33 +0200 Subject: [PATCH 05/18] don't die on SIGHUP --- src/garage/server.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/garage/server.rs b/src/garage/server.rs index 1dc86fd3..b81ae334 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -183,10 +183,21 @@ fn watch_shutdown_signal() -> watch::Receiver { let mut sigterm = signal(SignalKind::terminate()).expect("Failed to install SIGTERM handler"); let mut sighup = signal(SignalKind::hangup()).expect("Failed to install SIGHUP handler"); - tokio::select! { - _ = sigint.recv() => info!("Received SIGINT, shutting down."), - _ = sigterm.recv() => info!("Received SIGTERM, shutting down."), - _ = sighup.recv() => info!("Received SIGHUP, shutting down."), + loop { + tokio::select! { + _ = sigint.recv() => { + info!("Received SIGINT, shutting down."); + break + } + _ = sigterm.recv() => { + info!("Received SIGTERM, shutting down."); + break + } + _ = sighup.recv() => { + info!("Received SIGHUP, reload not supported."); + continue + } + } } send_cancel.send(true).unwrap(); }); From 5469c9587718b24eb4b58ed9a5cbe39dfe39777b Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 2 Aug 2025 12:51:37 +0200 Subject: [PATCH 06/18] handle ECONNABORTED --- src/api/common/generic_server.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/api/common/generic_server.rs b/src/api/common/generic_server.rs index 6ddc2ff2..8f9bcdfb 100644 --- a/src/api/common/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -343,7 +343,11 @@ where while !*must_exit.borrow() { let (stream, client_addr) = tokio::select! { - acc = listener.accept() => acc?, + acc = listener.accept() => match acc { + Ok(r) => r, + Err(e) if e.kind() == std::io::ErrorKind::ConnectionAborted => continue, + Err(e) => return Err(e.into()), + }, _ = must_exit.changed() => continue, }; From b340599e6865ecd488c7a88487c48b410e45d9f8 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 2 Aug 2025 13:43:38 +0200 Subject: [PATCH 07/18] log access keys --- src/api/common/generic_server.rs | 28 ++++++++++++++++++---------- src/api/common/signature/payload.rs | 4 ++-- src/api/k2v/api_server.rs | 6 ++++++ src/api/s3/api_server.rs | 6 ++++++ 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/api/common/generic_server.rs b/src/api/common/generic_server.rs index 6ddc2ff2..8453dc07 100644 --- a/src/api/common/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -33,6 +33,7 @@ use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::helpers::{BoxBody, ErrorBody}; +use crate::signature::payload::Authorization; pub trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; @@ -58,6 +59,12 @@ pub trait ApiHandler: Send + Sync + 'static { req: Request, endpoint: Self::Endpoint, ) -> impl Future>, Self::Error>> + Send; + + /// Returns the key id used to authenticate this request. The ID returned must be safe to + /// log. + fn key_id_from_request(&self, req: &Request) -> Option { + None + } } pub struct ApiServer { @@ -142,19 +149,20 @@ impl ApiServer { ) -> Result>, http::Error> { let uri = req.uri().clone(); - if let Ok(forwarded_for_ip_addr) = + let source = if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { - info!( - "{} (via {}) {} {}", - forwarded_for_ip_addr, - addr, - req.method(), - uri - ); + format!("{forwarded_for_ip_addr} (via {addr})") } else { - info!("{} {} {}", addr, req.method(), uri); - } + format!("{addr}") + }; + // we only do this to log the access key, so we can discard any error + let key = self + .api_handler + .key_id_from_request(&req) + .map(|k| format!("(key {k}) ")) + .unwrap_or_default(); + info!("{source} {key}{} {uri}", req.method()); debug!("{:?}", req); let tracer = opentelemetry::global::tracer("garage"); diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index 2d5f8603..c3a7f231 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -417,7 +417,7 @@ pub async fn verify_v4( // ============ Authorization header, or X-Amz-* query params ========= pub struct Authorization { - key_id: String, + pub key_id: String, scope: String, signed_headers: String, signature: String, @@ -426,7 +426,7 @@ pub struct Authorization { } impl Authorization { - fn parse_header(headers: &HeaderMap) -> Result { + pub fn parse_header(headers: &HeaderMap) -> Result { let authorization = headers .get(AUTHORIZATION) .ok_or_bad_request("Missing authorization header")? diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index de5775da..8e10d9a6 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -176,6 +176,12 @@ impl ApiHandler for K2VApiServer { Ok(resp_ok) } + + fn key_id_from_request(&self, req: &Request) -> Option { + garage_api_common::signature::payload::Authorization::parse_header(req.headers()) + .map(|auth| auth.key_id) + .ok() + } } impl ApiEndpoint for K2VApiEndpoint { diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 337ddb23..acb0cf56 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -343,6 +343,12 @@ impl ApiHandler for S3ApiServer { Ok(resp_ok) } + + fn key_id_from_request(&self, req: &Request) -> Option { + garage_api_common::signature::payload::Authorization::parse_header(req.headers()) + .map(|auth| auth.key_id) + .ok() + } } impl ApiEndpoint for S3ApiEndpoint { From 96d7713915861da178c101700c37d0ac580dd1dc Mon Sep 17 00:00:00 2001 From: Julien Kritter Date: Fri, 13 Sep 2024 10:40:46 +0200 Subject: [PATCH 08/18] Add support for an LSM-tree-based backend with Fjall --- Cargo.lock | 197 ++++++++++++++++++++- Cargo.toml | 1 + src/db/Cargo.toml | 2 + src/db/fjall_adapter.rs | 366 ++++++++++++++++++++++++++++++++++++++++ src/db/lib.rs | 2 + src/db/open.rs | 23 +++ src/garage/Cargo.toml | 1 + src/model/Cargo.toml | 1 + src/model/garage.rs | 7 + src/util/config.rs | 4 + 10 files changed, 603 insertions(+), 1 deletion(-) create mode 100644 src/db/fjall_adapter.rs diff --git a/Cargo.lock b/Cargo.lock index 6acd85ff..cd44160c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -687,6 +687,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d2c12f985c78475a6b8d629afd0c360260ef34cfef52efccdcfd31972f81c2e" +[[package]] +name = "byteview" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5" + [[package]] name = "cc" version = "1.2.16" @@ -798,6 +804,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "compare" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" + [[package]] name = "core-foundation" version = "0.9.4" @@ -874,6 +886,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -883,6 +904,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -954,6 +985,20 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.10", +] + [[package]] name = "deranged" version = "0.4.0" @@ -996,6 +1041,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "double-ended-peekable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" + [[package]] name = "dyn-clone" version = "1.0.19" @@ -1017,6 +1068,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -1084,6 +1147,23 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fjall" +version = "2.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc" +dependencies = [ + "byteorder", + "byteview", + "dashmap 6.1.0", + "log", + "lsm-tree", + "path-absolutize", + "std-semaphore", + "tempfile", + "xxhash-rust", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1412,6 +1492,7 @@ name = "garage_db" version = "1.2.0" dependencies = [ "err-derive", + "fjall", "heed", "mktemp", "r2d2", @@ -1655,6 +1736,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "guardian" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f" + [[package]] name = "h2" version = "0.3.26" @@ -2229,6 +2316,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "interval-heap" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" +dependencies = [ + "compare", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2585,6 +2681,36 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "lsm-tree" +version = "2.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab73c02eadb3dc12c0024e5b61d6284e6d59064e67e74fbad77856caa56f62c7" +dependencies = [ + "byteorder", + "crossbeam-skiplist", + "double-ended-peekable", + "enum_dispatch", + "guardian", + "interval-heap", + "log", + "lz4_flex", + "path-absolutize", + "quick_cache", + "rustc-hash", + "self_cell", + "tempfile", + "value-log", + "varint-rs", + "xxhash-rust", +] + +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" + [[package]] name = "matchers" version = "0.1.0" @@ -2839,7 +2965,7 @@ checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" dependencies = [ "async-trait", "crossbeam-channel", - "dashmap", + "dashmap 4.0.2", "fnv", "futures-channel", "futures-executor", @@ -3000,6 +3126,24 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "path-absolutize" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" +dependencies = [ + "path-dedot", +] + +[[package]] +name = "path-dedot" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" +dependencies = [ + "once_cell", +] + [[package]] name = "pem" version = "3.0.5" @@ -3295,6 +3439,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick_cache" +version = "0.6.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ad6644cb07b7f3488b9f3d2fde3b4c0a7fa367cafefb39dff93a659f76eb786" +dependencies = [ + "equivalent", + "hashbrown 0.15.2", +] + [[package]] name = "quote" version = "1.0.40" @@ -3532,6 +3686,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -3774,6 +3934,12 @@ dependencies = [ "libc", ] +[[package]] +name = "self_cell" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749" + [[package]] name = "semver" version = "1.0.26" @@ -3987,6 +4153,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "std-semaphore" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" + [[package]] name = "strsim" version = "0.11.1" @@ -4664,6 +4836,29 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "value-log" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c" +dependencies = [ + "byteorder", + "byteview", + "interval-heap", + "log", + "path-absolutize", + "rustc-hash", + "tempfile", + "varint-rs", + "xxhash-rust", +] + +[[package]] +name = "varint-rs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 789225b8..9876db60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] } rusqlite = "0.31.0" r2d2 = "0.8" r2d2_sqlite = "0.24" +fjall = "2.4" async-compression = { version = "0.4", features = ["tokio", "zstd"] } zstd = { version = "0.13", default-features = false } diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 666296ce..06b2fabc 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -19,6 +19,7 @@ heed = { workspace = true, optional = true } rusqlite = { workspace = true, optional = true, features = ["backup"] } r2d2 = { workspace = true, optional = true } r2d2_sqlite = { workspace = true, optional = true } +fjall = { workspace = true, optional = true } [dev-dependencies] mktemp.workspace = true @@ -27,4 +28,5 @@ mktemp.workspace = true default = [ "lmdb", "sqlite" ] bundled-libs = [ "rusqlite?/bundled" ] lmdb = [ "heed" ] +fjall = [ "dep:fjall" ] sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ] diff --git a/src/db/fjall_adapter.rs b/src/db/fjall_adapter.rs new file mode 100644 index 00000000..57b540c1 --- /dev/null +++ b/src/db/fjall_adapter.rs @@ -0,0 +1,366 @@ +use core::ops::Bound; + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; + +use fjall::{ + PartitionCreateOptions, PersistMode, TransactionalKeyspace, TransactionalPartitionHandle, + WriteTransaction, +}; + +use crate::{ + Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, + TxResult, TxValueIter, Value, ValueIter, +}; + +pub use fjall; + +// -- err + +impl From for Error { + fn from(e: fjall::Error) -> Error { + Error(format!("fjall: {}", e).into()) + } +} + +impl From for Error { + fn from(e: fjall::LsmError) -> Error { + Error(format!("fjall lsm_tree: {}", e).into()) + } +} + +impl From for TxOpError { + fn from(e: fjall::Error) -> TxOpError { + TxOpError(e.into()) + } +} + +// -- db + +pub struct FjallDb { + path: PathBuf, + keyspace: TransactionalKeyspace, + trees: RwLock<(Vec, HashMap)>, +} + +type ByteRefRangeBound<'r> = (Bound<&'r [u8]>, Bound<&'r [u8]>); + +impl FjallDb { + pub fn init(path: &PathBuf, keyspace: TransactionalKeyspace) -> Db { + let s = Self { + path: path.clone(), + keyspace, + trees: RwLock::new((Vec::new(), HashMap::new())), + }; + Db(Arc::new(s)) + } + + fn get_tree(&self, i: usize) -> Result { + self.trees + .read() + .unwrap() + .0 + .get(i) + .cloned() + .ok_or_else(|| Error("invalid tree id".into())) + } + + fn canonicalize(name: &str) -> String { + name.chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c + } else { + '_' + } + }) + .collect::() + } +} + +impl IDb for FjallDb { + fn engine(&self) -> String { + "LSM trees (using Fjall crate)".into() + } + + fn open_tree(&self, name: &str) -> Result { + let mut trees = self.trees.write().unwrap(); + let canonical_name = FjallDb::canonicalize(name); + if let Some(i) = trees.1.get(&canonical_name) { + Ok(*i) + } else { + let tree = self + .keyspace + .open_partition(&canonical_name, PartitionCreateOptions::default())?; + let i = trees.0.len(); + trees.0.push(tree); + trees.1.insert(canonical_name, i); + Ok(i) + } + } + + fn list_trees(&self) -> Result> { + Ok(self + .keyspace + .list_partitions() + .iter() + .map(|n| n.to_string()) + .collect()) + } + + fn snapshot(&self, to: &PathBuf) -> Result<()> { + std::fs::create_dir_all(to)?; + let mut path = to.clone(); + path.push("data.fjall"); + + let source_keyspace = fjall::Config::new(&self.path).open()?; + let copy_keyspace = fjall::Config::new(path).open()?; + + for partition_name in source_keyspace.list_partitions() { + let source_partition = source_keyspace + .open_partition(&partition_name, PartitionCreateOptions::default())?; + let snapshot = source_partition.snapshot(); + let copy_partition = + copy_keyspace.open_partition(&partition_name, PartitionCreateOptions::default())?; + + for entry in snapshot.iter() { + let (key, value) = entry?; + copy_partition.insert(key, value)?; + } + } + + copy_keyspace.persist(PersistMode::SyncAll)?; + Ok(()) + } + + // ---- + + fn get(&self, tree_idx: usize, key: &[u8]) -> Result> { + let tree = self.get_tree(tree_idx)?; + let tx = self.keyspace.read_tx(); + let val = tx.get(&tree, key)?; + match val { + None => Ok(None), + Some(v) => Ok(Some(v.to_vec())), + } + } + + fn len(&self, tree_idx: usize) -> Result { + let tree = self.get_tree(tree_idx)?; + let tx = self.keyspace.read_tx(); + Ok(tx.len(&tree)?) + } + + fn insert(&self, tree_idx: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.get_tree(tree_idx)?; + let mut tx = self.keyspace.write_tx(); + tx.insert(&tree, key, value); + tx.commit()?; + Ok(()) + } + + fn remove(&self, tree_idx: usize, key: &[u8]) -> Result<()> { + let tree = self.get_tree(tree_idx)?; + let mut tx = self.keyspace.write_tx(); + tx.remove(&tree, key); + tx.commit()?; + Ok(()) + } + + fn clear(&self, tree_idx: usize) -> Result<()> { + let tree = self.get_tree(tree_idx)?; + let tree_name = tree.inner().name.clone(); + self.keyspace.delete_partition(tree)?; + let tree = self + .keyspace + .open_partition(&tree_name, PartitionCreateOptions::default())?; + let mut trees = self.trees.write().unwrap(); + trees.0[tree_idx] = tree; + Ok(()) + } + + fn iter(&self, tree_idx: usize) -> Result> { + let tree = self.get_tree(tree_idx)?; + let tx = self.keyspace.read_tx(); + Ok(Box::new(tx.iter(&tree).map(iterator_remap))) + } + + fn iter_rev(&self, tree_idx: usize) -> Result> { + let tree = self.get_tree(tree_idx)?; + let tx = self.keyspace.read_tx(); + Ok(Box::new(tx.iter(&tree).rev().map(iterator_remap))) + } + + fn range<'r>( + &self, + tree_idx: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + let tree = self.get_tree(tree_idx)?; + let tx = self.keyspace.read_tx(); + Ok(Box::new( + tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)) + .map(iterator_remap), + )) + } + fn range_rev<'r>( + &self, + tree_idx: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + let tree = self.get_tree(tree_idx)?; + let tx = self.keyspace.read_tx(); + Ok(Box::new( + tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)) + .rev() + .map(iterator_remap), + )) + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult { + let trees = self.trees.read().unwrap(); + let mut tx = FjallTx { + trees: &trees.0[..], + tx: self.keyspace.write_tx(), + }; + + let res = f.try_on(&mut tx); + match res { + TxFnResult::Ok(on_commit) => { + tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; + Ok(on_commit) + } + TxFnResult::Abort => { + tx.tx.rollback(); + Err(TxError::Abort(())) + } + TxFnResult::DbErr => { + tx.tx.rollback(); + Err(TxError::Db(Error( + "(this message will be discarded)".into(), + ))) + } + } + } +} + +// ---- + +struct FjallTx<'a> { + trees: &'a [TransactionalPartitionHandle], + tx: WriteTransaction<'a>, +} + +impl<'a> FjallTx<'a> { + fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalPartitionHandle> { + self.trees.get(i).ok_or_else(|| { + TxOpError(Error( + "invalid tree id (it might have been openned after the transaction started)".into(), + )) + }) + } +} + +impl<'a> ITx for FjallTx<'a> { + fn get(&self, tree_idx: usize, key: &[u8]) -> TxOpResult> { + let tree = self.get_tree(tree_idx)?; + match self.tx.get(tree, key)? { + Some(v) => Ok(Some(v.to_vec())), + None => Ok(None), + } + } + fn len(&self, tree_idx: usize) -> TxOpResult { + let tree = self.get_tree(tree_idx)?; + Ok(self.tx.len(tree)? as usize) + } + + fn insert(&mut self, tree_idx: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> { + let tree = self.get_tree(tree_idx)?.clone(); + self.tx.insert(&tree, key, value); + Ok(()) + } + fn remove(&mut self, tree_idx: usize, key: &[u8]) -> TxOpResult<()> { + let tree = self.get_tree(tree_idx)?.clone(); + self.tx.remove(&tree, key); + Ok(()) + } + fn clear(&mut self, _tree_idx: usize) -> TxOpResult<()> { + unimplemented!("LSM tree clearing in cross-partition transaction is not supported") + } + + fn iter(&self, tree_idx: usize) -> TxOpResult> { + let tree = self.get_tree(tree_idx)?.clone(); + Ok(Box::new(self.tx.iter(&tree).map(iterator_remap_tx))) + } + fn iter_rev(&self, tree_idx: usize) -> TxOpResult> { + let tree = self.get_tree(tree_idx)?.clone(); + Ok(Box::new(self.tx.iter(&tree).rev().map(iterator_remap_tx))) + } + + fn range<'r>( + &self, + tree_idx: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult> { + let tree = self.get_tree(tree_idx)?; + let low = clone_bound(low); + let high = clone_bound(high); + Ok(Box::new( + self.tx + .range::, ByteVecRangeBounds>(&tree, (low, high)) + .map(iterator_remap_tx), + )) + } + fn range_rev<'r>( + &self, + tree_idx: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult> { + let tree = self.get_tree(tree_idx)?; + let low = clone_bound(low); + let high = clone_bound(high); + Ok(Box::new( + self.tx + .range::, ByteVecRangeBounds>(&tree, (low, high)) + .rev() + .map(iterator_remap_tx), + )) + } +} + +// -- maps fjall's (k, v) to ours + +fn iterator_remap(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> Result<(Value, Value)> { + r.map(|(k, v)| (k.to_vec(), v.to_vec())) + .map_err(|e| e.into()) +} + +fn iterator_remap_tx(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> TxOpResult<(Value, Value)> { + r.map(|(k, v)| (k.to_vec(), v.to_vec())) + .map_err(|e| e.into()) +} + +// -- utils to deal with Garage's tightness on Bound lifetimes + +type ByteVecBound = Bound>; +type ByteVecRangeBounds = (ByteVecBound, ByteVecBound); + +fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound { + let value = match bound { + Bound::Excluded(v) | Bound::Included(v) => v.to_vec(), + Bound::Unbounded => vec![], + }; + + match bound { + Bound::Included(_) => Bound::Included(value), + Bound::Excluded(_) => Bound::Excluded(value), + Bound::Unbounded => Bound::Unbounded, + } +} diff --git a/src/db/lib.rs b/src/db/lib.rs index c55c8643..3454c759 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -1,6 +1,8 @@ #[macro_use] extern crate tracing; +#[cfg(feature = "fjall")] +pub mod fjall_adapter; #[cfg(feature = "lmdb")] pub mod lmdb_adapter; #[cfg(feature = "sqlite")] diff --git a/src/db/open.rs b/src/db/open.rs index ff3bc830..83ae1f93 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -1,4 +1,6 @@ +use std::convert::TryInto; use std::path::PathBuf; +use std::sync::Arc; use crate::{Db, Error, Result}; @@ -11,6 +13,7 @@ use crate::{Db, Error, Result}; pub enum Engine { Lmdb, Sqlite, + Fjall, } impl Engine { @@ -19,6 +22,7 @@ impl Engine { match self { Self::Lmdb => "lmdb", Self::Sqlite => "sqlite", + Self::Fjall => "fjall", } } } @@ -36,6 +40,7 @@ impl std::str::FromStr for Engine { match text { "lmdb" | "heed" => Ok(Self::Lmdb), "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite), + "fjall" => Ok(Self::Fjall), "sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())), kind => Err(Error( format!( @@ -51,6 +56,7 @@ impl std::str::FromStr for Engine { pub struct OpenOpt { pub fsync: bool, pub lmdb_map_size: Option, + pub fjall_block_cache_size: Option, } impl Default for OpenOpt { @@ -58,6 +64,7 @@ impl Default for OpenOpt { Self { fsync: false, lmdb_map_size: None, + fjall_block_cache_size: None, } } } @@ -114,6 +121,22 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { } } + // ---- Fjall DB ---- + #[cfg(feature = "fjall")] + Engine::Fjall => { + info!("Opening Fjall database at: {}", path.display()); + let fsync_ms = opt.fsync.then(|| 1000 as u16); + let mut config = fjall::Config::new(path).fsync_ms(fsync_ms); + if let Some(block_cache_size) = opt.fjall_block_cache_size { + let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes( + block_cache_size.try_into().unwrap(), + )); + config = config.block_cache(block_cache); + } + let keyspace = config.open_transactional()?; + Ok(crate::fjall_adapter::FjallDb::init(path, keyspace)) + } + // Pattern is unreachable when all supported DB engines are compiled into binary. The allow // attribute is added so that we won't have to change this match in case stop building // support for one or more engines by default. diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index ae3b5609..7d60313e 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -91,6 +91,7 @@ k2v = [ "garage_util/k2v", "garage_api_k2v" ] # Database engines lmdb = [ "garage_model/lmdb" ] sqlite = [ "garage_model/sqlite" ] +fjall = [ "garage_model/fjall" ] # Automatic registration and discovery via Consul API consul-discovery = [ "garage_rpc/consul-discovery" ] diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 376eaa9a..14f92253 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -44,3 +44,4 @@ default = [ "lmdb", "sqlite" ] k2v = [ "garage_util/k2v" ] lmdb = [ "garage_db/lmdb" ] sqlite = [ "garage_db/sqlite" ] +fjall = [ "garage_db/fjall" ] \ No newline at end of file diff --git a/src/model/garage.rs b/src/model/garage.rs index 11c0d90f..7420e740 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -124,6 +124,9 @@ impl Garage { db::Engine::Lmdb => { db_path.push("db.lmdb"); } + db::Engine::Fjall => { + db_path.push("db.fjall"); + } } let db_opt = db::OpenOpt { fsync: config.metadata_fsync, @@ -131,6 +134,10 @@ impl Garage { v if v == usize::default() => None, v => Some(v), }, + fjall_block_cache_size: match config.fjall_block_cache_size { + v if v == usize::default() => None, + v => Some(v), + }, }; let db = db::open_db(&db_path, db_engine, &db_opt) .ok_or_message("Unable to open metadata db")?; diff --git a/src/util/config.rs b/src/util/config.rs index c74029e7..19c3e821 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -122,6 +122,10 @@ pub struct Config { #[serde(deserialize_with = "deserialize_capacity", default)] pub lmdb_map_size: usize, + /// Fjall block cache size + #[serde(deserialize_with = "deserialize_capacity", default)] + pub fjall_block_cache_size: usize, + // -- APIs /// Configuration for S3 api pub s3_api: S3ApiConfig, From a6c6c44310973aba4625abba3819eaf1099362b5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 4 Jan 2025 17:56:09 +0100 Subject: [PATCH 09/18] nix: build and test fjall feature --- .woodpecker/debug.yaml | 5 +++++ flake.nix | 3 +++ nix/compile.nix | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.woodpecker/debug.yaml b/.woodpecker/debug.yaml index 65dab9ab..62266aa4 100644 --- a/.woodpecker/debug.yaml +++ b/.woodpecker/debug.yaml @@ -28,6 +28,11 @@ steps: commands: - nix-build -j4 --attr flakePackages.tests-sqlite + - name: unit + func tests (fjall) + image: nixpkgs/nix:nixos-22.05 + commands: + - nix-build -j4 --attr flakePackages.tests-fjall + - name: integration tests image: nixpkgs/nix:nixos-22.05 commands: diff --git a/flake.nix b/flake.nix index fc599e0b..2fb8c48e 100644 --- a/flake.nix +++ b/flake.nix @@ -53,6 +53,9 @@ tests-sqlite = testWith { GARAGE_TEST_INTEGRATION_DB_ENGINE = "sqlite"; }; + tests-fjall = testWith { + GARAGE_TEST_INTEGRATION_DB_ENGINE = "fjall"; + }; }; # ---- developpment shell, for making native builds only ---- diff --git a/nix/compile.nix b/nix/compile.nix index bbadaa37..7e9f79ab 100644 --- a/nix/compile.nix +++ b/nix/compile.nix @@ -68,7 +68,7 @@ let rootFeatures = if features != null then features else - ([ "bundled-libs" "lmdb" "sqlite" "k2v" ] ++ (lib.optionals release [ + ([ "bundled-libs" "lmdb" "sqlite" "fjall" "k2v" ] ++ (lib.optionals release [ "consul-discovery" "kubernetes-discovery" "metrics" From aa69c06f2b1b76630ae5b0f9d14c4223dbee6641 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Aug 2025 19:41:06 +0200 Subject: [PATCH 10/18] fix potential race condition and naming bug in fjall adapter --- Cargo.lock | 1 + Cargo.toml | 1 + src/db/Cargo.toml | 5 +- src/db/fjall_adapter.rs | 170 ++++++++++++++++++++++++++++------------ src/db/open.rs | 8 +- src/db/test.rs | 14 +++- 6 files changed, 139 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd44160c..997d6a92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,6 +1495,7 @@ dependencies = [ "fjall", "heed", "mktemp", + "parking_lot 0.12.3", "r2d2", "r2d2_sqlite", "rusqlite", diff --git a/Cargo.toml b/Cargo.toml index 9876db60..fdec5010 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ md-5 = "0.10" mktemp = "0.5" nix = { version = "0.29", default-features = false, features = ["fs"] } nom = "7.1" +parking_lot = "0.12" parse_duration = "2.1" pin-project = "1.0.12" pnet_datalink = "0.34" diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 06b2fabc..e9ed15c9 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -16,10 +16,13 @@ err-derive.workspace = true tracing.workspace = true heed = { workspace = true, optional = true } + rusqlite = { workspace = true, optional = true, features = ["backup"] } r2d2 = { workspace = true, optional = true } r2d2_sqlite = { workspace = true, optional = true } + fjall = { workspace = true, optional = true } +parking_lot = { workspace = true, optional = true } [dev-dependencies] mktemp.workspace = true @@ -28,5 +31,5 @@ mktemp.workspace = true default = [ "lmdb", "sqlite" ] bundled-libs = [ "rusqlite?/bundled" ] lmdb = [ "heed" ] -fjall = [ "dep:fjall" ] +fjall = [ "dep:fjall", "dep:parking_lot" ] sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ] diff --git a/src/db/fjall_adapter.rs b/src/db/fjall_adapter.rs index 57b540c1..d91ef12f 100644 --- a/src/db/fjall_adapter.rs +++ b/src/db/fjall_adapter.rs @@ -1,8 +1,9 @@ use core::ops::Bound; -use std::collections::HashMap; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use fjall::{ PartitionCreateOptions, PersistMode, TransactionalKeyspace, TransactionalPartitionHandle, @@ -39,63 +40,48 @@ impl From for TxOpError { // -- db pub struct FjallDb { - path: PathBuf, keyspace: TransactionalKeyspace, - trees: RwLock<(Vec, HashMap)>, + trees: RwLock>, } type ByteRefRangeBound<'r> = (Bound<&'r [u8]>, Bound<&'r [u8]>); impl FjallDb { - pub fn init(path: &PathBuf, keyspace: TransactionalKeyspace) -> Db { + pub fn init(keyspace: TransactionalKeyspace) -> Db { let s = Self { - path: path.clone(), keyspace, - trees: RwLock::new((Vec::new(), HashMap::new())), + trees: RwLock::new(Vec::new()), }; Db(Arc::new(s)) } - fn get_tree(&self, i: usize) -> Result { - self.trees - .read() - .unwrap() - .0 - .get(i) - .cloned() - .ok_or_else(|| Error("invalid tree id".into())) - } - - fn canonicalize(name: &str) -> String { - name.chars() - .map(|c| { - if c.is_alphanumeric() || c == '-' || c == '_' { - c - } else { - '_' - } - }) - .collect::() + fn get_tree( + &self, + i: usize, + ) -> Result> { + RwLockReadGuard::try_map(self.trees.read(), |trees: &Vec<_>| { + trees.get(i).map(|tup| &tup.1) + }) + .map_err(|_| Error("invalid tree id".into())) } } impl IDb for FjallDb { fn engine(&self) -> String { - "LSM trees (using Fjall crate)".into() + "Fjall (EXPERIMENTAL!)".into() } fn open_tree(&self, name: &str) -> Result { - let mut trees = self.trees.write().unwrap(); - let canonical_name = FjallDb::canonicalize(name); - if let Some(i) = trees.1.get(&canonical_name) { - Ok(*i) + let mut trees = self.trees.write(); + let safe_name = encode_name(name)?; + if let Some(i) = trees.iter().position(|(name, _)| *name == safe_name) { + Ok(i) } else { let tree = self .keyspace - .open_partition(&canonical_name, PartitionCreateOptions::default())?; - let i = trees.0.len(); - trees.0.push(tree); - trees.1.insert(canonical_name, i); + .open_partition(&safe_name, PartitionCreateOptions::default())?; + let i = trees.len(); + trees.push((safe_name, tree)); Ok(i) } } @@ -105,8 +91,8 @@ impl IDb for FjallDb { .keyspace .list_partitions() .iter() - .map(|n| n.to_string()) - .collect()) + .map(|n| decode_name(&n)) + .collect::>>()?) } fn snapshot(&self, to: &PathBuf) -> Result<()> { @@ -114,17 +100,17 @@ impl IDb for FjallDb { let mut path = to.clone(); path.push("data.fjall"); - let source_keyspace = fjall::Config::new(&self.path).open()?; + let source_state = self.keyspace.read_tx(); let copy_keyspace = fjall::Config::new(path).open()?; - for partition_name in source_keyspace.list_partitions() { - let source_partition = source_keyspace + for partition_name in self.keyspace.list_partitions() { + let source_partition = self + .keyspace .open_partition(&partition_name, PartitionCreateOptions::default())?; - let snapshot = source_partition.snapshot(); let copy_partition = copy_keyspace.open_partition(&partition_name, PartitionCreateOptions::default())?; - for entry in snapshot.iter() { + for entry in source_state.iter(&source_partition) { let (key, value) = entry?; copy_partition.insert(key, value)?; } @@ -169,14 +155,19 @@ impl IDb for FjallDb { } fn clear(&self, tree_idx: usize) -> Result<()> { - let tree = self.get_tree(tree_idx)?; - let tree_name = tree.inner().name.clone(); + let mut trees = self.trees.write(); + + if tree_idx >= trees.len() { + return Err(Error("invalid tree id".into())); + } + let (name, tree) = trees.remove(tree_idx); + self.keyspace.delete_partition(tree)?; let tree = self .keyspace - .open_partition(&tree_name, PartitionCreateOptions::default())?; - let mut trees = self.trees.write().unwrap(); - trees.0[tree_idx] = tree; + .open_partition(&name, PartitionCreateOptions::default())?; + trees.insert(tree_idx, (name, tree)); + Ok(()) } @@ -223,9 +214,9 @@ impl IDb for FjallDb { // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult { - let trees = self.trees.read().unwrap(); + let trees = self.trees.read(); let mut tx = FjallTx { - trees: &trees.0[..], + trees: &trees[..], tx: self.keyspace.write_tx(), }; @@ -252,13 +243,13 @@ impl IDb for FjallDb { // ---- struct FjallTx<'a> { - trees: &'a [TransactionalPartitionHandle], + trees: &'a [(String, TransactionalPartitionHandle)], tx: WriteTransaction<'a>, } impl<'a> FjallTx<'a> { fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalPartitionHandle> { - self.trees.get(i).ok_or_else(|| { + self.trees.get(i).map(|tup| &tup.1).ok_or_else(|| { TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), )) @@ -364,3 +355,78 @@ fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound { Bound::Unbounded => Bound::Unbounded, } } + +// -- utils to encode table names -- + +fn encode_name(s: &str) -> Result { + let base = 'A' as u32; + + let mut ret = String::with_capacity(s.len() + 10); + for c in s.chars() { + if c.is_alphanumeric() || c == '_' || c == '-' || c == '#' { + ret.push(c); + } else if c <= u8::MAX as char { + ret.push('$'); + let c_hi = c as u32 / 16; + let c_lo = c as u32 % 16; + ret.push(char::from_u32(base + c_hi).unwrap()); + ret.push(char::from_u32(base + c_lo).unwrap()); + } else { + return Err(Error( + format!("table name {} could not be safely encoded", s).into(), + )); + } + } + Ok(ret) +} + +fn decode_name(s: &str) -> Result { + use std::convert::TryFrom; + + let errfn = || Error(format!("encoded table name {} is invalid", s).into()); + let c_map = |c: char| { + let c = c as u32; + let base = 'A' as u32; + if (base..base + 16).contains(&c) { + Some(c - base) + } else { + None + } + }; + + let mut ret = String::with_capacity(s.len()); + let mut it = s.chars(); + while let Some(c) = it.next() { + if c == '$' { + let c_hi = it.next().and_then(c_map).ok_or_else(errfn)?; + let c_lo = it.next().and_then(c_map).ok_or_else(errfn)?; + let c_dec = char::try_from(c_hi * 16 + c_lo).map_err(|_| errfn())?; + ret.push(c_dec); + } else { + ret.push(c); + } + } + Ok(ret) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encdec_name() { + for name in [ + "testname", + "test_name", + "test name", + "test$name", + "test:name@help.me$get/this**right", + ] { + let encname = encode_name(name).unwrap(); + assert!(!encname.contains(' ')); + assert!(!encname.contains('.')); + assert!(!encname.contains('*')); + assert_eq!(*name, decode_name(&encname).unwrap()); + } + } +} diff --git a/src/db/open.rs b/src/db/open.rs index 83ae1f93..fbd8d74a 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -1,6 +1,5 @@ use std::convert::TryInto; use std::path::PathBuf; -use std::sync::Arc; use crate::{Db, Error, Result}; @@ -128,13 +127,10 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { let fsync_ms = opt.fsync.then(|| 1000 as u16); let mut config = fjall::Config::new(path).fsync_ms(fsync_ms); if let Some(block_cache_size) = opt.fjall_block_cache_size { - let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes( - block_cache_size.try_into().unwrap(), - )); - config = config.block_cache(block_cache); + config = config.cache_size(block_cache_size.try_into().unwrap()); } let keyspace = config.open_transactional()?; - Ok(crate::fjall_adapter::FjallDb::init(path, keyspace)) + Ok(crate::fjall_adapter::FjallDb::init(keyspace)) } // Pattern is unreachable when all supported DB engines are compiled into binary. The allow diff --git a/src/db/test.rs b/src/db/test.rs index 26b816b8..08ce1dda 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -1,7 +1,7 @@ use crate::*; fn test_suite(db: Db) { - let tree = db.open_tree("tree").unwrap(); + let tree = db.open_tree("tree:this_is_a_tree").unwrap(); let ka: &[u8] = &b"test"[..]; let kb: &[u8] = &b"zwello"[..]; @@ -148,3 +148,15 @@ fn test_sqlite_db() { let db = SqliteDb::new(manager, false).unwrap(); test_suite(db); } + +#[test] +#[cfg(feature = "fjall")] +fn test_fjall_db() { + use crate::fjall_adapter::{fjall, FjallDb}; + + let path = mktemp::Temp::new_dir().unwrap(); + let config = fjall::Config::new(path); + let keyspace = config.open_transactional().unwrap(); + let db = FjallDb::init(keyspace); + test_suite(db); +} From 6ea86db8cd7687a766679526f10cf1cb42ae00b2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Aug 2025 19:51:38 +0200 Subject: [PATCH 11/18] document fjall db engine, remove flakey metadata_fsync implementation --- doc/book/reference-manual/configuration.md | 10 ++++++++++ src/db/open.rs | 8 ++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 84aaf511..e134a83f 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -333,6 +333,7 @@ Since `v0.8.0`, Garage can use alternative storage backends as follows: | --------- | ----------------- | ------------- | | [LMDB](https://www.symas.com/lmdb) (since `v0.8.0`, default since `v0.9.0`) | `"lmdb"` | `/db.lmdb/` | | [Sqlite](https://sqlite.org) (since `v0.8.0`) | `"sqlite"` | `/db.sqlite` | +| [Fjall](https://github.com/fjall-rs/fjall) (**experimental support** since `v1.3.0`) | `"fjall"` | `/db.fjall/` | | [Sled](https://sled.rs) (old default, removed since `v1.0`) | `"sled"` | `/db/` | Sled was supported until Garage v0.9.x, and was removed in Garage v1.0. @@ -369,6 +370,14 @@ LMDB works very well, but is known to have the following limitations: so it is not the best choice for high-performance storage clusters, but it should work fine in many cases. +- Fjall: a storage engine based on LSM trees, which theoretically allow for + higher write throughput than other storage engines that are based on B-trees. + Using Fjall could potentially improve Garage's performance significantly in + write-heavy workloads. **Support for Fjall is experimental at this point**, + we have added it to Garage for evaluation purposes only. **Do not use it for + production-critical workloads.** + + It is possible to convert Garage's metadata directory from one format to another using the `garage convert-db` command, which should be used as follows: @@ -406,6 +415,7 @@ Here is how this option impacts the different database engines: |----------|------------------------------------|-------------------------------| | Sqlite | `PRAGMA synchronous = OFF` | `PRAGMA synchronous = NORMAL` | | LMDB | `MDB_NOMETASYNC` + `MDB_NOSYNC` | `MDB_NOMETASYNC` | +| Fjall | default options | not supported | Note that the Sqlite database is always ran in `WAL` mode (`PRAGMA journal_mode = WAL`). diff --git a/src/db/open.rs b/src/db/open.rs index fbd8d74a..d5469b58 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -124,8 +124,12 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { #[cfg(feature = "fjall")] Engine::Fjall => { info!("Opening Fjall database at: {}", path.display()); - let fsync_ms = opt.fsync.then(|| 1000 as u16); - let mut config = fjall::Config::new(path).fsync_ms(fsync_ms); + if opt.fsync { + return Err(Error( + "metadata_fsync is not supported with the Fjall database engine".into(), + )); + } + let mut config = fjall::Config::new(path); if let Some(block_cache_size) = opt.fjall_block_cache_size { config = config.cache_size(block_cache_size.try_into().unwrap()); } From 90bba5889aeeadfcd895ce4a245c3010bdcad01c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Aug 2025 21:17:32 +0200 Subject: [PATCH 12/18] garage_db: rename len to approximate_len as it is used for stats only --- src/block/manager.rs | 6 +++--- src/block/metrics.rs | 6 +++--- src/block/resync.rs | 14 ++++++++------ src/db/fjall_adapter.rs | 8 ++++++-- src/db/lib.rs | 13 +++++++++---- src/db/lmdb_adapter.rs | 7 ++++++- src/db/sqlite_adapter.rs | 6 +++++- src/db/test.rs | 2 +- src/garage/admin/mod.rs | 19 ++++++++++++------- src/model/s3/lifecycle_worker.rs | 6 +++--- src/table/data.rs | 4 ++-- src/table/gc.rs | 2 +- src/table/merkle.rs | 10 +++++----- src/table/metrics.rs | 8 ++++---- src/table/queue.rs | 2 +- 15 files changed, 69 insertions(+), 44 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 41b2f02a..d1bf90d8 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -408,8 +408,8 @@ impl BlockManager { } /// Get number of items in the refcount table - pub fn rc_len(&self) -> Result { - Ok(self.rc.rc_table.len()?) + pub fn rc_approximate_len(&self) -> Result { + Ok(self.rc.rc_table.approximate_len()?) } /// Send command to start/stop/manager scrub worker @@ -427,7 +427,7 @@ impl BlockManager { /// List all resync errors pub fn list_resync_errors(&self) -> Result, Error> { - let mut blocks = Vec::with_capacity(self.resync.errors.len()?); + let mut blocks = Vec::with_capacity(self.resync.errors.approximate_len()?); for ent in self.resync.errors.iter()? { let (hash, cnt) = ent?; let cnt = ErrorCounter::decode(&cnt); diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 2d41e365..c2ebb76b 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -50,7 +50,7 @@ impl BlockManagerMetrics { .init(), _rc_size: meter .u64_value_observer("block.rc_size", move |observer| { - if let Ok(value) = rc_tree.len() { + if let Ok(value) = rc_tree.approximate_len() { observer.observe(value as u64, &[]) } }) @@ -58,7 +58,7 @@ impl BlockManagerMetrics { .init(), _resync_queue_len: meter .u64_value_observer("block.resync_queue_length", move |observer| { - if let Ok(value) = resync_queue.len() { + if let Ok(value) = resync_queue.approximate_len() { observer.observe(value as u64, &[]); } }) @@ -68,7 +68,7 @@ impl BlockManagerMetrics { .init(), _resync_errored_blocks: meter .u64_value_observer("block.resync_errored_blocks", move |observer| { - if let Ok(value) = resync_errors.len() { + if let Ok(value) = resync_errors.approximate_len() { observer.observe(value as u64, &[]); } }) diff --git a/src/block/resync.rs b/src/block/resync.rs index b476a0b8..004f6b48 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -106,13 +106,13 @@ impl BlockResyncManager { } /// Get length of resync queue - pub fn queue_len(&self) -> Result { - Ok(self.queue.len()?) + pub fn queue_approximate_len(&self) -> Result { + Ok(self.queue.approximate_len()?) } /// Get number of blocks that have an error - pub fn errors_len(&self) -> Result { - Ok(self.errors.len()?) + pub fn errors_approximate_len(&self) -> Result { + Ok(self.errors.approximate_len()?) } /// Clear the error counter for a block and put it in queue immediately @@ -548,9 +548,11 @@ impl Worker for ResyncWorker { } WorkerStatus { - queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), + queue_length: Some(self.manager.resync.queue_approximate_len().unwrap_or(0) as u64), tranquility: Some(tranquility), - persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), + persistent_errors: Some( + self.manager.resync.errors_approximate_len().unwrap_or(0) as u64 + ), ..Default::default() } } diff --git a/src/db/fjall_adapter.rs b/src/db/fjall_adapter.rs index d91ef12f..d6a41e9e 100644 --- a/src/db/fjall_adapter.rs +++ b/src/db/fjall_adapter.rs @@ -132,10 +132,14 @@ impl IDb for FjallDb { } } - fn len(&self, tree_idx: usize) -> Result { + fn approximate_len(&self, tree_idx: usize) -> Result { + let tree = self.get_tree(tree_idx)?; + Ok(tree.approximate_len()) + } + fn is_empty(&self, tree_idx: usize) -> Result { let tree = self.get_tree(tree_idx)?; let tx = self.keyspace.read_tx(); - Ok(tx.len(&tree)?) + Ok(tx.is_empty(&tree)?) } fn insert(&self, tree_idx: usize, key: &[u8], value: &[u8]) -> Result<()> { diff --git a/src/db/lib.rs b/src/db/lib.rs index 3454c759..5ac16da8 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -154,7 +154,7 @@ impl Db { let tree_names = other.list_trees()?; for name in tree_names { let tree = self.open_tree(&name)?; - if tree.len()? > 0 { + if !tree.is_empty()? { return Err(Error(format!("tree {} already contains data", name).into())); } @@ -196,8 +196,12 @@ impl Tree { self.0.get(self.1, key.as_ref()) } #[inline] - pub fn len(&self) -> Result { - self.0.len(self.1) + pub fn approximate_len(&self) -> Result { + self.0.approximate_len(self.1) + } + #[inline] + pub fn is_empty(&self) -> Result { + self.0.is_empty(self.1) } #[inline] @@ -335,7 +339,8 @@ pub(crate) trait IDb: Send + Sync { fn snapshot(&self, path: &PathBuf) -> Result<()>; fn get(&self, tree: usize, key: &[u8]) -> Result>; - fn len(&self, tree: usize) -> Result; + fn approximate_len(&self, tree: usize) -> Result; + fn is_empty(&self, tree: usize) -> Result; fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; fn remove(&self, tree: usize, key: &[u8]) -> Result<()>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index bd85f1b4..cbbce2f8 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -126,11 +126,16 @@ impl IDb for LmdbDb { } } - fn len(&self, tree: usize) -> Result { + fn approximate_len(&self, tree: usize) -> Result { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; Ok(tree.len(&tx)?.try_into().unwrap()) } + fn is_empty(&self, tree: usize) -> Result { + let tree = self.get_tree(tree)?; + let tx = self.db.read_txn()?; + Ok(tree.is_empty(&tx)?) + } fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index ce6412b6..eee8b15d 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -160,7 +160,7 @@ impl IDb for SqliteDb { self.internal_get(&self.db.get()?, &tree, key) } - fn len(&self, tree: usize) -> Result { + fn approximate_len(&self, tree: usize) -> Result { let tree = self.get_tree(tree)?; let db = self.db.get()?; @@ -172,6 +172,10 @@ impl IDb for SqliteDb { } } + fn is_empty(&self, tree: usize) -> Result { + Ok(self.approximate_len(tree)? == 0) + } + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; let db = self.db.get()?; diff --git a/src/db/test.rs b/src/db/test.rs index 08ce1dda..1e649719 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -14,7 +14,7 @@ fn test_suite(db: Db) { assert!(tree.insert(ka, va).is_ok()); assert_eq!(tree.get(ka).unwrap().unwrap(), va); - assert_eq!(tree.len().unwrap(), 1); + assert_eq!(tree.iter().unwrap().count(), 1); // ---- test transaction logic ---- diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 3bbc2b86..6ae8fa88 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -219,7 +219,7 @@ impl AdminRpcHandler { // Gather block manager statistics writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = self.garage.block_manager.rc_len()?.to_string(); + let rc_len = self.garage.block_manager.rc_approximate_len()?.to_string(); writeln!( &mut ret, @@ -230,13 +230,13 @@ impl AdminRpcHandler { writeln!( &mut ret, " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? + self.garage.block_manager.resync.queue_approximate_len()? ) .unwrap(); writeln!( &mut ret, " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? + self.garage.block_manager.resync.errors_approximate_len()? ) .unwrap(); @@ -346,16 +346,21 @@ impl AdminRpcHandler { F: TableSchema + 'static, R: TableReplication + 'static, { - let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); - let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); + let data_len = t + .data + .store + .approximate_len() + .map_err(GarageError::from)? + .to_string(); + let mkl_len = t.merkle_updater.merkle_tree_approximate_len()?.to_string(); Ok(format!( " {}\t{}\t{}\t{}\t{}", F::TABLE_NAME, data_len, mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? + t.merkle_updater.todo_approximate_len()?, + t.data.gc_todo_approximate_len()? )) } diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index bb10ba48..af00437e 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -121,13 +121,13 @@ impl Worker for LifecycleWorker { mpu_aborted, .. } => { - let n_objects = self.garage.object_table.data.store.len().ok(); + let n_objects = self.garage.object_table.data.store.approximate_len().ok(); let progress = match n_objects { - None => "...".to_string(), - Some(total) => format!( + Some(total) if total > 0 => format!( "~{:.2}%", 100. * std::cmp::min(*counter, total) as f32 / total as f32 ), + _ => "...".to_string(), }; WorkerStatus { progress: Some(progress), diff --git a/src/table/data.rs b/src/table/data.rs index 09f4e008..1d0308ce 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -367,7 +367,7 @@ impl TableData { } } - pub fn gc_todo_len(&self) -> Result { - Ok(self.gc_todo.len()?) + pub fn gc_todo_approximate_len(&self) -> Result { + Ok(self.gc_todo.approximate_len()?) } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 28ea119d..1f30bd76 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -313,7 +313,7 @@ impl Worker for GcWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), + queue_length: Some(self.gc.data.gc_todo_approximate_len().unwrap_or(0) as u64), ..Default::default() } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 596d5805..7ba1f007 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -287,12 +287,12 @@ impl MerkleUpdater { MerkleNode::decode_opt(&ent) } - pub fn merkle_tree_len(&self) -> Result { - Ok(self.data.merkle_tree.len()?) + pub fn merkle_tree_approximate_len(&self) -> Result { + Ok(self.data.merkle_tree.approximate_len()?) } - pub fn todo_len(&self) -> Result { - Ok(self.data.merkle_todo.len()?) + pub fn todo_approximate_len(&self) -> Result { + Ok(self.data.merkle_todo.approximate_len()?) } } @@ -306,7 +306,7 @@ impl Worker for MerkleWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.0.todo_len().unwrap_or(0) as u64), + queue_length: Some(self.0.todo_approximate_len().unwrap_or(0) as u64), ..Default::default() } } diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 7bb0959a..78593202 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -34,7 +34,7 @@ impl TableMetrics { .u64_value_observer( "table.size", move |observer| { - if let Ok(value) = store.len() { + if let Ok(value) = store.approximate_len() { observer.observe( value as u64, &[KeyValue::new("table_name", table_name)], @@ -48,7 +48,7 @@ impl TableMetrics { .u64_value_observer( "table.merkle_tree_size", move |observer| { - if let Ok(value) = merkle_tree.len() { + if let Ok(value) = merkle_tree.approximate_len() { observer.observe( value as u64, &[KeyValue::new("table_name", table_name)], @@ -62,7 +62,7 @@ impl TableMetrics { .u64_value_observer( "table.merkle_updater_todo_queue_length", move |observer| { - if let Ok(v) = merkle_todo.len() { + if let Ok(v) = merkle_todo.approximate_len() { observer.observe( v as u64, &[KeyValue::new("table_name", table_name)], @@ -76,7 +76,7 @@ impl TableMetrics { .u64_value_observer( "table.gc_todo_queue_length", move |observer| { - if let Ok(value) = gc_todo.len() { + if let Ok(value) = gc_todo.approximate_len() { observer.observe( value as u64, &[KeyValue::new("table_name", table_name)], diff --git a/src/table/queue.rs b/src/table/queue.rs index ffe0a4a7..7ef1f16e 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -27,7 +27,7 @@ impl Worker for InsertQueueWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + queue_length: Some(self.0.data.insert_queue.approximate_len().unwrap_or(0) as u64), ..Default::default() } } From 54b9bf02a34612e227920693730cf45d2bb7fa14 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Aug 2025 23:03:09 +0200 Subject: [PATCH 13/18] garage_db: refactor open function --- src/db/fjall_adapter.rs | 25 +++++++++++-- src/db/lmdb_adapter.rs | 50 +++++++++++++++++++++++-- src/db/open.rs | 81 +++++++++++----------------------------- src/db/sqlite_adapter.rs | 21 +++++++++-- src/db/test.rs | 2 +- src/model/garage.rs | 13 +------ 6 files changed, 107 insertions(+), 85 deletions(-) diff --git a/src/db/fjall_adapter.rs b/src/db/fjall_adapter.rs index d6a41e9e..25913a1f 100644 --- a/src/db/fjall_adapter.rs +++ b/src/db/fjall_adapter.rs @@ -11,12 +11,30 @@ use fjall::{ }; use crate::{ + open::{Engine, OpenOpt}, Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, TxValueIter, Value, ValueIter, }; pub use fjall; +// -- + +pub(crate) fn open_db(path: &PathBuf, opt: &OpenOpt) -> Result { + info!("Opening Fjall database at: {}", path.display()); + if opt.fsync { + return Err(Error( + "metadata_fsync is not supported with the Fjall database engine".into(), + )); + } + let mut config = fjall::Config::new(path); + if let Some(block_cache_size) = opt.fjall_block_cache_size { + config = config.cache_size(block_cache_size as u64); + } + let keyspace = config.open_transactional()?; + Ok(FjallDb::init(keyspace)) +} + // -- err impl From for Error { @@ -95,10 +113,9 @@ impl IDb for FjallDb { .collect::>>()?) } - fn snapshot(&self, to: &PathBuf) -> Result<()> { - std::fs::create_dir_all(to)?; - let mut path = to.clone(); - path.push("data.fjall"); + fn snapshot(&self, base_path: &PathBuf) -> Result<()> { + std::fs::create_dir_all(base_path)?; + let path = Engine::Fjall.db_path(base_path); let source_state = self.keyspace.read_tx(); let copy_keyspace = fjall::Config::new(path).open()?; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index cbbce2f8..ac185ae9 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -11,12 +11,55 @@ use heed::types::ByteSlice; use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; use crate::{ + open::{Engine, OpenOpt}, Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, TxValueIter, Value, ValueIter, }; pub use heed; +// ---- top-level open function + +pub(crate) fn open_db(path: &PathBuf, opt: &OpenOpt) -> Result { + info!("Opening LMDB database at: {}", path.display()); + if let Err(e) = std::fs::create_dir_all(&path) { + return Err(Error( + format!("Unable to create LMDB data directory: {}", e).into(), + )); + } + + let map_size = match opt.lmdb_map_size { + None => recommended_map_size(), + Some(v) => v - (v % 4096), + }; + + let mut env_builder = heed::EnvOpenOptions::new(); + env_builder.max_dbs(100); + env_builder.map_size(map_size); + env_builder.max_readers(2048); + unsafe { + env_builder.flag(heed::flags::Flags::MdbNoRdAhead); + env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + if !opt.fsync { + env_builder.flag(heed::flags::Flags::MdbNoSync); + } + } + match env_builder.open(&path) { + Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => { + return Err(Error( + "OutOfMemory error while trying to open LMDB database. This can happen \ + if your operating system is not allowing you to use sufficient virtual \ + memory address space. Please check that no limit is set (ulimit -v). \ + You may also try to set a smaller `lmdb_map_size` configuration parameter. \ + On 32-bit machines, you should probably switch to another database engine." + .into(), + )) + } + Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())), + Ok(db) => Ok(LmdbDb::init(db)), + } +} + // -- err impl From for Error { @@ -104,10 +147,9 @@ impl IDb for LmdbDb { Ok(ret2) } - fn snapshot(&self, to: &PathBuf) -> Result<()> { - std::fs::create_dir_all(to)?; - let mut path = to.clone(); - path.push("data.mdb"); + fn snapshot(&self, base_path: &PathBuf) -> Result<()> { + std::fs::create_dir_all(base_path)?; + let path = Engine::Lmdb.db_path(base_path); self.db .copy_to_path(path, heed::CompactionOption::Enabled)?; Ok(()) diff --git a/src/db/open.rs b/src/db/open.rs index d5469b58..23391c61 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -1,4 +1,3 @@ -use std::convert::TryInto; use std::path::PathBuf; use crate::{Db, Error, Result}; @@ -24,6 +23,23 @@ impl Engine { Self::Fjall => "fjall", } } + + /// Return engine-specific DB path from base path + pub fn db_path(&self, base_path: &PathBuf) -> PathBuf { + let mut ret = base_path.clone(); + match self { + Self::Lmdb => { + ret.push("db.lmdb"); + } + Self::Sqlite => { + ret.push("db.sqlite"); + } + Self::Fjall => { + ret.push("db.fjall"); + } + } + ret + } } impl std::fmt::Display for Engine { @@ -43,7 +59,7 @@ impl std::str::FromStr for Engine { "sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())), kind => Err(Error( format!( - "Invalid DB engine: {} (options are: lmdb, sqlite)", + "Invalid DB engine: {} (options are: lmdb, sqlite, fjall)", kind ) .into(), @@ -72,70 +88,15 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { match engine { // ---- Sqlite DB ---- #[cfg(feature = "sqlite")] - Engine::Sqlite => { - info!("Opening Sqlite database at: {}", path.display()); - let manager = r2d2_sqlite::SqliteConnectionManager::file(path); - Ok(crate::sqlite_adapter::SqliteDb::new(manager, opt.fsync)?) - } + Engine::Sqlite => crate::sqlite_adapter::open_db(path, opt), // ---- LMDB DB ---- #[cfg(feature = "lmdb")] - Engine::Lmdb => { - info!("Opening LMDB database at: {}", path.display()); - if let Err(e) = std::fs::create_dir_all(&path) { - return Err(Error( - format!("Unable to create LMDB data directory: {}", e).into(), - )); - } - - let map_size = match opt.lmdb_map_size { - None => crate::lmdb_adapter::recommended_map_size(), - Some(v) => v - (v % 4096), - }; - - let mut env_builder = heed::EnvOpenOptions::new(); - env_builder.max_dbs(100); - env_builder.map_size(map_size); - env_builder.max_readers(2048); - unsafe { - env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoRdAhead); - env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoMetaSync); - if !opt.fsync { - env_builder.flag(heed::flags::Flags::MdbNoSync); - } - } - match env_builder.open(&path) { - Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => { - return Err(Error( - "OutOfMemory error while trying to open LMDB database. This can happen \ - if your operating system is not allowing you to use sufficient virtual \ - memory address space. Please check that no limit is set (ulimit -v). \ - You may also try to set a smaller `lmdb_map_size` configuration parameter. \ - On 32-bit machines, you should probably switch to another database engine." - .into(), - )) - } - Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())), - Ok(db) => Ok(crate::lmdb_adapter::LmdbDb::init(db)), - } - } + Engine::Lmdb => crate::lmdb_adapter::open_db(path, opt), // ---- Fjall DB ---- #[cfg(feature = "fjall")] - Engine::Fjall => { - info!("Opening Fjall database at: {}", path.display()); - if opt.fsync { - return Err(Error( - "metadata_fsync is not supported with the Fjall database engine".into(), - )); - } - let mut config = fjall::Config::new(path); - if let Some(block_cache_size) = opt.fjall_block_cache_size { - config = config.cache_size(block_cache_size.try_into().unwrap()); - } - let keyspace = config.open_transactional()?; - Ok(crate::fjall_adapter::FjallDb::init(keyspace)) - } + Engine::Fjall => crate::fjall_adapter::open_db(path, opt), // Pattern is unreachable when all supported DB engines are compiled into binary. The allow // attribute is added so that we won't have to change this match in case stop building diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index eee8b15d..5d86f178 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -11,12 +11,23 @@ use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{params, Rows, Statement, Transaction}; use crate::{ + open::{Engine, OpenOpt}, Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, TxValueIter, Value, ValueIter, }; pub use rusqlite; +// ---- top-level open function + +pub(crate) fn open_db(path: &PathBuf, opt: &OpenOpt) -> Result { + info!("Opening Sqlite database at: {}", path.display()); + let manager = r2d2_sqlite::SqliteConnectionManager::file(path); + Ok(SqliteDb::new(manager, opt.fsync)?) +} + +// ---- + type Connection = r2d2::PooledConnection; // --- err @@ -139,17 +150,19 @@ impl IDb for SqliteDb { Ok(trees) } - fn snapshot(&self, to: &PathBuf) -> Result<()> { + fn snapshot(&self, base_path: &PathBuf) -> Result<()> { fn progress(p: rusqlite::backup::Progress) { let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; info!("Sqlite snapshot progress: {}%", percent); } - std::fs::create_dir_all(to)?; - let mut path = to.clone(); - path.push("db.sqlite"); + + std::fs::create_dir_all(base_path)?; + let path = Engine::Sqlite.db_path(&base_path); + self.db .get()? .backup(rusqlite::DatabaseName::Main, path, Some(progress))?; + Ok(()) } diff --git a/src/db/test.rs b/src/db/test.rs index 1e649719..977dc965 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -155,7 +155,7 @@ fn test_fjall_db() { use crate::fjall_adapter::{fjall, FjallDb}; let path = mktemp::Temp::new_dir().unwrap(); - let config = fjall::Config::new(path); + let config = fjall::Config::new(path).temporary(true); let keyspace = config.open_transactional().unwrap(); let db = FjallDb::init(keyspace); test_suite(db); diff --git a/src/model/garage.rs b/src/model/garage.rs index 7420e740..38f8f1f7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -116,18 +116,7 @@ impl Garage { info!("Opening database..."); let db_engine = db::Engine::from_str(&config.db_engine) .ok_or_message("Invalid `db_engine` value in configuration file")?; - let mut db_path = config.metadata_dir.clone(); - match db_engine { - db::Engine::Sqlite => { - db_path.push("db.sqlite"); - } - db::Engine::Lmdb => { - db_path.push("db.lmdb"); - } - db::Engine::Fjall => { - db_path.push("db.fjall"); - } - } + let db_path = db_engine.db_path(&config.metadata_dir); let db_opt = db::OpenOpt { fsync: config.metadata_fsync, lmdb_map_size: match config.lmdb_map_size { From c8c20d6f471a4263c187b778582e6df2bd7a08b3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 28 Aug 2025 00:07:35 +0200 Subject: [PATCH 14/18] garage_db: reduce frequency of sqlite snapshot progress log (fix #1129) --- src/db/sqlite_adapter.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 5d86f178..d645c64e 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -152,8 +152,21 @@ impl IDb for SqliteDb { fn snapshot(&self, base_path: &PathBuf) -> Result<()> { fn progress(p: rusqlite::backup::Progress) { - let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; - info!("Sqlite snapshot progress: {}%", percent); + use std::sync::atomic::{AtomicU64, Ordering}; + use std::time::{SystemTime, UNIX_EPOCH}; + + static LAST_LOG_TIME: AtomicU64 = AtomicU64::new(0); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Fix your clock :o") + .as_millis() as u64; + if now >= LAST_LOG_TIME.load(Ordering::Relaxed) + 10 * 1000 { + let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; + info!("Sqlite snapshot progress: {}%", percent); + + LAST_LOG_TIME.fetch_max(now, Ordering::Relaxed); + } } std::fs::create_dir_all(base_path)?; From c8599a86360e72a40e0d8ada3e7b5802e943fe9e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 Sep 2025 11:06:46 +0200 Subject: [PATCH 15/18] woodpecker: require the nix=enabled label --- .woodpecker/debug.yaml | 3 +++ .woodpecker/publish.yaml | 3 +++ .woodpecker/release.yaml | 3 +++ 3 files changed, 9 insertions(+) diff --git a/.woodpecker/debug.yaml b/.woodpecker/debug.yaml index 62266aa4..4c729672 100644 --- a/.woodpecker/debug.yaml +++ b/.woodpecker/debug.yaml @@ -1,3 +1,6 @@ +labels: + nix: "enabled" + when: event: - push diff --git a/.woodpecker/publish.yaml b/.woodpecker/publish.yaml index 7522d58d..24a84463 100644 --- a/.woodpecker/publish.yaml +++ b/.woodpecker/publish.yaml @@ -1,3 +1,6 @@ +labels: + nix: "enabled" + when: event: - deployment diff --git a/.woodpecker/release.yaml b/.woodpecker/release.yaml index 0678a45b..bf2bd8ba 100644 --- a/.woodpecker/release.yaml +++ b/.woodpecker/release.yaml @@ -1,3 +1,6 @@ +labels: + nix: "enabled" + when: event: - deployment From 5cf354acb44872e782ff51b9da59df2838aa12f6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 13 Sep 2025 17:38:06 +0200 Subject: [PATCH 16/18] block: maximum number of simultaneous reads --- doc/book/reference-manual/configuration.md | 25 ++++++++++++++++++++++ src/block/manager.rs | 16 ++++++++++++++ src/block/metrics.rs | 6 ++++++ src/util/config.rs | 7 ++++++ 4 files changed, 54 insertions(+) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index e134a83f..c6dce089 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -24,6 +24,7 @@ db_engine = "lmdb" block_size = "1M" block_ram_buffer_max = "256MiB" +block_max_concurrent_reads = 16 lmdb_map_size = "1T" @@ -96,6 +97,7 @@ The following gives details about each available configuration option. Top-level configuration options, in alphabetical order: [`allow_punycode`](#allow_punycode), [`allow_world_readable_secrets`](#allow_world_readable_secrets), +[`block_max_concurrent_reads`](`block_max_concurrent_reads), [`block_ram_buffer_max`](#block_ram_buffer_max), [`block_size`](#block_size), [`bootstrap_peers`](#bootstrap_peers), @@ -522,6 +524,29 @@ node. The default value is 256MiB. +#### `block_max_concurrent_reads` (since `v1.3.0` / `v2.1.0`) {#block_max_concurrent_reads} + +The maximum number of blocks (individual files in the data directory) open +simultaneously for reading. + +Reducing this number does not limit the number of data blocks that can be +transferred through the network simultaneously. This mechanism was just added +as a backpressure mechanism for HDD read speed: it helps avoid a situation +where too many requests are coming in and Garage is reading too many block +files simultaneously, thus not making timely progress on any of the reads. + +When a request to read a data block comes in through the network, the requests +awaits for one of the `block_max_concurrent_reads` slots to be available +(internally implemented using a Semaphore object). Once it acquired a read +slot, it reads the entire block file to RAM and frees the slot as soon as the +block file is finished reading. Only after the slot is released will the +block's data start being transferred over the network. If the request fails to +acquire a reading slot wihtin 15 seconds, it fails with a timeout error. +Timeout events can be monitored through the `block_read_semaphore_timeouts` +metric in Prometheus: a non-zero number of such events indicates an I/O +bottleneck on HDD read speed. + + #### `lmdb_map_size` {#lmdb_map_size} This parameters can be used to set the map size used by LMDB, diff --git a/src/block/manager.rs b/src/block/manager.rs index d1bf90d8..5ff9a138 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -50,6 +50,8 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); +const BLOCK_READ_SEMAPHORE_TIMEOUT: Duration = Duration::from_secs(15); + /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -87,6 +89,7 @@ pub struct BlockManager { disable_scrub: bool, mutation_lock: Vec>, + read_semaphore: Semaphore, pub rc: BlockRc, pub resync: BlockResyncManager, @@ -176,6 +179,8 @@ impl BlockManager { .iter() .map(|_| Mutex::new(BlockManagerLocked())) .collect::>(), + + read_semaphore: Semaphore::new(config.block_max_concurrent_reads), rc, resync, system, @@ -581,6 +586,15 @@ impl BlockManager { ) -> Result { let (header, path) = block_path.as_parts_ref(); + let permit = tokio::select! { + sem = self.read_semaphore.acquire() => sem.ok_or_message("acquire read semaphore")?, + _ = tokio::time::sleep(BLOCK_READ_SEMAPHORE_TIMEOUT) => { + self.metrics.block_read_semaphore_timeouts.add(1); + debug!("read block {:?}: read_semaphore acquire timeout", hash); + return Err(Error::Message("read block: read_semaphore acquire timeout".into())); + } + }; + let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; @@ -605,6 +619,8 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } + drop(permit); + Ok(data) } diff --git a/src/block/metrics.rs b/src/block/metrics.rs index c2ebb76b..81021fe1 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -22,6 +22,7 @@ pub struct BlockManagerMetrics { pub(crate) bytes_read: BoundCounter, pub(crate) block_read_duration: BoundValueRecorder, + pub(crate) block_read_semaphore_timeouts: BoundCounter, pub(crate) bytes_written: BoundCounter, pub(crate) block_write_duration: BoundValueRecorder, pub(crate) delete_counter: BoundCounter, @@ -119,6 +120,11 @@ impl BlockManagerMetrics { .with_description("Duration of block read operations") .init() .bind(&[]), + block_read_semaphore_timeouts: meter + .u64_counter("block.read_semaphore_timeouts") + .with_description("Number of block reads that failed due to semaphore acquire timeout") + .init() + .bind(&[]), bytes_written: meter .u64_counter("block.bytes_written") .with_description("Number of bytes written to disk") diff --git a/src/util/config.rs b/src/util/config.rs index 19c3e821..e351185f 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -75,6 +75,10 @@ pub struct Config { )] pub block_ram_buffer_max: usize, + /// Maximum number of concurrent reads of block files on disk + #[serde(default = "default_block_max_concurrent_reads")] + pub block_max_concurrent_reads: usize, + /// Skip the permission check of secret files. Useful when /// POSIX ACLs (or more complex chmods) are used. #[serde(default)] @@ -280,6 +284,9 @@ fn default_block_size() -> usize { fn default_block_ram_buffer_max() -> usize { 256 * 1024 * 1024 } +fn default_block_max_concurrent_reads() -> usize { + 16 +} fn default_consistency_mode() -> String { "consistent".into() From d5a57e3e130841f83fafeb973f4d13777b3f40d3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 13 Sep 2025 17:38:23 +0200 Subject: [PATCH 17/18] block: read_block: don't add not found blocks to resync queue --- src/block/manager.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 5ff9a138..06cf9cbe 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -562,9 +562,6 @@ impl BlockManager { match self.find_block(hash).await { Some(p) => self.read_block_from(hash, &p).await, None => { - // Not found but maybe we should have had it ?? - self.resync - .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?; return Err(Error::Message(format!( "block {:?} not found on node", hash From 6cf6db5c6141e062560396086e7c6c80633f934c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 13 Sep 2025 17:49:25 +0200 Subject: [PATCH 18/18] fix panic when cluster_layout cannot be saved (fix #1150) --- src/rpc/layout/manager.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 21907ec7..bb8000bd 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -229,13 +229,11 @@ impl LayoutManager { } /// Save cluster layout data to disk - async fn save_cluster_layout(&self) -> Result<(), Error> { + async fn save_cluster_layout(&self) { let layout = self.layout.read().unwrap().inner().clone(); - self.persist_cluster_layout - .save_async(&layout) - .await - .expect("Cannot save current cluster layout"); - Ok(()) + if let Err(e) = self.persist_cluster_layout.save_async(&layout).await { + error!("Failed to save cluster_layout: {}", e); + } } fn broadcast_update(self: &Arc, rpc: SystemRpc) { @@ -313,7 +311,7 @@ impl LayoutManager { self.change_notify.notify_waiters(); self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout)); - self.save_cluster_layout().await?; + self.save_cluster_layout().await; } Ok(SystemRpc::Ok) @@ -328,7 +326,7 @@ impl LayoutManager { if let Some(new_trackers) = self.merge_layout_trackers(trackers) { self.change_notify.notify_waiters(); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers)); - self.save_cluster_layout().await?; + self.save_cluster_layout().await; } Ok(SystemRpc::Ok)