Skip to content

Commit

Permalink
feat: make NodeState generic over DB with DatabaseMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Dec 5, 2023
1 parent d654494 commit 15efe33
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 51 deletions.
36 changes: 20 additions & 16 deletions bin/reth/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::node::cl_events::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_db::DatabaseEnv;
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
Expand All @@ -28,11 +28,11 @@ use tracing::{info, warn};
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);

/// The current high-level state of the node.
struct NodeState {
struct NodeState<DB> {
/// Database environment.
/// Used for freelist calculation reported in the "Status" log message.
/// See [EventHandler::poll].
db: Arc<DatabaseEnv>,
db: Arc<DB>,
/// Connection to the network.
network: Option<NetworkHandle>,
/// The stage currently being executed.
Expand All @@ -41,12 +41,8 @@ struct NodeState {
latest_block: Option<BlockNumber>,
}

impl NodeState {
fn new(
db: Arc<DatabaseEnv>,
network: Option<NetworkHandle>,
latest_block: Option<BlockNumber>,
) -> Self {
impl<DB: Database> NodeState<DB> {
fn new(db: Arc<DB>, network: Option<NetworkHandle>, latest_block: Option<BlockNumber>) -> Self {
Self { db, network, current_stage: None, latest_block }
}

Expand Down Expand Up @@ -200,6 +196,12 @@ impl NodeState {
}
}

impl<DB: DatabaseMetadata<Metadata = usize>> NodeState<DB> {
fn freelist(&self) -> Option<usize> {
self.db.metadata().get("freelist").copied()
}
}

/// Helper type for formatting of optional fields:
/// - If [Some(x)], then `x` is written
/// - If [None], then `None` is written
Expand Down Expand Up @@ -270,13 +272,14 @@ impl From<PrunerEvent> for NodeEvent {

/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events<E>(
pub async fn handle_events<E, DB>(
network: Option<NetworkHandle>,
latest_block_number: Option<BlockNumber>,
events: E,
db: Arc<DatabaseEnv>,
db: Arc<DB>,
) where
E: Stream<Item = NodeEvent> + Unpin,
DB: DatabaseMetadata<Metadata = usize> + 'static,
{
let state = NodeState::new(db, network, latest_block_number);

Expand All @@ -290,25 +293,26 @@ pub async fn handle_events<E>(

/// Handles events emitted by the node and logs them accordingly.
#[pin_project::pin_project]
struct EventHandler<E> {
state: NodeState,
struct EventHandler<E, DB> {
state: NodeState<DB>,
#[pin]
events: E,
#[pin]
info_interval: Interval,
}

impl<E> Future for EventHandler<E>
impl<E, DB> Future for EventHandler<E, DB>
where
E: Stream<Item = NodeEvent> + Unpin,
DB: DatabaseMetadata<Metadata = usize> + 'static,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

while this.info_interval.poll_tick(cx).is_ready() {
let freelist = OptionalField(this.state.db.freelist().ok());
let freelist = OptionalField(this.state.freelist());

if let Some(CurrentStage { stage_id, eta, checkpoint, target }) =
&this.state.current_stage
Expand Down Expand Up @@ -428,7 +432,7 @@ impl Display for Eta {
f,
"{}",
humantime::format_duration(Duration::from_secs(remaining.as_secs()))
)
);
}
}

Expand Down
51 changes: 49 additions & 2 deletions crates/storage/db/src/abstraction/database_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,62 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use metrics::{counter, gauge, histogram, Label};

use crate::database::Database;

/// Extends [Database], adding a function that can be used as a hook for metric reporting.
pub trait DatabaseMetrics: Database {
/// Reports metrics for the database.
fn report_metrics(&self);
fn report_metrics(&self) {
for (name, value, labels) in self.gauge_metrics() {
gauge!(name, value, labels);
}

for (name, value, labels) in self.counter_metrics() {
counter!(name, value, labels);
}

for (name, value, labels) in self.histogram_metrics() {
histogram!(name, value, labels);
}
}

/// Returns a list of [Gauge](metrics::Gauge) metrics for the database.
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}

/// Returns a list of [Counter](metrics::Counter) metrics for the database.
fn counter_metrics(&self) -> Vec<(&'static str, u64, Vec<Label>)> {
vec![]
}

/// Returns a list of [Histogram](metrics::Histogram) metrics for the database.
fn histogram_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}
}

impl<DB: DatabaseMetrics> DatabaseMetrics for Arc<DB> {
fn report_metrics(&self) {
<DB as DatabaseMetrics>::report_metrics(self)
}
}

/// Extends [Database] to include a [Metadata] type, which can be used by methods which need to
/// dynamically retrieve information about the database.
pub trait DatabaseMetadata: Database {
/// The type used to store metadata about the database.
type Metadata;

/// Returns a [HashMap] of [&'static str]-addressable metadata about the database.
fn metadata(&self) -> HashMap<&'static str, Self::Metadata>;
}

impl<DB: DatabaseMetadata> DatabaseMetadata for Arc<DB> {
type Metadata = DB::Metadata;

fn metadata(&self) -> HashMap<&'static str, Self::Metadata> {
<DB as DatabaseMetadata>::metadata(self)
}
}
106 changes: 75 additions & 31 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
use crate::{
database::Database,
database_metrics::DatabaseMetrics,
database_metrics::{DatabaseMetadata, DatabaseMetrics},
tables::{TableType, Tables},
utils::default_page_size,
DatabaseError,
};
use eyre::Context;
use metrics::gauge;
use metrics::{gauge, Label};
use reth_interfaces::db::LogLevel;
use reth_libmdbx::{
DatabaseFlags, Environment, EnvironmentFlags, Geometry, Mode, PageSize, SyncMode, RO, RW,
};
use reth_tracing::tracing::error;
use std::{ops::Deref, path::Path};
use std::{collections::HashMap, ops::Deref, path::Path};
use tx::Tx;

pub mod cursor;
Expand Down Expand Up @@ -64,40 +64,84 @@ impl Database for DatabaseEnv {
}

impl DatabaseMetrics for DatabaseEnv {
fn report_metrics(&self) {
let _ = self.view(|tx| {
for table in Tables::ALL.iter().map(|table| table.name()) {
let table_db =
tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;

let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {table}"))?;

let page_size = stats.page_size() as usize;
let leaf_pages = stats.leaf_pages();
let branch_pages = stats.branch_pages();
let overflow_pages = stats.overflow_pages();
let num_pages = leaf_pages + branch_pages + overflow_pages;
let table_size = page_size * num_pages;
let entries = stats.entries();

gauge!("db.table_size", table_size as f64, "table" => table);
gauge!("db.table_pages", leaf_pages as f64, "table" => table, "type" => "leaf");
gauge!("db.table_pages", branch_pages as f64, "table" => table, "type" => "branch");
gauge!("db.table_pages", overflow_pages as f64, "table" => table, "type" => "overflow");
gauge!("db.table_entries", entries as f64, "table" => table);
}
fn report_metrics<'a>(&'a self) {
for (name, value, labels) in self.gauge_metrics() {
gauge!(name, value, labels);
}
}

Ok::<(), eyre::Report>(())
}).map_err(|error| error!(?error, "Failed to read db table stats"));
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
let mut metrics = Vec::new();

let _ = self
.view(|tx| {
for table in Tables::ALL.iter().map(|table| table.name()) {
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;

let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {table}"))?;

let page_size = stats.page_size() as usize;
let leaf_pages = stats.leaf_pages();
let branch_pages = stats.branch_pages();
let overflow_pages = stats.overflow_pages();
let num_pages = leaf_pages + branch_pages + overflow_pages;
let table_size = page_size * num_pages;
let entries = stats.entries();

metrics.push((
"db.table_size",
table_size as f64,
vec![Label::new("table", table)],
));
metrics.push((
"db.table_pages",
leaf_pages as f64,
vec![Label::new("table", table), Label::new("type", "leaf")],
));
metrics.push((
"db.table_pages",
branch_pages as f64,
vec![Label::new("table", table), Label::new("type", "branch")],
));
metrics.push((
"db.table_pages",
overflow_pages as f64,
vec![Label::new("table", table), Label::new("type", "overflow")],
));
metrics.push((
"db.table_entries",
entries as f64,
vec![Label::new("table", table)],
));
}

Ok::<(), eyre::Report>(())
})
.map_err(|error| error!(?error, "Failed to read db table stats"));

if let Ok(freelist) =
self.freelist().map_err(|error| error!(?error, "Failed to read db.freelist"))
{
gauge!("db.freelist", freelist as f64);
metrics.push(("db.freelist", freelist as f64, vec![]));
}

metrics
}
}

impl DatabaseMetadata for DatabaseEnv {
type Metadata = usize;

fn metadata(&self) -> HashMap<&'static str, Self::Metadata> {
let mut metadata = HashMap::new();

if let Ok(freelist) = self.freelist() {
metadata.insert("freelist", freelist);
}
metadata
}
}

Expand Down
15 changes: 13 additions & 2 deletions crates/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ pub fn open_db(path: &Path, log_level: Option<LogLevel>) -> eyre::Result<Databas
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils {
use super::*;
use crate::{database::Database, database_metrics::DatabaseMetrics};
use std::{path::PathBuf, sync::Arc};
use crate::{
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetrics},
};
use std::{collections::HashMap, path::PathBuf, sync::Arc};

/// Error during database open
pub const ERROR_DB_OPEN: &str = "Not able to open the database file.";
Expand Down Expand Up @@ -216,6 +219,14 @@ pub mod test_utils {
}
}

impl<DB: DatabaseMetadata> DatabaseMetadata for TempDatabase<DB> {
type Metadata = <DB as DatabaseMetadata>::Metadata;

fn metadata(&self) -> HashMap<&'static str, Self::Metadata> {
self.db().metadata()
}
}

/// Create read/write database for testing
pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path();
Expand Down

0 comments on commit 15efe33

Please sign in to comment.