Skip to content

Commit

Permalink
feat: use weighted round-robin for uploader selection (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbreithecker authored Jul 6, 2023
1 parent 4ff6ac3 commit 4eb43ed
Show file tree
Hide file tree
Showing 19 changed files with 1,377 additions and 204 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### Features

- (ibc) [#30](https://github.com/KYVENetwork/chain/pull/30) Integrate [Packet Forward Middleware](https://github.com/strangelove-ventures/packet-forward-middleware).
- (`x/bundles`) [#99](https://github.com/KYVENetwork/chain/pull/99) Use weighted round-robin approach for uploader selection.

### Improvements

Expand Down
16 changes: 16 additions & 0 deletions proto/kyve/bundles/v1beta1/bundles.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,19 @@ message FinalizedAt {
// timestamp ...
uint64 timestamp = 2;
}

// RoundRobinSingleValidatorProgress ...
message RoundRobinSingleValidatorProgress {
// address ...
string address = 1;
// progress ...
int64 progress = 2;
}

// RoundRobinProgress ...
message RoundRobinProgress {
// pool_id ...
uint64 pool_id = 1;
// progress_list ...
repeated RoundRobinSingleValidatorProgress progress_list = 2;
}
2 changes: 2 additions & 0 deletions proto/kyve/bundles/v1beta1/genesis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ message GenesisState {
repeated BundleProposal bundle_proposal_list = 2 [(gogoproto.nullable) = false];
// finalized_bundle_list ...
repeated FinalizedBundle finalized_bundle_list = 3 [(gogoproto.nullable) = false];
// round_robin_progress_list ...
repeated RoundRobinProgress round_robin_progress_list = 4 [(gogoproto.nullable) = false];
}
6 changes: 6 additions & 0 deletions x/bundles/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func InitGenesis(ctx sdk.Context, k keeper.Keeper, genState types.GenesisState)
for _, entry := range genState.FinalizedBundleList {
k.SetFinalizedBundle(ctx, entry)
}

for _, entry := range genState.RoundRobinProgressList {
k.SetRoundRobinProgress(ctx, entry)
}
}

// ExportGenesis returns the capability module's exported genesis.
Expand All @@ -30,5 +34,7 @@ func ExportGenesis(ctx sdk.Context, k keeper.Keeper) *types.GenesisState {

genesis.FinalizedBundleList = k.GetAllFinalizedBundles(ctx)

genesis.RoundRobinProgressList = k.GetAllRoundRobinProgress(ctx)

return genesis
}
41 changes: 41 additions & 0 deletions x/bundles/keeper/getters_round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package keeper

import (
"github.com/KYVENetwork/chain/x/bundles/types"
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// SetRoundRobinProgress stores the round-robin progress for a pool
func (k Keeper) SetRoundRobinProgress(ctx sdk.Context, roundRobinProgress types.RoundRobinProgress) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.RoundRobinProgressPrefix)
b := k.cdc.MustMarshal(&roundRobinProgress)
store.Set(types.RoundRobinProgressKey(roundRobinProgress.PoolId), b)
}

// GetRoundRobinProgress returns the round-robin progress for a pool
func (k Keeper) GetRoundRobinProgress(ctx sdk.Context, poolId uint64) (val types.RoundRobinProgress, found bool) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.RoundRobinProgressPrefix)

b := store.Get(types.RoundRobinProgressKey(poolId))
if b == nil {
return val, false
}

k.cdc.MustUnmarshal(b, &val)
return val, true
}

// GetAllRoundRobinProgress returns the round-robin progress of all pools
func (k Keeper) GetAllRoundRobinProgress(ctx sdk.Context) (list []types.RoundRobinProgress) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.RoundRobinProgressPrefix)
iterator := sdk.KVStorePrefixIterator(store, []byte{})

for ; iterator.Valid(); iterator.Next() {
var val types.RoundRobinProgress
k.cdc.MustUnmarshal(iterator.Value(), &val)
list = append(list, val)
}

return
}
2 changes: 1 addition & 1 deletion x/bundles/keeper/keeper_suite_dropped_bundles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("dropped bundles", Ordered, func() {
Expect(bundleProposal.PoolId).To(Equal(uint64(0)))
Expect(bundleProposal.StorageId).To(BeEmpty())
Expect(bundleProposal.Uploader).To(BeEmpty())
Expect(bundleProposal.NextUploader).To(Equal(i.STAKER_1))
Expect(bundleProposal.NextUploader).To(Equal(i.STAKER_0))
Expect(bundleProposal.DataSize).To(BeZero())
Expect(bundleProposal.DataHash).To(BeEmpty())
Expect(bundleProposal.BundleSize).To(BeZero())
Expand Down
2 changes: 1 addition & 1 deletion x/bundles/keeper/keeper_suite_zero_delegation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TEST CASES - zero delegation
*/

var _ = Describe("valid bundles", Ordered, func() {
var _ = Describe("zero delegation", Ordered, func() {
s := i.NewCleanChain()

initialBalanceStaker0 := s.GetBalanceFromAddress(i.STAKER_0)
Expand Down
101 changes: 23 additions & 78 deletions x/bundles/keeper/logic_bundles.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package keeper

import (
"math/rand"
"sort"

"cosmossdk.io/errors"

delegationTypes "github.com/KYVENetwork/chain/x/delegation/types"
Expand Down Expand Up @@ -369,12 +366,7 @@ func (k Keeper) finalizeCurrentBundleProposal(ctx sdk.Context, poolId uint64, vo
// a required quorum on the validity of the data. When the proposal is dropped
// the same next uploader as before can submit his proposal since it is not his
// fault, that the last one did not reach any quorum.
func (k Keeper) dropCurrentBundleProposal(
ctx sdk.Context,
poolId uint64,
voteDistribution types.VoteDistribution,
nextUploader string,
) {
func (k Keeper) dropCurrentBundleProposal(ctx sdk.Context, poolId uint64, voteDistribution types.VoteDistribution, nextUploader string) {
pool, _ := k.poolKeeper.GetPool(ctx, poolId)
bundleProposal, _ := k.GetBundleProposal(ctx, poolId)

Expand Down Expand Up @@ -412,82 +404,35 @@ func (k Keeper) calculateVotingPower(delegation uint64) (votingPower uint64) {
return
}

// RandomChoiceCandidate holds the voting power of a candidate for the
// next uploader selection
type RandomChoiceCandidate struct {
Account string
VotingPower uint64
}

// getWeightedRandomChoice is an internal function that returns a weighted random
// selection out of a list of candidates based on their voting power.
func (k Keeper) getWeightedRandomChoice(candidates []RandomChoiceCandidate, seed int64) string {
type WeightedRandomChoice struct {
Elements []string
Weights []uint64
TotalWeight uint64
}

wrc := WeightedRandomChoice{}

for _, candidate := range candidates {
i := sort.Search(len(wrc.Weights), func(i int) bool { return wrc.Weights[i] > candidate.VotingPower })
wrc.Weights = append(wrc.Weights, 0)
wrc.Elements = append(wrc.Elements, "")
copy(wrc.Weights[i+1:], wrc.Weights[i:])
copy(wrc.Elements[i+1:], wrc.Elements[i:])
wrc.Weights[i] = candidate.VotingPower
wrc.Elements[i] = candidate.Account
wrc.TotalWeight += candidate.VotingPower
}

if wrc.TotalWeight == 0 {
return ""
}

value := rand.New(rand.NewSource(seed)).Uint64() % wrc.TotalWeight

for key, weight := range wrc.Weights {
if weight > value {
return wrc.Elements[key]
}

value -= weight
}

return ""
// chooseNextUploader selects the next uploader based on a fixed set of stakers in a pool.
// It is guaranteed that someone is chosen deterministically if the round-robin set itself is not empty.
func (k Keeper) chooseNextUploader(ctx sdk.Context, poolId uint64, excluded ...string) (nextUploader string) {
vs := k.LoadRoundRobinValidatorSet(ctx, poolId)
nextUploader = vs.NextProposer(excluded...)
k.SaveRoundRobinValidatorSet(ctx, vs)
return
}

// chooseNextUploaderFromSelectedStakers selects the next uploader based on a
// fixed set of stakers in a pool. It is guaranteed that someone is chosen
// deterministically
func (k Keeper) chooseNextUploaderFromSelectedStakers(ctx sdk.Context, poolId uint64, addresses []string) (nextUploader string) {
var _candidates []RandomChoiceCandidate
// chooseNextUploader selects the next uploader based on a fixed set of stakers in a pool.
// It is guaranteed that someone is chosen deterministically if the round-robin set itself is not empty.
func (k Keeper) chooseNextUploaderFromList(ctx sdk.Context, poolId uint64, included []string) (nextUploader string) {
vs := k.LoadRoundRobinValidatorSet(ctx, poolId)

if len(addresses) == 0 {
return ""
// Calculate set difference to obtain excluded
includedMap := make(map[string]bool)
for _, entry := range included {
includedMap[entry] = true
}

for _, s := range addresses {
if k.stakerKeeper.DoesValaccountExist(ctx, poolId, s) {
delegation := k.delegationKeeper.GetDelegationAmount(ctx, s)

_candidates = append(_candidates, RandomChoiceCandidate{
Account: s,
VotingPower: k.calculateVotingPower(delegation),
})
excluded := make([]string, 0)
for _, entry := range vs.Validators {
if !includedMap[entry.Address] {
excluded = append(excluded, entry.Address)
}
}

return k.getWeightedRandomChoice(_candidates, ctx.BlockHeader().Height)
}

// chooseNextUploaderFromAllStakers selects the next uploader based on all
// stakers in a pool. It is guaranteed that someone is chosen
// deterministically
func (k Keeper) chooseNextUploaderFromAllStakers(ctx sdk.Context, poolId uint64) (nextUploader string) {
stakers := k.stakerKeeper.GetAllStakerAddressesOfPool(ctx, poolId)
return k.chooseNextUploaderFromSelectedStakers(ctx, poolId, stakers)
nextUploader = vs.NextProposer(excluded...)
k.SaveRoundRobinValidatorSet(ctx, vs)
return
}

// GetVoteDistribution is an internal function evaluates the quorum status
Expand Down
4 changes: 2 additions & 2 deletions x/bundles/keeper/logic_end_block_handle_upload_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (k Keeper) HandleUploadTimeout(goCtx context.Context) {
k.handleNonVoters(ctx, pool.Id)

// Get next uploader from all pool stakers
nextUploader := k.chooseNextUploaderFromAllStakers(ctx, pool.Id)
nextUploader := k.chooseNextUploader(ctx, pool.Id)

// If consensus wasn't reached, we drop the bundle and emit an event.
k.dropCurrentBundleProposal(ctx, pool.Id, voteDistribution, nextUploader)
Expand All @@ -76,7 +76,7 @@ func (k Keeper) HandleUploadTimeout(goCtx context.Context) {
}

// Update bundle proposal and choose next uploader
bundleProposal.NextUploader = k.chooseNextUploaderFromAllStakers(ctx, pool.Id)
bundleProposal.NextUploader = k.chooseNextUploader(ctx, pool.Id)
bundleProposal.UpdatedAt = uint64(ctx.BlockTime().Unix())

k.SetBundleProposal(ctx, bundleProposal)
Expand Down
Loading

0 comments on commit 4eb43ed

Please sign in to comment.