-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathmod.rs
2802 lines (2533 loc) · 122 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use futures::{stream::BoxStream, Future, StreamExt};
use itertools::Either;
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
};
use reth_db_api::database::Database;
use reth_engine_primitives::EngineTypes;
use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
use reth_network_p2p::{
bodies::client::BodiesClient,
headers::client::HeadersClient,
sync::{NetworkSyncUpdater, SyncState},
};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
constants::EPOCH_SLOTS, BlockNumHash, BlockNumber, Head, Header, SealedBlock, SealedHeader,
B256,
};
use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
StageCheckpointReader,
};
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
};
use reth_stages_api::{ControlFlow, Pipeline, PipelineTarget, StageId};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventSender;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
mod message;
pub use message::{BeaconEngineMessage, OnForkChoiceUpdated};
mod error;
pub use error::{
BeaconConsensusEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError,
BeaconOnNewPayloadError,
};
mod invalid_headers;
pub use invalid_headers::InvalidHeaderCache;
mod event;
pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
mod handle;
pub use handle::BeaconConsensusEngineHandle;
mod forkchoice;
pub use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus};
mod metrics;
use metrics::EngineMetrics;
pub mod sync;
use sync::{EngineSyncController, EngineSyncEvent};
/// Hooks for running during the main loop of
/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
pub mod hooks;
use hooks::{EngineHookContext, EngineHookEvent, EngineHooks, EngineHooksController, PolledHook};
#[cfg(test)]
pub mod test_utils;
/// The maximum number of invalid headers that can be tracked by the engine.
const MAX_INVALID_HEADERS: u32 = 512u32;
/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold`
/// for more information.
///
/// This is the default threshold, the distance to the head that the tree will be used for sync.
/// If the distance exceeds this threshold, the pipeline will be used for sync.
pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
/// Represents a pending forkchoice update.
///
/// This type encapsulates the necessary components for a pending forkchoice update
/// in the context of a beacon consensus engine.
///
/// It consists of:
/// - The current fork choice state.
/// - Optional payload attributes specific to the engine type.
/// - Sender for the result of an oneshot channel, conveying the outcome of the fork choice update.
type PendingForkchoiceUpdate<PayloadAttributes> =
(ForkchoiceState, Option<PayloadAttributes>, oneshot::Sender<RethResult<OnForkChoiceUpdated>>);
/// The beacon consensus engine is the driver that switches between historical and live sync.
///
/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are
/// received by Engine API (JSON-RPC).
///
/// The consensus engine is idle until it receives the first
/// [`BeaconEngineMessage::ForkchoiceUpdated`] message from the CL which would initiate the sync. At
/// first, the consensus engine would run the [Pipeline] until the latest known block hash.
/// Afterward, it would attempt to create/restore the [`BlockchainTreeEngine`] from the blocks
/// that are currently available. In case the restoration is successful, the consensus engine would
/// run in a live sync mode, populating the [`BlockchainTreeEngine`] with new blocks as they arrive
/// via engine API and downloading any missing blocks from the network to fill potential gaps.
///
/// The consensus engine has two data input sources:
///
/// ## New Payload (`engine_newPayloadV{}`)
///
/// The engine receives new payloads from the CL. If the payload is connected to the canonical
/// chain, it will be fully validated added to a chain in the [`BlockchainTreeEngine`]: `VALID`
///
/// If the payload's chain is disconnected (at least 1 block is missing) then it will be buffered:
/// `SYNCING` ([`BlockStatus::Disconnected`]).
///
/// ## Forkchoice Update (FCU) (`engine_forkchoiceUpdatedV{}`)
///
/// This contains the latest forkchoice state and the payload attributes. The engine will attempt to
/// make a new canonical chain based on the `head_hash` of the update and trigger payload building
/// if the `payload_attrs` are present and the FCU is `VALID`.
///
/// The `head_hash` forms a chain by walking backwards from the `head_hash` towards the canonical
/// blocks of the chain.
///
/// Making a new canonical chain can result in the following relevant outcomes:
///
/// ### The chain is connected
///
/// All blocks of the `head_hash`'s chain are present in the [`BlockchainTreeEngine`] and are
/// committed to the canonical chain. This also includes reorgs.
///
/// ### The chain is disconnected
///
/// In this case the [`BlockchainTreeEngine`] doesn't know how the new chain connects to the
/// existing canonical chain. It could be a simple commit (new blocks extend the current head) or a
/// re-org that requires unwinding the canonical chain.
///
/// This further distinguishes between two variants:
///
/// #### `head_hash`'s block exists
///
/// The `head_hash`'s block was already received/downloaded, but at least one block is missing to
/// form a _connected_ chain. The engine will attempt to download the missing blocks from the
/// network by walking backwards (`parent_hash`), and then try to make the block canonical as soon
/// as the chain becomes connected.
///
/// However, it still can be the case that the chain and the FCU is `INVALID`.
///
/// #### `head_hash` block is missing
///
/// This is similar to the previous case, but the `head_hash`'s block is missing. At which point the
/// engine doesn't know where the new head will point to: new chain could be a re-org or a simple
/// commit. The engine will download the missing head first and then proceed as in the previous
/// case.
///
/// # Panics
///
/// If the future is polled more than once. Leads to undefined state.
#[must_use = "Future does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct BeaconConsensusEngine<DB, BT, Client, EngineT>
where
DB: Database,
Client: HeadersClient + BodiesClient,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
+ CanonChainTracker
+ StageCheckpointReader,
EngineT: EngineTypes,
{
/// Controls syncing triggered by engine updates.
sync: EngineSyncController<DB, Client>,
/// The type we can use to query both the database and the blockchain tree.
blockchain: BT,
/// Used for emitting updates about whether the engine is syncing or not.
sync_state_updater: Box<dyn NetworkSyncUpdater>,
/// The Engine API message receiver.
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
/// A clone of the handle
handle: BeaconConsensusEngineHandle<EngineT>,
/// Tracks the received forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// The payload store.
payload_builder: PayloadBuilderHandle<EngineT>,
/// Validator for execution payloads
payload_validator: ExecutionPayloadValidator,
/// Current blockchain tree action.
blockchain_tree_action: Option<BlockchainTreeAction<EngineT>>,
/// Pending forkchoice update.
/// It is recorded if we cannot process the forkchoice update because
/// a hook with database read-write access is active.
/// This is a temporary solution to always process missed FCUs.
pending_forkchoice_update: Option<PendingForkchoiceUpdate<EngineT::PayloadAttributes>>,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
/// After downloading a block corresponding to a recent forkchoice update, the engine will
/// check whether or not we can connect the block to the current canonical chain. If we can't,
/// we need to download and execute the missing parents of that block.
///
/// When the block can't be connected, its block number will be compared to the canonical head,
/// resulting in a heuristic for the number of missing blocks, or the size of the gap between
/// the new block and the canonical head.
///
/// If the gap is larger than this threshold, the engine will download and execute the missing
/// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will
/// be used to download and execute the missing blocks.
pipeline_run_threshold: u64,
hooks: EngineHooksController,
/// Sender for engine events.
event_sender: EventSender<BeaconConsensusEngineEvent>,
/// Consensus engine metrics.
metrics: EngineMetrics,
}
impl<DB, BT, Client, EngineT> BeaconConsensusEngine<DB, BT, Client, EngineT>
where
DB: Database + Unpin + 'static,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
+ CanonChainTracker
+ StageCheckpointReader
+ ChainSpecProvider
+ 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
EngineT: EngineTypes + Unpin,
{
/// Create a new instance of the [`BeaconConsensusEngine`].
#[allow(clippy::too_many_arguments)]
pub fn new(
client: Client,
pipeline: Pipeline<DB>,
blockchain: BT,
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>,
pipeline_run_threshold: u64,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
client,
pipeline,
blockchain,
task_spawner,
sync_state_updater,
max_block,
payload_builder,
target,
pipeline_run_threshold,
to_engine,
Box::pin(UnboundedReceiverStream::from(rx)),
hooks,
)
}
/// Create a new instance of the [`BeaconConsensusEngine`] using the given channel to configure
/// the [`BeaconEngineMessage`] communication channel.
///
/// By default the engine is started with idle pipeline.
/// The pipeline can be launched immediately in one of the following ways descending in
/// priority:
/// - Explicit [`Option::Some`] target block hash provided via a constructor argument.
/// - The process was previously interrupted amidst the pipeline run. This is checked by
/// comparing the checkpoints of the first ([`StageId::Headers`]) and last
/// ([`StageId::Finish`]) stages. In this case, the latest available header in the database is
/// used as the target.
///
/// Propagates any database related error.
#[allow(clippy::too_many_arguments)]
pub fn with_channel(
client: Client,
pipeline: Pipeline<DB>,
blockchain: BT,
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>,
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let event_sender = EventSender::default();
let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone());
let sync = EngineSyncController::new(
pipeline,
client,
task_spawner.clone(),
max_block,
blockchain.chain_spec(),
event_sender.clone(),
);
let mut this = Self {
sync,
payload_validator: ExecutionPayloadValidator::new(blockchain.chain_spec()),
blockchain,
sync_state_updater,
engine_message_stream,
handle: handle.clone(),
forkchoice_state_tracker: Default::default(),
payload_builder,
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
blockchain_tree_action: None,
pending_forkchoice_update: None,
pipeline_run_threshold,
hooks: EngineHooksController::new(hooks),
event_sender,
metrics: EngineMetrics::default(),
};
let maybe_pipeline_target = match target {
// Provided target always takes precedence.
target @ Some(_) => target,
None => this.check_pipeline_consistency()?,
};
if let Some(target) = maybe_pipeline_target {
this.sync.set_pipeline_sync_target(target.into());
}
Ok((this, handle))
}
/// Returns current [`EngineHookContext`] that's used for polling engine hooks.
fn current_engine_hook_context(&self) -> RethResult<EngineHookContext> {
Ok(EngineHookContext {
tip_block_number: self.blockchain.canonical_tip().number,
finalized_block_number: self
.blockchain
.finalized_block_number()
.map_err(RethError::Provider)?,
})
}
/// Set the next blockchain tree action.
fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction<EngineT>) {
let previous_action = self.blockchain_tree_action.replace(action);
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
/// Pre-validate forkchoice update and check whether it can be processed.
///
/// This method returns the update outcome if validation fails or
/// the node is syncing and the update cannot be processed at the moment.
fn pre_validate_forkchoice_update(
&mut self,
state: ForkchoiceState,
) -> ProviderResult<Option<OnForkChoiceUpdated>> {
if state.head_block_hash.is_zero() {
return Ok(Some(OnForkChoiceUpdated::invalid_state()))
}
// check if the new head hash is connected to any ancestor that we previously marked as
// invalid
let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
}
if self.sync.is_pipeline_active() {
// We can only process new forkchoice updates if the pipeline is idle, since it requires
// exclusive access to the database
trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update");
return Ok(Some(OnForkChoiceUpdated::syncing()))
}
Ok(None)
}
/// Process the result of attempting to make forkchoice state head hash canonical.
///
/// # Returns
///
/// A forkchoice state update outcome or fatal error.
fn on_forkchoice_updated_make_canonical_result(
&mut self,
state: ForkchoiceState,
mut attrs: Option<EngineT::PayloadAttributes>,
make_canonical_result: Result<CanonicalOutcome, CanonicalError>,
elapsed: Duration,
) -> Result<OnForkChoiceUpdated, CanonicalError> {
match make_canonical_result {
Ok(outcome) => {
let should_update_head = match &outcome {
CanonicalOutcome::AlreadyCanonical { head, header } => {
self.on_head_already_canonical(head, header, &mut attrs)
}
CanonicalOutcome::Committed { head } => {
// new VALID update that moved the canonical chain forward
debug!(target: "consensus::engine", hash=?state.head_block_hash, number=head.number, "Canonicalized new head");
true
}
};
if should_update_head {
let head = outcome.header();
let _ = self.update_head(head.clone());
self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
Box::new(head.clone()),
elapsed,
));
}
// Validate that the forkchoice state is consistent.
let on_updated = if let Some(invalid_fcu_response) =
self.ensure_consistent_forkchoice_state(state)?
{
trace!(target: "consensus::engine", ?state, "Forkchoice state is inconsistent");
invalid_fcu_response
} else if let Some(attrs) = attrs {
// the CL requested to build a new payload on top of this new VALID head
let head = outcome.into_header().unseal();
self.process_payload_attributes(attrs, head, state)
} else {
OnForkChoiceUpdated::valid(PayloadStatus::new(
PayloadStatusEnum::Valid,
Some(state.head_block_hash),
))
};
Ok(on_updated)
}
Err(err) => {
if err.is_fatal() {
error!(target: "consensus::engine", %err, "Encountered fatal error");
Err(err)
} else {
Ok(OnForkChoiceUpdated::valid(
self.on_failed_canonical_forkchoice_update(&state, err)?,
))
}
}
}
}
/// Invoked when head hash references a `VALID` block that is already canonical.
///
/// Returns `true` if the head needs to be updated.
fn on_head_already_canonical(
&self,
head: &BlockNumHash,
header: &SealedHeader,
attrs: &mut Option<EngineT::PayloadAttributes>,
) -> bool {
// On Optimism, the proposers are allowed to reorg their own chain at will.
#[cfg(feature = "optimism")]
if self.blockchain.chain_spec().is_optimism() {
debug!(
target: "consensus::engine",
fcu_head_num=?header.number,
current_head_num=?head.number,
"[Optimism] Allowing beacon reorg to old head"
);
return true
}
// 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
// payload build process if `forkchoiceState.headBlockHash` references a `VALID` ancestor
// of the head of canonical chain, i.e. the ancestor passed payload validation process
// and deemed `VALID`. In the case of such an event, client software MUST return
// `{payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash,
// validationError: null}, payloadId: null}`
if head != &header.num_hash() {
attrs.take();
}
debug!(
target: "consensus::engine",
fcu_head_num=?header.number,
current_head_num=?head.number,
"Ignoring beacon update to old head"
);
false
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
) {
self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state);
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
match self.pre_validate_forkchoice_update(state) {
Ok(on_updated_result) => {
if let Some(on_updated) = on_updated_result {
// Pre-validate forkchoice state update and return if it's invalid
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
} else if let Some(hook) = self.hooks.active_db_write_hook() {
// We can only process new forkchoice updates if no hook with db write is
// running, since it requires exclusive access to the
// database
let replaced_pending =
self.pending_forkchoice_update.replace((state, attrs, tx));
warn!(
target: "consensus::engine",
hook = %hook.name(),
head_block_hash = ?state.head_block_hash,
safe_block_hash = ?state.safe_block_hash,
finalized_block_hash = ?state.finalized_block_hash,
replaced_pending = ?replaced_pending.map(|(state, _, _)| state),
"Hook is in progress, delaying forkchoice update. \
This may affect the performance of your node as a validator."
);
} else {
self.set_blockchain_tree_action(
BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
);
}
}
Err(error) => {
let _ = tx.send(Err(error.into()));
}
}
}
/// Called after the forkchoice update status has been resolved.
/// Depending on the outcome, the method updates the sync state and notifies the listeners
/// about new processed FCU.
fn on_forkchoice_updated_status(
&mut self,
state: ForkchoiceState,
on_updated: OnForkChoiceUpdated,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
) {
// send the response to the CL ASAP
let status = on_updated.forkchoice_status();
let _ = tx.send(Ok(on_updated));
// update the forkchoice state tracker
self.forkchoice_state_tracker.set_latest(state, status);
match status {
ForkchoiceStatus::Invalid => {}
ForkchoiceStatus::Valid => {
// FCU head is valid, we're no longer syncing
self.sync_state_updater.update_sync_state(SyncState::Idle);
// node's fully synced, clear active download requests
self.sync.clear_block_download_requests();
}
ForkchoiceStatus::Syncing => {
// we're syncing
self.sync_state_updater.update_sync_state(SyncState::Syncing);
}
}
// notify listeners about new processed FCU
self.event_sender.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
}
/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
/// than the checkpoint of the first stage).
///
/// This will return the pipeline target if:
/// * the pipeline was interrupted during its previous run
/// * a new stage was added
/// * stage data was dropped manually through `reth stage drop ...`
///
/// # Returns
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
fn check_pipeline_consistency(&self) -> RethResult<Option<B256>> {
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = self
.blockchain
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;
// Skip the first stage as we've already retrieved it and comparing all other checkpoints
// against it.
for stage_id in StageId::ALL.iter().skip(1) {
let stage_checkpoint =
self.blockchain.get_stage_checkpoint(*stage_id)?.unwrap_or_default().block_number;
// If the checkpoint of any stage is less than the checkpoint of the first stage,
// retrieve and return the block hash of the latest header and use it as the target.
if stage_checkpoint < first_stage_checkpoint {
debug!(
target: "consensus::engine",
first_stage_checkpoint,
inconsistent_stage_id = %stage_id,
inconsistent_stage_checkpoint = stage_checkpoint,
"Pipeline sync progress is inconsistent"
);
return Ok(self.blockchain.block_hash(first_stage_checkpoint)?)
}
}
Ok(None)
}
/// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared.
///
/// The [`BeaconConsensusEngineHandle`] can be used to interact with this
/// [`BeaconConsensusEngine`]
pub fn handle(&self) -> BeaconConsensusEngineHandle<EngineT> {
self.handle.clone()
}
/// Returns true if the distance from the local tip to the block is greater than the configured
/// threshold.
///
/// If the `local_tip` is greater than the `block`, then this will return false.
#[inline]
const fn exceeds_pipeline_run_threshold(&self, local_tip: u64, block: u64) -> bool {
block > local_tip && block - local_tip > self.pipeline_run_threshold
}
/// Returns the finalized hash to sync to if the distance from the local tip to the block is
/// greater than the configured threshold and we're not synced to the finalized block yet
/// yet (if we've seen that block already).
///
/// If this is invoked after a new block has been downloaded, the downloaded block could be the
/// (missing) finalized block.
fn can_pipeline_sync_to_finalized(
&self,
canonical_tip_num: u64,
target_block_number: u64,
downloaded_block: Option<BlockNumHash>,
) -> Option<B256> {
let sync_target_state = self.forkchoice_state_tracker.sync_target_state();
// check if the distance exceeds the threshold for pipeline sync
let mut exceeds_pipeline_run_threshold =
self.exceeds_pipeline_run_threshold(canonical_tip_num, target_block_number);
// check if the downloaded block is the tracked finalized block
if let Some(ref buffered_finalized) = sync_target_state
.as_ref()
.and_then(|state| self.blockchain.buffered_header_by_hash(state.finalized_block_hash))
{
// if we have buffered the finalized block, we should check how far
// we're off
exceeds_pipeline_run_threshold =
self.exceeds_pipeline_run_threshold(canonical_tip_num, buffered_finalized.number);
}
// If this is invoked after we downloaded a block we can check if this block is the
// finalized block
if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
if downloaded_block.hash == state.finalized_block_hash {
// we downloaded the finalized block
exceeds_pipeline_run_threshold =
self.exceeds_pipeline_run_threshold(canonical_tip_num, downloaded_block.number);
}
}
// if the number of missing blocks is greater than the max, run the
// pipeline
if exceeds_pipeline_run_threshold {
if let Some(state) = sync_target_state {
// if we have already canonicalized the finalized block, we should
// skip the pipeline run
match self.blockchain.header_by_hash_or_number(state.finalized_block_hash.into()) {
Err(err) => {
warn!(target: "consensus::engine", %err, "Failed to get finalized block header");
}
Ok(None) => {
// ensure the finalized block is known (not the zero hash)
if !state.finalized_block_hash.is_zero() {
// we don't have the block yet and the distance exceeds the allowed
// threshold
return Some(state.finalized_block_hash)
}
// OPTIMISTIC SYNCING
//
// It can happen when the node is doing an
// optimistic sync, where the CL has no knowledge of the finalized hash,
// but is expecting the EL to sync as high
// as possible before finalizing.
//
// This usually doesn't happen on ETH mainnet since CLs use the more
// secure checkpoint syncing.
//
// However, optimism chains will do this. The risk of a reorg is however
// low.
debug!(target: "consensus::engine", hash=?state.head_block_hash, "Setting head hash as an optimistic pipeline target.");
return Some(state.head_block_hash)
}
Ok(Some(_)) => {
// we're fully synced to the finalized block
// but we want to continue downloading the missing parent
}
}
}
}
None
}
/// Returns how far the local tip is from the given block. If the local tip is at the same
/// height or its block number is greater than the given block, this returns None.
#[inline]
const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
if block > local_tip {
Some(block - local_tip)
} else {
None
}
}
/// If validation fails, the response MUST contain the latest valid hash:
///
/// - The block hash of the ancestor of the invalid payload satisfying the following two
/// conditions:
/// - It is fully validated and deemed VALID
/// - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
/// - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
/// conditions are satisfied by a `PoW` block.
/// - null if client software cannot determine the ancestor of the invalid payload satisfying
/// the above conditions.
fn latest_valid_hash_for_invalid_payload(
&mut self,
parent_hash: B256,
) -> ProviderResult<Option<B256>> {
// Check if parent exists in side chain or in canonical chain.
if self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any)?.is_some() {
return Ok(Some(parent_hash))
}
// iterate over ancestors in the invalid cache
// until we encounter the first valid ancestor
let mut current_hash = parent_hash;
let mut current_header = self.invalid_headers.get(¤t_hash);
while let Some(header) = current_header {
current_hash = header.parent_hash;
current_header = self.invalid_headers.get(¤t_hash);
// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_header.is_none() &&
self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some()
{
return Ok(Some(current_hash))
}
}
Ok(None)
}
/// Prepares the invalid payload response for the given hash, checking the
/// database for the parent hash and populating the payload status with the latest valid hash
/// according to the engine api spec.
fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Ok(Some(parent)) = self.blockchain.header_by_hash_or_number(parent_hash.into()) {
if !parent.is_zero_difficulty() {
parent_hash = B256::ZERO;
}
}
let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
})
.with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
}
/// Checks if the given `check` hash points to an invalid header, inserting the given `head`
/// block into the invalid header cache if the `check` hash has a known invalid ancestor.
///
/// Returns a payload status response according to the engine API spec if the block is known to
/// be invalid.
fn check_invalid_ancestor_with_head(
&mut self,
check: B256,
head: B256,
) -> ProviderResult<Option<PayloadStatus>> {
// check if the check hash was previously marked as invalid
let Some(header) = self.invalid_headers.get(&check) else { return Ok(None) };
// populate the latest valid hash field
let status = self.prepare_invalid_response(header.parent_hash)?;
// insert the head block into the invalid header cache
self.invalid_headers.insert_with_invalid_ancestor(head, header);
Ok(Some(status))
}
/// Checks if the given `head` points to an invalid header, which requires a specific response
/// to a forkchoice update.
fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
// check if the head was previously marked as invalid
let Some(header) = self.invalid_headers.get(&head) else { return Ok(None) };
// populate the latest valid hash field
Ok(Some(self.prepare_invalid_response(header.parent_hash)?))
}
/// Record latency metrics for one call to make a block canonical
/// Takes start time of the call and result of the make canonical call
///
/// Handles cases for error, already canonical and committed blocks
fn record_make_canonical_latency(
&self,
start: Instant,
outcome: &Result<CanonicalOutcome, CanonicalError>,
) -> Duration {
let elapsed = start.elapsed();
self.metrics.make_canonical_latency.record(elapsed);
match outcome {
Ok(CanonicalOutcome::AlreadyCanonical { .. }) => {
self.metrics.make_canonical_already_canonical_latency.record(elapsed)
}
Ok(CanonicalOutcome::Committed { .. }) => {
self.metrics.make_canonical_committed_latency.record(elapsed)
}
Err(_) => self.metrics.make_canonical_error_latency.record(elapsed),
}
elapsed
}
/// Ensures that the given forkchoice state is consistent, assuming the head block has been
/// made canonical.
///
/// If the forkchoice state is consistent, this will return Ok(None). Otherwise, this will
/// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
///
/// This also updates the safe and finalized blocks in the [`CanonChainTracker`], if they are
/// consistent with the head block.
fn ensure_consistent_forkchoice_state(
&self,
state: ForkchoiceState,
) -> ProviderResult<Option<OnForkChoiceUpdated>> {
// Ensure that the finalized block, if not zero, is known and in the canonical chain
// after the head block is canonicalized.
//
// This ensures that the finalized block is consistent with the head block, i.e. the
// finalized block is an ancestor of the head block.
if !state.finalized_block_hash.is_zero() &&
!self.blockchain.is_canonical(state.finalized_block_hash)?
{
return Ok(Some(OnForkChoiceUpdated::invalid_state()))
}
// Finalized block is consistent, so update it in the canon chain tracker.
self.update_finalized_block(state.finalized_block_hash)?;
// Also ensure that the safe block, if not zero, is known and in the canonical chain
// after the head block is canonicalized.
//
// This ensures that the safe block is consistent with the head block, i.e. the safe
// block is an ancestor of the head block.
if !state.safe_block_hash.is_zero() &&
!self.blockchain.is_canonical(state.safe_block_hash)?
{
return Ok(Some(OnForkChoiceUpdated::invalid_state()))
}
// Safe block is consistent, so update it in the canon chain tracker.
self.update_safe_block(state.safe_block_hash)?;
Ok(None)
}
/// Sets the state of the canon chain tracker based to the given head.
///
/// This expects the given head to be the new canonical head.
///
/// Additionally, updates the head used for p2p handshakes.
///
/// This also updates the tracked safe and finalized blocks, and should be called before
/// returning a VALID forkchoice update response
fn update_canon_chain(&self, head: SealedHeader, update: &ForkchoiceState) -> RethResult<()> {
self.update_head(head)?;
self.update_finalized_block(update.finalized_block_hash)?;
self.update_safe_block(update.safe_block_hash)?;
Ok(())
}
/// Updates the state of the canon chain tracker based on the given head.
///
/// This expects the given head to be the new canonical head.
/// Additionally, updates the head used for p2p handshakes.
///
/// This should be called before returning a VALID forkchoice update response
#[inline]
fn update_head(&self, head: SealedHeader) -> RethResult<()> {
let mut head_block = Head {
number: head.number,
hash: head.hash(),
difficulty: head.difficulty,
timestamp: head.timestamp,
// NOTE: this will be set later
total_difficulty: Default::default(),
};
// we update the tracked header first
self.blockchain.set_canonical_head(head);
head_block.total_difficulty =
self.blockchain.header_td_by_number(head_block.number)?.ok_or_else(|| {
RethError::Provider(ProviderError::TotalDifficultyNotFound(head_block.number))
})?;
self.sync_state_updater.update_status(head_block);
Ok(())
}
/// Updates the tracked safe block if we have it
///
/// Returns an error if the block is not found.
#[inline]
fn update_safe_block(&self, safe_block_hash: B256) -> ProviderResult<()> {
if !safe_block_hash.is_zero() {
if self.blockchain.safe_block_hash()? == Some(safe_block_hash) {
// nothing to update
return Ok(())
}
let safe = self
.blockchain
.find_block_by_hash(safe_block_hash, BlockSource::Any)?
.ok_or_else(|| ProviderError::UnknownBlockHash(safe_block_hash))?;
self.blockchain.set_safe(safe.header.seal(safe_block_hash));
}
Ok(())
}
/// Updates the tracked finalized block if we have it
///
/// Returns an error if the block is not found.
#[inline]
fn update_finalized_block(&self, finalized_block_hash: B256) -> ProviderResult<()> {
if !finalized_block_hash.is_zero() {
if self.blockchain.finalized_block_hash()? == Some(finalized_block_hash) {
// nothing to update
return Ok(())
}
let finalized = self
.blockchain
.find_block_by_hash(finalized_block_hash, BlockSource::Any)?
.ok_or_else(|| ProviderError::UnknownBlockHash(finalized_block_hash))?;
self.blockchain.finalize_block(finalized.number)?;
self.blockchain.set_finalized(finalized.header.seal(finalized_block_hash));
}
Ok(())
}
/// Handler for a failed a forkchoice update due to a canonicalization error.
///
/// This will determine if the state's head is invalid, and if so, return immediately.
///
/// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap
///
/// See [`Self::on_forkchoice_updated`] and [`BlockchainTreeEngine::make_canonical`].
fn on_failed_canonical_forkchoice_update(
&mut self,
state: &ForkchoiceState,
error: CanonicalError,
) -> ProviderResult<PayloadStatus> {
debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
// check if the new head was previously invalidated, if so then we deem this FCU
// as invalid
if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash)? {
warn!(target: "consensus::engine", %error, ?state, ?invalid_ancestor, head=?state.head_block_hash, "Failed to canonicalize the head hash, head is also considered invalid");
debug!(target: "consensus::engine", head=?state.head_block_hash, current_error=%error, "Head was previously marked as invalid");
return Ok(invalid_ancestor)
}
match &error {
CanonicalError::Validation(BlockValidationError::BlockPreMerge { .. }) => {
warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: error.to_string(),
})
.with_latest_valid_hash(B256::ZERO))
}