From 927bab0164afb8615dd00c00f02f9ca0f68d1a25 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Fri, 16 Feb 2024 11:51:19 -0500 Subject: [PATCH] add the catch up process in the migration --- store/changeset.go | 104 ++++++++++++++++++++ store/migration/manager.go | 163 ++++++++++++++++++++++++++++++-- store/migration/manager_test.go | 10 +- store/root/migrate_test.go | 154 ++++++++++++++++++++++++++++++ store/root/store.go | 86 ++++++++++++++--- store/root/store_test.go | 4 +- store/store.go | 6 ++ 7 files changed, 496 insertions(+), 31 deletions(-) create mode 100644 store/root/migrate_test.go diff --git a/store/changeset.go b/store/changeset.go index 7c5d88a864a3..b7d431086223 100644 --- a/store/changeset.go +++ b/store/changeset.go @@ -1,5 +1,11 @@ package store +import ( + "bytes" + + "cosmossdk.io/store/v2/internal/encoding" +) + // KVPair defines a key-value pair with additional metadata that is used to // track writes. Deletion can be denoted by a nil value or explicitly by the // Delete field. @@ -61,3 +67,101 @@ func (cs *Changeset) Merge(other *Changeset) { cs.Pairs[storeKey] = append(cs.Pairs[storeKey], pairs...) } } + +// encodedSize returns the size of the encoded Changeset. +func (cs *Changeset) encodedSize() int { + size := encoding.EncodeUvarintSize(uint64(len(cs.Pairs))) + for storeKey, pairs := range cs.Pairs { + size += encoding.EncodeBytesSize([]byte(storeKey)) + size += encoding.EncodeUvarintSize(uint64(len(pairs))) + for _, pair := range pairs { + size += encoding.EncodeBytesSize(pair.Key) + size += encoding.EncodeBytesSize(pair.Value) + } + } + return size +} + +// Marshal returns the encoded byte representation of Changeset. +// NOTE: The Changeset is encoded as follows: +// - number of store keys (uvarint) +// - for each store key: +// -- store key (bytes) +// -- number of pairs (uvarint) +// -- for each pair: +// --- key (bytes) +// --- value (bytes) +func (cs *Changeset) Marshal() ([]byte, error) { + var buf bytes.Buffer + buf.Grow(cs.encodedSize()) + + if err := encoding.EncodeUvarint(&buf, uint64(len(cs.Pairs))); err != nil { + return nil, err + } + for storeKey, pairs := range cs.Pairs { + if err := encoding.EncodeBytes(&buf, []byte(storeKey)); err != nil { + return nil, err + } + if err := encoding.EncodeUvarint(&buf, uint64(len(pairs))); err != nil { + return nil, err + } + for _, pair := range pairs { + if err := encoding.EncodeBytes(&buf, pair.Key); err != nil { + return nil, err + } + if err := encoding.EncodeBytes(&buf, pair.Value); err != nil { + return nil, err + } + } + } + + return buf.Bytes(), nil +} + +// Unmarshal decodes the Changeset from the given byte slice. +func (cs *Changeset) Unmarshal(buf []byte) error { + storeCount, n, err := encoding.DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + + cs.Pairs = make(map[string]KVPairs, storeCount) + for i := uint64(0); i < storeCount; i++ { + storeKey, n, err := encoding.DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairCount, n, err := encoding.DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairs := make(KVPairs, pairCount) + for j := uint64(0); j < pairCount; j++ { + key, n, err := encoding.DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + value, n, err := encoding.DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairs[j] = KVPair{ + Key: key, + Value: value, + StoreKey: string(storeKey), + } + } + cs.Pairs[string(storeKey)] = pairs + } + + return nil +} diff --git a/store/migration/manager.go b/store/migration/manager.go index 48537664a3f4..21bd21f085d8 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -1,11 +1,18 @@ package migration import ( + "encoding/binary" + "fmt" + "sync" + "time" + "golang.org/x/sync/errgroup" "cosmossdk.io/log" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" ) const ( @@ -13,25 +20,68 @@ const ( defaultChannelBufferSize = 1024 // defaultStorageBufferSize is the default buffer size for the storage snapshotter. defaultStorageBufferSize = 1024 + + migrateChangesetKeyFmt = "m/cs_%x" // m/ ) +// VersionedChangeset is a pair of version and Changeset. +type VersionedChangeset struct { + Version uint64 + Changeset *store.Changeset +} + // Manager manages the migration of the whole state from store/v1 to store/v2. type Manager struct { logger log.Logger snapshotsManager *snapshots.Manager - storageSnapshotter snapshots.StorageSnapshotter - commitSnapshotter snapshots.CommitSnapshotter + stateStorage *storage.StorageStore + stateCommitment *commitment.CommitStore + + db store.RawDB + migratedVersion uint64 + mtx sync.Mutex + chChangeset <-chan *VersionedChangeset + chDone chan struct{} } // NewManager returns a new Manager. -func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager { +func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager { return &Manager{ - logger: logger, - snapshotsManager: sm, - storageSnapshotter: ss, - commitSnapshotter: cs, + logger: logger, + snapshotsManager: sm, + stateStorage: ss, + stateCommitment: sc, + db: db, + } +} + +// Start starts the whole migration process. +func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone chan struct{}) error { + m.chChangeset = chChangeset + m.chDone = chDone + + go func() { + if err := m.writeChangeset(); err != nil { + m.logger.Error("failed to write changeset", "err", err) + } + }() + + if err := m.Migrate(version); err != nil { + return fmt.Errorf("failed to migrate state: %w", err) } + + return m.Sync() +} + +// GetStateStorage returns the state storage. +func (m *Manager) GetStateStorage() *storage.StorageStore { + return m.stateStorage +} + +// GetStateCommitment returns the state commitment. +func (m *Manager) GetStateCommitment() *commitment.CommitStore { + return m.stateCommitment } // Migrate migrates the whole state at the given height to the new store/v2. @@ -49,13 +99,106 @@ func (m *Manager) Migrate(height uint64) error { eg := new(errgroup.Group) eg.Go(func() error { - return m.storageSnapshotter.Restore(height, chStorage) + return m.stateStorage.Restore(height, chStorage) }) eg.Go(func() error { defer close(chStorage) - _, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage) + _, err := m.stateCommitment.Restore(height, 0, ms, chStorage) return err }) - return eg.Wait() + if err := eg.Wait(); err != nil { + return err + } + + m.mtx.Lock() + m.migratedVersion = height + m.mtx.Unlock() + + return nil +} + +// writeChangeset writes the Changeset to the db. +func (m *Manager) writeChangeset() error { + for vc := range m.chChangeset { + cs := vc.Changeset + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, vc.Version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csBytes, err := cs.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal changeset: %w", err) + } + + batch := m.db.NewBatch() + defer batch.Close() + + if err := batch.Set(csKey, csBytes); err != nil { + return fmt.Errorf("failed to write changeset to db.Batch: %w", err) + } + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write changeset to db: %w", err) + } + } + + return nil +} + +// GetMigratedVersion returns the migrated version. +// It is used to check the migrated version in the RootStore. +func (m *Manager) GetMigratedVersion() uint64 { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.migratedVersion +} + +// Sync catches up the Changesets which are committed while the migration is in progress. +// It should be called after the migration is done. +func (m *Manager) Sync() error { + version := m.GetMigratedVersion() + if version == 0 { + return fmt.Errorf("migration is not done yet") + } + version += 1 + + for { + select { + case <-m.chDone: + return nil + default: + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csBytes, err := m.db.Get(csKey) + if err != nil { + return fmt.Errorf("failed to get changeset from db: %w", err) + } + if csBytes == nil { + // wait for the next changeset + time.Sleep(100 * time.Millisecond) + continue + } + + cs := store.NewChangeset() + if err := cs.Unmarshal(csBytes); err != nil { + return fmt.Errorf("failed to unmarshal changeset: %w", err) + } + + if err := m.stateCommitment.WriteBatch(cs); err != nil { + return fmt.Errorf("failed to write changeset to commitment: %w", err) + } + if _, err := m.stateCommitment.Commit(version); err != nil { + return fmt.Errorf("failed to commit changeset to commitment: %w", err) + } + if err := m.stateStorage.ApplyChangeset(version, cs); err != nil { + return fmt.Errorf("failed to write changeset to storage: %w", err) + } + + m.mtx.Lock() + m.migratedVersion = version + m.mtx.Unlock() + + version += 1 + } + } } diff --git a/store/migration/manager_test.go b/store/migration/manager_test.go index b02ac5db94a9..3f08d320ae2a 100644 --- a/store/migration/manager_test.go +++ b/store/migration/manager_test.go @@ -50,7 +50,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2 require.NoError(t, err) - return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore + return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore } func TestMigrateState(t *testing.T) { @@ -78,17 +78,17 @@ func TestMigrateState(t *testing.T) { for version := uint64(1); version < toVersion; version++ { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - val, err := m.commitSnapshotter.(*commitment.CommitStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + val, err := m.stateCommitment.Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) require.NoError(t, err) require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) } } } // check the latest state - val, err := m.commitSnapshotter.(*commitment.CommitStore).Get("store1", toVersion-1, []byte("key-100-1")) + val, err := m.stateCommitment.Get("store1", toVersion-1, []byte("key-100-1")) require.NoError(t, err) require.Nil(t, val) - val, err = m.commitSnapshotter.(*commitment.CommitStore).Get("store2", toVersion-1, []byte("key-100-0")) + val, err = m.stateCommitment.Get("store2", toVersion-1, []byte("key-100-0")) require.NoError(t, err) require.Nil(t, val) @@ -96,7 +96,7 @@ func TestMigrateState(t *testing.T) { for version := uint64(1); version < toVersion; version++ { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - val, err := m.storageSnapshotter.(*storage.StorageStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + val, err := m.stateStorage.Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) require.NoError(t, err) require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) } diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go new file mode 100644 index 000000000000..4809a80f28c5 --- /dev/null +++ b/store/root/migrate_test.go @@ -0,0 +1,154 @@ +package root + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" + "cosmossdk.io/store/v2/migration" + "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" + "cosmossdk.io/store/v2/storage/sqlite" +) + +var ( + storeKeys = []string{"store1", "store2", "store3"} +) + +type MigrateStoreTestSuite struct { + suite.Suite + + rootStore store.RootStore +} + +func TestMigrateStoreTestSuite(t *testing.T) { + suite.Run(t, &MigrateStoreTestSuite{}) +} + +func (s *MigrateStoreTestSuite) SetupTest() { + noopLog := log.NewNopLogger() + + mdb := dbm.NewMemDB() + multiTrees := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, noopLog, iavl.DefaultConfig()) + } + commitStore, err := commitment.NewCommitStore(multiTrees, mdb, nil, noopLog) + s.Require().NoError(err) + + // create a new storage and commitment stores + sqliteDB, err := sqlite.New(s.T().TempDir()) + s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB, nil, noopLog) + + multiTrees1 := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) + } + sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, noopLog) + s.Require().NoError(err) + + snapshotsStore, err := snapshots.NewStore(dbm.NewMemDB(), s.T().TempDir()) + s.Require().NoError(err) + snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, noopLog) + migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, noopLog) + + // assume no storage store, simulate the migration process + s.rootStore, err = New(noopLog, nil, commitStore, migrationManager, nil) + s.Require().NoError(err) +} + +func (s *MigrateStoreTestSuite) TestMigrateState() { + // apply changeset against the original store + toVersion := uint64(100) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + } + + // start the migration process + s.rootStore.StartMigration() + + // continue to apply changeset against the original store + latestVersion := uint64(0) + for version := toVersion + 1; ; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + + // check if the migration is complete + if s.rootStore.GetStateStorage() != nil { + latestVersion = version + break + } + + // add some delay to simulate the consensus process + time.Sleep(100 * time.Millisecond) + } + + // check if the migration is successful + version, err := s.rootStore.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(latestVersion, version) + + // query against the migrated store + for version := uint64(1); version <= latestVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + targetVersion := version + if version < toVersion { + targetVersion = toVersion + } + value, err := s.rootStore.Query(storeKey, targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + s.Require().NoError(err) + s.Require().NotNil(value) + } + } + } + + // prune the old versions + err = s.rootStore.Prune(latestVersion - 1) + s.Require().NoError(err) + + // apply changeset against the migrated store + for version := latestVersion + 1; version <= latestVersion+10; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + } + + version, err = s.rootStore.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(latestVersion+10, version) +} diff --git a/store/root/store.go b/store/root/store.go index 766d3d8e5f7d..36b04e105fd4 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -13,6 +13,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/metrics" + "cosmossdk.io/store/v2/migration" "cosmossdk.io/store/v2/proof" ) @@ -26,8 +27,8 @@ type Store struct { logger log.Logger initialVersion uint64 - // stateStore reflects the state storage backend - stateStore store.VersionedDatabase + // stateStorage reflects the state storage backend + stateStorage store.VersionedDatabase // stateCommitment reflects the state commitment (SC) backend stateCommitment store.Committer @@ -43,30 +44,42 @@ type Store struct { // telemetry reflects a telemetry agent responsible for emitting metrics (if any) telemetry metrics.StoreMetrics + + // Migration related fields + // migrationManager reflects the migration manager used to migrate state from v1 to v2 + migrationManager *migration.Manager + // chChangeset reflects the channel used to send the changeset to the migration manager + chChangeset chan *migration.VersionedChangeset + // chDone reflects the channel used to signal the migration manager that the migration is done + chDone chan struct{} + // isMigrating reflects whether the store is currently migrating + isMigrating bool } func New( logger log.Logger, ss store.VersionedDatabase, sc store.Committer, + mm *migration.Manager, m metrics.StoreMetrics, ) (store.RootStore, error) { return &Store{ - logger: logger.With("module", "root_store"), - initialVersion: 1, - stateStore: ss, - stateCommitment: sc, - telemetry: m, + logger: logger.With("module", "root_store"), + initialVersion: 1, + stateStorage: ss, + stateCommitment: sc, + migrationManager: mm, + telemetry: m, }, nil } // Close closes the store and resets all internal fields. Note, Close() is NOT // idempotent and should only be called once. func (s *Store) Close() (err error) { - err = errors.Join(err, s.stateStore.Close()) + err = errors.Join(err, s.stateStorage.Close()) err = errors.Join(err, s.stateCommitment.Close()) - s.stateStore = nil + s.stateStorage = nil s.stateCommitment = nil s.lastCommitInfo = nil s.commitHeader = nil @@ -106,7 +119,7 @@ func (s *Store) StateAt(v uint64) (store.ReadOnlyRootStore, error) { } func (s *Store) GetStateStorage() store.VersionedDatabase { - return s.stateStore + return s.stateStorage } func (s *Store) GetStateCommitment() store.Committer { @@ -126,7 +139,7 @@ func (s *Store) LastCommitID() (proof.CommitID, error) { // in SS might not be the latest version in the SC stores. // // Ref: https://github.com/cosmos/cosmos-sdk/issues/17314 - latestVersion, err := s.stateStore.GetLatestVersion() + latestVersion, err := s.stateStorage.GetLatestVersion() if err != nil { return proof.CommitID{}, err } @@ -162,7 +175,7 @@ func (s *Store) Query(storeKey string, version uint64, key []byte, prove bool) ( defer s.telemetry.MeasureSince(now, "root_store", "query") } - val, err := s.stateStore.Get(storeKey, version, key) + val, err := s.stateStorage.Get(storeKey, version, key) if err != nil || val == nil { // fallback to querying SC backend if not found in SS backend // @@ -255,6 +268,21 @@ func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) { } if s.workingHash == nil { + // if migration is in progress, send the changeset to the migration manager + if s.isMigrating { + // if the migration manager has already migrated to the version, close the + // channels and replace the state storage and commitment + if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { + close(s.chDone) + close(s.chChangeset) + s.isMigrating = false + s.stateStorage = s.migrationManager.GetStateStorage() + s.stateCommitment = s.migrationManager.GetStateCommitment() + } else { + s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} + } + } + if err := s.writeSC(cs); err != nil { return nil, err } @@ -290,7 +318,12 @@ func (s *Store) Commit(cs *store.Changeset) ([]byte, error) { // commit SS async eg.Go(func() error { - if err := s.stateStore.ApplyChangeset(version, cs); err != nil { + // if we're migrating, we don't want to commit to the state storage + if s.stateStorage == nil { + return nil + } + + if err := s.stateStorage.ApplyChangeset(version, cs); err != nil { return fmt.Errorf("failed to commit SS: %w", err) } @@ -326,7 +359,7 @@ func (s *Store) Prune(version uint64) error { defer s.telemetry.MeasureSince(now, "root_store", "prune") } - if err := s.stateStore.Prune(version); err != nil { + if err := s.stateStorage.Prune(version); err != nil { return fmt.Errorf("failed to prune SS store: %w", err) } @@ -337,6 +370,31 @@ func (s *Store) Prune(version uint64) error { return nil } +// StartMigration starts the migration process. It sets the migration manager +// and initializes the channels. An error is returned if migration is already in +// progress. +// NOTE: This method should only be called once after loadVersion. +func (s *Store) StartMigration() error { + if s.isMigrating { + return fmt.Errorf("migration already in progress") + } + + // buffer the changeset channel to avoid blocking + s.chChangeset = make(chan *migration.VersionedChangeset, 1) + s.chDone = make(chan struct{}) + + s.isMigrating = true + + go func() { + version := s.lastCommitInfo.Version + if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { + s.logger.Error("failed to start migration", "err", err) + } + }() + + return nil +} + // writeSC accepts a Changeset and writes that as a batch to the underlying SC // tree, which allows us to retrieve the working hash of the SC tree. Finally, // we construct a *CommitInfo and set that as lastCommitInfo. Note, this should diff --git a/store/root/store_test.go b/store/root/store_test.go index 89d319129f71..a717313c135b 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -45,7 +45,7 @@ func (s *RootStoreTestSuite) SetupTest() { sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog) s.Require().NoError(err) - rs, err := New(noopLog, ss, sc, nil) + rs, err := New(noopLog, ss, sc, nil, nil) s.Require().NoError(err) s.rootStore = rs @@ -61,7 +61,7 @@ func (s *RootStoreTestSuite) TestGetStateCommitment() { } func (s *RootStoreTestSuite) TestGetStateStorage() { - s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStore) + s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStorage) } func (s *RootStoreTestSuite) TestSetInitialVersion() { diff --git a/store/store.go b/store/store.go index a26afb4298a0..ec8d1f81ea62 100644 --- a/store/store.go +++ b/store/store.go @@ -72,6 +72,12 @@ type RootStore interface { // old versions of the RootStore by the CLI. Prune(version uint64) error + // StartMigration starts a migration process to migrate the RootStore/v1 to the + // SS and SC backends of store/v2. + // It runs in a separate goroutine and replaces the current RootStore with the + // migrated new backends once the migration is complete. + StartMigration() error + // SetMetrics sets the telemetry handler on the RootStore. SetMetrics(m metrics.Metrics)