diff --git a/tests/integration/cache_test.go b/tests/integration/cache_test.go index c074760665..89a2aa2633 100644 --- a/tests/integration/cache_test.go +++ b/tests/integration/cache_test.go @@ -14,7 +14,7 @@ var _ = Describe("Cache test", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/csanning_test.go b/tests/integration/csanning_test.go index 43c5e59b29..67bc89f739 100644 --- a/tests/integration/csanning_test.go +++ b/tests/integration/csanning_test.go @@ -15,7 +15,7 @@ var _ = Describe("Csanning Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/geo_test.go b/tests/integration/geo_test.go index 199f52113e..7b5a4e62b3 100644 --- a/tests/integration/geo_test.go +++ b/tests/integration/geo_test.go @@ -14,7 +14,7 @@ var _ = Describe("Geo Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/go.mod b/tests/integration/go.mod index 6329f50d22..9b4e1fb433 100644 --- a/tests/integration/go.mod +++ b/tests/integration/go.mod @@ -3,9 +3,9 @@ module pika-integration go 1.19 require ( - github.com/bsm/ginkgo/v2 v2.7.0 - github.com/bsm/gomega v1.26.0 - github.com/redis/go-redis/v9 v9.0.4 + github.com/bsm/ginkgo/v2 v2.12.0 + github.com/bsm/gomega v1.27.10 + github.com/redis/go-redis/v9 v9.4.0 ) require ( diff --git a/tests/integration/go.sum b/tests/integration/go.sum index f41c0740f0..85f5563c0c 100644 --- a/tests/integration/go.sum +++ b/tests/integration/go.sum @@ -1,10 +1,10 @@ -github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= -github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= -github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= -github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= -github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= +github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= +github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= diff --git a/tests/integration/hash_test.go b/tests/integration/hash_test.go index aab30f3e3d..b7938d733b 100644 --- a/tests/integration/hash_test.go +++ b/tests/integration/hash_test.go @@ -16,7 +16,7 @@ var _ = Describe("Hash Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/hyperloglog_test.go b/tests/integration/hyperloglog_test.go index 3b9217c0cb..59d86e69d7 100644 --- a/tests/integration/hyperloglog_test.go +++ b/tests/integration/hyperloglog_test.go @@ -14,7 +14,7 @@ var _ = Describe("Hyperloglog Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/list_test.go b/tests/integration/list_test.go index 043d52cfad..c62d00111c 100644 --- a/tests/integration/list_test.go +++ b/tests/integration/list_test.go @@ -40,7 +40,7 @@ var _ = Describe("List Commands", func() { var blockedLock sync.Mutex BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) @@ -916,8 +916,8 @@ var _ = Describe("List Commands", func() { Expect(lRange.Err()).NotTo(HaveOccurred()) Expect(lRange.Val()).To(Equal([]string{"two", "three"})) - err := client.Do(ctx, "LPOP", "list", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command"))) + err := client.Do(ctx, "LPOP", "list", 1, 2).Err() + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command"))) }) It("should LPopCount", func() { @@ -1162,7 +1162,7 @@ var _ = Describe("List Commands", func() { Expect(lRange.Val()).To(Equal([]string{"one", "two"})) err := client.Do(ctx, "RPOP", "list", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command"))) + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command"))) }) It("should RPopCount", func() { diff --git a/tests/integration/options.go b/tests/integration/options.go index 174a2f8890..611e838044 100644 --- a/tests/integration/options.go +++ b/tests/integration/options.go @@ -29,22 +29,9 @@ func (t *TimeValue) ScanRedis(s string) (err error) { return } -func pikaOptions1() *redis.Options { +func PikaOption(addr string) *redis.Options { return &redis.Options{ - Addr: "127.0.0.1:9221", - DB: 0, - DialTimeout: 10 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - MaxRetries: -1, - PoolSize: 30, - PoolTimeout: 60 * time.Second, - } -} - -func pikaOptions2() *redis.Options { - return &redis.Options{ - Addr: "127.0.0.1:9231", + Addr: addr, DB: 0, DialTimeout: 10 * time.Second, ReadTimeout: 30 * time.Second, diff --git a/tests/integration/pubsub_test.go b/tests/integration/pubsub_test.go index 198c77f035..560188ea77 100644 --- a/tests/integration/pubsub_test.go +++ b/tests/integration/pubsub_test.go @@ -21,8 +21,8 @@ var _ = Describe("PubSub", func() { ctx := context.TODO() BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) - client2 = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) + client2 = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client2.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(2 * time.Second) diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index 49b23ab7d3..98dfba89b0 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -371,8 +371,8 @@ var _ = Describe("should replication ", func() { var clientMaster *redis.Client BeforeEach(func() { - clientMaster = redis.NewClient(pikaOptions1()) - clientSlave = redis.NewClient(pikaOptions2()) + clientMaster = redis.NewClient(PikaOption(MASTERADDR)) + clientSlave = redis.NewClient(PikaOption(SLAVEADDR)) cleanEnv(ctx, clientMaster, clientSlave) Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred()) @@ -395,11 +395,11 @@ var _ = Describe("should replication ", func() { infoRes = clientMaster.Info(ctx, "replication") Expect(infoRes.Err()).NotTo(HaveOccurred()) Expect(infoRes.Val()).To(ContainSubstring("role:master")) - Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same")) + Expect(clientSlave.Do(ctx, "slaveof", LOCALHOST, SLAVEPORT).Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same")) var count = 0 for { - res := trySlave(ctx, clientSlave, "127.0.0.1", "9221") + res := trySlave(ctx, clientSlave, LOCALHOST, MASTERPORT) if res { break } else if count > 4 { diff --git a/tests/integration/server_test.go b/tests/integration/server_test.go index 9c21767928..95685d2fdf 100644 --- a/tests/integration/server_test.go +++ b/tests/integration/server_test.go @@ -14,7 +14,7 @@ var _ = Describe("Server", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/set_test.go b/tests/integration/set_test.go index 07a568b0c7..dfe39408fb 100644 --- a/tests/integration/set_test.go +++ b/tests/integration/set_test.go @@ -14,7 +14,7 @@ var _ = Describe("Set Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) @@ -276,8 +276,8 @@ var _ = Describe("Set Commands", func() { Expect(sMembers.Err()).NotTo(HaveOccurred()) Expect(sMembers.Val()).To(HaveLen(3)) - err := client.Do(ctx, "SPOP", "set", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command"))) + err := client.Do(ctx, "SPOP", "set", 1, 2).Err() + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command"))) }) It("should SPopN", func() { diff --git a/tests/integration/slowlog_test.go b/tests/integration/slowlog_test.go index fa6f96a7c9..cdb00cd2b4 100644 --- a/tests/integration/slowlog_test.go +++ b/tests/integration/slowlog_test.go @@ -18,7 +18,7 @@ var _ = Describe("Slowlog Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/stream_test.go b/tests/integration/stream_test.go index 3c49ad9973..00016e14aa 100644 --- a/tests/integration/stream_test.go +++ b/tests/integration/stream_test.go @@ -6,13 +6,13 @@ package pika_integration import ( - "sync" "context" - "sync/atomic" "fmt" "math/rand" "strconv" "strings" + "sync" + "sync/atomic" . "github.com/bsm/ginkgo/v2" . "github.com/bsm/gomega" @@ -120,7 +120,7 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) { var _ = Describe("Stream Commands", func() { ctx := context.TODO() var client *redis.Client - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) client.FlushDB(ctx) BeforeEach(func() { @@ -140,20 +140,20 @@ var _ = Describe("Stream Commands", func() { const numWriters = 10 const numReaders = 10 const messagesPerWriter = 20 - + createClient := func() *redis.Client { - return redis.NewClient(pikaOptions1()) + return redis.NewClient(PikaOption(SINGLEADDR)) } - + var messageCount int32 - + // Start writer goroutines for i := 0; i < numWriters; i++ { go func(writerIndex int) { defer GinkgoRecover() writerClient := createClient() defer writerClient.Close() - + for j := 0; j < messagesPerWriter; j++ { _, err := writerClient.XAdd(ctx, &redis.XAddArgs{ Stream: streamKey, @@ -164,41 +164,42 @@ var _ = Describe("Stream Commands", func() { } }(i) } - + // Start reader goroutines var wg sync.WaitGroup for i := 0; i < numReaders; i++ { - wg.Add(1) - go func() { - defer GinkgoRecover() - defer wg.Done() - readerClient := createClient() - defer readerClient.Close() - - lastID := "0" - readMessages := 0 - for readMessages < totalMessages { - items, err := readerClient.XRead(ctx, &redis.XReadArgs{ - Streams: []string{streamKey, lastID}, - Block: 0, - }).Result() - if (err != nil) { - continue - } - - // Check if items slice is not empty - if len(items) > 0 && len(items[0].Messages) > 0 { - lastMessageIndex := len(items[0].Messages) - 1 - lastID = items[0].Messages[lastMessageIndex].ID - readMessages += len(items[0].Messages) - } - // Optionally add a short delay here if needed - } - Expect(readMessages).To(BeNumerically(">=", totalMessages)) + wg.Add(1) + go func() { + readerClient := createClient() + defer func() { + GinkgoRecover() + wg.Done() + readerClient.Close() }() + + lastID := "0" + readMessages := 0 + for readMessages < totalMessages { + items, err := readerClient.XRead(ctx, &redis.XReadArgs{ + Streams: []string{streamKey, lastID}, + Block: 0, + }).Result() + if err != nil { + continue + } + + // Check if items slice is not empty + if len(items) > 0 && len(items[0].Messages) > 0 { + lastMessageIndex := len(items[0].Messages) - 1 + lastID = items[0].Messages[lastMessageIndex].ID + readMessages += len(items[0].Messages) + } + // Optionally add a short delay here if needed + } + Expect(readMessages).To(BeNumerically(">=", totalMessages)) + }() } - wg.Wait() Eventually(func() int32 { return atomic.LoadInt32(&messageCount) @@ -209,29 +210,27 @@ var _ = Describe("Stream Commands", func() { Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred()) // Creating a stream and adding entries _, err := client.XAdd(ctx, &redis.XAddArgs{ - Stream: "mystream", - ID: "*", - Values: map[string]interface{}{"key1": "value1", "key2": "value2"}, + Stream: "mystream", + ID: "*", + Values: map[string]interface{}{"key1": "value1", "key2": "value2"}, }).Result() Expect(err).NotTo(HaveOccurred()) - + // Using keys * to find all keys including the stream keys, err := client.Keys(ctx, "*").Result() Expect(err).NotTo(HaveOccurred()) - + // Checking if the stream 'mystream' exists in the returned keys found := false for _, key := range keys { - if key == "mystream" { - found = true - break - } + if key == "mystream" { + found = true + break + } } Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys") }) - - - + It("XADD wrong number of args", func() { _, err := client.Do(ctx, "XADD", "mystream").Result() Expect(err).To(HaveOccurred()) diff --git a/tests/integration/string_test.go b/tests/integration/string_test.go index b2f357af41..fed216d5a2 100644 --- a/tests/integration/string_test.go +++ b/tests/integration/string_test.go @@ -16,7 +16,7 @@ var _ = Describe("String Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/txn_test.go b/tests/integration/txn_test.go index 0d3e219cdc..450c235a74 100644 --- a/tests/integration/txn_test.go +++ b/tests/integration/txn_test.go @@ -28,8 +28,8 @@ var _ = Describe("Text Txn", func() { var cmdCost time.Duration BeforeEach(func() { - txnClient = redis.NewClient(pikaOptions1()) - cmdClient = redis.NewClient(pikaOptions1()) + txnClient = redis.NewClient(PikaOption(SINGLEADDR)) + cmdClient = redis.NewClient(PikaOption(SINGLEADDR)) }) Describe("test watch", func() { It("basic watch", func() { diff --git a/tests/integration/zset_test.go b/tests/integration/zset_test.go index eb6817ba0f..abd9fb35b9 100644 --- a/tests/integration/zset_test.go +++ b/tests/integration/zset_test.go @@ -15,7 +15,7 @@ var _ = Describe("Zset Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) @@ -1086,7 +1086,7 @@ var _ = Describe("Zset Commands", func() { Member: "three", }})) err = client.Do(ctx, "ZPOPMIN", "zset", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'zpopmin' command"))) + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'zpopmin' command"))) }) It("should ZRange", func() {