Skip to content

Commit

Permalink
Node: Remove references to SignedObservation
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jan 27, 2025
1 parent c8b4aa5 commit 761117d
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 582 deletions.
6 changes: 0 additions & 6 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ const (
// gossipVaaSendBufferSize configures the size of the gossip network send buffer
gossipVaaSendBufferSize = 5000

// inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these. With 19 guardians, we would expect 19 messages
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
Expand Down Expand Up @@ -85,7 +81,6 @@ type G struct {
gossipAttestationSendC chan []byte
gossipVaaSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation batches.
batchObsvC channelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]]
// Finalized guardian observations aggregated across all chains
Expand Down Expand Up @@ -127,7 +122,6 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize)
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
Expand Down
2 changes: 0 additions & 2 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func GuardianOptionP2P(
p2p.WithGuardianOptions(
nodeName,
g.guardianSigner,
g.obsvC,
g.batchObsvC.writeC,
signedInC,
g.obsvReqC.writeC,
Expand Down Expand Up @@ -593,7 +592,6 @@ func GuardianOptionProcessor(networkId string) *GuardianOption {
g.setC.readC,
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvC,
g.batchObsvC.readC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
Expand Down
15 changes: 2 additions & 13 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Set up the attestation channel. ////////////////////////////////////////////////////////////////////
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil || params.batchObsvRecvC != nil {
if params.gossipAttestationSendC != nil || params.batchObsvRecvC != nil {
attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation")
logger.Info("joining the attestation topic", zap.String("topic", attestationTopic))
attestationPubsubTopic, err = ps.Join(attestationTopic)
Expand All @@ -407,7 +407,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvRecvC != nil || params.batchObsvRecvC != nil {
if params.batchObsvRecvC != nil {
logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down Expand Up @@ -883,17 +883,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedObservation:
if params.obsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
Expand Down
13 changes: 0 additions & 13 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ type (
gst *common.GuardianSetState
rootCtxCancel context.CancelFunc

// obsvRecvC is optional and can be set with `WithSignedObservationListener`.
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvRecvC is optional and can be set with `WithSignedObservationBatchListener`.
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

Expand Down Expand Up @@ -117,14 +114,6 @@ func WithProcessorFeaturesFunc(processorFeaturesFunc func() string) RunOpt {
}
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages.
func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvRecvC = obsvRecvC
return nil
}
}

// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages.
func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt {
return func(p *RunParams) error {
Expand Down Expand Up @@ -177,7 +166,6 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
func WithGuardianOptions(
nodeName string,
guardianSigner guardiansigner.GuardianSigner,
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqRecvC chan<- *gossipv1.ObservationRequest,
Expand All @@ -201,7 +189,6 @@ func WithGuardianOptions(
return func(p *RunParams) error {
p.nodeName = nodeName
p.guardianSigner = guardianSigner
p.obsvRecvC = obsvRecvC
p.batchObsvRecvC = batchObsvRecvC
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
p.obsvReqRecvC = obsvReqRecvC
Expand Down
3 changes: 0 additions & 3 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, guardianSigner)

obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
Expand Down Expand Up @@ -170,7 +169,6 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
WithGuardianOptions(
nodeName,
guardianSigner,
obsvC,
batchObsvC,
signedInC,
obsvReqC,
Expand All @@ -195,7 +193,6 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, params)
assert.Equal(t, nodeName, params.nodeName)
assert.Equal(t, obsvC, params.obsvRecvC)
assert.Equal(t, signedInC, params.signedIncomingVaaRecvC)
assert.Equal(t, obsvReqC, params.obsvReqRecvC)
assert.Equal(t, gossipControlSendC, params.gossipControlSendC)
Expand Down
4 changes: 0 additions & 4 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const LOCAL_P2P_PORTRANGE_START = 11000

type G struct {
// arguments passed to p2p.New
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
batchObsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
Expand Down Expand Up @@ -66,7 +65,6 @@ func NewG(t *testing.T, nodeName string) *G {
_, rootCtxCancel := context.WithCancel(context.Background())

g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
batchObsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
Expand All @@ -91,7 +89,6 @@ func NewG(t *testing.T, nodeName string) *G {
name := g.nodeName
t.Logf("[%s] consuming\n", name)
select {
case <-g.obsvC:
case <-g.obsvReqC:
case <-g.signedInC:
case <-g.signedGovCfg:
Expand Down Expand Up @@ -182,7 +179,6 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
WithGuardianOptions(
g.nodeName,
g.guardianSigner,
g.obsvC,
g.batchObsvC,
g.signedInC,
g.obsvReqC,
Expand Down
14 changes: 1 addition & 13 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,6 @@ func (p *Processor) handleBatchObservation(m *node_common.MsgWithTimeStamp[gossi
batchObservationTotalDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
}

// handleObservation processes a remote VAA observation.
func (p *Processor) handleObservation(m *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) {
obs := gossipv1.Observation{
Hash: m.Msg.Hash,
Signature: m.Msg.Signature,
TxHash: m.Msg.TxHash,
MessageId: m.Msg.MessageId,
}
p.handleSingleObservation(m.Msg.Addr, &obs)
observationTotalDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
}

// handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, and assembles and submits a valid VAA if possible.
func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation) {
// SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
Expand Down Expand Up @@ -197,7 +185,7 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation
return
}

// Hooray! Now, we have verified all fields on SignedObservation and know that it includes
// Hooray! Now, we have verified all fields on the observation and know that it includes
// a valid signature by an active guardian. We still don't fully trust them, as they may be
// byzantine, but now we know who we're dealing with.

Expand Down
22 changes: 0 additions & 22 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ type Processor struct {
// gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p
gossipVaaSendC chan<- []byte

// obsvC is a channel of inbound decoded observations from p2p
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvC is a channel of inbound decoded batches of observations from p2p
batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

Expand Down Expand Up @@ -166,20 +163,6 @@ type updateVaaEntry struct {
}

var (
observationChanDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_channel_delay_us",
Help: "Latency histogram for delay of signed observations in channel",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
})

observationTotalDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_total_delay_us",
Help: "Latency histogram for total time to process signed observations",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
})

batchObservationChanDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_batch_observation_channel_delay_us",
Expand Down Expand Up @@ -225,7 +208,6 @@ func NewProcessor(
setC <-chan *common.GuardianSet,
gossipAttestationSendC chan<- []byte,
gossipVaaSendC chan<- []byte,
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
obsvReqSendC chan<- *gossipv1.ObservationRequest,
signedInC <-chan *gossipv1.SignedVAAWithQuorum,
Expand All @@ -243,7 +225,6 @@ func NewProcessor(
setC: setC,
gossipAttestationSendC: gossipAttestationSendC,
gossipVaaSendC: gossipVaaSendC,
obsvC: obsvC,
batchObsvC: batchObsvC,
obsvReqSendC: obsvReqSendC,
signedInC: signedInC,
Expand Down Expand Up @@ -319,9 +300,6 @@ func (p *Processor) Run(ctx context.Context) error {
return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
}
p.handleMessage(ctx, k)
case m := <-p.obsvC:
observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleObservation(m)
case m := <-p.batchObsvC:
batchObservationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleBatchObservation(m)
Expand Down
Loading

0 comments on commit 761117d

Please sign in to comment.