Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): introduce read/write bytes metrics for iceberg and all files sys_catalog #18841

Merged
merged 14 commits into from
Oct 12, 2024
Merged
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

34 changes: 33 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4027,7 +4027,7 @@ def section_iceberg_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Iceberg Sink Metrics",
"Iceberg Metrics",
[
panels.timeseries_count(
"Write Qps Of Iceberg Writer",
Expand Down Expand Up @@ -4086,6 +4086,38 @@ def section_iceberg_metrics(outer_panels):
),
],
),

panels.timeseries_bytes(
"Iceberg Write Size",
"",
[
panels.target(
f"sum({metric('iceberg_write_bytes')}) by (sink_name)",
"write @ {{sink_name}}",
),

panels.target(
f"sum({metric('iceberg_write_bytes')})",
"total write",
),
],
),

panels.timeseries_bytes(
"Iceberg Read Size",
"",
[
panels.target(
f"sum({metric('iceberg_read_bytes')}) by (table_name)",
"read @ {{table_name}}",
),

panels.target(
f"sum({metric('nimtable_read_bytes')})",
"total read",
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
Expand All @@ -36,6 +37,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
use crate::monitor::BatchMetrics;
use crate::task::BatchTaskContext;

static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
Expand All @@ -51,6 +53,7 @@ pub struct IcebergScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
metrics: Option<BatchMetrics>,
}

impl Executor for IcebergScanExecutor {
Expand Down Expand Up @@ -78,6 +81,7 @@ impl IcebergScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
metrics: Option<BatchMetrics>,
) -> Self {
Self {
iceberg_config,
Expand All @@ -89,6 +93,7 @@ impl IcebergScanExecutor {
batch_size,
schema,
identity,
metrics,
}
}

Expand All @@ -100,6 +105,7 @@ impl IcebergScanExecutor {
.await?;
let data_types = self.schema.data_types();
let executor_schema_names = self.schema.names();
let table_name = table.identifier().name().to_string();

let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks);

Expand All @@ -119,6 +125,20 @@ impl IcebergScanExecutor {
.await?;

// Delete rows in the data file that need to be deleted by map
let mut read_bytes = 0;
let _metrics_report_guard = scopeguard::guard(
(read_bytes, table_name, self.metrics.clone()),
|(read_bytes, table_name, metrics)| {
if let Some(metrics) = metrics {
metrics
.iceberg_scan_metrics()
.iceberg_read_bytes
.with_guarded_label_values(&[&table_name])
.inc_by(read_bytes as _);
}
},
);

for data_file_scan_task in data_file_scan_tasks {
let data_file_path = data_file_scan_task.data_file_path.clone();
let data_sequence_number = data_file_scan_task.sequence_number;
Expand All @@ -145,6 +165,7 @@ impl IcebergScanExecutor {
// equality delete
let chunk = equality_delete_filter.filter(chunk, data_sequence_number)?;
assert_eq!(chunk.data_types(), data_types);
read_bytes += chunk.estimated_heap_size() as u64;
yield chunk;
}
position_delete_filter.remove_file_path(&data_file_path);
Expand Down Expand Up @@ -195,6 +216,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
})
.collect();
let schema = Schema::new(fields);
let metrics = source.context.batch_metrics().clone();

if let ConnectorProperties::Iceberg(iceberg_properties) = config
&& let SplitImpl::Iceberg(split) = &split_list[0]
Expand All @@ -219,6 +241,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
metrics,
)))
} else {
unreachable!()
Expand Down
36 changes: 35 additions & 1 deletion src/batch/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use prometheus::{
histogram_opts, register_histogram_with_registry, register_int_counter_with_registry,
Histogram, IntGauge, Registry,
};
use risingwave_common::metrics::TrAdderGauge;
use risingwave_common::metrics::{LabelGuardedIntCounterVec, TrAdderGauge};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

/// Metrics for batch executor.
Expand Down Expand Up @@ -70,16 +70,19 @@ pub type BatchMetrics = Arc<BatchMetricsInner>;
pub struct BatchMetricsInner {
batch_manager_metrics: Arc<BatchManagerMetrics>,
executor_metrics: Arc<BatchExecutorMetrics>,
iceberg_scan_metrics: Arc<IcebergScanMetrics>,
}

impl BatchMetricsInner {
pub fn new(
batch_manager_metrics: Arc<BatchManagerMetrics>,
executor_metrics: Arc<BatchExecutorMetrics>,
iceberg_scan_metrics: Arc<IcebergScanMetrics>,
) -> Self {
Self {
batch_manager_metrics,
executor_metrics,
iceberg_scan_metrics,
}
}

Expand All @@ -91,11 +94,16 @@ impl BatchMetricsInner {
&self.batch_manager_metrics
}

pub fn iceberg_scan_metrics(&self) -> &IcebergScanMetrics {
&self.iceberg_scan_metrics
}

#[cfg(test)]
pub fn for_test() -> BatchMetrics {
Arc::new(Self {
batch_manager_metrics: BatchManagerMetrics::for_test(),
executor_metrics: BatchExecutorMetrics::for_test(),
iceberg_scan_metrics: IcebergScanMetrics::for_test(),
})
}
}
Expand Down Expand Up @@ -176,3 +184,29 @@ impl BatchSpillMetrics {
Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone())
}
}

#[derive(Clone)]
pub struct IcebergScanMetrics {
pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>,
}

impl IcebergScanMetrics {
fn new(registry: &Registry) -> Self {
let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
"iceberg_read_bytes",
"Total size of iceberg read requests",
&["table_name"],
registry
)
.unwrap();

Self { iceberg_read_bytes }
}

pub fn for_test() -> Arc<Self> {
Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
}
}

pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));
1 change: 1 addition & 0 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl ComputeNodeContext {
let batch_metrics = Arc::new(BatchMetricsInner::new(
env.task_manager().metrics(),
env.executor_metrics(),
env.iceberg_scan_metrics(),
));
Self {
env,
Expand Down
14 changes: 13 additions & 1 deletion src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_storage::StateStoreImpl;

use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics};
use crate::monitor::{
BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics, IcebergScanMetrics,
};
use crate::task::BatchManager;

/// The global environment for task execution.
Expand Down Expand Up @@ -59,6 +61,9 @@ pub struct BatchEnvironment {
/// Batch spill metrics
spill_metrics: Arc<BatchSpillMetrics>,

/// Metrics for iceberg scan.
iceberg_scan_metrics: Arc<IcebergScanMetrics>,

metric_level: MetricLevel,
}

Expand All @@ -75,6 +80,7 @@ impl BatchEnvironment {
dml_manager: DmlManagerRef,
source_metrics: Arc<SourceMetrics>,
spill_metrics: Arc<BatchSpillMetrics>,
iceberg_scan_metrics: Arc<IcebergScanMetrics>,
metric_level: MetricLevel,
) -> Self {
BatchEnvironment {
Expand All @@ -88,6 +94,7 @@ impl BatchEnvironment {
dml_manager,
source_metrics,
spill_metrics,
iceberg_scan_metrics,
metric_level,
}
}
Expand Down Expand Up @@ -116,6 +123,7 @@ impl BatchEnvironment {
source_metrics: Arc::new(SourceMetrics::default()),
executor_metrics: BatchExecutorMetrics::for_test(),
spill_metrics: BatchSpillMetrics::for_test(),
iceberg_scan_metrics: IcebergScanMetrics::for_test(),
metric_level: MetricLevel::Debug,
}
}
Expand Down Expand Up @@ -167,4 +175,8 @@ impl BatchEnvironment {
pub fn metric_level(&self) -> MetricLevel {
self.metric_level
}

pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
self.iceberg_scan_metrics.clone()
}
}
3 changes: 3 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use risingwave_batch::monitor::{
GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
GLOBAL_ICEBERG_SCAN_METRICS,
};
use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
use risingwave_batch::spill::spill_op::SpillOp;
Expand Down Expand Up @@ -180,6 +181,7 @@ pub async fn compute_node_serve(
let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone());
let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());

// Initialize state store.
let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
Expand Down Expand Up @@ -355,6 +357,7 @@ pub async fn compute_node_serve(
dml_mgr.clone(),
source_metrics.clone(),
batch_spill_metrics.clone(),
iceberg_scan_metrics.clone(),
config.server.metrics_level,
);

Expand Down
Loading
Loading