diff --git a/built/Redis.js b/built/Redis.js index fa2ca7d..7233d04 100644 --- a/built/Redis.js +++ b/built/Redis.js @@ -498,7 +498,6 @@ class Redis extends Commander_1.default { * @ignore */ handleReconnection(err, item) { - var _a; let needReconnect = false; if (this.options.reconnectOnError) { needReconnect = this.options.reconnectOnError(err); @@ -511,18 +510,32 @@ class Redis extends Commander_1.default { } item.command.reject(err); break; - case 2: - if (this.status !== "reconnecting") { - this.disconnect(true); + case 2: { + const resendCommand = () => { + var _a; + if (this.status !== "reconnecting") { + this.disconnect(true); + } + if (((_a = this.condition) === null || _a === void 0 ? void 0 : _a.select) !== item.select && + item.command.name !== "select") { + this.select(item.select); + } + // TODO + // @ts-expect-error + this.sendCommand(item.command); + }; + if (typeof this.options.retryStrategy !== "function") { + return resendCommand(); } - if (((_a = this.condition) === null || _a === void 0 ? void 0 : _a.select) !== item.select && - item.command.name !== "select") { - this.select(item.select); + const retryDelay = this.options.retryStrategy(++this.retryAttempts); + if (typeof retryDelay === "number") { + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null; + resendCommand(); + }, retryDelay); } - // TODO - // @ts-expect-error - this.sendCommand(item.command); break; + } default: item.command.reject(err); } diff --git a/built/cluster/ClusterSubscriber.js b/built/cluster/ClusterSubscriber.js index bf4e4bb..77a508c 100644 --- a/built/cluster/ClusterSubscriber.js +++ b/built/cluster/ClusterSubscriber.js @@ -33,6 +33,7 @@ class ClusterSubscriber { if (!this.started || !this.subscriber) { return; } + // @ts-expect-error if ((0, util_1.getNodeKey)(this.subscriber.options) === key) { debug("subscriber has left, selecting a new one..."); this.selectSubscriber(); @@ -63,6 +64,7 @@ class ClusterSubscriber { debug("stopped"); } selectSubscriber() { + var _a, _b; const lastActiveSubscriber = this.lastActiveSubscriber; // Disconnect the previous subscriber even if there // will not be a new one. @@ -116,13 +118,12 @@ class ClusterSubscriber { // Re-subscribe previous channels const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] }; if (lastActiveSubscriber) { - const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition; - if (condition && condition.subscriber) { - previousChannels.subscribe = condition.subscriber.channels("subscribe"); - previousChannels.psubscribe = - condition.subscriber.channels("psubscribe"); - previousChannels.ssubscribe = - condition.subscriber.channels("ssubscribe"); + const subscriber = ((_a = lastActiveSubscriber.condition) === null || _a === void 0 ? void 0 : _a.subscriber) || + ((_b = lastActiveSubscriber.prevCondition) === null || _b === void 0 ? void 0 : _b.subscriber); + if (subscriber) { + previousChannels.subscribe = subscriber.channels("subscribe"); + previousChannels.psubscribe = subscriber.channels("psubscribe"); + previousChannels.ssubscribe = subscriber.channels("ssubscribe"); } } if (previousChannels.subscribe.length || diff --git a/built/cluster/ConnectionPool.d.ts b/built/cluster/ConnectionPool.d.ts index 523125d..9b7855f 100644 --- a/built/cluster/ConnectionPool.d.ts +++ b/built/cluster/ConnectionPool.d.ts @@ -28,5 +28,6 @@ export default class ConnectionPool extends EventEmitter { * Remove a node from the pool. */ private removeNode; + private getNodeKey; } export {}; diff --git a/built/cluster/ConnectionPool.js b/built/cluster/ConnectionPool.js index 59856d3..ea203dd 100644 --- a/built/cluster/ConnectionPool.js +++ b/built/cluster/ConnectionPool.js @@ -34,7 +34,7 @@ class ConnectionPool extends events_1.EventEmitter { * Find or create a connection to the node */ findOrCreate(redisOptions, readOnly = false) { - const key = (0, util_1.getNodeKey)(redisOptions); + const key = this.getNodeKey(redisOptions); readOnly = Boolean(readOnly); if (this.specifiedOptions[key]) { Object.assign(redisOptions, this.specifiedOptions[key]); @@ -95,7 +95,7 @@ class ConnectionPool extends events_1.EventEmitter { debug("Reset with %O", nodes); const newNodes = {}; nodes.forEach((node) => { - const key = (0, util_1.getNodeKey)(node); + const key = this.getNodeKey(node); // Don't override the existing (master) node // when the current one is slave. if (!(node.readOnly && newNodes[key])) { @@ -133,5 +133,8 @@ class ConnectionPool extends events_1.EventEmitter { } } } + getNodeKey(options) { + return (0, util_1.getNodeKey)(options) + ":" + options.nodeId; + } } exports.default = ConnectionPool; diff --git a/built/cluster/index.js b/built/cluster/index.js index 4523b7a..89cd5e8 100644 --- a/built/cluster/index.js +++ b/built/cluster/index.js @@ -686,6 +686,7 @@ class Cluster extends Commander_1.default { port: items[j][1], }); node.readOnly = j !== 2; + node.nodeId = items[j][2]; nodes.push(node); keys.push(node.host + ":" + node.port); } diff --git a/built/cluster/util.d.ts b/built/cluster/util.d.ts index 42880a6..b7fd15f 100644 --- a/built/cluster/util.d.ts +++ b/built/cluster/util.d.ts @@ -7,6 +7,7 @@ export interface RedisOptions { host: string; username?: string; password?: string; + nodeId?: string; [key: string]: any; } export interface SrvRecordsGroup { diff --git a/lib/Redis.ts b/lib/Redis.ts index 1812157..dc0d9bb 100644 --- a/lib/Redis.ts +++ b/lib/Redis.ts @@ -657,20 +657,33 @@ class Redis extends Commander implements DataHandledable { } item.command.reject(err); break; - case 2: - if (this.status !== "reconnecting") { - this.disconnect(true); + case 2: { + const resendCommand = () => { + if (this.status !== "reconnecting") { + this.disconnect(true); + } + if ( + this.condition?.select !== item.select && + item.command.name !== "select" + ) { + this.select(item.select); + } + // TODO + // @ts-expect-error + this.sendCommand(item.command); + }; + if (typeof this.options.retryStrategy !== "function") { + return resendCommand(); } - if ( - this.condition?.select !== item.select && - item.command.name !== "select" - ) { - this.select(item.select); + const retryDelay = this.options.retryStrategy(++this.retryAttempts); + if (typeof retryDelay === "number") { + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null; + resendCommand(); + }, retryDelay); } - // TODO - // @ts-expect-error - this.sendCommand(item.command); break; + } default: item.command.reject(err); } diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index e0ecd2e..05ea23b 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -3,13 +3,14 @@ import ConnectionPool from "./ConnectionPool"; import { getConnectionName, getNodeKey } from "./util"; import { sample, noop, Debug } from "../utils"; import Redis from "../Redis"; +import { Condition } from "../DataHandler"; const debug = Debug("cluster:subscriber"); export default class ClusterSubscriber { private started = false; - private subscriber: any = null; - private lastActiveSubscriber: any; + private subscriber: Redis | null = null; + private lastActiveSubscriber: Redis & { prevCondition?: Condition }; constructor( private connectionPool: ConnectionPool, @@ -27,6 +28,7 @@ export default class ClusterSubscriber { if (!this.started || !this.subscriber) { return; } + // @ts-expect-error if (getNodeKey(this.subscriber.options) === key) { debug("subscriber has left, selecting a new one..."); this.selectSubscriber(); @@ -140,14 +142,14 @@ export default class ClusterSubscriber { // Re-subscribe previous channels const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] }; if (lastActiveSubscriber) { - const condition = - lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition; - if (condition && condition.subscriber) { - previousChannels.subscribe = condition.subscriber.channels("subscribe"); - previousChannels.psubscribe = - condition.subscriber.channels("psubscribe"); - previousChannels.ssubscribe = - condition.subscriber.channels("ssubscribe"); + const subscriber = + lastActiveSubscriber.condition?.subscriber || + lastActiveSubscriber.prevCondition?.subscriber; + + if (subscriber) { + previousChannels.subscribe = subscriber.channels("subscribe"); + previousChannels.psubscribe = subscriber.channels("psubscribe"); + previousChannels.ssubscribe = subscriber.channels("ssubscribe"); } } if ( diff --git a/lib/cluster/ConnectionPool.ts b/lib/cluster/ConnectionPool.ts index 5a117ba..3974ebb 100644 --- a/lib/cluster/ConnectionPool.ts +++ b/lib/cluster/ConnectionPool.ts @@ -46,7 +46,7 @@ export default class ConnectionPool extends EventEmitter { * Find or create a connection to the node */ findOrCreate(redisOptions: RedisOptions, readOnly = false): NodeRecord { - const key = getNodeKey(redisOptions); + const key = this.getNodeKey(redisOptions); readOnly = Boolean(readOnly); if (this.specifiedOptions[key]) { @@ -119,7 +119,7 @@ export default class ConnectionPool extends EventEmitter { debug("Reset with %O", nodes); const newNodes = {}; nodes.forEach((node) => { - const key = getNodeKey(node); + const key = this.getNodeKey(node); // Don't override the existing (master) node // when the current one is slave. @@ -161,4 +161,8 @@ export default class ConnectionPool extends EventEmitter { } } } + + private getNodeKey(options: RedisOptions) { + return getNodeKey(options) + ":" + options.nodeId; + } } diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index c6500f5..f62f615 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -419,7 +419,7 @@ class Cluster extends Commander { if (this.isRefreshing) { return; } - + this.isRefreshing = true; const _this = this; @@ -868,6 +868,7 @@ class Cluster extends Commander { port: items[j][1], }); node.readOnly = j !== 2; + node.nodeId = items[j][2]; nodes.push(node); keys.push(node.host + ":" + node.port); } diff --git a/lib/cluster/util.ts b/lib/cluster/util.ts index 4b12b95..b00ba6a 100644 --- a/lib/cluster/util.ts +++ b/lib/cluster/util.ts @@ -10,6 +10,7 @@ export interface RedisOptions { host: string; username?: string; password?: string; + nodeId?: string; [key: string]: any; }