From 3790c61daee36cb98836103870a1479b7456c3d0 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Thu, 18 Feb 2021 18:41:53 +0800 Subject: [PATCH 1/8] test: add metadata case --- packages/grpc/src/comsumer/clients.ts | 4 +- packages/grpc/src/interface.ts | 6 ++- packages/grpc/src/provider/framework.ts | 37 +++++++------ .../fixtures/base-app-stream/package.json | 3 ++ .../base-app-stream/src/configuration.ts | 10 ++++ .../fixtures/base-app-stream/src/interface.ts | 27 ++++++++++ .../base-app-stream/src/provider/math.ts | 29 ++++++++++ .../fixtures/base-app/src/provider/greeter.ts | 8 +++ packages/grpc/test/fixtures/proto/math.proto | 53 +++++++++++++++++++ packages/grpc/test/index.test.ts | 39 +++++++++++++- 10 files changed, 196 insertions(+), 20 deletions(-) create mode 100644 packages/grpc/test/fixtures/base-app-stream/package.json create mode 100644 packages/grpc/test/fixtures/base-app-stream/src/configuration.ts create mode 100644 packages/grpc/test/fixtures/base-app-stream/src/interface.ts create mode 100644 packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts create mode 100644 packages/grpc/test/fixtures/proto/math.proto diff --git a/packages/grpc/src/comsumer/clients.ts b/packages/grpc/src/comsumer/clients.ts index 91ded6222ed3..74b6073488bc 100644 --- a/packages/grpc/src/comsumer/clients.ts +++ b/packages/grpc/src/comsumer/clients.ts @@ -57,7 +57,9 @@ export class GRPCClients extends Map { } resolve(response); } - ); + ).on('metadata', (metadata) => { + console.log(metadata); + }); }); }; connectionService[camelCase(methodName)] = diff --git a/packages/grpc/src/interface.ts b/packages/grpc/src/interface.ts index 0f178b78639e..f67189dccfd6 100644 --- a/packages/grpc/src/interface.ts +++ b/packages/grpc/src/interface.ts @@ -1,15 +1,17 @@ import { IConfigurationOptions, IMidwayApplication, IMidwayContext } from '@midwayjs/core'; -import { Server, ServerCredentials, Metadata } from '@grpc/grpc-js'; +import { Server, ServerCredentials, Metadata, ServerUnaryCall } from '@grpc/grpc-js'; export interface IMidwayGRPCContext extends IMidwayContext { metadata: Metadata; method: string; + sendMetadata(metadata: Metadata): void; + call: ServerUnaryCall; } export type IMidwayGRPCApplication = IMidwayApplication & Server; export type Application = IMidwayGRPCApplication; -export type Context = IMidwayGRPCContext; +export interface Context extends IMidwayGRPCContext {} export interface IGRPCServiceOptions { /** diff --git a/packages/grpc/src/provider/framework.ts b/packages/grpc/src/provider/framework.ts index b460505aba8d..679abd64a543 100644 --- a/packages/grpc/src/provider/framework.ts +++ b/packages/grpc/src/provider/framework.ts @@ -100,7 +100,12 @@ export class MidwayGRPCFramework extends BaseFramework< call: ServerUnaryCall, callback: sendUnaryData ) => { - const ctx = { method, metadata: call.metadata } as any; + const ctx = { + method, + call, + metadata: call.metadata, + sendMetadata: call.sendMetadata, + } as any; this.app.createAnonymousContext(ctx); try { const service = await ctx.requestContext.getAsync(module); @@ -122,21 +127,23 @@ export class MidwayGRPCFramework extends BaseFramework< } public async run(): Promise { - return new Promise((resolve, reject) => { - this.server.bindAsync( - `${this.configurationOptions.url || 'localhost:6565'}`, - ServerCredentials.createInsecure(), - (err: Error | null, bindPort: number) => { - if (err) { - reject(err); - } + if (this.configurationOptions.url) { + return new Promise((resolve, reject) => { + this.server.bindAsync( + `${this.configurationOptions.url}`, + ServerCredentials.createInsecure(), + (err: Error | null, bindPort: number) => { + if (err) { + reject(err); + } - this.server.start(); - this.logger.info(`Server port = ${bindPort} start success`); - resolve(); - } - ); - }); + this.server.start(); + this.logger.info(`Server port = ${bindPort} start success`); + resolve(); + } + ); + }); + } } public async beforeStop() { diff --git a/packages/grpc/test/fixtures/base-app-stream/package.json b/packages/grpc/test/fixtures/base-app-stream/package.json new file mode 100644 index 000000000000..621cdc6a4174 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream/package.json @@ -0,0 +1,3 @@ +{ + "name": "ali-demo" +} diff --git a/packages/grpc/test/fixtures/base-app-stream/src/configuration.ts b/packages/grpc/test/fixtures/base-app-stream/src/configuration.ts new file mode 100644 index 000000000000..023d2b63e20d --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream/src/configuration.ts @@ -0,0 +1,10 @@ +import { Configuration } from '@midwayjs/decorator'; +import * as grpc from '../../../../src'; + +@Configuration({ + imports: [ + grpc + ], +}) +export class AutoConfiguration { +} diff --git a/packages/grpc/test/fixtures/base-app-stream/src/interface.ts b/packages/grpc/test/fixtures/base-app-stream/src/interface.ts new file mode 100644 index 000000000000..cafe2782c814 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream/src/interface.ts @@ -0,0 +1,27 @@ +import { Metadata, ServerDuplexStream, ServerReadableStream, ServerWritableStream } from '@grpc/grpc-js'; + +export namespace math { + export interface DivArgs { + dividend?: number; + divisor?: number; + } + export interface DivReply { + quotient?: number; + remainder?: number; + } + export interface FibArgs { + limit?: number; + } + export interface Num { + num?: number; + } + export interface FibReply { + count?: number; + } + export interface Math { + div(data: DivArgs, metadata?: Metadata): Promise; + divMany(requestStream: ServerDuplexStream, metadata?: Metadata): ServerDuplexStream; + fib(requestStream: ServerReadableStream, metadata?: Metadata): void; + sum(metadata?: Metadata): ServerWritableStream; + } +} diff --git a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts new file mode 100644 index 000000000000..71b9f3ca8f69 --- /dev/null +++ b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts @@ -0,0 +1,29 @@ +import { MSProviderType, Provider, Provide } from '@midwayjs/decorator'; +import { math } from '../interface'; + +/** + */ +@Provide() +@Provider(MSProviderType.GRPC, { package: 'math' }) +export class Math implements math.Math { + + async div(data: math.DivArgs, metadata?): Promise { + return { + quotient: 1, + remainder: 2, + } + } + + async divMany(data: math.DivArgs, metadata?): Promise { + return Promise.resolve(undefined); + } + + async fib(data: math.FibArgs, metadata?): Promise { + return Promise.resolve(undefined); + } + + async sum(data: math.Num, metadata?): Promise { + return Promise.resolve(undefined); + } + +} diff --git a/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts b/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts index abc7b070b6f2..14aaa893dcd1 100644 --- a/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts +++ b/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts @@ -1,6 +1,8 @@ import { MSProviderType, Provider, Provide, Inject } from '@midwayjs/decorator'; import { helloworld } from '../interface'; import { ILogger } from '@midwayjs/logger'; +import { Context } from '../../../../../src'; +import { Metadata } from '@grpc/grpc-js'; /** * package helloworld @@ -13,11 +15,17 @@ export class Greeter implements helloworld.Greeter { @Inject() logger: ILogger; + @Inject() + ctx: Context; + /** * Implements the SayHello RPC method. */ async sayHello(request: helloworld.HelloRequest) { this.logger.info('this is a context logger'); + const serverMetadata = new Metadata(); + serverMetadata.add('Set-Cookie', 'yummy_cookie=choco'); + this.ctx.sendMetadata(serverMetadata); return { message: 'Hello ' + request.name } } } diff --git a/packages/grpc/test/fixtures/proto/math.proto b/packages/grpc/test/fixtures/proto/math.proto new file mode 100644 index 000000000000..89bfa159206b --- /dev/null +++ b/packages/grpc/test/fixtures/proto/math.proto @@ -0,0 +1,53 @@ + +syntax = "proto3"; + +package math; + +message DivArgs { + int64 dividend = 1; + int64 divisor = 2; +} + +message DivReply { + int64 quotient = 1; + int64 remainder = 2; +} + +message FibArgs { + int64 limit = 1; +} + +message Num { + int64 num = 1; +} + +message FibReply { + int64 count = 1; +} + +service Math { + // Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + // and remainder. + rpc Div (DivArgs) returns (DivReply) { + } + + // DivMany accepts an arbitrary number of division args from the client stream + // and sends back the results in the reply stream. The stream continues until + // the client closes its end; the server does the same after sending all the + // replies. The stream ends immediately if either end aborts. + rpc DivMany (stream DivArgs) returns (stream DivReply) { + } + + // Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib + // generates up to limit numbers; otherwise it continues until the call is + // canceled. Unlike Fib above, Fib has no final FibReply. + rpc Fib (FibArgs) returns (stream Num) { + // 服务端往客户端推 + } + + // Sum sums a stream of numbers, returning the final result once the stream + // is closed. + rpc Sum (stream Num) returns (Num) { + // 客户端往服务端推 + } +} diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index 475816290304..a4cbfa9e63e2 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -1,6 +1,7 @@ import { createServer, closeApp } from './utils'; import { join } from 'path'; import { createGRPCConsumer } from '../src'; +import { math } from './fixtures/base-app-stream/src/interface'; export namespace hero { export interface HeroService { @@ -38,12 +39,14 @@ describe('/test/index.test.ts', function () { protoPath: join(__dirname, 'fixtures/proto/helloworld.proto'), package: 'helloworld', } - ] + ], + url: 'localhost:6565' }); const service = await createGRPCConsumer({ package: 'helloworld', protoPath: join(__dirname, 'fixtures/proto/helloworld.proto'), + url: 'localhost:6565' }); const result = await service.sayHello({ @@ -65,19 +68,51 @@ describe('/test/index.test.ts', function () { protoPath: join(__dirname, 'fixtures/proto/helloworld.proto'), package: 'helloworld', } - ] + ], + url: 'localhost:6565' }); const service: any = await createGRPCConsumer({ package: 'hero', protoPath: join(__dirname, 'fixtures/proto/hero.proto'), + url: 'localhost:6565' }); const result = await service.findOne({ id: 123 + }, (metadata) => { + }); expect(result).toEqual({ id: 1, name: 'bbbb-Hello harry' }) await closeApp(app); }); + + it('should support publish stream gRPC server', async () => { + const app = await createServer('base-app-stream', { + services: [ + { + protoPath: join(__dirname, 'fixtures/proto/math.proto'), + package: 'math', + } + ], + url: 'localhost:6565' + }); + + const service = await createGRPCConsumer({ + package: 'math', + protoPath: join(__dirname, 'fixtures/proto/math.proto'), + url: 'localhost:6565' + }); + + const result = await service.div({ + dividend: 222, + }); + + expect(result).toEqual({ + 'quotient': '1', + 'remainder': '2' + }) + await closeApp(app); + }); }); From 74c729e51fd97573e85dce1c89f2faaaddebfc13 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sat, 20 Feb 2021 12:13:37 +0800 Subject: [PATCH 2/8] refactor: update grpc client --- .../decorator/src/microservice/provider.ts | 26 +++- packages/grpc/src/comsumer/clients.ts | 50 +++++--- .../grpc/src/comsumer/type/duplex-request.ts | 91 ++++++++++++++ .../src/comsumer/type/readable-request.ts | 49 ++++++++ .../grpc/src/comsumer/type/unary-request.ts | 45 +++++++ .../src/comsumer/type/writeable-request.ts | 54 ++++++++ packages/grpc/src/interface.ts | 32 ++++- packages/grpc/src/provider/framework.ts | 113 ++++++++++++----- .../fixtures/base-app-stream/src/interface.ts | 33 ++++- .../base-app-stream/src/provider/math.ts | 57 +++++++-- packages/grpc/test/fixtures/proto/math.proto | 14 +-- packages/grpc/test/index.test.ts | 116 ++++++++++++++++-- 12 files changed, 591 insertions(+), 89 deletions(-) create mode 100644 packages/grpc/src/comsumer/type/duplex-request.ts create mode 100644 packages/grpc/src/comsumer/type/readable-request.ts create mode 100644 packages/grpc/src/comsumer/type/unary-request.ts create mode 100644 packages/grpc/src/comsumer/type/writeable-request.ts diff --git a/packages/decorator/src/microservice/provider.ts b/packages/decorator/src/microservice/provider.ts index 01070c283304..06a3921f657e 100644 --- a/packages/decorator/src/microservice/provider.ts +++ b/packages/decorator/src/microservice/provider.ts @@ -4,6 +4,7 @@ import { saveModule, MS_PROVIDER_KEY, attachClassMetadata, + savePropertyMetadata, MS_GRPC_METHOD_KEY, MS_DUBBO_METHOD_KEY, MS_HSF_METHOD_KEY, @@ -38,14 +39,31 @@ export function Provider( }; } -export function GrpcMethod(methodName?: string): MethodDecorator { +export enum GrpcStreamTypeEnum { + BASE = 'base', + DUPLEX = 'ServerDuplexStream', + READABLE = 'ServerReadableStream', + WRITEABLE = 'ServerWritableStream', +} + +export function GrpcMethod(methodOptions: { + methodName?: string; + type?: GrpcStreamTypeEnum; + onEnd?: string; +} = {}): MethodDecorator { return (target, propertyName, descriptor: PropertyDescriptor) => { - attachClassMetadata( + if (!methodOptions.type) { + methodOptions.type = GrpcStreamTypeEnum.BASE; + } + savePropertyMetadata( MS_GRPC_METHOD_KEY, { - methodName: methodName || propertyName, + methodName: methodOptions.methodName || propertyName, + type: methodOptions.type, + onEnd: methodOptions.onEnd, }, - target + target, + propertyName, ); return descriptor; diff --git a/packages/grpc/src/comsumer/clients.ts b/packages/grpc/src/comsumer/clients.ts index 74b6073488bc..dfcd23a33b24 100644 --- a/packages/grpc/src/comsumer/clients.ts +++ b/packages/grpc/src/comsumer/clients.ts @@ -12,6 +12,11 @@ import { DefaultConfig } from '../interface'; import { loadProto } from '../util'; import * as camelCase from 'camelcase'; import { ILogger } from '@midwayjs/logger'; +import { ClientUnaryRequest } from './type/unary-request'; +import { ClientDuplexStreamRequest } from './type/duplex-request'; +import { ClientReadableRequest } from './type/readable-request'; +import { ClientWritableRequest } from './type/writeable-request'; + @Autoload() @Provide('clients') @@ -46,24 +51,10 @@ export class GRPCClients extends Map { ); for (const methodName of Object.keys(packageDefinition[definition])) { const originMethod = connectionService[methodName]; - connectionService[methodName] = async (...args) => { - return new Promise((resolve, reject) => { - originMethod.call( - connectionService, - args[0], - (err, response) => { - if (err) { - reject(err); - } - resolve(response); - } - ).on('metadata', (metadata) => { - console.log(metadata); - }); - }); - }; - connectionService[camelCase(methodName)] = - connectionService[methodName]; + connectionService[methodName] = () => { + return this.getClientRequestImpl(connectionService, originMethod); + } + connectionService[camelCase(methodName)] = connectionService[methodName]; } this.set(definition, connectionService); } @@ -74,4 +65,27 @@ export class GRPCClients extends Map { getService(serviceName: string): T { return this.get(serviceName); } + + getClientRequestImpl(client, originalFunction, options = {}) { + const genericFunctionSelector = + (originalFunction.requestStream ? 2 : 0) | (originalFunction.responseStream ? 1 : 0); + + let genericFunctionName; + switch (genericFunctionSelector) { + case 0: + genericFunctionName = new ClientUnaryRequest(client, originalFunction, options); + break; + case 1: + genericFunctionName = new ClientReadableRequest(client, originalFunction, options); + break; + case 2: + genericFunctionName = new ClientWritableRequest(client, originalFunction, options); + break; + case 3: + genericFunctionName = new ClientDuplexStreamRequest(client, originalFunction, options); + break; + } + + return genericFunctionName; + } } diff --git a/packages/grpc/src/comsumer/type/duplex-request.ts b/packages/grpc/src/comsumer/type/duplex-request.ts new file mode 100644 index 000000000000..3a776041604b --- /dev/null +++ b/packages/grpc/src/comsumer/type/duplex-request.ts @@ -0,0 +1,91 @@ +import { Metadata } from '@grpc/grpc-js'; +import { IClientDuplexStreamService } from '../../interface'; + +export class ClientDuplexStreamRequest implements IClientDuplexStreamService { + + correlationId: number; + timeout_message; + queue; + client; + metadata; + timeout; + stream; + promise; + + static get MAX_INT32() { + return 2147483647; + } + + constructor(client, original_function, options: { + metadata?: Metadata; + timeout?: number; + timeout_message?: number; + } = {}) { + this.queue = {}; + this.correlationId = 0; + this.timeout_message = options.timeout_message || 1000; + this.metadata = options.metadata || new Metadata(); + + // Deadline is advisable to be set + // It should be a timestamp value in milliseconds + let deadline = undefined; + if (options.timeout !== undefined) { + deadline = Date.now() + options.timeout; + } + this.stream = original_function.call(client, this.metadata, {deadline: deadline}); + + this.stream.on('error', () => { + }); + this.stream.on('data', data => { + if (this.queue[data.id]) { + clearTimeout(this.queue[data.id]['timeout']); + this.queue[data.id]['cb'](null, data); + delete this.queue[data.id]; + } + }); + } + + _nextId() { + if (this.correlationId >= ClientDuplexStreamRequest.MAX_INT32) { + this.correlationId = 0; + } + return this.correlationId++; + } + + sendMetadata(metadata: Metadata): IClientDuplexStreamService { + return this; + } + + sendMessage(content: reqType = ({} as any)): Promise { + return new Promise((resolve, reject) => { + const id = this._nextId(); + + if (this.stream.received_status) { + return reject('stream_closed'); + } + + const cb = (err: Error, response?) => { + if (err) { + reject(err); + } else { + resolve(response); + } + }; + + this.queue[id] = { + cb, + timeout: setTimeout(() => { + delete this.queue[id]; + cb(new Error(`provider response timeout in ${this.timeout_message}`)); + }, this.timeout_message) + }; + content['_id'] = id; + this.stream.write(content); + }); + } + + end(): void { + return this.stream.end(); + } + +} diff --git a/packages/grpc/src/comsumer/type/readable-request.ts b/packages/grpc/src/comsumer/type/readable-request.ts new file mode 100644 index 000000000000..22a2db8ff83d --- /dev/null +++ b/packages/grpc/src/comsumer/type/readable-request.ts @@ -0,0 +1,49 @@ +import { Metadata } from '@grpc/grpc-js'; +import { IClientReadableStreamService } from '../../interface'; + +export class ClientReadableRequest implements IClientReadableStreamService { + + client; + metadata; + timeout; + stream; + queue; + original_function; + + constructor(client, original_function, options: { + metadata?: Metadata; + timeout?: number; + } = {}) { + this.queue = []; + this.client = client; + this.metadata = options.metadata || new Metadata(); + this.timeout = options.timeout || undefined; + this.original_function = original_function; + } + + sendMetadata(metadata: Metadata): IClientReadableStreamService { + return this; + } + + sendMessage(content: reqType): Promise { + return new Promise((resolve, reject) => { + // Deadline is advisable to be set + // It should be a timestamp value in milliseconds + let deadline = undefined; + if (this.timeout !== undefined) { + deadline = Date.now() + this.timeout; + } + this.stream = this.original_function.call(this.client, content, this.metadata, {deadline: deadline}); + this.stream.on('error', error => { + reject(error); + }); + this.stream.on('data', data => { + this.queue.push(data); + }); + this.stream.on('end', () => { + resolve(this.queue); + }); + }); + } + +} diff --git a/packages/grpc/src/comsumer/type/unary-request.ts b/packages/grpc/src/comsumer/type/unary-request.ts new file mode 100644 index 000000000000..d0e034020c65 --- /dev/null +++ b/packages/grpc/src/comsumer/type/unary-request.ts @@ -0,0 +1,45 @@ +import { Metadata } from '@grpc/grpc-js'; +import { IClientUnaryService } from '../../interface'; + +export class ClientUnaryRequest implements IClientUnaryService { + + client; + metadata; + timeout; + original_function; + + constructor(client, original_function, options: { + metadata?: Metadata; + timeout?: number; + } = {}) { + this.client = client; + this.metadata = options.metadata || new Metadata(); + this.timeout = options.timeout || undefined; + this.original_function = original_function; + } + + sendMetadata(Metadata): IClientUnaryService { + return this; + } + + sendMessage(content: reqType): Promise { + return new Promise((resolve, reject) => { + // Deadline is advisable to be set + // It should be a timestamp value in milliseconds + let deadline = undefined; + if (this.timeout !== undefined) { + deadline = Date.now() + this.timeout; + } + this.original_function.call(this.client, content, this.metadata, {deadline: deadline}, + (error, response) => { + if (error) { + reject(error); + } else { + resolve(response); + } + } + ); + }); + } + +} diff --git a/packages/grpc/src/comsumer/type/writeable-request.ts b/packages/grpc/src/comsumer/type/writeable-request.ts new file mode 100644 index 000000000000..d99a9424be21 --- /dev/null +++ b/packages/grpc/src/comsumer/type/writeable-request.ts @@ -0,0 +1,54 @@ +import { Metadata } from '@grpc/grpc-js'; +import { IClientWritableStreamService } from '../../interface'; + +export class ClientWritableRequest implements IClientWritableStreamService { + + client; + metadata; + timeout; + stream; + promise; + + constructor(client, original_function, options: { + metadata?: Metadata; + timeout?: number; + } = {}) { + this.promise = new Promise((resolve, reject) => { + // Deadline is advisable to be set + // It should be a timestamp value in milliseconds + let deadline = undefined; + if (options.timeout !== undefined) { + deadline = Date.now() + options.timeout; + } + this.metadata = options.metadata || new Metadata(); + this.stream = original_function.call(client, this.metadata, {deadline: deadline}, + (error, response) => { + if (error) { + reject(error); + } else { + resolve(response); + } + } + ); + }); + } + + sendMetadata(metadata: Metadata): IClientWritableStreamService { + return this; + } + + sendMessage(content: reqType): IClientWritableStreamService { + this.stream.write(content); + return this; + } + + end(): Promise { + this.stream.end(); + return this.promise; + } + + getCall() { + return this.stream; + } + +} diff --git a/packages/grpc/src/interface.ts b/packages/grpc/src/interface.ts index f67189dccfd6..fdfc7726b88b 100644 --- a/packages/grpc/src/interface.ts +++ b/packages/grpc/src/interface.ts @@ -1,11 +1,9 @@ import { IConfigurationOptions, IMidwayApplication, IMidwayContext } from '@midwayjs/core'; -import { Server, ServerCredentials, Metadata, ServerUnaryCall } from '@grpc/grpc-js'; +import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, /*ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream*/ } from '@grpc/grpc-js'; -export interface IMidwayGRPCContext extends IMidwayContext { +export interface IMidwayGRPCContext extends ServerUnaryCall, IMidwayContext { metadata: Metadata; method: string; - sendMetadata(metadata: Metadata): void; - call: ServerUnaryCall; } export type IMidwayGRPCApplication = IMidwayApplication & Server; @@ -45,3 +43,29 @@ export interface IMidwayGRPFrameworkOptions extends IConfigurationOptions { export interface DefaultConfig extends IConfigurationOptions { services: IGRPCServiceOptions[]; } + +export interface IClientUnaryService { + sendMetadata(Metadata): IClientUnaryService; + sendMessage(reqData: reqType): Promise; + // getCall(): ClientUnaryCall; +} + +export interface IClientReadableStreamService { + sendMetadata(metadata: Metadata): IClientReadableStreamService; + sendMessage(reqData: reqType): Promise; + // getCall(): ClientReadableStream; +} + +export interface IClientWritableStreamService { + sendMetadata(metadata: Metadata): IClientWritableStreamService; + sendMessage(reqData: reqType): IClientWritableStreamService; + end(): Promise; + getCall(): ClientWritableStream; +} + +export interface IClientDuplexStreamService { + sendMetadata(metadata: Metadata): IClientDuplexStreamService; + sendMessage(reqData: reqType): Promise; + // getCall(): ClientDuplexStream; + end(): void; +} diff --git a/packages/grpc/src/provider/framework.ts b/packages/grpc/src/provider/framework.ts index 679abd64a543..1de882d9062c 100644 --- a/packages/grpc/src/provider/framework.ts +++ b/packages/grpc/src/provider/framework.ts @@ -1,29 +1,18 @@ -import { - Server, - ServerCredentials, - setLogger, - ServerUnaryCall, - sendUnaryData, -} from '@grpc/grpc-js'; -import { - BaseFramework, - getClassMetadata, - IMidwayBootstrapOptions, - listModule, - MidwayFrameworkType, -} from '@midwayjs/core'; +import { sendUnaryData, Server, ServerCredentials, ServerUnaryCall, setLogger, } from '@grpc/grpc-js'; +import { BaseFramework, IMidwayBootstrapOptions, MidwayFrameworkType, } from '@midwayjs/core'; import { DecoratorMetadata, + getClassMetadata, + getPropertyMetadata, getProviderId, + GrpcStreamTypeEnum, + listModule, + MS_GRPC_METHOD_KEY, MS_PROVIDER_KEY, MSProviderType, } from '@midwayjs/decorator'; -import { - IMidwayGRPCApplication, - IMidwayGRPCContext, - IMidwayGRPFrameworkOptions, -} from '../interface'; +import { Context, IMidwayGRPCApplication, IMidwayGRPCContext, IMidwayGRPFrameworkOptions, } from '../interface'; import { pascalCase } from 'pascal-case'; import * as camelCase from 'camelcase'; import { loadProto } from '../util'; @@ -98,23 +87,58 @@ export class MidwayGRPCFramework extends BaseFramework< for (const method in serviceDefinition) { serviceInstance[method] = async ( call: ServerUnaryCall, - callback: sendUnaryData + callback?: sendUnaryData ) => { - const ctx = { - method, - call, - metadata: call.metadata, - sendMetadata: call.sendMetadata, - } as any; + // merge ctx and call + const ctx = call as any; + ctx.method = method; this.app.createAnonymousContext(ctx); - try { - const service = await ctx.requestContext.getAsync(module); - const result = await service[camelCase(method)]?.apply(service, [ - call.request, - ]); - callback(null, result); - } catch (err) { - callback(err); + + // get service from request container + const service = await ctx.requestContext.getAsync(module); + + // get metadata from decorator + const grpcMethodData: { + methodName: string; + type: GrpcStreamTypeEnum; + onEnd: string; + } = getPropertyMetadata( + MS_GRPC_METHOD_KEY, + module, + camelCase(method), + ); + + if (grpcMethodData.type === GrpcStreamTypeEnum.DUPLEX + || grpcMethodData.type === GrpcStreamTypeEnum.READABLE) { + // listen data and trigger binding method + call.on('data', async (data) => { + await this.handleContextMethod({ + service, + ctx, + callback, + data, + grpcMethodData, + }); + }); + call.on('end', async () => { + if (grpcMethodData.onEnd) { + try { + const endResult = await service[grpcMethodData.onEnd](); + callback && callback(null, endResult); + } catch (err) { + callback && callback(err); + } + } + }); + } else { + // writable and base type will be got data directly + await this.handleContextMethod({ + service, + ctx, + callback, + data: call.request, + grpcMethodData, + }); } }; } @@ -126,6 +150,27 @@ export class MidwayGRPCFramework extends BaseFramework< } } + protected async handleContextMethod(options: { + service, ctx: Context, callback, data: any, grpcMethodData: { + methodName: string; + type: GrpcStreamTypeEnum; + onEnd: string; + } + }) { + let result; + const {service, ctx, callback, data, grpcMethodData} = options; + + try { + result = await service[camelCase(ctx.method)]?.call(service, data); + if (grpcMethodData.type === GrpcStreamTypeEnum.BASE) { + // base 才返回,其他的要等服务端自己 end,或者等客户端 end 事件才结束 + callback && callback(null, result); + } + } catch (err) { + callback && callback(err); + } + } + public async run(): Promise { if (this.configurationOptions.url) { return new Promise((resolve, reject) => { diff --git a/packages/grpc/test/fixtures/base-app-stream/src/interface.ts b/packages/grpc/test/fixtures/base-app-stream/src/interface.ts index cafe2782c814..5b9286751705 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/interface.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/interface.ts @@ -1,4 +1,9 @@ -import { Metadata, ServerDuplexStream, ServerReadableStream, ServerWritableStream } from '@grpc/grpc-js'; +import { + IClientDuplexStreamService, + IClientReadableStreamService, + IClientUnaryService, + IClientWritableStreamService +} from '../../../../src'; export namespace math { export interface DivArgs { @@ -18,10 +23,28 @@ export namespace math { export interface FibReply { count?: number; } + + /** + * server interface + */ export interface Math { - div(data: DivArgs, metadata?: Metadata): Promise; - divMany(requestStream: ServerDuplexStream, metadata?: Metadata): ServerDuplexStream; - fib(requestStream: ServerReadableStream, metadata?: Metadata): void; - sum(metadata?: Metadata): ServerWritableStream; + div(data: math.DivArgs): Promise; + divMany(data: any): void; + // 服务端推,客户端读 + fib(fibArgs: math.FibArgs): Promise + // 客户端端推,服务端读 + sum(num: Num): Promise; + } + + /** + * client interface + */ + export interface MathClient { + div(): IClientUnaryService; + divMany(): Promise>; + // 服务端推,客户端读 + fib(): IClientReadableStreamService; + // 客户端端推,服务端读 + sum(): IClientWritableStreamService; } } diff --git a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts index 71b9f3ca8f69..a2e91e7a414b 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts @@ -1,5 +1,7 @@ -import { MSProviderType, Provider, Provide } from '@midwayjs/decorator'; +import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator'; +import { IMidwayGRPCContext } from '../../../../../src'; import { math } from '../interface'; +import { Metadata } from '@grpc/grpc-js'; /** */ @@ -7,23 +9,62 @@ import { math } from '../interface'; @Provider(MSProviderType.GRPC, { package: 'math' }) export class Math implements math.Math { - async div(data: math.DivArgs, metadata?): Promise { + @Inject() + ctx: IMidwayGRPCContext; + + sumDataList = []; + + @GrpcMethod() + async div(data: math.DivArgs): Promise { return { quotient: 1, remainder: 2, } } - async divMany(data: math.DivArgs, metadata?): Promise { - return Promise.resolve(undefined); + @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' }) + async divMany(message) { + this.ctx.write({ + id: message.id, + num: 1 + }); + } + + async duplexEnd() { + + } + + @GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE }) + async fib(fibArgs: math.FibArgs) { + this.ctx.write({ + num: 1 + fibArgs.limit + }); + this.ctx.write({ + num: 2 + fibArgs.limit + }); + this.ctx.write({ + num: 3 + fibArgs.limit + }); + + const meta = new Metadata(); + this.ctx.metadata.add('xxx', 'bbb'); + + this.ctx.sendMetadata(meta); + this.ctx.end(); } - async fib(data: math.FibArgs, metadata?): Promise { - return Promise.resolve(undefined); + @GrpcMethod({type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' }) + async sum(data: math.Num) { + this.sumDataList.push(data); } - async sum(data: math.Num, metadata?): Promise { - return Promise.resolve(undefined); + async sumEnd(): Promise { + const total = this.sumDataList.reduce((pre, cur) => { + return { + num: pre.num + cur.num, + } + }) + return total; } } diff --git a/packages/grpc/test/fixtures/proto/math.proto b/packages/grpc/test/fixtures/proto/math.proto index 89bfa159206b..78de535ca27d 100644 --- a/packages/grpc/test/fixtures/proto/math.proto +++ b/packages/grpc/test/fixtures/proto/math.proto @@ -4,25 +4,25 @@ syntax = "proto3"; package math; message DivArgs { - int64 dividend = 1; - int64 divisor = 2; + int32 dividend = 1; + int32 divisor = 2; } message DivReply { - int64 quotient = 1; - int64 remainder = 2; + int32 quotient = 1; + int32 remainder = 2; } message FibArgs { - int64 limit = 1; + int32 limit = 1; } message Num { - int64 num = 1; + int32 num = 1; } message FibReply { - int64 count = 1; + int32 count = 1; } service Math { diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index a4cbfa9e63e2..3909928f1d8e 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -1,7 +1,12 @@ import { createServer, closeApp } from './utils'; import { join } from 'path'; -import { createGRPCConsumer } from '../src'; -import { math } from './fixtures/base-app-stream/src/interface'; +import { + createGRPCConsumer, + IClientDuplexStreamService, + IClientReadableStreamService, + IClientUnaryService, + IClientWritableStreamService +} from '../src'; export namespace hero { export interface HeroService { @@ -30,6 +35,37 @@ export namespace helloworld { } } + +export namespace math { + export interface DivArgs { + dividend?: number; + divisor?: number; + } + export interface DivReply { + quotient?: number; + remainder?: number; + } + export interface FibArgs { + limit?: number; + } + export interface Num { + num?: number; + } + export interface FibReply { + count?: number; + } + + export interface MathClient { + div(): IClientUnaryService; + divMany(): Promise>; + // 服务端推,客户端读 + fib(): IClientReadableStreamService; + // 客户端端推,服务端读 + sum(): IClientWritableStreamService; + } +} + + describe('/test/index.test.ts', function () { it('should create gRPC server', async () => { @@ -99,20 +135,82 @@ describe('/test/index.test.ts', function () { url: 'localhost:6565' }); - const service = await createGRPCConsumer({ + const service = await createGRPCConsumer({ package: 'math', protoPath: join(__dirname, 'fixtures/proto/math.proto'), url: 'localhost:6565' }); - const result = await service.div({ - dividend: 222, + // 一元操作 + // let result: any = await service.div({ + // dividend: 222, + // }); + // + // expect(result).toEqual({ + // 'quotient': 1, + // 'remainder': 2, + // }); + + // 使用发送消息的写法 + // let result1 = await service.div().sendMessage({ + // dividend: 222, + // }); + // + // expect(result1.quotient).toEqual(1); + // + // + // // 服务端推送 + // let total = 0; + // let result2 = await service.fib().sendMessage({ + // limit: 1, + // }); + // + // result2.forEach(data => { + // total += data.num; + // }); + // + // expect(total).toEqual(9); + + // 客户端推送 + + // const data = await service.sum() + // .sendMessage({num: 1}) + // .sendMessage({num: 2}) + // .sendMessage({num: 3}) + // .end(); + // + // expect(data.num).toEqual(6); + + // const ser = service.sum(); + // ser.sendMessage({num: 1}); + // ser.sendMessage({num: 2}); + // ser.sendMessage({num: 3}); + // + // ser.end().then((res) => { + // console.log(res) + // }); + + + // 双向流 + const t = await service.divMany(); + + await new Promise((resolve, reject) => { + t.sendMessage({}) + .then(res => { + console.log('Client: Stream Message Received = ', res); // Client: Stream Message Received = {id: 0} + }) + .catch(err => console.error(err)) + ; + t.sendMessage({}) + .then(res => { + console.log('Client: Stream Message Received = ', res); // Client: Stream Message Received = {id: 1} + resolve(); + }) + .catch(err => console.error(err)) + ; + t.end(); }); - expect(result).toEqual({ - 'quotient': '1', - 'remainder': '2' - }) await closeApp(app); }); }); From 869085e278adf745d89e03dee7f01345e9a69af4 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sat, 20 Feb 2021 15:01:21 +0800 Subject: [PATCH 3/8] refactor: update grpc client --- packages/grpc/test/index.test.ts | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index 3909928f1d8e..b04fedb01f18 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -170,9 +170,8 @@ describe('/test/index.test.ts', function () { // }); // // expect(total).toEqual(9); - - // 客户端推送 - + // + // // 客户端推送 // const data = await service.sum() // .sendMessage({num: 1}) // .sendMessage({num: 2}) @@ -181,15 +180,6 @@ describe('/test/index.test.ts', function () { // // expect(data.num).toEqual(6); - // const ser = service.sum(); - // ser.sendMessage({num: 1}); - // ser.sendMessage({num: 2}); - // ser.sendMessage({num: 3}); - // - // ser.end().then((res) => { - // console.log(res) - // }); - // 双向流 const t = await service.divMany(); From a9dfe4fe7476e98313f9b24a8ccb95cd5ee9e463 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sat, 20 Feb 2021 21:02:47 +0800 Subject: [PATCH 4/8] refactor: complete client API --- packages/grpc/src/comsumer/clients.ts | 10 +- .../grpc/src/comsumer/type/duplex-request.ts | 25 ++-- .../src/comsumer/type/readable-request.ts | 8 +- .../grpc/src/comsumer/type/unary-request.ts | 4 - .../src/comsumer/type/writeable-request.ts | 4 - packages/grpc/src/interface.ts | 10 +- packages/grpc/src/provider/framework.ts | 21 ++- .../fixtures/base-app-stream/src/interface.ts | 33 ++--- .../base-app-stream/src/provider/math.ts | 21 ++- packages/grpc/test/fixtures/proto/math.proto | 45 ++---- packages/grpc/test/index.test.ts | 139 ++++++++++-------- 11 files changed, 156 insertions(+), 164 deletions(-) diff --git a/packages/grpc/src/comsumer/clients.ts b/packages/grpc/src/comsumer/clients.ts index dfcd23a33b24..7ab8844dbae2 100644 --- a/packages/grpc/src/comsumer/clients.ts +++ b/packages/grpc/src/comsumer/clients.ts @@ -7,7 +7,7 @@ import { Scope, ScopeEnum, } from '@midwayjs/decorator'; -import { credentials, loadPackageDefinition } from '@grpc/grpc-js'; +import { credentials, loadPackageDefinition, Metadata } from '@grpc/grpc-js'; import { DefaultConfig } from '../interface'; import { loadProto } from '../util'; import * as camelCase from 'camelcase'; @@ -17,7 +17,6 @@ import { ClientDuplexStreamRequest } from './type/duplex-request'; import { ClientReadableRequest } from './type/readable-request'; import { ClientWritableRequest } from './type/writeable-request'; - @Autoload() @Provide('clients') @Scope(ScopeEnum.Singleton) @@ -51,8 +50,11 @@ export class GRPCClients extends Map { ); for (const methodName of Object.keys(packageDefinition[definition])) { const originMethod = connectionService[methodName]; - connectionService[methodName] = () => { - return this.getClientRequestImpl(connectionService, originMethod); + connectionService[methodName] = (clientOptions: { + metadata?: Metadata; + timeout?: number; + }) => { + return this.getClientRequestImpl(connectionService, originMethod, clientOptions); } connectionService[camelCase(methodName)] = connectionService[methodName]; } diff --git a/packages/grpc/src/comsumer/type/duplex-request.ts b/packages/grpc/src/comsumer/type/duplex-request.ts index 3a776041604b..189fb8ac0d0c 100644 --- a/packages/grpc/src/comsumer/type/duplex-request.ts +++ b/packages/grpc/src/comsumer/type/duplex-request.ts @@ -11,6 +11,7 @@ export class ClientDuplexStreamRequest implements IClientDuple timeout; stream; promise; + messageKey: string; static get MAX_INT32() { return 2147483647; @@ -19,12 +20,14 @@ export class ClientDuplexStreamRequest implements IClientDuple constructor(client, original_function, options: { metadata?: Metadata; timeout?: number; - timeout_message?: number; + timeoutMessage?: number; + messageKey?: string; } = {}) { this.queue = {}; this.correlationId = 0; - this.timeout_message = options.timeout_message || 1000; + this.timeout_message = options.timeoutMessage || 1000; this.metadata = options.metadata || new Metadata(); + this.messageKey = options.messageKey || 'id'; // Deadline is advisable to be set // It should be a timestamp value in milliseconds @@ -37,10 +40,10 @@ export class ClientDuplexStreamRequest implements IClientDuple this.stream.on('error', () => { }); this.stream.on('data', data => { - if (this.queue[data.id]) { - clearTimeout(this.queue[data.id]['timeout']); - this.queue[data.id]['cb'](null, data); - delete this.queue[data.id]; + if (this.queue[data[this.messageKey]]) { + clearTimeout(this.queue[data[this.messageKey]]['timeout']); + this.queue[data[this.messageKey]]['cb'](null, data); + delete this.queue[data[this.messageKey]]; } }); } @@ -52,10 +55,6 @@ export class ClientDuplexStreamRequest implements IClientDuple return this.correlationId++; } - sendMetadata(metadata: Metadata): IClientDuplexStreamService { - return this; - } - sendMessage(content: reqType = ({} as any)): Promise { return new Promise((resolve, reject) => { const id = this._nextId(); @@ -79,7 +78,7 @@ export class ClientDuplexStreamRequest implements IClientDuple cb(new Error(`provider response timeout in ${this.timeout_message}`)); }, this.timeout_message) }; - content['_id'] = id; + content[this.messageKey] = id; this.stream.write(content); }); } @@ -88,4 +87,8 @@ export class ClientDuplexStreamRequest implements IClientDuple return this.stream.end(); } + getCall() { + return this.stream; + } + } diff --git a/packages/grpc/src/comsumer/type/readable-request.ts b/packages/grpc/src/comsumer/type/readable-request.ts index 22a2db8ff83d..92eaea16a7c3 100644 --- a/packages/grpc/src/comsumer/type/readable-request.ts +++ b/packages/grpc/src/comsumer/type/readable-request.ts @@ -21,10 +21,6 @@ export class ClientReadableRequest implements IClientReadableS this.original_function = original_function; } - sendMetadata(metadata: Metadata): IClientReadableStreamService { - return this; - } - sendMessage(content: reqType): Promise { return new Promise((resolve, reject) => { // Deadline is advisable to be set @@ -46,4 +42,8 @@ export class ClientReadableRequest implements IClientReadableS }); } + getCall() { + return this.stream; + } + } diff --git a/packages/grpc/src/comsumer/type/unary-request.ts b/packages/grpc/src/comsumer/type/unary-request.ts index d0e034020c65..af48d317bf66 100644 --- a/packages/grpc/src/comsumer/type/unary-request.ts +++ b/packages/grpc/src/comsumer/type/unary-request.ts @@ -18,10 +18,6 @@ export class ClientUnaryRequest implements IClientUnaryService this.original_function = original_function; } - sendMetadata(Metadata): IClientUnaryService { - return this; - } - sendMessage(content: reqType): Promise { return new Promise((resolve, reject) => { // Deadline is advisable to be set diff --git a/packages/grpc/src/comsumer/type/writeable-request.ts b/packages/grpc/src/comsumer/type/writeable-request.ts index d99a9424be21..df4be7ab93cf 100644 --- a/packages/grpc/src/comsumer/type/writeable-request.ts +++ b/packages/grpc/src/comsumer/type/writeable-request.ts @@ -33,10 +33,6 @@ export class ClientWritableRequest implements IClientWritableS }); } - sendMetadata(metadata: Metadata): IClientWritableStreamService { - return this; - } - sendMessage(content: reqType): IClientWritableStreamService { this.stream.write(content); return this; diff --git a/packages/grpc/src/interface.ts b/packages/grpc/src/interface.ts index fdfc7726b88b..211e7de154a7 100644 --- a/packages/grpc/src/interface.ts +++ b/packages/grpc/src/interface.ts @@ -1,5 +1,5 @@ import { IConfigurationOptions, IMidwayApplication, IMidwayContext } from '@midwayjs/core'; -import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, /*ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream*/ } from '@grpc/grpc-js'; +import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, ClientDuplexStream, ClientReadableStream /*ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream*/ } from '@grpc/grpc-js'; export interface IMidwayGRPCContext extends ServerUnaryCall, IMidwayContext { metadata: Metadata; @@ -45,27 +45,23 @@ export interface DefaultConfig extends IConfigurationOptions { } export interface IClientUnaryService { - sendMetadata(Metadata): IClientUnaryService; sendMessage(reqData: reqType): Promise; // getCall(): ClientUnaryCall; } export interface IClientReadableStreamService { - sendMetadata(metadata: Metadata): IClientReadableStreamService; sendMessage(reqData: reqType): Promise; - // getCall(): ClientReadableStream; + getCall(): ClientReadableStream; } export interface IClientWritableStreamService { - sendMetadata(metadata: Metadata): IClientWritableStreamService; sendMessage(reqData: reqType): IClientWritableStreamService; end(): Promise; getCall(): ClientWritableStream; } export interface IClientDuplexStreamService { - sendMetadata(metadata: Metadata): IClientDuplexStreamService; sendMessage(reqData: reqType): Promise; - // getCall(): ClientDuplexStream; + getCall(): ClientDuplexStream; end(): void; } diff --git a/packages/grpc/src/provider/framework.ts b/packages/grpc/src/provider/framework.ts index 1de882d9062c..90e7ccd87bbf 100644 --- a/packages/grpc/src/provider/framework.ts +++ b/packages/grpc/src/provider/framework.ts @@ -192,9 +192,24 @@ export class MidwayGRPCFramework extends BaseFramework< } public async beforeStop() { - this.server.tryShutdown(() => { - this.logger.info('Server shutdown success'); - }); + await new Promise(resolve => { + const shutdownTimer = setTimeout(() => { + this.server.forceShutdown(); + resolve(); + }, 2000); + + this.server.tryShutdown((err) => { + clearTimeout(shutdownTimer); + if (err) { + this.logger.error('Server shutdown error and will invoke force shutdown, err=' + err.message); + this.server.forceShutdown(); + resolve(); + } else { + this.logger.info('Server shutdown success'); + resolve(); + } + }); + }) } public getFrameworkType(): MidwayFrameworkType { diff --git a/packages/grpc/test/fixtures/base-app-stream/src/interface.ts b/packages/grpc/test/fixtures/base-app-stream/src/interface.ts index 5b9286751705..a4633b8c85b1 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/interface.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/interface.ts @@ -6,45 +6,36 @@ import { } from '../../../../src'; export namespace math { - export interface DivArgs { - dividend?: number; - divisor?: number; - } - export interface DivReply { - quotient?: number; - remainder?: number; - } - export interface FibArgs { - limit?: number; + export interface AddArgs { + id?: number; + num?: number; } export interface Num { + id?: number; num?: number; } - export interface FibReply { - count?: number; - } /** * server interface */ export interface Math { - div(data: math.DivArgs): Promise; - divMany(data: any): void; + add(data: AddArgs): Promise; + addMore(data: AddArgs): void; // 服务端推,客户端读 - fib(fibArgs: math.FibArgs): Promise + sumMany(fibArgs: AddArgs): Promise // 客户端端推,服务端读 - sum(num: Num): Promise; + addMany(num: AddArgs): Promise; } /** * client interface */ export interface MathClient { - div(): IClientUnaryService; - divMany(): Promise>; + add(): IClientUnaryService; + addMore(): IClientDuplexStreamService; // 服务端推,客户端读 - fib(): IClientReadableStreamService; + sumMany(): IClientReadableStreamService; // 客户端端推,服务端读 - sum(): IClientWritableStreamService; + addMany(): IClientWritableStreamService; } } diff --git a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts index a2e91e7a414b..fc6342eddd04 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts @@ -15,18 +15,17 @@ export class Math implements math.Math { sumDataList = []; @GrpcMethod() - async div(data: math.DivArgs): Promise { + async add(data: math.AddArgs): Promise { return { - quotient: 1, - remainder: 2, + num: data.num + 2, } } @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' }) - async divMany(message) { + async addMore(message: math.AddArgs) { this.ctx.write({ id: message.id, - num: 1 + num: message.num + 10 }); } @@ -35,15 +34,15 @@ export class Math implements math.Math { } @GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE }) - async fib(fibArgs: math.FibArgs) { + async sumMany(args: math.AddArgs) { this.ctx.write({ - num: 1 + fibArgs.limit + num: 1 + args.num }); this.ctx.write({ - num: 2 + fibArgs.limit + num: 2 + args.num }); this.ctx.write({ - num: 3 + fibArgs.limit + num: 3 + args.num }); const meta = new Metadata(); @@ -54,7 +53,7 @@ export class Math implements math.Math { } @GrpcMethod({type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' }) - async sum(data: math.Num) { + async addMany(data: math.Num) { this.sumDataList.push(data); } @@ -63,7 +62,7 @@ export class Math implements math.Math { return { num: pre.num + cur.num, } - }) + }); return total; } diff --git a/packages/grpc/test/fixtures/proto/math.proto b/packages/grpc/test/fixtures/proto/math.proto index 78de535ca27d..c6295cf001ac 100644 --- a/packages/grpc/test/fixtures/proto/math.proto +++ b/packages/grpc/test/fixtures/proto/math.proto @@ -3,51 +3,28 @@ syntax = "proto3"; package math; -message DivArgs { - int32 dividend = 1; - int32 divisor = 2; -} - -message DivReply { - int32 quotient = 1; - int32 remainder = 2; -} - -message FibArgs { - int32 limit = 1; +message AddArgs { + int32 id = 1; + int32 num = 2; } message Num { - int32 num = 1; -} - -message FibReply { - int32 count = 1; + int32 id = 1; + int32 num = 2; } service Math { - // Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient - // and remainder. - rpc Div (DivArgs) returns (DivReply) { + rpc Add (AddArgs) returns (Num) { } - // DivMany accepts an arbitrary number of division args from the client stream - // and sends back the results in the reply stream. The stream continues until - // the client closes its end; the server does the same after sending all the - // replies. The stream ends immediately if either end aborts. - rpc DivMany (stream DivArgs) returns (stream DivReply) { + rpc AddMore (stream AddArgs) returns (stream Num) { } - // Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib - // generates up to limit numbers; otherwise it continues until the call is - // canceled. Unlike Fib above, Fib has no final FibReply. - rpc Fib (FibArgs) returns (stream Num) { - // 服务端往客户端推 + // 服务端往客户端推 + rpc SumMany (AddArgs) returns (stream Num) { } - // Sum sums a stream of numbers, returning the final result once the stream - // is closed. - rpc Sum (stream Num) returns (Num) { - // 客户端往服务端推 + // 客户端往服务端推 + rpc AddMany (stream AddArgs) returns (Num) { } } diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index b04fedb01f18..818e11f63448 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -37,31 +37,23 @@ export namespace helloworld { export namespace math { - export interface DivArgs { - dividend?: number; - divisor?: number; - } - export interface DivReply { - quotient?: number; - remainder?: number; - } - export interface FibArgs { - limit?: number; + export interface AddArgs { + num?: number; } export interface Num { num?: number; } - export interface FibReply { - count?: number; - } + /** + * client interface + */ export interface MathClient { - div(): IClientUnaryService; - divMany(): Promise>; + add(): IClientUnaryService; + addMore(): IClientDuplexStreamService; // 服务端推,客户端读 - fib(): IClientReadableStreamService; + sumMany(): IClientReadableStreamService; // 客户端端推,服务端读 - sum(): IClientWritableStreamService; + addMany(): IClientWritableStreamService; } } @@ -141,66 +133,91 @@ describe('/test/index.test.ts', function () { url: 'localhost:6565' }); - // 一元操作 - // let result: any = await service.div({ - // dividend: 222, - // }); - // - // expect(result).toEqual({ - // 'quotient': 1, - // 'remainder': 2, - // }); - // 使用发送消息的写法 - // let result1 = await service.div().sendMessage({ - // dividend: 222, - // }); - // - // expect(result1.quotient).toEqual(1); - // - // - // // 服务端推送 - // let total = 0; - // let result2 = await service.fib().sendMessage({ - // limit: 1, - // }); - // - // result2.forEach(data => { - // total += data.num; - // }); - // - // expect(total).toEqual(9); - // - // // 客户端推送 - // const data = await service.sum() - // .sendMessage({num: 1}) - // .sendMessage({num: 2}) - // .sendMessage({num: 3}) - // .end(); - // - // expect(data.num).toEqual(6); + let result1 = await service.add().sendMessage({ + num: 2, + }); + + expect(result1.num).toEqual(4); + + // 服务端推送 + let total = 0; + let result2 = await service.sumMany().sendMessage({ + num: 1, + }); + + result2.forEach(data => { + total += data.num; + }); + expect(total).toEqual(9); + + // 客户端推送 + const data = await service.addMany() + .sendMessage({num: 1}) + .sendMessage({num: 2}) + .sendMessage({num: 3}) + .end(); + + expect(data.num).toEqual(6); // 双向流 - const t = await service.divMany(); + const result3= await new Promise((resolve, reject) => { + const clientStream = service.addMore(); + const duplexCall = clientStream.getCall(); + total = 0; + let idx = 0; + + duplexCall.on('data', (data: math.Num) => { + total += data.num; + idx++; + if (idx === 2) { + clientStream.end(); + resolve(total); + } + }); + + duplexCall.write({ + num: 3, + }); + + duplexCall.write({ + num: 6, + }); + }); - await new Promise((resolve, reject) => { - t.sendMessage({}) + expect(result3).toEqual(29); + + + // 双向流 + const t = service.addMore(); + + const result4 = await new Promise((resolve, reject) => { + total = 0; + t.sendMessage({ + num: 2 + }) .then(res => { - console.log('Client: Stream Message Received = ', res); // Client: Stream Message Received = {id: 0} + expect(res.num).toEqual(12); + total += res.num; }) .catch(err => console.error(err)) ; - t.sendMessage({}) + t.sendMessage({ + num: 5 + }) .then(res => { - console.log('Client: Stream Message Received = ', res); // Client: Stream Message Received = {id: 1} - resolve(); + expect(res.num).toEqual(15); + total += res.num; + resolve(total); }) .catch(err => console.error(err)) ; t.end(); }); + expect(result4).toEqual(27); + await closeApp(app); }); }); From 54277a70fa71542bf3c46fe75e7ad01c35c3eef8 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sat, 20 Feb 2021 21:22:59 +0800 Subject: [PATCH 5/8] refactor: lint fix --- .../decorator/src/microservice/provider.ts | 14 +++--- packages/grpc/src/comsumer/clients.ts | 49 ++++++++++++------ .../grpc/src/comsumer/type/duplex-request.ts | 32 ++++++------ .../src/comsumer/type/readable-request.ts | 24 ++++++--- .../grpc/src/comsumer/type/unary-request.ts | 23 ++++++--- .../src/comsumer/type/writeable-request.ts | 26 ++++++---- packages/grpc/src/interface.ts | 9 +++- packages/grpc/src/provider/framework.ts | 50 ++++++++++++++----- .../src/interface.ts | 8 +++ .../src/provider/hero.ts | 6 +-- .../base-app-stream/src/provider/math.ts | 4 +- .../fixtures/base-app/src/provider/greeter.ts | 3 +- packages/grpc/test/index.test.ts | 27 +++++----- 13 files changed, 182 insertions(+), 93 deletions(-) diff --git a/packages/decorator/src/microservice/provider.ts b/packages/decorator/src/microservice/provider.ts index 06a3921f657e..83d344cc05f2 100644 --- a/packages/decorator/src/microservice/provider.ts +++ b/packages/decorator/src/microservice/provider.ts @@ -46,11 +46,13 @@ export enum GrpcStreamTypeEnum { WRITEABLE = 'ServerWritableStream', } -export function GrpcMethod(methodOptions: { - methodName?: string; - type?: GrpcStreamTypeEnum; - onEnd?: string; -} = {}): MethodDecorator { +export function GrpcMethod( + methodOptions: { + methodName?: string; + type?: GrpcStreamTypeEnum; + onEnd?: string; + } = {} +): MethodDecorator { return (target, propertyName, descriptor: PropertyDescriptor) => { if (!methodOptions.type) { methodOptions.type = GrpcStreamTypeEnum.BASE; @@ -63,7 +65,7 @@ export function GrpcMethod(methodOptions: { onEnd: methodOptions.onEnd, }, target, - propertyName, + propertyName ); return descriptor; diff --git a/packages/grpc/src/comsumer/clients.ts b/packages/grpc/src/comsumer/clients.ts index 7ab8844dbae2..8faed0431c36 100644 --- a/packages/grpc/src/comsumer/clients.ts +++ b/packages/grpc/src/comsumer/clients.ts @@ -7,8 +7,8 @@ import { Scope, ScopeEnum, } from '@midwayjs/decorator'; -import { credentials, loadPackageDefinition, Metadata } from '@grpc/grpc-js'; -import { DefaultConfig } from '../interface'; +import { credentials, loadPackageDefinition } from '@grpc/grpc-js'; +import { DefaultConfig, IClientOptions } from '../interface'; import { loadProto } from '../util'; import * as camelCase from 'camelcase'; import { ILogger } from '@midwayjs/logger'; @@ -50,13 +50,17 @@ export class GRPCClients extends Map { ); for (const methodName of Object.keys(packageDefinition[definition])) { const originMethod = connectionService[methodName]; - connectionService[methodName] = (clientOptions: { - metadata?: Metadata; - timeout?: number; - }) => { - return this.getClientRequestImpl(connectionService, originMethod, clientOptions); - } - connectionService[camelCase(methodName)] = connectionService[methodName]; + connectionService[methodName] = ( + clientOptions: IClientOptions = {} + ) => { + return this.getClientRequestImpl( + connectionService, + originMethod, + clientOptions + ); + }; + connectionService[camelCase(methodName)] = + connectionService[methodName]; } this.set(definition, connectionService); } @@ -70,21 +74,38 @@ export class GRPCClients extends Map { getClientRequestImpl(client, originalFunction, options = {}) { const genericFunctionSelector = - (originalFunction.requestStream ? 2 : 0) | (originalFunction.responseStream ? 1 : 0); + (originalFunction.requestStream ? 2 : 0) | + (originalFunction.responseStream ? 1 : 0); let genericFunctionName; switch (genericFunctionSelector) { case 0: - genericFunctionName = new ClientUnaryRequest(client, originalFunction, options); + genericFunctionName = new ClientUnaryRequest( + client, + originalFunction, + options + ); break; case 1: - genericFunctionName = new ClientReadableRequest(client, originalFunction, options); + genericFunctionName = new ClientReadableRequest( + client, + originalFunction, + options + ); break; case 2: - genericFunctionName = new ClientWritableRequest(client, originalFunction, options); + genericFunctionName = new ClientWritableRequest( + client, + originalFunction, + options + ); break; case 3: - genericFunctionName = new ClientDuplexStreamRequest(client, originalFunction, options); + genericFunctionName = new ClientDuplexStreamRequest( + client, + originalFunction, + options + ); break; } diff --git a/packages/grpc/src/comsumer/type/duplex-request.ts b/packages/grpc/src/comsumer/type/duplex-request.ts index 189fb8ac0d0c..e8895cd8ec4a 100644 --- a/packages/grpc/src/comsumer/type/duplex-request.ts +++ b/packages/grpc/src/comsumer/type/duplex-request.ts @@ -1,8 +1,8 @@ import { Metadata } from '@grpc/grpc-js'; import { IClientDuplexStreamService } from '../../interface'; -export class ClientDuplexStreamRequest implements IClientDuplexStreamService { - +export class ClientDuplexStreamRequest + implements IClientDuplexStreamService { correlationId: number; timeout_message; queue; @@ -17,12 +17,16 @@ export class ClientDuplexStreamRequest implements IClientDuple return 2147483647; } - constructor(client, original_function, options: { - metadata?: Metadata; - timeout?: number; - timeoutMessage?: number; - messageKey?: string; - } = {}) { + constructor( + client, + original_function, + options: { + metadata?: Metadata; + timeout?: number; + timeoutMessage?: number; + messageKey?: string; + } = {} + ) { this.queue = {}; this.correlationId = 0; this.timeout_message = options.timeoutMessage || 1000; @@ -35,10 +39,11 @@ export class ClientDuplexStreamRequest implements IClientDuple if (options.timeout !== undefined) { deadline = Date.now() + options.timeout; } - this.stream = original_function.call(client, this.metadata, {deadline: deadline}); - - this.stream.on('error', () => { + this.stream = original_function.call(client, this.metadata, { + deadline: deadline, }); + + this.stream.on('error', () => {}); this.stream.on('data', data => { if (this.queue[data[this.messageKey]]) { clearTimeout(this.queue[data[this.messageKey]]['timeout']); @@ -55,7 +60,7 @@ export class ClientDuplexStreamRequest implements IClientDuple return this.correlationId++; } - sendMessage(content: reqType = ({} as any)): Promise { + sendMessage(content: reqType = {} as any): Promise { return new Promise((resolve, reject) => { const id = this._nextId(); @@ -76,7 +81,7 @@ export class ClientDuplexStreamRequest implements IClientDuple timeout: setTimeout(() => { delete this.queue[id]; cb(new Error(`provider response timeout in ${this.timeout_message}`)); - }, this.timeout_message) + }, this.timeout_message), }; content[this.messageKey] = id; this.stream.write(content); @@ -90,5 +95,4 @@ export class ClientDuplexStreamRequest implements IClientDuple getCall() { return this.stream; } - } diff --git a/packages/grpc/src/comsumer/type/readable-request.ts b/packages/grpc/src/comsumer/type/readable-request.ts index 92eaea16a7c3..65c5592af3c9 100644 --- a/packages/grpc/src/comsumer/type/readable-request.ts +++ b/packages/grpc/src/comsumer/type/readable-request.ts @@ -1,8 +1,8 @@ import { Metadata } from '@grpc/grpc-js'; import { IClientReadableStreamService } from '../../interface'; -export class ClientReadableRequest implements IClientReadableStreamService { - +export class ClientReadableRequest + implements IClientReadableStreamService { client; metadata; timeout; @@ -10,10 +10,14 @@ export class ClientReadableRequest implements IClientReadableS queue; original_function; - constructor(client, original_function, options: { - metadata?: Metadata; - timeout?: number; - } = {}) { + constructor( + client, + original_function, + options: { + metadata?: Metadata; + timeout?: number; + } = {} + ) { this.queue = []; this.client = client; this.metadata = options.metadata || new Metadata(); @@ -29,7 +33,12 @@ export class ClientReadableRequest implements IClientReadableS if (this.timeout !== undefined) { deadline = Date.now() + this.timeout; } - this.stream = this.original_function.call(this.client, content, this.metadata, {deadline: deadline}); + this.stream = this.original_function.call( + this.client, + content, + this.metadata, + { deadline: deadline } + ); this.stream.on('error', error => { reject(error); }); @@ -45,5 +54,4 @@ export class ClientReadableRequest implements IClientReadableS getCall() { return this.stream; } - } diff --git a/packages/grpc/src/comsumer/type/unary-request.ts b/packages/grpc/src/comsumer/type/unary-request.ts index af48d317bf66..0bbcbeccfb6e 100644 --- a/packages/grpc/src/comsumer/type/unary-request.ts +++ b/packages/grpc/src/comsumer/type/unary-request.ts @@ -1,17 +1,21 @@ import { Metadata } from '@grpc/grpc-js'; import { IClientUnaryService } from '../../interface'; -export class ClientUnaryRequest implements IClientUnaryService { - +export class ClientUnaryRequest + implements IClientUnaryService { client; metadata; timeout; original_function; - constructor(client, original_function, options: { - metadata?: Metadata; - timeout?: number; - } = {}) { + constructor( + client, + original_function, + options: { + metadata?: Metadata; + timeout?: number; + } = {} + ) { this.client = client; this.metadata = options.metadata || new Metadata(); this.timeout = options.timeout || undefined; @@ -26,7 +30,11 @@ export class ClientUnaryRequest implements IClientUnaryService if (this.timeout !== undefined) { deadline = Date.now() + this.timeout; } - this.original_function.call(this.client, content, this.metadata, {deadline: deadline}, + this.original_function.call( + this.client, + content, + this.metadata, + { deadline: deadline }, (error, response) => { if (error) { reject(error); @@ -37,5 +45,4 @@ export class ClientUnaryRequest implements IClientUnaryService ); }); } - } diff --git a/packages/grpc/src/comsumer/type/writeable-request.ts b/packages/grpc/src/comsumer/type/writeable-request.ts index df4be7ab93cf..600f645dd951 100644 --- a/packages/grpc/src/comsumer/type/writeable-request.ts +++ b/packages/grpc/src/comsumer/type/writeable-request.ts @@ -1,18 +1,22 @@ import { Metadata } from '@grpc/grpc-js'; import { IClientWritableStreamService } from '../../interface'; -export class ClientWritableRequest implements IClientWritableStreamService { - +export class ClientWritableRequest + implements IClientWritableStreamService { client; metadata; timeout; stream; promise; - constructor(client, original_function, options: { - metadata?: Metadata; - timeout?: number; - } = {}) { + constructor( + client, + original_function, + options: { + metadata?: Metadata; + timeout?: number; + } = {} + ) { this.promise = new Promise((resolve, reject) => { // Deadline is advisable to be set // It should be a timestamp value in milliseconds @@ -21,7 +25,10 @@ export class ClientWritableRequest implements IClientWritableS deadline = Date.now() + options.timeout; } this.metadata = options.metadata || new Metadata(); - this.stream = original_function.call(client, this.metadata, {deadline: deadline}, + this.stream = original_function.call( + client, + this.metadata, + { deadline: deadline }, (error, response) => { if (error) { reject(error); @@ -33,7 +40,9 @@ export class ClientWritableRequest implements IClientWritableS }); } - sendMessage(content: reqType): IClientWritableStreamService { + sendMessage( + content: reqType + ): IClientWritableStreamService { this.stream.write(content); return this; } @@ -46,5 +55,4 @@ export class ClientWritableRequest implements IClientWritableS getCall() { return this.stream; } - } diff --git a/packages/grpc/src/interface.ts b/packages/grpc/src/interface.ts index 211e7de154a7..fd7ac9fcaad9 100644 --- a/packages/grpc/src/interface.ts +++ b/packages/grpc/src/interface.ts @@ -1,5 +1,5 @@ import { IConfigurationOptions, IMidwayApplication, IMidwayContext } from '@midwayjs/core'; -import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, ClientDuplexStream, ClientReadableStream /*ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream*/ } from '@grpc/grpc-js'; +import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, ClientDuplexStream, ClientReadableStream /*ClientUnaryCall*/ } from '@grpc/grpc-js'; export interface IMidwayGRPCContext extends ServerUnaryCall, IMidwayContext { metadata: Metadata; @@ -65,3 +65,10 @@ export interface IClientDuplexStreamService { getCall(): ClientDuplexStream; end(): void; } + +export interface IClientOptions { + metadata?: Metadata; + timeout?: number; + timeoutMessage?: number; + messageKey?: string; +} diff --git a/packages/grpc/src/provider/framework.ts b/packages/grpc/src/provider/framework.ts index 90e7ccd87bbf..014beaa75d6a 100644 --- a/packages/grpc/src/provider/framework.ts +++ b/packages/grpc/src/provider/framework.ts @@ -1,5 +1,15 @@ -import { sendUnaryData, Server, ServerCredentials, ServerUnaryCall, setLogger, } from '@grpc/grpc-js'; -import { BaseFramework, IMidwayBootstrapOptions, MidwayFrameworkType, } from '@midwayjs/core'; +import { + sendUnaryData, + Server, + ServerCredentials, + ServerUnaryCall, + setLogger, +} from '@grpc/grpc-js'; +import { + BaseFramework, + IMidwayBootstrapOptions, + MidwayFrameworkType, +} from '@midwayjs/core'; import { DecoratorMetadata, @@ -12,7 +22,12 @@ import { MS_PROVIDER_KEY, MSProviderType, } from '@midwayjs/decorator'; -import { Context, IMidwayGRPCApplication, IMidwayGRPCContext, IMidwayGRPFrameworkOptions, } from '../interface'; +import { + Context, + IMidwayGRPCApplication, + IMidwayGRPCContext, + IMidwayGRPFrameworkOptions, +} from '../interface'; import { pascalCase } from 'pascal-case'; import * as camelCase from 'camelcase'; import { loadProto } from '../util'; @@ -105,13 +120,15 @@ export class MidwayGRPCFramework extends BaseFramework< } = getPropertyMetadata( MS_GRPC_METHOD_KEY, module, - camelCase(method), + camelCase(method) ); - if (grpcMethodData.type === GrpcStreamTypeEnum.DUPLEX - || grpcMethodData.type === GrpcStreamTypeEnum.READABLE) { + if ( + grpcMethodData.type === GrpcStreamTypeEnum.DUPLEX || + grpcMethodData.type === GrpcStreamTypeEnum.READABLE + ) { // listen data and trigger binding method - call.on('data', async (data) => { + call.on('data', async data => { await this.handleContextMethod({ service, ctx, @@ -151,14 +168,18 @@ export class MidwayGRPCFramework extends BaseFramework< } protected async handleContextMethod(options: { - service, ctx: Context, callback, data: any, grpcMethodData: { + service; + ctx: Context; + callback; + data: any; + grpcMethodData: { methodName: string; type: GrpcStreamTypeEnum; onEnd: string; - } + }; }) { let result; - const {service, ctx, callback, data, grpcMethodData} = options; + const { service, ctx, callback, data, grpcMethodData } = options; try { result = await service[camelCase(ctx.method)]?.call(service, data); @@ -198,10 +219,13 @@ export class MidwayGRPCFramework extends BaseFramework< resolve(); }, 2000); - this.server.tryShutdown((err) => { + this.server.tryShutdown(err => { clearTimeout(shutdownTimer); if (err) { - this.logger.error('Server shutdown error and will invoke force shutdown, err=' + err.message); + this.logger.error( + 'Server shutdown error and will invoke force shutdown, err=' + + err.message + ); this.server.forceShutdown(); resolve(); } else { @@ -209,7 +233,7 @@ export class MidwayGRPCFramework extends BaseFramework< resolve(); } }); - }) + }); } public getFrameworkType(): MidwayFrameworkType { diff --git a/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts b/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts index 12447457d823..0aab159a6788 100644 --- a/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts +++ b/packages/grpc/test/fixtures/base-app-multiple-service/src/interface.ts @@ -1,9 +1,13 @@ import { Metadata } from '@grpc/grpc-js'; +import { IClientOptions, IClientUnaryService } from '../../../../src'; export namespace hero { export interface HeroService { findOne(data: HeroById, metadata?: Metadata): Promise; } + export interface HeroServiceClient { + findOne(options?: IClientOptions): IClientUnaryService; + } export interface HeroById { id?: number; } @@ -18,6 +22,10 @@ export namespace helloworld { sayHello (request: HelloRequest): Promise } + export interface GreeterClient { + sayHello (options?: IClientOptions): IClientUnaryService + } + export interface HelloRequest { name: string; } diff --git a/packages/grpc/test/fixtures/base-app-multiple-service/src/provider/hero.ts b/packages/grpc/test/fixtures/base-app-multiple-service/src/provider/hero.ts index 3dd6e26771fe..377ba235445a 100644 --- a/packages/grpc/test/fixtures/base-app-multiple-service/src/provider/hero.ts +++ b/packages/grpc/test/fixtures/base-app-multiple-service/src/provider/hero.ts @@ -9,16 +9,16 @@ export class HeroService implements hero.HeroService { @Inject('grpc:clients') grpcClients: Clients; - greeterService: helloworld.Greeter; + greeterService: helloworld.GreeterClient; @Init() async init() { - this.greeterService = this.grpcClients.getService('helloworld.Greeter'); + this.greeterService = this.grpcClients.getService('helloworld.Greeter'); } @GrpcMethod() async findOne(data) { - const result = await this.greeterService.sayHello({ + const result = await this.greeterService.sayHello().sendMessage({ name: 'harry' }); return { diff --git a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts index fc6342eddd04..732cd997c9e5 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts @@ -25,12 +25,12 @@ export class Math implements math.Math { async addMore(message: math.AddArgs) { this.ctx.write({ id: message.id, - num: message.num + 10 + num: message.num + 10, }); } async duplexEnd() { - + console.log('got client end message'); } @GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE }) diff --git a/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts b/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts index 14aaa893dcd1..dab1fa74361e 100644 --- a/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts +++ b/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts @@ -1,4 +1,4 @@ -import { MSProviderType, Provider, Provide, Inject } from '@midwayjs/decorator'; +import { MSProviderType, Provider, Provide, Inject, GrpcMethod } from '@midwayjs/decorator'; import { helloworld } from '../interface'; import { ILogger } from '@midwayjs/logger'; import { Context } from '../../../../../src'; @@ -21,6 +21,7 @@ export class Greeter implements helloworld.Greeter { /** * Implements the SayHello RPC method. */ + @GrpcMethod() async sayHello(request: helloworld.HelloRequest) { this.logger.info('this is a context logger'); const serverMetadata = new Metadata(); diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index 818e11f63448..0ba752d07059 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -5,12 +5,13 @@ import { IClientDuplexStreamService, IClientReadableStreamService, IClientUnaryService, - IClientWritableStreamService + IClientWritableStreamService, + IClientOptions } from '../src'; export namespace hero { - export interface HeroService { - findOne(data: HeroById): Promise; + export interface HeroServiceClient { + findOne(options?: IClientOptions): IClientUnaryService; } export interface HeroById { id?: number; @@ -22,8 +23,8 @@ export namespace hero { } export namespace helloworld { - export interface Greeter { - sayHello (request: HelloRequest): Promise + export interface GreeterClient { + sayHello (options?: IClientOptions): IClientUnaryService } export interface HelloRequest { @@ -48,12 +49,12 @@ export namespace math { * client interface */ export interface MathClient { - add(): IClientUnaryService; - addMore(): IClientDuplexStreamService; + add(options?: IClientOptions): IClientUnaryService; + addMore(options?: IClientOptions): IClientDuplexStreamService; // 服务端推,客户端读 - sumMany(): IClientReadableStreamService; + sumMany(options?: IClientOptions): IClientReadableStreamService; // 客户端端推,服务端读 - addMany(): IClientWritableStreamService; + addMany(options?: IClientOptions): IClientWritableStreamService; } } @@ -71,13 +72,13 @@ describe('/test/index.test.ts', function () { url: 'localhost:6565' }); - const service = await createGRPCConsumer({ + const service = await createGRPCConsumer({ package: 'helloworld', protoPath: join(__dirname, 'fixtures/proto/helloworld.proto'), url: 'localhost:6565' }); - const result = await service.sayHello({ + const result = await service.sayHello().sendMessage({ name: 'harry' }); @@ -106,10 +107,8 @@ describe('/test/index.test.ts', function () { url: 'localhost:6565' }); - const result = await service.findOne({ + const result = await service.findOne().sendMessage({ id: 123 - }, (metadata) => { - }); expect(result).toEqual({ id: 1, name: 'bbbb-Hello harry' }) From ecc66c6895cad3c212df0b14cb0fc9ae34f9cd69 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 21 Feb 2021 11:33:53 +0800 Subject: [PATCH 6/8] test: metadata case --- .../grpc/src/comsumer/type/unary-request.ts | 23 +++++++++++++-- packages/grpc/src/interface.ts | 6 ++-- .../fixtures/base-app/src/provider/greeter.ts | 10 ++++--- packages/grpc/test/index.test.ts | 28 +++++++++++++++++-- 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/packages/grpc/src/comsumer/type/unary-request.ts b/packages/grpc/src/comsumer/type/unary-request.ts index 0bbcbeccfb6e..f04859934b7d 100644 --- a/packages/grpc/src/comsumer/type/unary-request.ts +++ b/packages/grpc/src/comsumer/type/unary-request.ts @@ -1,4 +1,4 @@ -import { Metadata } from '@grpc/grpc-js'; +import { Metadata, ClientUnaryCall } from '@grpc/grpc-js'; import { IClientUnaryService } from '../../interface'; export class ClientUnaryRequest @@ -22,7 +22,7 @@ export class ClientUnaryRequest this.original_function = original_function; } - sendMessage(content: reqType): Promise { + sendMessage(content: reqType, handler?: (call: ClientUnaryCall) => void): Promise { return new Promise((resolve, reject) => { // Deadline is advisable to be set // It should be a timestamp value in milliseconds @@ -30,7 +30,7 @@ export class ClientUnaryRequest if (this.timeout !== undefined) { deadline = Date.now() + this.timeout; } - this.original_function.call( + const call = this.original_function.call( this.client, content, this.metadata, @@ -43,6 +43,23 @@ export class ClientUnaryRequest } } ); + handler && handler(call); }); } + + sendMessageWithCallback(content: reqType, callback): ClientUnaryCall { + // Deadline is advisable to be set + // It should be a timestamp value in milliseconds + let deadline = undefined; + if (this.timeout !== undefined) { + deadline = Date.now() + this.timeout; + } + return this.original_function.call( + this.client, + content, + this.metadata, + { deadline: deadline }, + callback + ); + } } diff --git a/packages/grpc/src/interface.ts b/packages/grpc/src/interface.ts index fd7ac9fcaad9..47d783c4894d 100644 --- a/packages/grpc/src/interface.ts +++ b/packages/grpc/src/interface.ts @@ -1,5 +1,5 @@ import { IConfigurationOptions, IMidwayApplication, IMidwayContext } from '@midwayjs/core'; -import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, ClientDuplexStream, ClientReadableStream /*ClientUnaryCall*/ } from '@grpc/grpc-js'; +import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, ClientDuplexStream, ClientReadableStream, ClientUnaryCall } from '@grpc/grpc-js'; export interface IMidwayGRPCContext extends ServerUnaryCall, IMidwayContext { metadata: Metadata; @@ -45,8 +45,8 @@ export interface DefaultConfig extends IConfigurationOptions { } export interface IClientUnaryService { - sendMessage(reqData: reqType): Promise; - // getCall(): ClientUnaryCall; + sendMessage(reqData: reqType, handler?: (call: ClientUnaryCall) => void): Promise; + sendMessageWithCallback(content: reqType, callback): ClientUnaryCall; } export interface IClientReadableStreamService { diff --git a/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts b/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts index dab1fa74361e..de1d6085e8cd 100644 --- a/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts +++ b/packages/grpc/test/fixtures/base-app/src/provider/greeter.ts @@ -24,9 +24,11 @@ export class Greeter implements helloworld.Greeter { @GrpcMethod() async sayHello(request: helloworld.HelloRequest) { this.logger.info('this is a context logger'); - const serverMetadata = new Metadata(); - serverMetadata.add('Set-Cookie', 'yummy_cookie=choco'); - this.ctx.sendMetadata(serverMetadata); - return { message: 'Hello ' + request.name } + if (request.name === 'zhangting') { + const serverMetadata = new Metadata(); + serverMetadata.add('Set-Cookie', 'yummy_cookie=choco'); + this.ctx.sendMetadata(serverMetadata); + } + return { message: 'Hello ' + request.name }; } } diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index 0ba752d07059..7c339778bba5 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -9,6 +9,8 @@ import { IClientOptions } from '../src'; +import { Metadata } from '@grpc/grpc-js'; + export namespace hero { export interface HeroServiceClient { findOne(options?: IClientOptions): IClientUnaryService; @@ -78,11 +80,31 @@ describe('/test/index.test.ts', function () { url: 'localhost:6565' }); - const result = await service.sayHello().sendMessage({ + const meta = new Metadata(); + meta.add('key', 'value'); + + const result = await service.sayHello({ + metadata: meta, + }).sendMessage({ name: 'harry' }); - expect(result).toEqual({ message: 'Hello harry' }) + expect(result).toEqual({ message: 'Hello harry' }); + + const serverMetadata = await new Promise((resolve, reject) => { + const call = service.sayHello().sendMessageWithCallback({ + name: 'zhangting' + }, (err) => { + if (err) { + reject(err); + } + }); + call.on('metadata', (meta) => { + resolve(meta); + }); + }) + + expect(serverMetadata.get('Set-Cookie')[0]).toEqual('yummy_cookie=choco'); await closeApp(app); }); @@ -188,7 +210,7 @@ describe('/test/index.test.ts', function () { expect(result3).toEqual(29); - // 双向流 + // 保证顺序的双向流 const t = service.addMore(); const result4 = await new Promise((resolve, reject) => { From 0ffaee45a6d611cf259cd1b0b6ecfbff959d2187 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 21 Feb 2021 11:53:27 +0800 Subject: [PATCH 7/8] fix: lint --- packages/grpc/src/comsumer/type/unary-request.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/grpc/src/comsumer/type/unary-request.ts b/packages/grpc/src/comsumer/type/unary-request.ts index f04859934b7d..0a63b60ec789 100644 --- a/packages/grpc/src/comsumer/type/unary-request.ts +++ b/packages/grpc/src/comsumer/type/unary-request.ts @@ -22,7 +22,10 @@ export class ClientUnaryRequest this.original_function = original_function; } - sendMessage(content: reqType, handler?: (call: ClientUnaryCall) => void): Promise { + sendMessage( + content: reqType, + handler?: (call: ClientUnaryCall) => void + ): Promise { return new Promise((resolve, reject) => { // Deadline is advisable to be set // It should be a timestamp value in milliseconds From 093a851a827ba7d044915c036f7e99e0a3b305ce Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Sun, 21 Feb 2021 12:37:51 +0800 Subject: [PATCH 8/8] fix: autoload clients --- packages/grpc/src/comsumer/clients.ts | 4 +-- packages/grpc/src/comsumer/configuration.ts | 4 ++- packages/grpc/src/interface.ts | 35 +++++++++++++------ packages/grpc/src/provider/framework.ts | 8 ++--- packages/grpc/src/util.ts | 4 +-- .../base-app-stream/src/provider/math.ts | 4 +-- packages/grpc/test/index.test.ts | 2 +- 7 files changed, 37 insertions(+), 24 deletions(-) diff --git a/packages/grpc/src/comsumer/clients.ts b/packages/grpc/src/comsumer/clients.ts index 8faed0431c36..228e9b7a2cec 100644 --- a/packages/grpc/src/comsumer/clients.ts +++ b/packages/grpc/src/comsumer/clients.ts @@ -1,5 +1,4 @@ import { - Autoload, Config, Init, Logger, @@ -17,7 +16,6 @@ import { ClientDuplexStreamRequest } from './type/duplex-request'; import { ClientReadableRequest } from './type/readable-request'; import { ClientWritableRequest } from './type/writeable-request'; -@Autoload() @Provide('clients') @Scope(ScopeEnum.Singleton) export class GRPCClients extends Map { @@ -30,7 +28,7 @@ export class GRPCClients extends Map { @Init() async initService() { if (!this.grpcConfig['services']) { - this.logger.error('Please set gRPC services in your config["grpc"]'); + this.logger.debug('Please set gRPC services in your config["grpc"]'); return; } for (const cfg of this.grpcConfig['services']) { diff --git a/packages/grpc/src/comsumer/configuration.ts b/packages/grpc/src/comsumer/configuration.ts index e0d3cf948094..c1ac098618d5 100644 --- a/packages/grpc/src/comsumer/configuration.ts +++ b/packages/grpc/src/comsumer/configuration.ts @@ -2,6 +2,7 @@ import { Configuration, Logger } from '@midwayjs/decorator'; import { setLogger } from '@grpc/grpc-js'; import { ILogger } from '@midwayjs/logger'; import { join } from 'path'; +import { IMidwayContainer } from '@midwayjs/core'; @Configuration({ namespace: 'grpc', @@ -11,7 +12,8 @@ export class AutoConfiguration { @Logger() logger: ILogger; - async onReady() { + async onReady(container: IMidwayContainer) { setLogger(this.logger); + await container.getAsync('grpc:clients'); } } diff --git a/packages/grpc/src/interface.ts b/packages/grpc/src/interface.ts index 47d783c4894d..9bf1e62e338a 100644 --- a/packages/grpc/src/interface.ts +++ b/packages/grpc/src/interface.ts @@ -1,21 +1,15 @@ import { IConfigurationOptions, IMidwayApplication, IMidwayContext } from '@midwayjs/core'; import { Server, ServerCredentials, Metadata, ServerUnaryCall, ClientWritableStream, ClientDuplexStream, ClientReadableStream, ClientUnaryCall } from '@grpc/grpc-js'; -export interface IMidwayGRPCContext extends ServerUnaryCall, IMidwayContext { +export interface Context extends ServerUnaryCall, IMidwayContext { metadata: Metadata; method: string; } -export type IMidwayGRPCApplication = IMidwayApplication & Server; +export type IMidwayGRPCApplication = IMidwayApplication & Server; export type Application = IMidwayGRPCApplication; -export interface Context extends IMidwayGRPCContext {} - export interface IGRPCServiceOptions { - /** - * application gRPC connection string - */ - url?: string; /** * proto path */ @@ -25,23 +19,42 @@ export interface IGRPCServiceOptions { * protobuf package name */ package?: string; +} +export interface IGRPCClientServiceOptions extends IGRPCServiceOptions { + /** + * application gRPC connection string + */ + url: string; + /** + * proto file loader options. Optional + */ loaderOptions?: object; + /** + * Server credentials. Optional. + */ credentials?: ServerCredentials; } export interface IMidwayGRPFrameworkOptions extends IConfigurationOptions { /** - * gRPC Server connection url, default is localhost:6565 + * gRPC Server connection url, like 'localhost:6565' */ url?: string; - services: Pick[]; + services: IGRPCServiceOptions[]; + /** + * proto file loader options. Optional + */ loaderOptions?: object; + /** + * Server credentials. Optional. + */ + credentials?: ServerCredentials; } export interface DefaultConfig extends IConfigurationOptions { - services: IGRPCServiceOptions[]; + services: IGRPCClientServiceOptions[]; } export interface IClientUnaryService { diff --git a/packages/grpc/src/provider/framework.ts b/packages/grpc/src/provider/framework.ts index 014beaa75d6a..6bf30c944f24 100644 --- a/packages/grpc/src/provider/framework.ts +++ b/packages/grpc/src/provider/framework.ts @@ -23,9 +23,8 @@ import { MSProviderType, } from '@midwayjs/decorator'; import { - Context, IMidwayGRPCApplication, - IMidwayGRPCContext, + Context, IMidwayGRPFrameworkOptions, } from '../interface'; import { pascalCase } from 'pascal-case'; @@ -35,7 +34,7 @@ import { PackageDefinition } from '@grpc/proto-loader'; export class MidwayGRPCFramework extends BaseFramework< IMidwayGRPCApplication, - IMidwayGRPCContext, + Context, IMidwayGRPFrameworkOptions > { public app: IMidwayGRPCApplication; @@ -197,7 +196,8 @@ export class MidwayGRPCFramework extends BaseFramework< return new Promise((resolve, reject) => { this.server.bindAsync( `${this.configurationOptions.url}`, - ServerCredentials.createInsecure(), + this.configurationOptions.credentials || + ServerCredentials.createInsecure(), (err: Error | null, bindPort: number) => { if (err) { reject(err); diff --git a/packages/grpc/src/util.ts b/packages/grpc/src/util.ts index 6dd04e175683..b569122515b2 100644 --- a/packages/grpc/src/util.ts +++ b/packages/grpc/src/util.ts @@ -1,5 +1,5 @@ import * as protoLoader from '@grpc/proto-loader'; -import { IGRPCServiceOptions } from './interface'; +import { IGRPCClientServiceOptions } from './interface'; import { GRPCClients } from './comsumer/clients'; export const loadProto = (options: { @@ -22,7 +22,7 @@ export const loadProto = (options: { }; export const createGRPCConsumer = async ( - options: IGRPCServiceOptions + options: IGRPCClientServiceOptions ): Promise => { const clients = new GRPCClients(); options.url = options.url || 'localhost:6565'; diff --git a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts index 732cd997c9e5..79533895ffce 100644 --- a/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts +++ b/packages/grpc/test/fixtures/base-app-stream/src/provider/math.ts @@ -1,5 +1,5 @@ import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator'; -import { IMidwayGRPCContext } from '../../../../../src'; +import { Context } from '../../../../../src'; import { math } from '../interface'; import { Metadata } from '@grpc/grpc-js'; @@ -10,7 +10,7 @@ import { Metadata } from '@grpc/grpc-js'; export class Math implements math.Math { @Inject() - ctx: IMidwayGRPCContext; + ctx: Context; sumDataList = []; diff --git a/packages/grpc/test/index.test.ts b/packages/grpc/test/index.test.ts index 7c339778bba5..4261c2532062 100644 --- a/packages/grpc/test/index.test.ts +++ b/packages/grpc/test/index.test.ts @@ -71,7 +71,7 @@ describe('/test/index.test.ts', function () { package: 'helloworld', } ], - url: 'localhost:6565' + url: 'localhost:6565', }); const service = await createGRPCConsumer({