Skip to content

Commit

Permalink
cherry-pick
Browse files Browse the repository at this point in the history
  • Loading branch information
Mixficsol authored and wuxianrong committed Mar 13, 2024
1 parent dafb0bf commit 38c4676
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 98 deletions.
2 changes: 1 addition & 1 deletion tests/integration/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Cache test", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/csanning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var _ = Describe("Csanning Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/geo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Geo Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module pika-integration
go 1.19

require (
github.com/bsm/ginkgo/v2 v2.7.0
github.com/bsm/gomega v1.26.0
github.com/redis/go-redis/v9 v9.0.4
github.com/bsm/ginkgo/v2 v2.12.0
github.com/bsm/gomega v1.27.10
github.com/redis/go-redis/v9 v9.4.0
)

require (
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
2 changes: 1 addition & 1 deletion tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("Hash Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Hyperloglog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("List Commands", func() {
var blockedLock sync.Mutex

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -916,8 +916,8 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Err()).NotTo(HaveOccurred())
Expect(lRange.Val()).To(Equal([]string{"two", "three"}))

err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
})

It("should LPopCount", func() {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))

err := client.Do(ctx, "RPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
})

It("should RPopCount", func() {
Expand Down
17 changes: 2 additions & 15 deletions tests/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,9 @@ func (t *TimeValue) ScanRedis(s string) (err error) {
return
}

func pikaOptions1() *redis.Options {
func PikaOption(addr string) *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9221",
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 30,
PoolTimeout: 60 * time.Second,
}
}

func pikaOptions2() *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9231",
Addr: addr,
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var _ = Describe("PubSub", func() {
ctx := context.TODO()

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client2 = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client2 = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(client2.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(2 * time.Second)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ var _ = Describe("should replication ", func() {
var clientMaster *redis.Client

BeforeEach(func() {
clientMaster = redis.NewClient(pikaOptions1())
clientSlave = redis.NewClient(pikaOptions2())
clientMaster = redis.NewClient(PikaOption(MASTERADDR))
clientSlave = redis.NewClient(PikaOption(SLAVEADDR))
cleanEnv(ctx, clientMaster, clientSlave)
Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expand All @@ -395,11 +395,11 @@ var _ = Describe("should replication ", func() {
infoRes = clientMaster.Info(ctx, "replication")
Expect(infoRes.Err()).NotTo(HaveOccurred())
Expect(infoRes.Val()).To(ContainSubstring("role:master"))
Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))
Expect(clientSlave.Do(ctx, "slaveof", LOCALHOST, SLAVEPORT).Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))

var count = 0
for {
res := trySlave(ctx, clientSlave, "127.0.0.1", "9221")
res := trySlave(ctx, clientSlave, LOCALHOST, MASTERPORT)
if res {
break
} else if count > 4 {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Server", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Set Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -276,8 +276,8 @@ var _ = Describe("Set Commands", func() {
Expect(sMembers.Err()).NotTo(HaveOccurred())
Expect(sMembers.Val()).To(HaveLen(3))

err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
})

It("should SPopN", func() {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/slowlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var _ = Describe("Slowlog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
97 changes: 48 additions & 49 deletions tests/integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
package pika_integration

import (
"sync"
"context"
"sync/atomic"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
Expand Down Expand Up @@ -120,7 +120,7 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) {
var _ = Describe("Stream Commands", func() {
ctx := context.TODO()
var client *redis.Client
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client.FlushDB(ctx)

BeforeEach(func() {
Expand All @@ -140,20 +140,20 @@ var _ = Describe("Stream Commands", func() {
const numWriters = 10
const numReaders = 10
const messagesPerWriter = 20

createClient := func() *redis.Client {
return redis.NewClient(pikaOptions1())
return redis.NewClient(PikaOption(SINGLEADDR))
}

var messageCount int32

// Start writer goroutines
for i := 0; i < numWriters; i++ {
go func(writerIndex int) {
defer GinkgoRecover()
writerClient := createClient()
defer writerClient.Close()

for j := 0; j < messagesPerWriter; j++ {
_, err := writerClient.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
Expand All @@ -164,41 +164,42 @@ var _ = Describe("Stream Commands", func() {
}
}(i)
}

// Start reader goroutines
var wg sync.WaitGroup
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
readerClient := createClient()
defer readerClient.Close()

lastID := "0"
readMessages := 0
for readMessages < totalMessages {
items, err := readerClient.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamKey, lastID},
Block: 0,
}).Result()
if (err != nil) {
continue
}

// Check if items slice is not empty
if len(items) > 0 && len(items[0].Messages) > 0 {
lastMessageIndex := len(items[0].Messages) - 1
lastID = items[0].Messages[lastMessageIndex].ID
readMessages += len(items[0].Messages)
}
// Optionally add a short delay here if needed
}
Expect(readMessages).To(BeNumerically(">=", totalMessages))
wg.Add(1)
go func() {
readerClient := createClient()
defer func() {
GinkgoRecover()
wg.Done()
readerClient.Close()
}()

lastID := "0"
readMessages := 0
for readMessages < totalMessages {
items, err := readerClient.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamKey, lastID},
Block: 0,
}).Result()
if err != nil {
continue
}

// Check if items slice is not empty
if len(items) > 0 && len(items[0].Messages) > 0 {
lastMessageIndex := len(items[0].Messages) - 1
lastID = items[0].Messages[lastMessageIndex].ID
readMessages += len(items[0].Messages)
}
// Optionally add a short delay here if needed
}
Expect(readMessages).To(BeNumerically(">=", totalMessages))
}()
}


wg.Wait()
Eventually(func() int32 {
return atomic.LoadInt32(&messageCount)
Expand All @@ -209,29 +210,27 @@ var _ = Describe("Stream Commands", func() {
Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred())
// Creating a stream and adding entries
_, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())

// Using keys * to find all keys including the stream
keys, err := client.Keys(ctx, "*").Result()
Expect(err).NotTo(HaveOccurred())

// Checking if the stream 'mystream' exists in the returned keys
found := false
for _, key := range keys {
if key == "mystream" {
found = true
break
}
if key == "mystream" {
found = true
break
}
}
Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys")
})




It("XADD wrong number of args", func() {
_, err := client.Do(ctx, "XADD", "mystream").Result()
Expect(err).To(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("String Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ var _ = Describe("Text Txn", func() {
var cmdCost time.Duration

BeforeEach(func() {
txnClient = redis.NewClient(pikaOptions1())
cmdClient = redis.NewClient(pikaOptions1())
txnClient = redis.NewClient(PikaOption(SINGLEADDR))
cmdClient = redis.NewClient(PikaOption(SINGLEADDR))
})
Describe("test watch", func() {
It("basic watch", func() {
Expand Down
Loading

0 comments on commit 38c4676

Please sign in to comment.