Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharded pub/sub support via dedicated subscribers #1956

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

dmaier-redislabs
Copy link

The previous implementation used one subscriber connection per cluster instance, but the command SSUBSCRIBE <channel> needs to be executed against a node that is responsible for the hash slot that is derived from the channel name.

This PR adds a ClusterSubscriberGroup whereby each subscriber in the group is responsible for one shard.

* @param node
* @param readOnly
*/
createRedisFromOptions(node: RedisOptions, readOnly: boolean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding a useShardedPubsub option? It would be false by default, and would control whether we eagerly initialise the new pool of socket connections ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. The way how it is implemented runs now one subscriber connection per shard. Which doubles the number of connections to a cluster. I added an option called "shardedSubscribers".

BTW: This especially relevant for RESP2. The library doesn't support RESP3 yet. Later when RESP3 might be supported, users can then decide if they want to subscribe on the normal connections or if they want to use dedicated connections for the sharded subscriptions.

I added a new configuration option called shardedSubscribers, which defaults to false.

*/
refreshSlots(cluster: Cluster) : boolean {
//If there was an actual change, then reassign the slot ranges
if (this.clusterSlots !== cluster.slots) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since !== only compares references, we should add a test to verify the refresh logic and confirm whether we need a deep equality check using JSON.stringify .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will investigate that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, added a deep equality check via JSON.stringify.

@bobymicroby
Copy link
Member

There are some tests that need attention as well:

2) cluster:spub/ssub
       should receive messages:
     Uncaught TypeError: Cannot read properties of undefined (reading 'writable')
      at MockServer.write (test/helpers/mock_server.ts:139:11)
      at /home/runner/work/ioredis/ioredis/test/functional/cluster/spub_ssub.ts:26:13
      at tryCatcher (node_modules/standard-as-callback/built/utils.js:12:23)
      at /home/runner/work/ioredis/ioredis/node_modules/standard-as-callback/built/index.js:21:53
      at processTicksAndRejections (node:internal/process/task_queues:95:5)
  3) cluster:spub/ssub
       supports password:
     Uncaught AssertionError: expected undefined to deeply equal 'abc'
      at MockServer.handler (test/functional/cluster/spub_ssub.ts:66:31)
      at JavascriptRedisParser.returnReply (test/helpers/mock_server.ts:96:54)
      at JavascriptRedisParser.execute (node_modules/redis-parser/lib/parser.js:544:14)
      at Socket.<anonymous> (test/helpers/mock_server.ts:113:16)
      at Socket.emit (node:events:517:28)
      at Socket.emit (node:domain:489:12)
      at addChunk (node:internal/streams/readable:368:12)
      at readableAddChunk (node:internal/streams/readable:341:9)
      at Socket.Readable.push (node:internal/streams/readable:278:10)
      at TCP.onStreamRead (node:internal/stream_base_commons:190:23)
      
     ``` 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants