Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several cluster fixes based on open ioredis PRs #4

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions built/Redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ class Redis extends Commander_1.default {
* @ignore
*/
handleReconnection(err, item) {
var _a;
let needReconnect = false;
if (this.options.reconnectOnError) {
needReconnect = this.options.reconnectOnError(err);
Expand All @@ -511,18 +510,32 @@ class Redis extends Commander_1.default {
}
item.command.reject(err);
break;
case 2:
if (this.status !== "reconnecting") {
this.disconnect(true);
case 2: {
const resendCommand = () => {
var _a;
if (this.status !== "reconnecting") {
this.disconnect(true);
}
if (((_a = this.condition) === null || _a === void 0 ? void 0 : _a.select) !== item.select &&
item.command.name !== "select") {
this.select(item.select);
}
// TODO
// @ts-expect-error
this.sendCommand(item.command);
};
if (typeof this.options.retryStrategy !== "function") {
return resendCommand();
}
if (((_a = this.condition) === null || _a === void 0 ? void 0 : _a.select) !== item.select &&
item.command.name !== "select") {
this.select(item.select);
const retryDelay = this.options.retryStrategy(++this.retryAttempts);
if (typeof retryDelay === "number") {
this.reconnectTimeout = setTimeout(() => {
this.reconnectTimeout = null;
resendCommand();
}, retryDelay);
}
// TODO
// @ts-expect-error
this.sendCommand(item.command);
break;
}
default:
item.command.reject(err);
}
Expand Down
15 changes: 8 additions & 7 deletions built/cluster/ClusterSubscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ClusterSubscriber {
if (!this.started || !this.subscriber) {
return;
}
// @ts-expect-error
if ((0, util_1.getNodeKey)(this.subscriber.options) === key) {
debug("subscriber has left, selecting a new one...");
this.selectSubscriber();
Expand Down Expand Up @@ -63,6 +64,7 @@ class ClusterSubscriber {
debug("stopped");
}
selectSubscriber() {
var _a, _b;
const lastActiveSubscriber = this.lastActiveSubscriber;
// Disconnect the previous subscriber even if there
// will not be a new one.
Expand Down Expand Up @@ -116,13 +118,12 @@ class ClusterSubscriber {
// Re-subscribe previous channels
const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
if (lastActiveSubscriber) {
const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
if (condition && condition.subscriber) {
previousChannels.subscribe = condition.subscriber.channels("subscribe");
previousChannels.psubscribe =
condition.subscriber.channels("psubscribe");
previousChannels.ssubscribe =
condition.subscriber.channels("ssubscribe");
const subscriber = ((_a = lastActiveSubscriber.condition) === null || _a === void 0 ? void 0 : _a.subscriber) ||
((_b = lastActiveSubscriber.prevCondition) === null || _b === void 0 ? void 0 : _b.subscriber);
if (subscriber) {
previousChannels.subscribe = subscriber.channels("subscribe");
previousChannels.psubscribe = subscriber.channels("psubscribe");
previousChannels.ssubscribe = subscriber.channels("ssubscribe");
}
}
if (previousChannels.subscribe.length ||
Expand Down
1 change: 1 addition & 0 deletions built/cluster/ConnectionPool.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ export default class ConnectionPool extends EventEmitter {
* Remove a node from the pool.
*/
private removeNode;
private getNodeKey;
}
export {};
7 changes: 5 additions & 2 deletions built/cluster/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ConnectionPool extends events_1.EventEmitter {
* Find or create a connection to the node
*/
findOrCreate(redisOptions, readOnly = false) {
const key = (0, util_1.getNodeKey)(redisOptions);
const key = this.getNodeKey(redisOptions);
readOnly = Boolean(readOnly);
if (this.specifiedOptions[key]) {
Object.assign(redisOptions, this.specifiedOptions[key]);
Expand Down Expand Up @@ -95,7 +95,7 @@ class ConnectionPool extends events_1.EventEmitter {
debug("Reset with %O", nodes);
const newNodes = {};
nodes.forEach((node) => {
const key = (0, util_1.getNodeKey)(node);
const key = this.getNodeKey(node);
// Don't override the existing (master) node
// when the current one is slave.
if (!(node.readOnly && newNodes[key])) {
Expand Down Expand Up @@ -133,5 +133,8 @@ class ConnectionPool extends events_1.EventEmitter {
}
}
}
getNodeKey(options) {
return (0, util_1.getNodeKey)(options) + ":" + options.nodeId;
}
}
exports.default = ConnectionPool;
1 change: 1 addition & 0 deletions built/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ class Cluster extends Commander_1.default {
port: items[j][1],
});
node.readOnly = j !== 2;
node.nodeId = items[j][2];
nodes.push(node);
keys.push(node.host + ":" + node.port);
}
Expand Down
1 change: 1 addition & 0 deletions built/cluster/util.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface RedisOptions {
host: string;
username?: string;
password?: string;
nodeId?: string;
[key: string]: any;
}
export interface SrvRecordsGroup {
Expand Down
35 changes: 24 additions & 11 deletions lib/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,20 +657,33 @@ class Redis extends Commander implements DataHandledable {
}
item.command.reject(err);
break;
case 2:
if (this.status !== "reconnecting") {
this.disconnect(true);
case 2: {
const resendCommand = () => {
if (this.status !== "reconnecting") {
this.disconnect(true);
}
if (
this.condition?.select !== item.select &&
item.command.name !== "select"
) {
this.select(item.select);
}
// TODO
// @ts-expect-error
this.sendCommand(item.command);
};
if (typeof this.options.retryStrategy !== "function") {
return resendCommand();
}
if (
this.condition?.select !== item.select &&
item.command.name !== "select"
) {
this.select(item.select);
const retryDelay = this.options.retryStrategy(++this.retryAttempts);
if (typeof retryDelay === "number") {
this.reconnectTimeout = setTimeout(() => {
this.reconnectTimeout = null;
resendCommand();
}, retryDelay);
}
// TODO
// @ts-expect-error
this.sendCommand(item.command);
break;
}
default:
item.command.reject(err);
}
Expand Down
22 changes: 12 additions & 10 deletions lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import ConnectionPool from "./ConnectionPool";
import { getConnectionName, getNodeKey } from "./util";
import { sample, noop, Debug } from "../utils";
import Redis from "../Redis";
import { Condition } from "../DataHandler";

const debug = Debug("cluster:subscriber");

export default class ClusterSubscriber {
private started = false;
private subscriber: any = null;
private lastActiveSubscriber: any;
private subscriber: Redis | null = null;
private lastActiveSubscriber: Redis & { prevCondition?: Condition };

constructor(
private connectionPool: ConnectionPool,
Expand All @@ -27,6 +28,7 @@ export default class ClusterSubscriber {
if (!this.started || !this.subscriber) {
return;
}
// @ts-expect-error
if (getNodeKey(this.subscriber.options) === key) {
debug("subscriber has left, selecting a new one...");
this.selectSubscriber();
Expand Down Expand Up @@ -140,14 +142,14 @@ export default class ClusterSubscriber {
// Re-subscribe previous channels
const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
if (lastActiveSubscriber) {
const condition =
lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
if (condition && condition.subscriber) {
previousChannels.subscribe = condition.subscriber.channels("subscribe");
previousChannels.psubscribe =
condition.subscriber.channels("psubscribe");
previousChannels.ssubscribe =
condition.subscriber.channels("ssubscribe");
const subscriber =
lastActiveSubscriber.condition?.subscriber ||
lastActiveSubscriber.prevCondition?.subscriber;

if (subscriber) {
previousChannels.subscribe = subscriber.channels("subscribe");
previousChannels.psubscribe = subscriber.channels("psubscribe");
previousChannels.ssubscribe = subscriber.channels("ssubscribe");
}
}
if (
Expand Down
8 changes: 6 additions & 2 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export default class ConnectionPool extends EventEmitter {
* Find or create a connection to the node
*/
findOrCreate(redisOptions: RedisOptions, readOnly = false): NodeRecord {
const key = getNodeKey(redisOptions);
const key = this.getNodeKey(redisOptions);
readOnly = Boolean(readOnly);

if (this.specifiedOptions[key]) {
Expand Down Expand Up @@ -119,7 +119,7 @@ export default class ConnectionPool extends EventEmitter {
debug("Reset with %O", nodes);
const newNodes = {};
nodes.forEach((node) => {
const key = getNodeKey(node);
const key = this.getNodeKey(node);

// Don't override the existing (master) node
// when the current one is slave.
Expand Down Expand Up @@ -161,4 +161,8 @@ export default class ConnectionPool extends EventEmitter {
}
}
}

private getNodeKey(options: RedisOptions) {
return getNodeKey(options) + ":" + options.nodeId;
}
}
3 changes: 2 additions & 1 deletion lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class Cluster extends Commander {
if (this.isRefreshing) {
return;
}

this.isRefreshing = true;

const _this = this;
Expand Down Expand Up @@ -868,6 +868,7 @@ class Cluster extends Commander {
port: items[j][1],
});
node.readOnly = j !== 2;
node.nodeId = items[j][2];
nodes.push(node);
keys.push(node.host + ":" + node.port);
}
Expand Down
1 change: 1 addition & 0 deletions lib/cluster/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface RedisOptions {
host: string;
username?: string;
password?: string;
nodeId?: string;
[key: string]: any;
}

Expand Down