Skip to content

Commit

Permalink
use msgindex as epoch=>tipsetkey index to speed up eth_getBlockByNumb…
Browse files Browse the repository at this point in the history
…er queries
  • Loading branch information
i-norden committed May 25, 2023
1 parent 41203f7 commit 792ea07
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 4 deletions.
20 changes: 20 additions & 0 deletions chain/index/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type MsgIndex interface {
Close() error
}

// TipsetIndex is the interface to the tipset index
type TipsetIndex interface {
// GetTipsetCID returns the tipset cid for the given epoch
GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error)
// Close closes the index
Close() error
}

type dummyMsgIndex struct{}

func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
Expand All @@ -43,3 +51,15 @@ func (dummyMsgIndex) Close() error {
}

var DummyMsgIndex MsgIndex = dummyMsgIndex{}

type dummyTipsetIndex struct{}

func (dummyTipsetIndex) GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error) {
return nil, ErrNotFound
}

func (dummyTipsetIndex) Close() error {
return nil
}

var DummyTipsetIndex TipsetIndex = dummyTipsetIndex{}
40 changes: 39 additions & 1 deletion chain/index/msgindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ var dbDefs = []string{
epoch INTEGER NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
`,
`,
`CREATE INDEX IF NOT EXISTS epochs ON messages (epoch)
`, // we are repurposing the already existing epoch => tipset_cid mapping in this table
// but it is possible that it makes more sense to move this into a separate table?
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,
Expand All @@ -44,6 +47,7 @@ const (
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
dbqGetTipsetCidByEpoch = "SELECT tipset_cid FROM messages WHERE epoch = ?"
// reconciliation
dbqCountMessages = "SELECT COUNT(*) FROM messages"
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
Expand Down Expand Up @@ -75,6 +79,7 @@ type msgIndex struct {

db *sql.DB
selectMsgStmt *sql.Stmt
selectTipsetStmt *sql.Stmt
insertMsgStmt *sql.Stmt
deleteTipSetStmt *sql.Stmt

Expand Down Expand Up @@ -362,6 +367,12 @@ func (x *msgIndex) prepareStatements() error {
}
x.deleteTipSetStmt = stmt

stmt, err = x.db.Prepare(dbqGetTipsetCidByEpoch)
if err != nil {
return xerrors.Errorf("prepare selectTipSetStmt: %w", err)
}
x.selectTipsetStmt = stmt

return nil
}

Expand Down Expand Up @@ -521,6 +532,33 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
}, nil
}

func (x *msgIndex) GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()

if x.closed {
return nil, ErrClosed
}

var tipset string

row := x.selectTipsetStmt.QueryRow(epoch)
err := row.Scan(&tipset)
switch {
case err == sql.ErrNoRows:
return nil, ErrNotFound

case err != nil:
return nil, xerrors.Errorf("error querying msgindex database: %w", err)
}

tipsetCid, err := cid.Decode(tipset)
if err != nil {
return nil, xerrors.Errorf("error decoding tipset cid: %w", err)
}
return &tipsetCid, nil
}

func (x *msgIndex) Close() error {
x.closeLk.Lock()
defer x.closeLk.Unlock()
Expand Down
30 changes: 28 additions & 2 deletions chain/store/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"os"
"strconv"

"github.com/ipfs/go-cid"
"github.com/puzpuzpuz/xsync/v2"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/shardedmutex"
)
Expand All @@ -34,21 +37,25 @@ type ChainIndex struct {

fillCacheLock shardedmutex.ShardedMutexFor[types.TipSetKey]

loadTipSet loadTipSetFunc
loadTipSet loadTipSetFunc
lookupTipSet lookupTipSetCIDFunc

skipLength abi.ChainEpoch
}
type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error)

type lookupTipSetCIDFunc func(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error)

func maphashTSK(s maphash.Seed, tsk types.TipSetKey) uint64 {
return maphash.Bytes(s, tsk.Bytes())
}

func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
func NewChainIndex(lts loadTipSetFunc, luts lookupTipSetCIDFunc) *ChainIndex {
return &ChainIndex{
indexCache: xsync.NewTypedMapOfPresized[types.TipSetKey, *lbEntry](maphashTSK, DefaultChainIndexCacheSize),
fillCacheLock: shardedmutex.NewFor(maphashTSK, 32),
loadTipSet: lts,
lookupTipSet: luts,
skipLength: 20,
}
}
Expand All @@ -59,6 +66,25 @@ type lbEntry struct {
}

func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
if ci.lookupTipSet != nil {
tsc, err := ci.lookupTipSet(ctx, to)
switch {
case err == index.ErrNotFound:
// fall through
case err != nil:
return nil, xerrors.Errorf("failed to load tipset cid: %w", err)
default:
ts, err := ci.loadTipSet(ctx, types.NewTipSetKey(*tsc))
if err != nil {
return nil, xerrors.Errorf("failed to load tipset: %w", err)
}
// make sure that the tipset is correct!
if ts.Height() == to {
return ts, nil
}
// otherwise, fall through
}
}
if from.Height()-to <= ci.skipLength {
return ci.walkBack(ctx, from, to)
}
Expand Down
3 changes: 2 additions & 1 deletion chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dsto
evtTypeHeadChange: j.RegisterEventType("sync", "head_change"),
}

ci := NewChainIndex(cs.LoadTipSet)
// TODO: what's the best way to get the message (tipset) index loaded into here?
ci := NewChainIndex(cs.LoadTipSet, nil)

cs.cindex = ci

Expand Down

0 comments on commit 792ea07

Please sign in to comment.