gRPC
gRPC 是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。
本篇内容演示了如何在 Midway 体系下,提供 gRPC 服务,以及调用 gRPC 服务的方法。
Midway 当前采用了最新的 gRPC 官方推荐的 @grpc/grpc-js 进行开发,并提供了一些工具包,用于快速发布服务和调用服务。
我们使用的模块为 @midwayjs/grpc
,既可以独立发布服务,又可以接入其它框架调用 gRPC 服务。
相关信息:
提供服务
描述 | |
---|---|
可用于标准项目 | ✅ |
可用于 Serverless | ❌ |
可用于一体化 | ✅ |
调用服务
描述 | |
---|---|
可用于标准项目 | ✅ |
可用于 Serverless | ✅ |
可用 于一体化 | ✅ |
其他
描述 | |
---|---|
可作为主框架独立使用 | ✅ |
可独立添加中间件 | ✅ |
安装依赖
$ npm i @midwayjs/grpc@3 --save
$ npm i @midwayjs/grpc-helper --save-dev
或者在 package.json
中增加如下依赖后,重新安装。
{
"dependencies": {
"@midwayjs/grpc": "^3.0.0",
// ...
},
"devDependencies": {
"@midwayjs/grpc-helper": "^1.0.0",
// ...
}
}
开启组件
不管是提供服务还是调用服务,都需要开启组件。
@midwayjs/grpc
可以作为独立主框架使用。
// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as grpc from '@midwayjs/grpc';
@Configuration({
imports: [grpc],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}
也可以附加在其他的主框架下,比如 @midwayjs/koa
。
// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
import * as grpc from '@midwayjs/grpc';
@Configuration({
imports: [koa, grpc],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}
目录结构
大致的目录结构如下,src/provider
是提供 gRPC 服务的目录。
.
├── package.json
├── proto ## proto 定义文件
│ └── helloworld.proto
├── src
│ ├── configuration.ts ## 入口配置文件
│ ├── interface.ts
│ └── provider ## gRPC 提供服务的文件
│ └── greeter.ts
├── test
├── bootstrap.js ## 服务启动入口
└── tsconfig.json
定义服务接口
在微服务中,定义一个服务需要特定的接口定义语言(IDL)来完成,在 gRPC中 默认使用 Protocol Buffers 作为序列化协议。
序列化协议独立于语言和平台,提供了多种语言的实现,Java,C++,Go 等等,每一种实现都包含了相应语言的编译器和库文件。所以 gRPC 是一个提供和调用都可以跨语言的服务框架。
一个gRPC服务的大体架构可以用官网上的一幅图表示。
Protocol Buffers 协议的文件, 默认的后缀为 .proto
。.proto后缀的IDL文件,并通过其编译器生成特定语言的数据结构、服务端接口和客户端Stub代码。
由于 proto 文件可以跨语言使用,为了方便共享,我们一般将 proto 文件放在 src 目录外侧,方便其他工具复制分发。
下面是一个基础的 proto/helloworld.proto
文件。
syntax = "proto3";
package helloworld;
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
proto3 表示的是第三版的 protobuf 协议,是 gRPC 目前推荐的版本,“语法简单,功能更全”。
我们可以用 service
格式,定义服务体,其中可以包含方法。同时,我们可以更加细致的通过 message
描述服务具体的请求参数和响应参数。
我 们可以从 Google 的官网文档 中查看更多细节。
大家会看到,这和 Java 中的 Class 非常相像,每个结构就相当于 Java 中的一个类。
编写 proto 文件
现在我们再来看之前的服务,是不是就很好理解了。
syntax = "proto3";
package helloworld;
// 服务的定义
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 服务的请求参数
message HelloRequest {
string name = 1;
}
// 服务的响应参数
message HelloReply {
string message = 1;
}
我们定义了一个名为 Greeter
的服务,包含一个 HelloRequest
结构的请求体,以及返回 HelloReply
结构的响应体。
接下去,我们将对这个服务给大家做演示。
生成代码定义
传统的 gRPC 框架,需要用户手动编写 proto 文件,以及生成 js 服务,最后再根据 js 生成的服务再编写实现,在 Midway 体系下,我们提供了一个 grpc-helper 工具包来加速这个过程。
如果没有安装,可以先安装。
$ npm i @midwayjs/grpc-helper --save-dev
grpc-helper 工具的作用,是将用户提供的 proto 文件,生成对应可读的 ts interface 文件。
我们可以添加一个脚本,方便这个过程。
{
"scripts": {
"generate": "tsproto --path proto --output src/domain"
}
}
然后执行 npm run generate
。
上述命令执行后,会在代码的 src/domain
目录中生成 proto 文件对应的服务接口定义。
不管是提供 gRPC 服务还是调用 gRPC 服务,都要先生成定义。
生成的代码如下,包含有一个命名空间(namespace),以及命名空间下的两个 TypeScript Interface, Greeter
用于编写服务端实现, GreeterClient
用于编写客户端实现。
/**
* This file is auto-generated by grpc-helper
*/
import * as grpc from '@midwayjs/grpc';
// 生成的命名空间
export namespace helloworld {
// 服务端使用的定义
export interface Greeter {
// Sends a greeting
sayHello(data: HelloRequest): Promise<HelloReply>;
}
// 客户端使用的定义
export interface GreeterClient {
// Sends a greeting
sayHello(options?: grpc.IClientOptions): grpc.IClientUnaryService<HelloRequest, HelloReply>;
}
// 请求体结构
export interface HelloRequest {
name?: string;
}
// 响应体结构
export interface HelloReply {
message?: string;
}
}
每当 proto 文件被修改时,就需要重新生成对应的服务定义,然后将对应的方法实现。
提供 gRPC 服务(Provider)
编写服务提供方(Provider)
在 src/provider
目录中,我们创建 greeter.ts
,内容如下
import {
MSProviderType,
Provider,
GrpcMethod,
} from '@midwayjs/core';
import { helloworld } from '../domain/helloworld';
/**
* 实现 helloworld.Greeter 接口的服务
*/
@Provider(MSProviderType.GRPC, { package: 'helloworld' })
export class Greeter implements helloworld.Greeter {
@GrpcMethod()
async sayHello(request: helloworld.HelloRequest) {
return { message: 'Hello ' + request.name };
}
}
注意,@Provider 装饰器和 @Provide 装饰器不同,前者用于提供服务,后者用于依赖注入容器扫描标识的类。
我们使用 @Provider
暴露出一个 RPC 服务, @Provider
的第一个参数为 RPC 服务类型,这个参数是个枚举,这里选择 GRPC 类型。
@Provider
的第二个参数为 RPC 服务的元数据,这里指代的是 gRPC 服务的元数据。这里需要写入 gRPC 的 package 字段,即 proto 文件中的 package 字段(这里的字段用于和 proto 文件加载后的字段做对应)。
对于普通的 gRPC 服务接口(UnaryCall),我们只需要使用 @GrpcMethod()
装饰器修饰即可。修饰的方法即为服务定义本身,入参为 proto 中定义好的入参,return 值即为定义好的响应体。
注意,生成的 Interface 是为了更好的编写服务代码,规范结构,请务必按照定义编写。
配置服务
配置内容如下。
// src/config/config.default
import { MidwayAppInfo, MidwayConfig } from '@midwayjs/core';
export default (appInfo: MidwayAppInfo): MidwayConfig => {
return {
// ...
grpcServer: {
services: [
{
protoPath: join(appInfo.appDir, 'proto/hero.proto'),
package: 'hero',
},
{
protoPath: join(appInfo.appDir, 'proto/helloworld.proto'),
package: 'helloworld',
}
],
}
};
}
services 字段是数组,意味着 Midway 项目可以同时发布多个 gRPC 服务。每个 service 的结构为:
属性 | 类型 | 描述 |
---|---|---|
protoPath | string | 必选,proto 文件的绝对路径 |
package | string | 必选,服务对应的 package |
除了 Service 配置之外,还有一些其他的配置。
属性 | 类型 | 描述 |
---|---|---|
url | string | 可选,gRPC 服务地址,默认 6565 端口,比如 'localhost:6565' |
loaderOptions | Object | 可选,proto file loader 的 options |
credentials | ServerCredentials | 可选,grpc Server binding 时的 credentials 参数选项 |
serverOptions | ChannelOptions | 可选,grpc Server 的 自定义 options |
提供安全证书
可以通过 credentials
参数传递安全证书。
// src/config/config.default
import { MidwayAppInfo, MidwayConfig } from '@midwayjs/core';
import { ServerCredentials } from '@midwayjs/grpc';
import { readFileSync } from 'fs';
import { join } from 'path';
const cert = readFileSync(join(__dirname, './cert/server.crt'));
const pem = readFileSync(join(__dirname, './cert/server.pem'));
const key = readFileSync(join(__dirname, './cert/server.key'));
export default (appInfo: MidwayAppInfo): MidwayConfig => {
return {
// ...
grpcServer: {
// ...
credentials: ServerCredentials.createSsl(cert, [{ private_key: key, cert_chain: pem }]);
}
};
}
编写单元测试
@midwayjs/grpc
库提供了一个 createGRPCConsumer
方法,用于实时调用客户端,一般我们用这个方法做测试。
这个方法每次调用会实时连接,不建议将该方法用在生产环境。
在测试中写法如下。
import { createApp, close } from '@midwayjs/mock';
import { Framework, createGRPCConsumer } from '@midwayjs/grpc';
import { join } from 'path';
import { helloworld } from '../src/domain/helloworld';
describe('test/index.test.ts', () => {
it('should create multiple grpc service in one server', async () => {
const baseDir = join(__dirname, '../');
// 创建服务
const app = await createApp<Framework>();
// 调用服务
const service = await createGRPCConsumer<helloworld.GreeterClient>({
package: 'helloworld',
protoPath: join(baseDir, 'proto', 'helloworld.proto'),
url: 'localhost:6565'
});
const result = await service.sayHello().sendMessage({
name: 'harry'
});
expect(result.message).toEqual('Hello harry');
await close(app);
});
});
调用 gRPC 服务(Consumer)
我们编写一个 gRPC 服务来调用上面的暴露的服务。
事实上,你可以在 Web 的 Controller,或者 Service 等其他地方来调用,这里只是做一个示例。
调用配置
你需要在 src/config/config.default.ts
中增加你需要调用的目标服务以及它的 proto 文件信息。
比如,这里我们填写了上面暴露的服务本身,以及该服务的 proto,包名等信息(函数形式)。
// src/config/config.default
import { MidwayAppInfo, MidwayConfig } from '@midwayjs/core';
export default (appInfo: MidwayAppInfo): MidwayConfig => {
return {
// ...
grpc: {
services: [
{
url: 'localhost:6565',
protoPath: join(appInfo.appDir, 'proto/helloworld.proto'),
package: 'helloworld',
},
],
},
};
}
代码调用
配置完后,我们就可以在代码里调用了。
@midwayjs/grpc
提供了 clients
,可以方便的获取到已配置的服务。我们只需要在需要注入的地方,注入这个对象即可。
比如:
import {
Provide,
Inject,
} from '@midwayjs/core';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';
@Provide()
export class UserService {
@Inject()
grpcClients: Clients;
}
我们通过 clients
获取到对方服务的客户端实例,然后调用即可。
import {
Provide,
Inject,
} from '@midwayjs/core';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';
@Provide()
export class UserService {
@Inject()
grpcClients: Clients;
async invoke() {
// 获取服务
const greeterService = this.grpcClients.getService<helloworld.GreeterClient>(
'helloworld.Greeter'
);
// 调用服务
const result = await greeterService.sayHello()
.sendMessage({
name: 'harry'
});
// 返回结果
return result;
}
}
我们也可以利用 @Init
装饰器,将需要调用的服务缓存到属性上。这样可以在其他方法调用时复用。
示例如下。
import {
GrpcMethod,
MSProviderType,
Provider,
Inject,
Init,
} from '@midwayjs/core';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';
@Provider(MSProviderType.GRPC, { package: 'hero' })
export class HeroService implements hero.HeroService {
// 注入客户端
@Inject()
grpcClients: Clients;
greeterService: helloworld.GreeterClient;
@Init()
async init() {
// 赋值一个服务实例
this.greeterService = this.grpcClients.getService<helloworld.GreeterClient>(
'helloworld.Greeter'
);
}
@GrpcMethod()
async findOne(data) {
// 调用服务
const result = await greeterService.sayHello()
.sendMessage({
name: 'harry'
});
// 返回结果
return result;
}
}
流式服务
gRPC 的流式服务用于减少连接,让服务端或者客户端不需要等待 即可执行任务,从而提高执行效率。
gRPC 的流式服务分为三种,以服务端角度来说,为
- 服务端接收流(客户端推)
- 服务端响应流(服务端推)
- 双向流
下面我们将一一介绍。
流式 proto 文件
流式的 proto 文件写法不同,需要在希望使用流式的地方将参数标记为 stream
。
syntax = "proto3";
package math;
message AddArgs {
int32 id = 1;
int32 num = 2;
}
message Num {
int32 id = 1;
int32 num = 2;
}
service Math {
rpc Add (AddArgs) returns (Num) {
}
// 双向流
rpc AddMore (stream AddArgs) returns (stream Num) {
}
// 服务端往客户端推
rpc SumMany (AddArgs) returns (stream Num) {
}
// 客户端往服务端推
rpc AddMany (stream AddArgs) returns (Num) {
}
}
该服务生成的接口定义为:
import {
IClientDuplexStreamService,
IClientReadableStreamService,
IClientUnaryService,
IClientWritableStreamService,
IClientOptions,
} from '@midwayjs/grpc';
export namespace math {
export interface AddArgs {
id?: number;
num?: number;
}
export interface Num {
id?: number;
num?: number;
}
/**
* server interface
*/
export interface Math {
add(data: AddArgs): Promise<Num>;
addMore(data: AddArgs): Promise<void>;
// 服务端推,客户端读
sumMany(data: AddArgs): Promise<void>
// 客户端端推,服务端读
addMany(num: AddArgs): Promise<void>;
}
/**
* client interface
*/
export interface MathClient {
add(options?: IClientOptions): IClientUnaryService<AddArgs, Num>;
addMore(options?: IClientOptions): IClientDuplexStreamService<AddArgs, Num>;
// 服务端推,客户端读
sumMany(options?: IClientOptions): IClientReadableStreamService<AddArgs, Num>;
// 客户端端推,服务端读
addMany(options?: IClientOptions): IClientWritableStreamService<AddArgs, Num>;
}
}
服务端推送
客户端调用一次,服务端可以多次返回。通过 @GrpcMethod()
的参数来标识流式类型。
可用的类型为:
GrpcStreamTypeEnum.WRITEABLE
服务端输出流(单工)GrpcStreamTypeEnum.READABLE
客户端输出流(单工),服务端接受多次GrpcStreamTypeEnum.DUPLEX
双工流
服务端示例如下:
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provider } from '@midwayjs/core';
import { Context, Metadata } from '@midwayjs/grpc';
import { math } from '../interface';
/**
*/
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {
@Inject()
ctx: Context;
@GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE })
async sumMany(args: math.AddArgs) {
this.ctx.write({
num: 1 + args.num
});
this.ctx.write({
num: 2 + args.num
});
this.ctx.write({
num: 3 + args.num
});
this.ctx.end();
}
// ...
}
服务端使用 ctx.write
方法来返回数据,由于是服务端流,可以返回多次。
返回结束后,请使用 ctx.end()
方法关闭流。
客户端,调用一次,接受多次数据。
比如下面的累加逻辑。
Promise 写法,会等待服务端数据都返回再做处理。
// 服务端推送
let total = 0;
let result = await service.sumMany().sendMessage({
num: 1,
});
result.forEach(data => {
total += data.num;
});
// total = 9;
事件写法,实时处理。
// 服务端推送
let call = service.sumMany().getCall();
call.on('data', data => {
// do something
});
call.sendMessage({
num: 1,
});
客户端推送
客户端调用多次,服务端接收多次数据,返回一个结果。通过 @GrpcMethod({type: GrpcStreamTypeEnum.READABLE})
的参数来标识流式类型。
服务端示例如下:
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provider } from '@midwayjs/core';
import { Context, Metadata } from '@midwayjs/grpc';
import { math } from '../interface';
/**
*/
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {
sumDataList: number[] = [];
@Inject()
ctx: Context;
@GrpcMethod({type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' })
async addMany(data: math.Num) {
this.sumDataList.push(data);
}
async sumEnd(): Promise<math.Num> {
const total = this.sumDataList.reduce((pre, cur) => {
return {
num: pre.num + cur.num,
}
});
return total;
}
// ...
}
客户端每次调用,都会触发一次 addMany
方法。
在客户端发送 end
事件之后,会调用 @GrpcMethod
装饰器上的 onEnd
参数指定的方法,该方法的返回值即为最后客户端拿到的值。
客户端示例如下:
// 客户端推送
const data = await service.addMany()
.sendMessage({num: 1})
.sendMessage({num: 2})
.sendMessage({num: 3})
.end();
// data.num = 6
双向流
客户端可以调用多次,服务端也可以接收多次数据,返回多个结果,类似于传统的 TCP 通信。通过 @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX})
的参数来标识双工流式类型。
服务端示例如下:
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provider } from '@midwayjs/core';
import { Context, Metadata } from '@midwayjs/grpc';
import { math } from '../interface';
/**
*/
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {
@Inject()
ctx: Context;
@GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' })
async addMore(message: math.AddArgs) {
this.ctx.write({
id: message.id,
num: message.num + 10,
});
}
async duplexEnd() {
console.log('got client end message');
}
// ...
}
服务端可以随时使用 ctx.write
返回数据,也可以使用 ctx.end
来关闭流。
客户端示例:
对于双工通信的客户端,由于无法保证调用、返回的顺序,我们需要使用监听的模式来消费结果。
const clientStream = service.addMore().getCall();
let total = 0;
let idx = 0;
duplexCall.on('data', (data: math.Num) => {
total += data.num;
idx++;
if (idx === 2) {
duplexCall.end();
// total => 29
}
});
duplexCall.write({
num: 3,
});
duplexCall.write({
num: 6,
});
如果希望保证调用顺序,我们也提供了保证顺序的双向流调用方法,但是需要在 proto 中定义一个固定的 id,来确保顺序。
比如我们的 Math.proto,对每个入参和出参,都增加了一个固定的 id,所以可以固定顺序。
syntax = "proto3";
package math;
message AddArgs {
int32 id = 1; // 这里的 id 名字是固定的
int32 num = 2;
}
message Num {
int32 id = 1; // 这里的 id 名字是固定的
int32 num = 2;
}
service Math {
rpc Add (AddArgs) returns (Num) {
}
rpc AddMore (stream AddArgs) returns (stream Num) {
}
// 服务端往客户端推
rpc SumMany (AddArgs) returns (stream Num) {
}
// 客户端往服务端推
rpc AddMany (stream AddArgs) returns (Num) {
}
}
固定顺序的客户端调用方式如下:
// 保证顺序的双向流
const t = service.addMore();
const result4 = await new Promise<number>((resolve, reject) => {
let total = 0;
// 第一次调用和返回
t.sendMessage({
num: 2
})
.then(res => {
expect(res.num).toEqual(12);
total += res.num;
})
.catch(err => console.error(err));
// 第二次调用和返回
t.sendMessage({
num: 5
}).then(res => {
expect(res.num).toEqual(15);
total += res.num;
resolve(total);
})
.catch(err => console.error(err));
t.end();
});
// result4 => 27
默认的 id 为 id
,如果服务端定义不同,需要修改,可以在客户端调用时传递。
// 保证顺序的双向流
const t = service.addMore({
messageKey: 'uid'
});
元数据(Metadata)
gRPC 的元数据等价于 HTTP 的上下文。
服务端通过 ctx.sendMetadata
方法返回元数据,也可以通过 ctx.metadata
获取客户端传递的元数据。
import {
MSProviderType,
Provider,
GrpcMethod,
} from '@midwayjs/core';
import { helloworld } from '../domain/helloworld';
import { Context, Metadata } from '@midwayjs/grpc';
/**
* 实现 helloworld.Greeter 接口的服务
*/
@Provider(MSProviderType.GRPC, { package: 'helloworld' })
export class Greeter implements helloworld.Greeter {
@Inject()
ctx: Context;
@GrpcMethod()
async sayHello(request: helloworld.HelloRequest) {
// 客户端传递的元数据
console.log(this.ctx.metadata);
// 创建元数据
const meta = new Metadata();
this.ctx.metadata.add('xxx', 'bbb');
this.ctx.sendMetadata(meta);
return { message: 'Hello ' + request.name };
}
}
客户端通过方法的 options 参数传递元数据。
import { Metadata } from '@midwayjs/grpc';
const meta = new Metadata();
meta.add('key', 'value');
const result = await service.sayHello({
metadata: meta,
}).sendMessage({
name: 'harry'
});
获取元数据相对麻烦一些。
普通一元调用(UnaryCall)获取元数据需要使用 sendMessageWithCallback
方法。
const call = service.sayHello().sendMessageWithCallback({
name: 'zhangting'
}, (err) => {
if (err) {
reject(err);
}
});
call.on('metadata', (meta) => {
// output meta
});
其他流式服务,可以通过 getCall()
方法 获取原始客户端流对象,从而直接订阅。
// 获取服务,注意,这里没有 await
const call = service.addMany().getCall();
call.on('metadata', (meta) => {
// output meta
});
超时处理
我们可以在调用服务时传递参数,单位毫秒。
const result = await service.sayHello({
timeout: 5000
}).sendMessage({
name: 'harry'
});