Skip to content

Commit

Permalink
feat: improve typings for pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Mar 14, 2022
1 parent 94c1e24 commit 334242b
Show file tree
Hide file tree
Showing 15 changed files with 1,504 additions and 9,998 deletions.
2 changes: 1 addition & 1 deletion bin/generateRedisCommander/overrides.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module.exports = {
exec: {
overwrite: true,
defs: [
"exec(callback?: Callback<[error: Error, result: unknown][] | null>): Promise<[error: Error, result: unknown][] | null>;",
"exec(callback?: Callback<[error: Error | null, result: unknown][] | null>): Promise<[error: Error | null, result: unknown][] | null>;",
],
},
};
3 changes: 2 additions & 1 deletion bin/generateRedisCommander/template.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Callback } from "../types";

type RedisKey = string | Buffer;
type RedisValue = string | Buffer | number;
type Callback<T> = (err: Error | null | undefined, res?: T) => void;

// Inspired by https://github.com/mmkal/handy-redis/blob/main/src/generated/interface.ts.
// Should be fixed with https://github.com/Microsoft/TypeScript/issues/1213
Expand Down
2 changes: 1 addition & 1 deletion bin/generateRedisCommander/typeMaps.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module.exports = {
: "string | Buffer",
pattern: "string",
number: (name) =>
["seconds", "count", "start", "stop", "index"].some((pattern) =>
["seconds", "count", "start", "stop", "end", "index"].some((pattern) =>
name.toLowerCase().includes(pattern)
)
? "number"
Expand Down
8 changes: 4 additions & 4 deletions lib/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
} from "./redis/RedisOptions";
import { addTransactionSupport, Transaction } from "./transaction";
import {
CallbackFunction,
Callback,
CommandItem,
NetStream,
ScanStreamOptions,
Expand Down Expand Up @@ -182,7 +182,7 @@ class Redis extends Commander {
* When calling this method manually, a Promise is returned, which will
* be resolved when the connection status is ready.
*/
connect(callback?: CallbackFunction<void>): Promise<void> {
connect(callback?: Callback<void>): Promise<void> {
const promise = new Promise<void>((resolve, reject) => {
if (
this.status === "connecting" ||
Expand Down Expand Up @@ -447,7 +447,7 @@ class Redis extends Commander {
* });
* ```
*/
monitor(callback: CallbackFunction<Redis>): Promise<Redis> {
monitor(callback: Callback<Redis>): Promise<Redis> {
const monitorInstance = this.duplicate({
monitor: true,
lazyConnect: false,
Expand Down Expand Up @@ -734,7 +734,7 @@ class Redis extends Commander {
* Check whether Redis has finished loading the persistent data and is able to
* process commands.
*/
private _readyCheck(callback: CallbackFunction) {
private _readyCheck(callback: Callback) {
const _this = this;
this.info(function (err, res) {
if (err) {
Expand Down
12 changes: 6 additions & 6 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Command from "../command";
import ClusterAllFailedError from "../errors/ClusterAllFailedError";
import Redis from "../Redis";
import ScanStream from "../ScanStream";
import { CallbackFunction, ScanStreamOptions, WriteableStream } from "../types";
import { Callback, ScanStreamOptions, WriteableStream } from "../types";
import {
CONNECTION_CLOSED_ERROR_MSG,
Debug,
Expand Down Expand Up @@ -74,7 +74,7 @@ class Cluster extends Commander {
private isRefreshing = false;
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
private _runningAutoPipelines: Set<string> = new Set();
private _readyDelayedCallbacks: CallbackFunction[] = [];
private _readyDelayedCallbacks: Callback[] = [];

/**
* Every time Cluster#connect() is called, this value will be
Expand Down Expand Up @@ -299,7 +299,7 @@ class Cluster extends Commander {
/**
* Quit the cluster gracefully.
*/
quit(callback?: CallbackFunction<"OK">): Promise<"OK"> {
quit(callback?: Callback<"OK">): Promise<"OK"> {
const status = this.status;
this.setStatus("disconnecting");

Expand Down Expand Up @@ -375,7 +375,7 @@ class Cluster extends Commander {
}

// This is needed in order not to install a listener for each auto pipeline
delayUntilReady(callback: CallbackFunction) {
delayUntilReady(callback: Callback) {
this._readyDelayedCallbacks.push(callback);
}

Expand All @@ -397,7 +397,7 @@ class Cluster extends Commander {
/**
* Refresh the slot cache
*/
refreshSlotsCache(callback?: CallbackFunction<void>): void {
refreshSlotsCache(callback?: Callback<void>): void {
if (this.isRefreshing) {
if (typeof callback === "function") {
process.nextTick(callback);
Expand Down Expand Up @@ -872,7 +872,7 @@ class Cluster extends Commander {
/**
* Check whether Cluster is able to process commands
*/
private readyCheck(callback: CallbackFunction<void | "fail">): void {
private readyCheck(callback: Callback<void | "fail">): void {
(this as any).cluster("info", function (err, res) {
if (err) {
return callback(err);
Expand Down
6 changes: 3 additions & 3 deletions lib/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
convertMapToArray,
convertObjectToArray,
} from "./utils";
import { CallbackFunction, Respondable, CommandParameter } from "./types";
import { Callback, Respondable, CommandParameter } from "./types";

export type ArgumentType =
| string
Expand Down Expand Up @@ -157,7 +157,7 @@ export default class Command implements Respondable {
private replyEncoding: BufferEncoding | null;
private errorStack: Error;
private bufferMode: boolean;
private callback: CallbackFunction;
private callback: Callback;
private transformed = false;
private _commandTimeoutTimer?: NodeJS.Timeout;

Expand All @@ -176,7 +176,7 @@ export default class Command implements Respondable {
public name: string,
args: Array<ArgumentType> = [],
options: CommandOptions = {},
callback?: CallbackFunction
callback?: Callback
) {
this.replyEncoding = options.replyEncoding;
this.errorStack = options.errorStack;
Expand Down
6 changes: 2 additions & 4 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import asCallback from "standard-as-callback";
import { deprecate } from "util";
import Redis, { Cluster } from ".";
import Command from "./command";
import { CallbackFunction, PipelineWriteableStream } from "./types";
import { Callback, PipelineWriteableStream } from "./types";
import { noop } from "./utils";
import Commander from "./utils/Commander";

Expand Down Expand Up @@ -259,9 +259,7 @@ Pipeline.prototype.execBuffer = deprecate(function () {
//
// If a different promise instance were returned, that promise would cause its own unhandled promise rejection
// errors, even if that promise unconditionally resolved to **the resolved value of** this.promise.
Pipeline.prototype.exec = function (
callback: CallbackFunction
): Promise<Array<any>> {
Pipeline.prototype.exec = function (callback: Callback): Promise<Array<any>> {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
Expand Down
21 changes: 4 additions & 17 deletions lib/script.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createHash } from "crypto";
import Command from "./command";
import asCallback from "standard-as-callback";
import { CallbackFunction } from "./types";
import { Callback } from "./types";
export default class Script {
private sha: string;
private Command: new (...args: any[]) => Command;
Expand Down Expand Up @@ -40,12 +40,7 @@ export default class Script {
};
}

execute(
container: any,
args: any[],
options: any,
callback?: CallbackFunction
) {
execute(container: any, args: any[], options: any, callback?: Callback) {
if (typeof this.numberOfKeys === "number") {
args.unshift(this.numberOfKeys);
}
Expand All @@ -56,11 +51,7 @@ export default class Script {
options.readOnly = true;
}

const evalsha = new this.Command(
"evalsha",
[this.sha, ...args],
options
);
const evalsha = new this.Command("evalsha", [this.sha, ...args], options);
evalsha.isCustomCommand = true;

evalsha.promise = evalsha.promise.catch((err: Error) => {
Expand All @@ -70,11 +61,7 @@ export default class Script {

// Resend the same custom evalsha command that gets transformed to an eval
// in case it's not loaded yet on the connectionDo an eval as fallback, redis will hash and load it
const resend = new this.Command(
"evalsha",
[this.sha, ...args],
options
);
const resend = new this.Command("evalsha", [this.sha, ...args], options);
resend.isCustomCommand = true;

const client = container.isPipeline ? container.redis : container;
Expand Down
21 changes: 8 additions & 13 deletions lib/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import { wrapMultiResult, noop } from "./utils";
import asCallback from "standard-as-callback";
import Pipeline from "./pipeline";
import { CallbackFunction } from "./types";
import { Callback } from "./types";
import { ChainableCommander } from "./utils/RedisCommander";

interface MultiOptions {
pipeline: boolean;
}

export interface Transaction {
pipeline(commands?: [name: string, ...args: unknown[]][]): ChainableCommander;
multi(options?: MultiOptions): ChainableCommander;
multi(
commands?: [name: string, ...args: unknown[]][],
options?: MultiOptions
): ChainableCommander;
multi(options: { pipeline: false }): Promise<"OK">;
multi(): ChainableCommander;
multi(options: { pipeline: true }): ChainableCommander;
multi(commands?: [name: string, ...args: unknown[]][]): ChainableCommander;
}

export function addTransactionSupport(redis) {
Expand Down Expand Up @@ -42,7 +37,7 @@ export function addTransactionSupport(redis) {
pipeline.addBatch(commands);
}
const exec = pipeline.exec;
pipeline.exec = function (callback: CallbackFunction) {
pipeline.exec = function (callback: Callback) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
Expand Down Expand Up @@ -97,7 +92,7 @@ export function addTransactionSupport(redis) {
// @ts-expect-error
const { execBuffer } = pipeline;
// @ts-expect-error
pipeline.execBuffer = function (callback: CallbackFunction) {
pipeline.execBuffer = function (callback: Callback) {
if (this._transactions > 0) {
execBuffer.call(pipeline);
}
Expand All @@ -107,7 +102,7 @@ export function addTransactionSupport(redis) {
};

const { exec } = redis;
redis.exec = function (callback: CallbackFunction): Promise<any[] | null> {
redis.exec = function (callback: Callback): Promise<any[] | null> {
return asCallback(
exec.call(this).then(function (results: any[] | null) {
if (Array.isArray(results)) {
Expand Down
2 changes: 1 addition & 1 deletion lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Socket } from "net";
import { TLSSocket } from "tls";

export type CallbackFunction<T = any> = (err?: Error | null, result?: T) => void;
export type Callback<T = any> = (err?: Error | null, result?: T) => void;
export type NetStream = Socket | TLSSocket;

export type CommandParameter = string | Buffer | number | any[];
Expand Down
8 changes: 3 additions & 5 deletions lib/utils/Commander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
} from "../autoPipelining";
import Command, { ArgumentType } from "../command";
import Script from "../script";
import { CallbackFunction, WriteableStream } from "../types";
import { Callback, WriteableStream } from "../types";
import RedisCommander, { ClientContext } from "./RedisCommander";

export interface CommanderOptions {
Expand Down Expand Up @@ -129,9 +129,7 @@ function generateFunction(
_commandName = null;
}

return function (
...args: ArgumentType[] | [...ArgumentType[], CallbackFunction]
) {
return function (...args: ArgumentType[] | [...ArgumentType[], Callback]) {
const commandName = (_commandName || args.shift()) as string;
let callback = args[args.length - 1];

Expand All @@ -150,7 +148,7 @@ function generateFunction(
if (this.options.dropBufferSupport && !_encoding) {
return asCallback(
Promise.reject(new Error(DROP_BUFFER_SUPPORT_ERROR)),
callback as CallbackFunction | undefined
callback as Callback | undefined
);
}

Expand Down
Loading

0 comments on commit 334242b

Please sign in to comment.