Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: refactor payload processing #14589

Draft
wants to merge 43 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
e93a39c
feat: add workload crate
mattsse Feb 15, 2025
1277fdd
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 18, 2025
a883af3
wip
mattsse Feb 18, 2025
ce8d433
wip
mattsse Feb 18, 2025
106e7c0
wip
mattsse Feb 18, 2025
0c65979
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 18, 2025
1f4bebb
wip
mattsse Feb 18, 2025
92f0b2f
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 19, 2025
b94cc1b
wip
mattsse Feb 19, 2025
cad995b
wip
mattsse Feb 19, 2025
60d7d89
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 19, 2025
853d45c
wip
mattsse Feb 19, 2025
9fe6964
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 21, 2025
14fe718
wip
mattsse Feb 21, 2025
80ef954
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 21, 2025
a77af74
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 21, 2025
ab1f7a1
wip
mattsse Feb 21, 2025
1e421fa
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 24, 2025
02a187a
wip
mattsse Feb 24, 2025
cef3b3b
make it compile jaja
mattsse Feb 24, 2025
18af402
wip
mattsse Feb 24, 2025
138e78c
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 24, 2025
08ef8fc
wip
mattsse Feb 24, 2025
f400c20
wip
mattsse Feb 24, 2025
745ee57
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 25, 2025
ef49717
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 25, 2025
9976e74
impl prewarm task
mattsse Feb 25, 2025
12827e1
port sprase trie
mattsse Feb 25, 2025
0795daa
wip
mattsse Feb 25, 2025
9a7ce36
wip
mattsse Feb 25, 2025
f4bc435
wip
mattsse Feb 25, 2025
cbd6cf3
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 26, 2025
1e00467
support optional mutltiproof
mattsse Feb 26, 2025
e56326f
wip
mattsse Feb 26, 2025
ace95f4
wip
mattsse Feb 26, 2025
30d94e1
Merge branch 'main' into matt/add-workload-crate
mattsse Feb 26, 2025
e0858d4
wip
mattsse Feb 26, 2025
0674b1b
wip
mattsse Feb 26, 2025
fe92493
wip
mattsse Feb 26, 2025
d323131
wip
mattsse Feb 26, 2025
75661ac
wip
mattsse Feb 26, 2025
719363c
clippy
mattsse Feb 26, 2025
952f6f8
wip
mattsse Feb 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"crates/engine/primitives/",
"crates/engine/service",
"crates/engine/tree/",
"crates/engine/workload/",
"crates/engine/util/",
"crates/errors/",
"crates/ethereum-forks/",
Expand Down Expand Up @@ -331,6 +332,7 @@ reth-downloaders = { path = "crates/net/downloaders" }
reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
reth-ecies = { path = "crates/net/ecies" }
reth-engine-local = { path = "crates/engine/local" }
reth-workload-executor = { path = "crates/engine/workload" }
reth-engine-primitives = { path = "crates/engine/primitives", default-features = false }
reth-engine-tree = { path = "crates/engine/tree" }
reth-engine-service = { path = "crates/engine/service" }
Expand Down
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ reth-trie-db.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse.workspace = true
reth-trie.workspace = true
reth-workload-executor.workspace = true

# alloy
alloy-consensus.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ pub mod error;
mod invalid_block_hook;
mod invalid_headers;
mod metrics;
mod payload_processor;
mod persistence_state;
pub mod root;
pub mod root2;
mod trie_updates;

use crate::tree::{config::MIN_BLOCKS_FOR_PIPELINE_RUN, error::AdvancePersistenceError};
Expand Down
230 changes: 230 additions & 0 deletions crates/engine/tree/src/tree/payload_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
//! Entrypoint for payload processing.

use crate::tree::{
cached_state::{CachedStateMetrics, ProviderCaches},
root::{SparseTrieUpdate, StateRootConfig, StateRootTaskMetrics},
StateProviderBuilder,
};
use alloy_consensus::transaction::Recovered;
use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives};
use reth_provider::{
BlockReader, DatabaseProviderFactory, HashedPostStateProvider, StateCommitmentProvider,
StateProviderFactory, StateReader, StateRootProvider,
};
use reth_revm::cancelled::ManualCancel;
use reth_workload_executor::WorkloadExecutor;
use std::{
collections::VecDeque,
sync::{
mpsc,
mpsc::{Receiver, Sender},
},
};

/// Entrypoint for executing the payload.
pub struct PayloadProcessor {
executor: WorkloadExecutor,
// TODO move all the caching stuff in here
}

impl PayloadProcessor {
/// Executes the payload based on the configured settings.
pub fn execute(&self) {
// TODO helpers for executing in sync?
}

/// Spawns all background tasks and returns a handle connected to the tasks.
///
/// - Transaction prewarming task
/// - State root task
/// - Sparse trie task
///
/// # Transaction prewarming task
///
/// Responsible for feeding state updates to the state root task.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also warming revm caches, right?

///
/// This task runs until:
/// - externally cancelled (e.g. sequential block execution is complete)
/// - all transaction have been processed
///
/// ## State root task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename this to multiproof task?

///
/// Responsible for preparing sparse trie messages for the sparse trie task.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, so state root task will only spawn multiproofs and send results back to PayloadProcessor, but PayloadProcessor itself will send the state update + multiproof to sparse trie task?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send results back to PayloadProcessor

send back to itself. PayloadProcessor is just some helper that spawns these 3 tasks

/// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
/// output back to this task.
///
/// This task runs until it receives a shutdown signal, which should be after after the block
/// was fully executed.
///
/// ## Sparse trie task
///
/// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
///
/// This task runs until there are no further updates to process.
///
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
fn spawn(&self, input: ()) -> PayloadTaskHandle {
// TODO: spawn the main tasks and wire them up

todo!()
}
}

pub struct PayloadTaskHandle {
// TODO should internals be an enum to represent no parallel workload

// needs receiver to await the stateroot from the background task

// need channel to emit `StateUpdates` to the state root task

// On drop this should also terminate the prewarm task

// must include the receiver of the state root wired to the sparse trie
}

impl PayloadTaskHandle {
/// Terminates the pre-warming processing
// TODO: does this need a config arg?
pub fn terminate_prewarming(&self) {
// TODO emit a
}
}

impl Drop for PayloadTaskHandle {
fn drop(&mut self) {
// TODO: terminate all tasks explicitly
}
}

/// A task responsible for populating the sparse trie.
pub struct SparseTrieTask<F> {
executor: WorkloadExecutor,
/// Receives updates from the state root task
// TODO: change to option?
updates: mpsc::Receiver<SparseTrieUpdate>,
factory: F,
config: StateRootConfig<F>,
metrics: StateRootTaskMetrics,
}

impl<F> SparseTrieTask<F>
where
F: DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider,
{
/// Runs the sparse trie task to completion.
///
/// This waits for new incoming [`SparseTrieUpdate`].
///
/// This concludes once the last trie update has been received.
// TODO this should probably return the stateroot as response so we can wire a oneshot channel
fn run(mut self) {
let mut num_iterations = 0;

// run
while let Ok(mut update) = self.updates.recv() {
num_iterations += 1;
let mut num_updates = 1;

while let Ok(next) = self.updates.try_recv() {
update.extend(next);
num_updates += 1;
}
}
}
}

/// A task that executes transactions individually in parallel.
pub struct PrewarmTask<N: NodePrimitives, P, C> {
executor: WorkloadExecutor,
/// Transactions pending execution
pending: VecDeque<Recovered<N::SignedTx>>,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, C>,
/// How many txs are currently in progress
in_progress: usize,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// Sender to emit Stateroot messages
to_state_root: (),
/// Receiver for events produced by tx execution
actions_rx: Receiver<PrewarmTaskEvent>,

/// Sender the transactions use to send their result back
actions_tx: Sender<PrewarmTaskEvent>,
}

impl<N, P, C> PrewarmTask<N, P, C>
where
N: NodePrimitives,
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
{
/// Spawns the next transactions
fn spawn_next(&mut self) {
while self.in_progress < self.max_concurrency {
if let Some(event) = self.pending.pop_front() {
// TODO spawn the next tx
} else {
break
}
}
}

fn is_done(&self) -> bool {
self.in_progress == 0 && self.pending.is_empty()
}

/// Executes the task.
///
/// This will execute the transactions until all transactions have been processed or the task
/// was cancelled.
fn run(mut self) {
self.spawn_next();

while let Ok(event) = self.actions_rx.recv() {
if self.ctx.is_cancelled() {
// terminate
break
}
match event {
PrewarmTaskEvent::Terminate => {
// received terminate signal
break
}
PrewarmTaskEvent::Outcome { .. } => {}
}

self.spawn_next();
if self.is_done() {
break
}
}
}
}

/// Context required by tx execution tasks.
#[derive(Debug, Clone)]
struct PrewarmContext<N: NodePrimitives, P, C> {
header: SealedHeaderFor<N>,
evm_config: C,
caches: ProviderCaches,
cache_metrics: CachedStateMetrics,
cancelled: ManualCancel,
/// Provider to obtain the state
provider: StateProviderBuilder<N, P>,
}

impl<N: NodePrimitives, P, C> PrewarmContext<N, P, C> {
/// Returns true if the task is cancelled
fn is_cancelled(&self) -> bool {
self.cancelled.is_cancelled()
}
}

enum PrewarmTaskEvent {
Terminate,
Outcome {
// Evmstate outcome
},
}
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ where

#[derive(Metrics, Clone)]
#[metrics(scope = "tree.root")]
struct StateRootTaskMetrics {
pub(crate) struct StateRootTaskMetrics {
/// Histogram of proof calculation durations.
pub proof_calculation_duration_histogram: Histogram,
/// Histogram of proof calculation account targets.
Expand Down
Loading
Loading