Skip to content

Commit

Permalink
use msgindex as epoch=>tipsetkey index to speed up eth_getBlockByNumb…
Browse files Browse the repository at this point in the history
…er queries
  • Loading branch information
i-norden committed May 26, 2023
1 parent 61ca075 commit 0c47a8e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 5 deletions.
20 changes: 20 additions & 0 deletions chain/index/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type MsgIndex interface {
Close() error
}

// TipsetIndex is the interface to the tipset index
type TipsetIndex interface {
// GetTipsetCID returns the tipset cid for the given epoch
GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error)
// Close closes the index
Close() error
}

type dummyMsgIndex struct{}

func (dummyMsgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) {
Expand All @@ -44,3 +52,15 @@ func (dummyMsgIndex) Close() error {
}

var DummyMsgIndex MsgIndex = dummyMsgIndex{}

type dummyTipsetIndex struct{}

func (dummyTipsetIndex) GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error) {
return nil, ErrNotFound
}

func (dummyTipsetIndex) Close() error {
return nil
}

var DummyTipsetIndex TipsetIndex = dummyTipsetIndex{}
28 changes: 27 additions & 1 deletion chain/index/msgindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ const (
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
dbqGetTipsetByEpoch = "SELECT tipset_cid FROM messages WHERE epoch = ? LIMIT 1"

// reconciliation
dbqCountMessages = "SELECT COUNT(*) FROM messages"
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
Expand Down Expand Up @@ -546,6 +545,33 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.C
return msgInfo, xtsCid, nil
}

func (x *msgIndex) GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()

if x.closed {
return nil, ErrClosed
}

var tipset string

row := x.selectTipsetStmt.QueryRow(epoch)
err := row.Scan(&tipset)
switch {
case err == sql.ErrNoRows:
return nil, ErrNotFound

case err != nil:
return nil, xerrors.Errorf("error querying msgindex database: %w", err)
}

tipsetCid, err := cid.Decode(tipset)
if err != nil {
return nil, xerrors.Errorf("error decoding tipset cid: %w", err)
}
return &tipsetCid, nil
}

func (x *msgIndex) Close() error {
x.closeLk.Lock()
defer x.closeLk.Unlock()
Expand Down
30 changes: 27 additions & 3 deletions chain/store/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"strconv"
"sync"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/types"
)

Expand All @@ -30,16 +31,20 @@ type ChainIndex struct {
indexCacheLk sync.Mutex
indexCache map[types.TipSetKey]*lbEntry

loadTipSet loadTipSetFunc
loadTipSet loadTipSetFunc
lookupTipSet lookupTipSetCIDFunc

skipLength abi.ChainEpoch
}
type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error)

func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
type lookupTipSetCIDFunc func(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error)

func NewChainIndex(lts loadTipSetFunc, luts lookupTipSetCIDFunc) *ChainIndex {
return &ChainIndex{
indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize),
loadTipSet: lts,
lookupTipSet: luts,
skipLength: 20,
}
}
Expand All @@ -50,6 +55,25 @@ type lbEntry struct {
}

func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
if ci.lookupTipSet != nil {
tsc, err := ci.lookupTipSet(ctx, to)
switch {
case err == index.ErrNotFound:
// fall through
case err != nil:
return nil, xerrors.Errorf("failed to load tipset cid: %w", err)
default:
ts, err := ci.loadTipSet(ctx, types.NewTipSetKey(*tsc))
if err != nil {
return nil, xerrors.Errorf("failed to load tipset: %w", err)
}
// make sure that the tipset is correct!
if ts.Height() == to {
return ts, nil
}
// otherwise, fall through
}
}
if from.Height()-to <= ci.skipLength {
return ci.walkBack(ctx, from, to)
}
Expand Down
3 changes: 2 additions & 1 deletion chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dsto
evtTypeHeadChange: j.RegisterEventType("sync", "head_change"),
}

ci := NewChainIndex(cs.LoadTipSet)
// TODO: what's the best way to get the message (tipset) index loaded into here?
ci := NewChainIndex(cs.LoadTipSet, nil)

cs.cindex = ci

Expand Down

0 comments on commit 0c47a8e

Please sign in to comment.