RabbitMQ
在复杂系统的架构中,会有负责处理消息队列的微服务,如下图:服务 A 负责产生消息给消息队列,而服务 B 则负责消费消息队列中的任务。
在 Midway 中,我们提供了订阅 rabbitMQ 的能力,专门来满足用户的这类需求。
基础概念
RabbitMQ 的概念较为复杂,其基于高级消息队列协议即 Advanced Message Queuing Protocol(AMQP),如果第一次接触请阅读一下相关的参考文档。
AMQP 有一些概念,Queue、Exchange 和 Binding 构成了 AMQP 协议的核心,包括:
- Producer:消息生产者,即投递消息的程序。
- Broker:消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Consumer:消息消费者,即接受消息的程序。
简单的理解,消息通过 Publisher 发布到 Exchange(交换机),Consumer 通过订阅 Queue 来接受消息,Exchange 和 Queue 通过路由做连接。
消费者(Consumer)使用方法
安装依赖
Midway 提供了订阅 rabbitMQ 的能力,并能够独立部署和使用。安装 @midwayjs/rabbitmq
模块及其定义。
$ npm i @midwayjs/rabbitmq@2 amqplib --save
$ npm i @types/amqplib --save-dev
入口函数
和 Web 一样,创建一个入口文件,指定 Framework 即可。
// server.js
const { Bootstrap } = require('@midwayjs/bootstrap');
const RabbitMQFramework = require('@midwayjs/rabbitmq').Framework;
const rabbitMQFramework = new RabbitMQFramework().configure({
url: 'amqp://localhost',
});
Bootstrap.load(rabbitMQFramework).run();
整个启动的配置为:
export type IMidwayRabbitMQConfigurationOptions = {
url: string | Options.Connect;
socketOptions?: any;
reconnectTime?: number;
};
url | rabbitMQ 的连接信息 |
---|---|
socketOptions | amqplib.connect 的第二个参数 |
reconnectTime | 队列断连后的重试时间,默认 10 秒 |
订阅 rabbitMQ
我们一般把能力分为生产者和消费者,而订阅正是消费者的能力。
我们一般把消费者放在 consumer 目录。比如 src/consumer/userConsumer.ts
。
➜ my_midway_app tree
.
├── src
│ ├── consumer
│ │ └── userConsumer.ts
│ ├── interface.ts
│ └── service
│ └── userService.ts
├── test
├── package.json
└── tsconfig.json
代码示例如下。
import { Provide, Consumer, MSListenerType, RabbitMQListener, Inject } from '@midwayjs/decorator';
import { Context } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Provide()
@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {
@Inject()
ctx: Context;
@RabbitMQListener('tasks')
async gotData(msg: ConsumeMessage) {
this.ctx.channel.ack(msg);
}
}
@Consumer
装饰器,提供消费者标识,并且它的参数,指定了某种消费框架的类型,比如,我们这里指定了 MSListenerType.RABBITMQ
这个类型,指的就是 rabbitMQ 类型。
标识了 @Consumer
的类,对方法使用 @RabbitMQListener
装饰器后,可以绑定一个 RabbitMQ 的队列(Queue)。
方法的参数为接收到的消息,类型为 ConsumeMessage
。如果返回值需要确认,则需要对服务端进行 ack
操作,明确接收到的数据。
如果需要订阅多个队列,可以使用多个方法,也可以使用多个文件。
RabbitMQ 消息上下文
订阅 RabbitMQ
数据的上下文,和 Web 同样的,其中包含一个 requestContext
,和每次接收消息的数据绑定。
从 ctx 上可以取到 channel
,整个 ctx 的定义为:
export type Context = {
channel: amqp.Channel;
requestContext: IMidwayContainer;
};
可以从框架获取定义
import { Context } from '@midwayjs/rabbitmq';
Fanout Exchange
Fanout 是一种特定的交换机,如果满足匹配(binding),就往 Exchange 所绑定的 Queue 发送消息。Fanout Exchange 会忽略 RoutingKey 的设置,直接将 Message 广播到所有绑定的 Queue 中。
即所有订阅该交换机的 Queue 都会收到消息。
比如,下面我们添加了两个 Queue,订阅了相同的交换机。
import { Provide, Consumer, MSListenerType, RabbitMQListener, Inject, App } from '@midwayjs/decorator';
import { Context, Application } from '@midwayjs/rabbitmq';
import { ConsumeMessage } from 'amqplib';
@Provide()
@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
@RabbitMQListener('abc', {
exchange: 'logs',
exchangeOptions: {
type: 'fanout',
durable: false,
},
exclusive: true,
consumeOptions: {
noAck: true,
},
})
async gotData(msg: ConsumeMessage) {
this.logger.info('test output1 =>', msg.content.toString('utf8'));
// TODO
}
@RabbitMQListener('bcd', {
exchange: 'logs',
exchangeOptions: {
type: 'fanout',
durable: false,
},
exclusive: true,
consumeOptions: {
noAck: true,
},
})
async gotData2(msg: ConsumeMessage) {
this.logger.info('test output2 =>', msg.content.toString('utf8'));
// TODO
}
}
订阅的 abc 和 bcd 队列,绑定了相同的交换机 logs,最终的结果是,两个方法都会被调用。
Direct Exchange
Direct Exchange 是 RabbitMQ 默认的 Exchange,完全根据 RoutingKey 来路由消息。设置 Exchange 和 Queue 的 Binding 时需指定 RoutingKey(一般为 Queue Name),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的 Queue。
下面的示例代码,我们不填写 Queue Name,只添加一个 routingKey,交换机类型为 direct。
import { Provide, Consumer, MSListenerType, RabbitMQListener, Inject, App } from '@midwayjs/decorator';
import { Context, Application } from '../../../../../src';
import { ConsumeMessage } from 'amqplib';
@Provide()
@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {
@App()
app: Application;
@Inject()
ctx: Context;
@Inject()
logger;
@RabbitMQListener('', {
exchange: 'direct_logs',
exchangeOptions: {
type: 'direct',
durable: false,
},
routingKey: 'direct_key',
exclusive: true,
consumeOptions: {
noAck: true,
},
})
async gotData(msg: ConsumeMessage) {
// TODO
}
}
direct 类型的消息,会根据 routerKey 做定向过滤,所以只有特定订阅能收到消息。
装饰器参数
@RabbitMQListener
装饰器的第一个参数为 queueName,代表需要监听的队列。
第二个参数是一个对象,包含队列,交换机等参数,详细定义如下:
export interface RabbitMQListenerOptions {
exchange?: string;
/**
* queue options
*/
exclusive?: boolean;
durable?: boolean;
autoDelete?: boolean;
messageTtl?: number;
expires?: number;
deadLetterExchange?: string;
deadLetterRoutingKey?: string;
maxLength?: number;
maxPriority?: number;
pattern?: string;
/**
* prefetch
*/
prefetch?: number;
/**
* router
*/
routingKey?: string;
/**
* exchange options
*/
exchangeOptions?: {
type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string;
durable?: boolean;
internal?: boolean;
autoDelete?: boolean;
alternateExchange?: string;
arguments?: any;
};
/**
* consumeOptions
*/
consumeOptions?: {
consumerTag?: string;
noLocal?: boolean;
noAck?: boolean;
exclusive?: boolean;
priority?: number;
arguments?: any;
};
}
本地测试
Midway 提供了一个简单的测试方法用于测试订阅某个数据。 @midwayjs/mock
工具提供了一个 createRabbitMQProducer
的方法,用于创建一个生产者,通过它,你可以创建一个队列(Queue),以及向这个队列发消息。
然后,我们启动一个 app,就可以自动监听到这个队列中的数据,并执行后续逻辑。
import { createRabbitMQProducer, closeApp, creatApp } from '@midwayjs/mock';
describe('/test/index.test.ts', () => {
it('should test create message and get from app', async () => {
// create a queue and channel
const channel = await createRabbitMQProducer('tasks', {
isConfirmChannel: true,
mock: false,
url: 'amqp://localhost',
});
// send data to queue
channel.sendToQueue('tasks', Buffer.from('something to do'));
// create app and got data
const app = await creatApp('base-app', { url: 'amqp://localhost' });
await closeApp(app);
});
});
示例一
创建一个 fanout exchange。
const manager = await createRabbitMQProducer('tasks-fanout', {
isConfirmChannel: false,
mock: false,
url: 'amqp://localhost',
});
// Name of the exchange
const ex = 'logs';
// Write a message
const msg = 'Hello World!';
// 声明交换机
manager.assertExchange(ex, 'fanout', { durable: false }); // 'fanout' will broadcast all messages to all the queues it knows
// 启动服务
const app = await creatApp('base-app-fanout', {
url: 'amqp://localhost',
reconnectTime: 2000,
});
// 发送到交换机,由于不持久化,需要等订阅服务起来之后再发
manager.sendToExchange(ex, '', Buffer.from(msg));
// 等一段时间
await sleep(5000);
// 校验结果
// 关闭 producer
await manager.close();
// 关闭 app
await closeApp(app);
示例二
创建一个 direct exchange。
/**
* direct 类型的消息,根据 routerKey 做定向过滤
*/
const manager = await createRabbitMQProducer('tasks-direct', {
isConfirmChannel: false,
mock: false,
url: 'amqp://localhost',
});
// Name of the exchange
const ex = 'direct_logs';
// Write a message
const msg = 'Hello World!';
// 声明交换机
manager.assertExchange(ex, 'direct', { durable: false }); // 'fanout' will broadcast all messages to all the queues it knows
const app = await creatApp('base-app-direct', {
url: 'amqp://localhost',
reconnectTime: 2000,
});
// 这里指定 routerKey,发送到交换机
manager.sendToExchange(ex, 'direct_key', Buffer.from(msg));
// 校验结果
await manager.close();
await closeApp(app);
生产者(Producer)使用方法
生产者(Producer)也就是第一节中的消息产生者,简单的来说就是会创建一个客户端,将消息发送到 RabbitMQ 服务。
注意:当前 Midway 并没有使用组件来支持消息发送,这里展示的示例只是使用纯 SDK 在 Midway 中的写法。
安装依赖
$ npm i amqplib amqp-connection-manager --save
$ npm i @types/amqplib --save-dev
调用服务发送消息
比如,我们在 service 文件下,新增一个 rabbitmq.ts
文件。
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy } from '@midwayjs/decorator';
import * as amqp from 'amqp-connection-manager';
@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {
private connection: amqp.AmqpConnectionManager;
private channelWrapper;
@Init()
async connect() {
// 创建连接,你可以把配置放在 Config 中,然后注入进来
this.connection = await amqp.connect('amqp://localhost');
// 创建 channel
this.channelWrapper = this.connection.createChannel({
json: true,
setup: function (channel) {
return Promise.all([
// 绑定队列
channel.assertQueue('tasks', { durable: true }),
]);
},
});
}
// 发送消息
public async sendToQueue(queueName: string, data: any) {
return this.channelWrapper.sendToQueue(queueName, data);
}
@Destroy()
async close() {
await this.channelWrapper.close();
await this.connection.close();
}
}
大概就是创建了一个用来封装消息通信的 service,同时他是全局唯一的 Singleton 单例。由于增加了 @AutoLoad
装饰器,可以自执行初始化。
这样基础的调用服务就抽象好了,我们只需要在用到的地方,调用 sendToQueue
方法即可。
比如:
@Provide()
export class UserService {
@Inject()
rabbitmqService: RabbitmqService;
async invoke() {
// TODO
// 发送消息
await this.rabbitmqService.sendToQueue('tasks', { hello: 'world' });
}
}