Skip to content

Commit

Permalink
add the catch up process in the migration
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope committed Feb 16, 2024
1 parent 3ecae08 commit 927bab0
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 31 deletions.
104 changes: 104 additions & 0 deletions store/changeset.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package store

import (
"bytes"

"cosmossdk.io/store/v2/internal/encoding"
)

// KVPair defines a key-value pair with additional metadata that is used to
// track writes. Deletion can be denoted by a nil value or explicitly by the
// Delete field.
Expand Down Expand Up @@ -61,3 +67,101 @@ func (cs *Changeset) Merge(other *Changeset) {
cs.Pairs[storeKey] = append(cs.Pairs[storeKey], pairs...)
}
}

// encodedSize returns the size of the encoded Changeset.
func (cs *Changeset) encodedSize() int {
size := encoding.EncodeUvarintSize(uint64(len(cs.Pairs)))
for storeKey, pairs := range cs.Pairs {
size += encoding.EncodeBytesSize([]byte(storeKey))
size += encoding.EncodeUvarintSize(uint64(len(pairs)))
for _, pair := range pairs {
size += encoding.EncodeBytesSize(pair.Key)
size += encoding.EncodeBytesSize(pair.Value)
}
}
return size
}

// Marshal returns the encoded byte representation of Changeset.
// NOTE: The Changeset is encoded as follows:
// - number of store keys (uvarint)
// - for each store key:
// -- store key (bytes)
// -- number of pairs (uvarint)
// -- for each pair:
// --- key (bytes)
// --- value (bytes)
func (cs *Changeset) Marshal() ([]byte, error) {
var buf bytes.Buffer
buf.Grow(cs.encodedSize())

if err := encoding.EncodeUvarint(&buf, uint64(len(cs.Pairs))); err != nil {
return nil, err
}
for storeKey, pairs := range cs.Pairs {
if err := encoding.EncodeBytes(&buf, []byte(storeKey)); err != nil {
return nil, err
}
if err := encoding.EncodeUvarint(&buf, uint64(len(pairs))); err != nil {
return nil, err
}
for _, pair := range pairs {
if err := encoding.EncodeBytes(&buf, pair.Key); err != nil {
return nil, err
}
if err := encoding.EncodeBytes(&buf, pair.Value); err != nil {
return nil, err
}
}
}

return buf.Bytes(), nil
}

// Unmarshal decodes the Changeset from the given byte slice.
func (cs *Changeset) Unmarshal(buf []byte) error {
storeCount, n, err := encoding.DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]

cs.Pairs = make(map[string]KVPairs, storeCount)
for i := uint64(0); i < storeCount; i++ {
storeKey, n, err := encoding.DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]

pairCount, n, err := encoding.DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]

pairs := make(KVPairs, pairCount)
for j := uint64(0); j < pairCount; j++ {
key, n, err := encoding.DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]

value, n, err := encoding.DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]

pairs[j] = KVPair{
Key: key,
Value: value,
StoreKey: string(storeKey),
}
}
cs.Pairs[string(storeKey)] = pairs
}

return nil
}
163 changes: 153 additions & 10 deletions store/migration/manager.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,87 @@
package migration

import (
"encoding/binary"
"fmt"
"sync"
"time"

"golang.org/x/sync/errgroup"

"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
)

const (
// defaultChannelBufferSize is the default buffer size for the migration stream.
defaultChannelBufferSize = 1024
// defaultStorageBufferSize is the default buffer size for the storage snapshotter.
defaultStorageBufferSize = 1024

migrateChangesetKeyFmt = "m/cs_%x" // m/<version>
)

// VersionedChangeset is a pair of version and Changeset.
type VersionedChangeset struct {
Version uint64
Changeset *store.Changeset
}

// Manager manages the migration of the whole state from store/v1 to store/v2.
type Manager struct {
logger log.Logger
snapshotsManager *snapshots.Manager

storageSnapshotter snapshots.StorageSnapshotter
commitSnapshotter snapshots.CommitSnapshotter
stateStorage *storage.StorageStore
stateCommitment *commitment.CommitStore

db store.RawDB
migratedVersion uint64
mtx sync.Mutex
chChangeset <-chan *VersionedChangeset
chDone chan struct{}
}

// NewManager returns a new Manager.
func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager {
func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager {
return &Manager{
logger: logger,
snapshotsManager: sm,
storageSnapshotter: ss,
commitSnapshotter: cs,
logger: logger,
snapshotsManager: sm,
stateStorage: ss,
stateCommitment: sc,
db: db,
}
}

// Start starts the whole migration process.
func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone chan struct{}) error {
m.chChangeset = chChangeset
m.chDone = chDone

go func() {
if err := m.writeChangeset(); err != nil {
m.logger.Error("failed to write changeset", "err", err)
}
}()

if err := m.Migrate(version); err != nil {
return fmt.Errorf("failed to migrate state: %w", err)
}

return m.Sync()
}

// GetStateStorage returns the state storage.
func (m *Manager) GetStateStorage() *storage.StorageStore {
return m.stateStorage
}

// GetStateCommitment returns the state commitment.
func (m *Manager) GetStateCommitment() *commitment.CommitStore {
return m.stateCommitment
}

// Migrate migrates the whole state at the given height to the new store/v2.
Expand All @@ -49,13 +99,106 @@ func (m *Manager) Migrate(height uint64) error {

eg := new(errgroup.Group)
eg.Go(func() error {
return m.storageSnapshotter.Restore(height, chStorage)
return m.stateStorage.Restore(height, chStorage)
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage)
_, err := m.stateCommitment.Restore(height, 0, ms, chStorage)
return err
})

return eg.Wait()
if err := eg.Wait(); err != nil {
return err
}

m.mtx.Lock()
m.migratedVersion = height
m.mtx.Unlock()

return nil
}

// writeChangeset writes the Changeset to the db.
func (m *Manager) writeChangeset() error {
for vc := range m.chChangeset {
cs := vc.Changeset
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, vc.Version)
csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf))
csBytes, err := cs.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal changeset: %w", err)
}

batch := m.db.NewBatch()
defer batch.Close()

if err := batch.Set(csKey, csBytes); err != nil {
return fmt.Errorf("failed to write changeset to db.Batch: %w", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write changeset to db: %w", err)
}
}

return nil
}

// GetMigratedVersion returns the migrated version.
// It is used to check the migrated version in the RootStore.
func (m *Manager) GetMigratedVersion() uint64 {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.migratedVersion
}

// Sync catches up the Changesets which are committed while the migration is in progress.
// It should be called after the migration is done.
func (m *Manager) Sync() error {
version := m.GetMigratedVersion()
if version == 0 {
return fmt.Errorf("migration is not done yet")
}
version += 1

for {
select {
case <-m.chDone:
return nil
default:
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, version)
csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf))
csBytes, err := m.db.Get(csKey)
if err != nil {
return fmt.Errorf("failed to get changeset from db: %w", err)
}
if csBytes == nil {
// wait for the next changeset
time.Sleep(100 * time.Millisecond)
continue
}

cs := store.NewChangeset()
if err := cs.Unmarshal(csBytes); err != nil {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}

if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
}
if err := m.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to write changeset to storage: %w", err)
}

m.mtx.Lock()
m.migratedVersion = version
m.mtx.Unlock()

version += 1
}
}
}
10 changes: 5 additions & 5 deletions store/migration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
require.NoError(t, err)

return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
}

func TestMigrateState(t *testing.T) {
Expand Down Expand Up @@ -78,25 +78,25 @@ func TestMigrateState(t *testing.T) {
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
val, err := m.stateCommitment.Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
// check the latest state
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get("store1", toVersion-1, []byte("key-100-1"))
val, err := m.stateCommitment.Get("store1", toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Nil(t, val)
val, err = m.commitSnapshotter.(*commitment.CommitStore).Get("store2", toVersion-1, []byte("key-100-0"))
val, err = m.stateCommitment.Get("store2", toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)

// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.storageSnapshotter.(*storage.StorageStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
val, err := m.stateStorage.Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
Expand Down
Loading

0 comments on commit 927bab0

Please sign in to comment.