Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/stf): delayed marshalling of typed event #22684

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct {
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
chainID string
indexedEvents map[string]struct{}
cfg Config
chainID string
indexedABCIEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand Down Expand Up @@ -105,9 +105,12 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
return nil, err
}

events, err := intoABCIEvents(resp.Events, c.indexedEvents)
if err != nil {
return nil, err
events := make([]abci.Event, 0)
if !c.cfg.AppTomlConfig.DisableABCIEvents {
events, err = intoABCIEvents(resp.Events, c.indexedABCIEvents)
if err != nil {
return nil, err
}
}

cometResp := &abciproto.CheckTxResponse{
Expand All @@ -116,6 +119,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
GasUsed: uint64ToInt64(resp.GasUsed),
Events: events,
}

if resp.Error != nil {
space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace)
cometResp.Code = code
Expand Down Expand Up @@ -557,7 +561,7 @@ func (c *consensus[T]) FinalizeBlock(
return nil, err
}

return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
return finalizeBlockResponse(resp, cp, appHash, c.indexedABCIEvents, c.cfg.AppTomlConfig.DisableABCIEvents, c.cfg.AppTomlConfig.Trace)
}

func (c *consensus[T]) internalFinalizeBlock(
Expand Down
24 changes: 13 additions & 11 deletions server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Config struct {
func DefaultAppTomlConfig() *AppTomlConfig {
return &AppTomlConfig{
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
HaltHeight: 0,
HaltTime: 0,
Address: "tcp://127.0.0.1:26658",
Expand All @@ -28,22 +27,25 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
IndexABCIEvents: make([]string, 0),
DisableABCIEvents: false,
}
}

type AppTomlConfig struct {
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables the ABCI event indexing. It is useful when relying on the indexer for event indexing."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
return nil, errorsmod.Wrap(err, "failed to simulate tx")
}

bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
bz, err := intoABCISimulationResponse(txResult, c.indexedABCIEvents)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
}
Expand Down
10 changes: 5 additions & 5 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cometbft

import (
"context"
"cosmossdk.io/server/v2/cometbft/oe"
"crypto/sha256"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -35,6 +34,7 @@ import (
"cosmossdk.io/server/v2/appmanager"
cometlog "cosmossdk.io/server/v2/cometbft/log"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/oe"
"cosmossdk.io/server/v2/cometbft/types"
"cosmossdk.io/store/v2/snapshots"

Expand Down Expand Up @@ -122,9 +122,9 @@ func New[T transaction.Tx](
}
}

indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents))
for _, e := range srv.config.AppTomlConfig.IndexEvents {
indexEvents[e] = struct{}{}
indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents))
for _, e := range srv.config.AppTomlConfig.IndexABCIEvents {
indexedABCIEvents[e] = struct{}{}
Comment on lines +125 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Inconsistent event indexing variable names across the codebase

The codebase shows two different naming conventions being used:

  • indexEvents in baseapp/ directory
  • indexedABCIEvents in server/v2/cometbft/ directory

Both variables serve the same purpose of indexing ABCI events, but the inconsistent naming could lead to confusion. Consider:

  • Unifying the naming convention across the codebase
  • Using indexedABCIEvents consistently as it's more descriptive and explicit about its ABCI-specific purpose
🔗 Analysis chain

LGTM: Efficient map initialization with pre-allocated capacity

The map initialization with pre-allocated capacity is a good performance optimization. The renamed variable better reflects its ABCI-specific purpose.

Let's verify the consistency of this change across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the consistent usage of indexedABCIEvents across the codebase
# and ensure no old references to indexEvents remain

# Check for any remaining references to the old name
rg "indexEvents" --type go

# Check the usage pattern of the new name
rg "indexedABCIEvents" --type go -A 2 -B 2

Length of output: 2687

}

ss := store.GetStateStorage().(snapshots.StorageSnapshotter)
Expand Down Expand Up @@ -185,7 +185,7 @@ func New[T transaction.Tx](
checkTxHandler: srv.serverOptions.CheckTxHandler,
extendVote: srv.serverOptions.ExtendVoteHandler,
chainID: chainID,
indexedEvents: indexEvents,
indexedABCIEvents: indexedABCIEvents,
initialHeight: 0,
queryHandlersMap: queryHandlers,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
Expand Down
94 changes: 60 additions & 34 deletions server/v2/cometbft/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package cometbft

import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"math"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -70,16 +73,20 @@ func finalizeBlockResponse(
cp *cmtproto.ConsensusParams,
appHash []byte,
indexSet map[string]struct{},
disableABCIEvents,
debug bool,
) (*abci.FinalizeBlockResponse, error) {
allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...)
events := make([]abci.Event, 0)

events, err := intoABCIEvents(allEvents, indexSet)
if err != nil {
return nil, err
if !disableABCIEvents {
var err error
events, err = intoABCIEvents(append(in.BeginBlockEvents, in.EndBlockEvents...), indexSet)
if err != nil {
return nil, err
}
}

txResults, err := intoABCITxResults(in.TxResults, indexSet, debug)
txResults, err := intoABCITxResults(in.TxResults, indexSet, disableABCIEvents, debug)
if err != nil {
return nil, err
}
Expand All @@ -91,6 +98,7 @@ func finalizeBlockResponse(
AppHash: appHash,
ConsensusParamUpdates: cp,
}

return resp, nil
}

Expand All @@ -108,12 +116,21 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali
return valsetUpdates
}

func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) {
func intoABCITxResults(
results []server.TxResult,
indexSet map[string]struct{},
disableABCIEvents, debug bool,
) ([]*abci.ExecTxResult, error) {
res := make([]*abci.ExecTxResult, len(results))
for i := range results {
events, err := intoABCIEvents(results[i].Events, indexSet)
if err != nil {
return nil, err
var err error
events := make([]abci.Event, 0)

if !disableABCIEvents {
events, err = intoABCIEvents(results[i].Events, indexSet)
if err != nil {
return nil, err
}
}

res[i] = responseExecTxResultWithEvents(
Expand All @@ -132,16 +149,42 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(events))
for i, e := range events {
attributes, err := e.Attributes()
if err != nil {
return nil, err
attrs := make([]event.Attribute, 0)

if e.Data != nil {
resp, err := e.Data()
if err != nil {
return nil, fmt.Errorf("failed to marshal event data: %w", err)
}

var attrMap map[string]json.RawMessage
if err := json.Unmarshal(resp, &attrMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data: %w", err)
}

// sort the keys to ensure the order is always the same
keys := slices.Sorted(maps.Keys(attrMap))
for _, k := range keys {
v := attrMap[k]
attrs = append(attrs, event.Attribute{
Key: k,
Value: string(v),
})
}
} else {
var err error
attrs, err = e.Attributes()
if err != nil {
return nil, err
}
}

abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
Attributes: make([]abci.EventAttribute, len(attrs)),
}

for j, attr := range attributes {
for j, attr := range attrs {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Expand All @@ -154,26 +197,9 @@ func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.
}

func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(txRes.Events))
for i, e := range txRes.Events {
attributes, err := e.Attributes()
if err != nil {
return nil, err
}
abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
}

for j, attr := range attributes {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
}
}
abciEvents, err := intoABCIEvents(txRes.Events, indexSet)
if err != nil {
return nil, err
}

msgResponses := make([]*gogoany.Any, len(txRes.Resp))
Expand Down
49 changes: 12 additions & 37 deletions server/v2/stf/core_event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"maps"
"slices"

"github.com/cosmos/gogoproto/jsonpb"
gogoproto "github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -39,12 +37,20 @@ type eventManager struct {
// Emit emits an typed event that is defined in the protobuf file.
// In the future these events will be added to consensus.
func (em *eventManager) Emit(tev transaction.Msg) error {
res, err := TypedEventToEvent(tev)
if err != nil {
return err
event := event.Event{
Type: gogoproto.MessageName(tev),
Data: func() (json.RawMessage, error) {
buf := new(bytes.Buffer)
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
if err := jm.Marshal(buf, tev); err != nil {
return nil, err
}

return buf.Bytes(), nil
},
}

em.executionContext.events = append(em.executionContext.events, res)
em.executionContext.events = append(em.executionContext.events, event)
return nil
}

Expand All @@ -53,34 +59,3 @@ func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error
em.executionContext.events = append(em.executionContext.events, event.NewEvent(eventType, attrs...))
return nil
}

// TypedEventToEvent takes typed event and converts to Event object
func TypedEventToEvent(tev transaction.Msg) (event.Event, error) {
evtType := gogoproto.MessageName(tev)
buf := new(bytes.Buffer)
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
if err := jm.Marshal(buf, tev); err != nil {
return event.Event{}, err
}

var attrMap map[string]json.RawMessage
if err := json.Unmarshal(buf.Bytes(), &attrMap); err != nil {
return event.Event{}, err
}

// sort the keys to ensure the order is always the same
keys := slices.Sorted(maps.Keys(attrMap))
attrs := make([]event.Attribute, 0, len(attrMap))
for _, k := range keys {
v := attrMap[k]
attrs = append(attrs, event.Attribute{
Key: k,
Value: string(v),
})
}

return event.Event{
Type: evtType,
Attributes: func() ([]event.Attribute, error) { return attrs, nil },
}, nil
}
Loading
Loading