Merge pull request 'refactor: make TableShardedReplication a thin wrapper around LayoutManager' (#820) from yuka/garage:refactor-sharded-table into next-v2

Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/820
This commit is contained in:
Alex
2025-04-28 10:43:35 +00:00
8 changed files with 70 additions and 61 deletions

View File

@@ -155,10 +155,8 @@ impl Garage {
let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
let meta_rep_param = TableShardedReplication {
system: system.clone(),
replication_factor: replication_factor.into(),
write_quorum: replication_factor.write_quorum(consistency_mode),
read_quorum: replication_factor.read_quorum(consistency_mode),
layout_manager: system.layout_manager.clone(),
consistency_mode,
};
let control_rep_param = TableFullReplication {

View File

@@ -28,7 +28,6 @@ pub struct SyncLayoutDigest {
}
pub struct LayoutHelper {
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode,
layout: Option<LayoutHistory>,
@@ -51,7 +50,6 @@ pub struct LayoutHelper {
impl LayoutHelper {
pub fn new(
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode,
mut layout: LayoutHistory,
mut ack_lock: HashMap<u64, AtomicUsize>,
@@ -97,8 +95,7 @@ impl LayoutHelper {
// consistency on those).
// This value is calculated using quorums to allow progress even
// if not all nodes have successfully completed a sync.
let sync_map_min =
layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
let sync_map_min = layout.calculate_sync_map_min_with_quorum(&all_nongateway_nodes);
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
@@ -111,7 +108,6 @@ impl LayoutHelper {
let is_check_ok = layout.check().is_ok();
LayoutHelper {
replication_factor,
consistency_mode,
layout: Some(layout),
ack_map_min,
@@ -134,7 +130,6 @@ impl LayoutHelper {
let changed = f(self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
self.replication_factor,
self.consistency_mode,
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),

View File

@@ -123,13 +123,9 @@ impl LayoutHistory {
}
}
pub(crate) fn calculate_sync_map_min_with_quorum(
&self,
replication_factor: ReplicationFactor,
all_nongateway_nodes: &[Uuid],
) -> u64 {
// This function calculates the minimum layout version from which
// it is safe to read if we want to maintain read-after-write consistency.
/// This function calculates the minimum layout version from which
/// it is safe to read if we want to maintain read-after-write consistency.
pub(crate) fn calculate_sync_map_min_with_quorum(&self, all_nongateway_nodes: &[Uuid]) -> u64 {
// In the general case the computation can be a bit expensive so
// we try to optimize it in several ways.
@@ -139,8 +135,6 @@ impl LayoutHistory {
return self.current().version;
}
let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent);
let min_version = self.min_stored();
let global_min = self
.update_trackers
@@ -153,7 +147,16 @@ impl LayoutHistory {
// This is represented by reading from the layout with version
// number global_min, the smallest layout version for which all nodes
// have completed a sync.
if quorum == self.current().replication_factor {
//
// While we currently do not support changing the replication factor
// between layout versions, this calculation is future-proofing for the
// case where this might be possible.
if self
.versions
.iter()
.filter(|v| v.version >= global_min)
.all(|v| v.write_quorum(ConsistencyMode::Consistent) == v.replication_factor)
{
return global_min;
}
@@ -195,7 +198,8 @@ impl LayoutHistory {
.map(|x| self.update_trackers.sync_map.get(x, min_version))
.collect::<Vec<_>>();
sync_values.sort();
let set_min = sync_values[sync_values.len() - quorum];
let set_min =
sync_values[sync_values.len() - v.write_quorum(ConsistencyMode::Consistent)];
if set_min < current_min {
current_min = set_min;
}

View File

@@ -46,11 +46,11 @@ impl LayoutManager {
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
if x.current().replication_factor != replication_factor.replication_factor() {
if x.current().replication_factor() != replication_factor {
return Err(Error::Message(format!(
"Previous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
replication_factor.replication_factor()
x.current().replication_factor(),
replication_factor,
)));
}
x
@@ -64,12 +64,8 @@ impl LayoutManager {
}
};
let mut cluster_layout = LayoutHelper::new(
replication_factor,
consistency_mode,
cluster_layout,
Default::default(),
);
let mut cluster_layout =
LayoutHelper::new(consistency_mode, cluster_layout, Default::default());
cluster_layout.update_update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@@ -301,11 +297,11 @@ impl LayoutManager {
adv.update_trackers
);
if adv.current().replication_factor != self.replication_factor.replication_factor() {
if adv.current().replication_factor() != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
self.replication_factor.replication_factor()
adv.current().replication_factor(),
self.replication_factor,
);
error!("{}", msg);
return Err(Error::Message(msg));

View File

@@ -11,12 +11,13 @@ use garage_util::error::*;
use super::graph_algo::*;
use super::*;
use crate::replication_mode::*;
// The Message type will be used to collect information on the algorithm.
pub type Message = Vec<String>;
impl LayoutVersion {
pub fn new(replication_factor: usize) -> Self {
pub fn new(replication_factor: ReplicationFactor) -> Self {
// We set the default zone redundancy to be Maximum, meaning that the maximum
// possible value will be used depending on the cluster topology
let parameters = LayoutParameters {
@@ -25,7 +26,7 @@ impl LayoutVersion {
LayoutVersion {
version: 0,
replication_factor,
replication_factor: usize::from(replication_factor),
partition_size: 0,
roles: LwwMap::new(),
node_id_vec: Vec::new(),
@@ -132,6 +133,18 @@ impl LayoutVersion {
.map(move |i| self.node_id_vec[*i as usize])
}
pub fn replication_factor(&self) -> ReplicationFactor {
ReplicationFactor::new(self.replication_factor).unwrap()
}
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
self.replication_factor().read_quorum(consistency_mode)
}
pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
self.replication_factor().write_quorum(consistency_mode)
}
// ===================== internal information extractors ======================
pub(crate) fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 {

View File

@@ -38,14 +38,10 @@ impl ReplicationFactor {
}
}
pub fn replication_factor(&self) -> usize {
self.0
}
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
ConsistencyMode::Consistent => self.replication_factor().div_ceil(2),
ConsistencyMode::Consistent => usize::from(*self).div_ceil(2),
}
}
@@ -53,7 +49,7 @@ impl ReplicationFactor {
match consistency_mode {
ConsistencyMode::Dangerous => 1,
ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
(self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent)
(usize::from(*self) + 1) - self.read_quorum(ConsistencyMode::Consistent)
}
}
}
@@ -65,6 +61,12 @@ impl std::convert::From<ReplicationFactor> for usize {
}
}
impl std::fmt::Display for ReplicationFactor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.0.fmt(f)
}
}
pub fn parse_replication_mode(
config: &Config,
) -> Result<(ReplicationFactor, ConsistencyMode), Error> {

View File

@@ -68,7 +68,7 @@ impl SystemMetrics {
let replication_factor = system.replication_factor;
meter
.u64_value_observer("garage_replication_factor", move |observer| {
observer.observe(replication_factor.replication_factor() as u64, &[])
observer.observe(usize::from(replication_factor) as u64, &[])
})
.with_description("Garage replication factor setting")
.init()

View File

@@ -2,9 +2,10 @@ use std::sync::Arc;
use std::time::Duration;
use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_util::data::*;
use crate::replication::sharded::manager::LayoutManager;
use crate::replication::*;
/// Sharded replication schema:
@@ -16,13 +17,8 @@ use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize,
pub layout_manager: Arc<LayoutManager>,
pub consistency_mode: ConsistencyMode,
}
impl TableReplication for TableShardedReplication {
@@ -32,9 +28,8 @@ impl TableReplication for TableShardedReplication {
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout();
let mut ret = vec![];
for version in layout.versions().iter() {
for version in self.layout_manager.layout().versions().iter() {
ret.extend(version.nodes_of(hash));
}
ret.sort();
@@ -43,31 +38,37 @@ impl TableReplication for TableShardedReplication {
}
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system
.cluster_layout()
self.layout_manager
.layout()
.read_version()
.nodes_of(hash)
.collect()
}
fn read_quorum(&self) -> usize {
self.read_quorum
self.layout_manager
.layout()
.read_version()
.read_quorum(self.consistency_mode)
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.system
.layout_manager
.write_lock_with(|l| write_sets(l, hash))
self.layout_manager.write_lock_with(|l| write_sets(l, hash))
}
fn write_quorum(&self) -> usize {
self.write_quorum
self.layout_manager
.layout()
.current()
.write_quorum(self.consistency_mode)
}
fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash)
self.layout_manager.layout().current().partition_of(hash)
}
fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout();
let layout = self.layout_manager.layout();
let layout_version = layout.ack_map_min();
let mut partitions = layout