Skip to content

Commit

Permalink
test: Benchmark MConnection with large size messages (#1179)
Browse files Browse the repository at this point in the history
Another PR towards #1162
The aim of this benchmark is to assess the impact of the size of
individual messages, particularly in relation to the
MaxPacketMsgPayloadSize in the overall performance of MConnection as
well as the ability to utilize the maximum bandwidth / send rate.
  • Loading branch information
staheri14 authored Jan 24, 2024
1 parent 7673ee0 commit 6596a30
Showing 1 changed file with 215 additions and 30 deletions.
245 changes: 215 additions & 30 deletions p2p/conn/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,16 +771,26 @@ func stopAll(t *testing.T, stoppers ...stopper) func() {
}
}

// generateMessages sends a sequence of messages to the specified multiplex connection `mc`.
const (
kibibyte = 1024
mebibyte = 1024 * 1024
)

// generateAndSendMessages sends a sequence of messages to the specified multiplex connection `mc`.
// Each message has the given size and is sent at the specified rate
// `messagingRate`. This process continues for the duration `totalDuration` or
// until `totalNum` messages are sent. If `totalNum` is negative,
// messaging persists for the entire `totalDuration`.
func generateMessages(mc *MConnection,
func generateAndSendMessages(mc *MConnection,
messagingRate time.Duration,
totalDuration time.Duration, totalNum int, msgSize int, chID byte) {
// all messages have an identical content
msg := bytes.Repeat([]byte{'x'}, msgSize)
totalDuration time.Duration, totalNum int, msgSize int,
msgContnet []byte, chID byte) {
var msg []byte
if msgContnet == nil {
msg = bytes.Repeat([]byte{'x'}, msgSize)
} else {
msg = msgContnet
}

// message generation interval ticker
ticker := time.NewTicker(messagingRate)
Expand Down Expand Up @@ -847,13 +857,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 1, " +
"total load = 50 KB, " +
"msg rate = send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 1 * 50,
sendQueueCapacity: 1,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
{
// testcase 2
Expand All @@ -864,13 +874,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 50, " +
"total load = 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 1 * 50,
sendQueueCapacity: 50,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
{
// testcase 3
Expand All @@ -881,13 +891,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 100, " +
"total load = 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 1 * 50,
sendQueueCapacity: 100,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
{
// testcase 4
Expand All @@ -896,13 +906,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 100, " +
"total load = 2 * 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 2 * 50,
sendQueueCapacity: 100,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
{
// testcase 5
Expand All @@ -912,13 +922,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 100, " +
"total load = 8 * 50 KB, " +
"traffic rate = send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 8 * 50,
sendQueueCapacity: 100,
messagingRate: 20 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
{
// testcase 6
Expand All @@ -928,13 +938,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 100, " +
"total load = 8 * 50 KB, " +
"traffic rate = 2 * send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 8 * 50,
sendQueueCapacity: 100,
messagingRate: 10 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
{
// testcase 7
Expand All @@ -944,13 +954,13 @@ func BenchmarkMConnection(b *testing.B) {
name: "queue capacity = 100, " +
"total load = 8 * 50 KB, " +
"traffic rate = 10 * send rate",
msgSize: 1 * 1024,
msgSize: 1 * kibibyte,
totalMsg: 8 * 50,
sendQueueCapacity: 100,
messagingRate: 2 * time.Millisecond,
totalDuration: 1 * time.Minute,
sendRate: 50 * 1024,
recRate: 50 * 1024,
sendRate: 50 * kibibyte,
recRate: 50 * kibibyte,
},
}

Expand All @@ -974,8 +984,8 @@ func BenchmarkMConnection(b *testing.B) {
}

cnfg := DefaultMConnConfig()
cnfg.SendRate = 50 * 1024 // 500 KB/s
cnfg.RecvRate = 50 * 1024 // 500 KB/s
cnfg.SendRate = tt.sendRate
cnfg.RecvRate = tt.recRate
chDescs := []*ChannelDescriptor{{ID: chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
clientMconn := NewMConnectionWithConfig(client, chDescs,
Expand Down Expand Up @@ -1006,11 +1016,12 @@ func BenchmarkMConnection(b *testing.B) {
// taken to set up the connections
b.StartTimer()
// start generating messages, it is a blocking call
generateMessages(clientMconn,
generateAndSendMessages(clientMconn,
tt.messagingRate,
tt.totalDuration,
tt.totalMsg,
tt.msgSize, chID)
tt.msgSize,
nil, chID)

// wait for all messages to be received
<-allReceived
Expand Down Expand Up @@ -1040,3 +1051,177 @@ func tcpNetPipe() (net.Conn, net.Conn) {

return conn2, conn1
}

// generateExponentialSizedMessages creates and returns a series of messages
// with sizes (in the specified unit) increasing exponentially.
// The size of each message doubles, starting from 1 up to maxSizeBytes.
// unit is expected to be a power of 2.
func generateExponentialSizedMessages(maxSizeBytes int, unit int) [][]byte {
maxSizeToUnit := maxSizeBytes / unit
msgs := make([][]byte, 0)

for size := 1; size <= maxSizeToUnit; size *= 2 {
msgs = append(msgs, bytes.Repeat([]byte{'x'}, size*unit)) // create a message of the calculated size
}
return msgs
}

type testCase struct {
name string
msgSize int // size of each message in bytes
msg []byte // message to be sent
totalMsg int // total number of messages to be sent
messagingRate time.Duration // rate at which messages are sent
totalDuration time.Duration // total duration for which messages are sent
sendQueueCapacity int // send queue capacity i.e., the number of messages that can be buffered
sendRate int64 // send rate in bytes per second
recRate int64 // receive rate in bytes per second
chID byte // channel ID
}

func runBenchmarkTest(b *testing.B, tt testCase) {
b.Run(tt.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
// set up two networked connections
// server, client := NetPipe() // can alternatively use this and comment out the line below
server, client := tcpNetPipe()
defer server.Close()
defer client.Close()

// prepare callback to receive messages
allReceived := make(chan bool)
receivedLoad := 0 // number of messages received
onReceive := func(chID byte, msgBytes []byte) {
receivedLoad++
if receivedLoad >= tt.totalMsg && tt.totalMsg > 0 {
allReceived <- true
}
}

cnfg := DefaultMConnConfig()
cnfg.SendRate = tt.sendRate
cnfg.RecvRate = tt.recRate
chDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
clientMconn := NewMConnectionWithConfig(client, chDescs,
func(chID byte, msgBytes []byte) {},
func(r interface{}) {},
cnfg)
serverChDescs := []*ChannelDescriptor{{ID: tt.chID, Priority: 1,
SendQueueCapacity: tt.sendQueueCapacity}}
serverMconn := NewMConnectionWithConfig(server, serverChDescs,
onReceive,
func(r interface{}) {},
cnfg)
clientMconn.SetLogger(log.TestingLogger())
serverMconn.SetLogger(log.TestingLogger())

err := clientMconn.Start()
require.Nil(b, err)
defer func() {
_ = clientMconn.Stop()
}()
err = serverMconn.Start()
require.Nil(b, err)
defer func() {
_ = serverMconn.Stop()
}()

// start measuring the time from here to exclude the time
// taken to set up the connections
b.StartTimer()
// start generating messages, it is a blocking call
generateAndSendMessages(clientMconn,
tt.messagingRate,
tt.totalDuration,
tt.totalMsg,
tt.msgSize,
tt.msg,
tt.chID)

// wait for all messages to be received
<-allReceived
b.StopTimer()
}
})
}

func BenchmarkMConnection_ScalingPayloadSizes_HighSendRate(b *testing.B) {
// One aspect that could impact the performance of MConnection and the
// transmission rate is the size of the messages sent over the network,
// especially when they exceed the MConnection.MaxPacketMsgPayloadSize (
// messages are sent in packets of maximum size MConnection.
// MaxPacketMsgPayloadSize).
// The test cases in this benchmark involve sending messages with sizes
// ranging exponentially from 1KB to 8192KB (
// the max value of 8192KB is inspired by the largest possible PFB in a
// Celestia block with 128*128 number of 512-byte shares)
// The bandwidth is set significantly higher than the message load to ensure
// it does not become a limiting factor.
// All test cases are expected to complete in less than one second,
// indicating a healthy performance.

squareSize := 128 // number of shares in a row/column
shareSize := 512 // bytes
maxSize := squareSize * squareSize * shareSize // bytes
msgs := generateExponentialSizedMessages(maxSize, kibibyte)
chID := byte(0x01)

// create test cases for each message size
var testCases = make([]testCase, len(msgs))
for i, msg := range msgs {
testCases[i] = testCase{
name: fmt.Sprintf("msgSize = %d KB", len(msg)/kibibyte),
msgSize: len(msg),
msg: msg,
totalMsg: 10,
messagingRate: time.Millisecond,
totalDuration: 1 * time.Minute,
sendQueueCapacity: 100,
sendRate: 512 * mebibyte,
recRate: 512 * mebibyte,
chID: chID,
}
}

for _, tt := range testCases {
runBenchmarkTest(b, tt)
}
}

func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) {
// This benchmark test builds upon the previous one i.e.,
// BenchmarkMConnection_ScalingPayloadSizes_HighSendRate
// by setting the send/and receive rates lower than the message load.
// Test cases involve sending the same load of messages but with different message sizes.
// Since the message load and bandwidth are consistent across all test cases,
// they are expected to complete in the same amount of time. i.e.,
//totalLoad/sendRate.

maxSize := 32 * kibibyte // 32KB
msgs := generateExponentialSizedMessages(maxSize, kibibyte)
totalLoad := float64(maxSize)
chID := byte(0x01)
// create test cases for each message size
var testCases = make([]testCase, len(msgs))
for i, msg := range msgs {
msgSize := len(msg)
totalMsg := int(math.Ceil(totalLoad / float64(msgSize)))
testCases[i] = testCase{
name: fmt.Sprintf("msgSize = %d KB", msgSize/kibibyte),
msgSize: msgSize,
msg: msg,
totalMsg: totalMsg,
messagingRate: time.Millisecond,
totalDuration: 1 * time.Minute,
sendQueueCapacity: 100,
sendRate: 4 * kibibyte,
recRate: 4 * kibibyte,
chID: chID,
}
}

for _, tt := range testCases {
runBenchmarkTest(b, tt)
}
}

0 comments on commit 6596a30

Please sign in to comment.