跳到主要内容
版本:4.0.0 🚧

MQTT

MQTT是用于物联网 (IoT) 的OASIS标准消息传递协议。它被设计为非常轻量级的发布/订阅消息传输,非常适合以较小的代码占用空间和最小的网络带宽连接远程设备。MQTT目前广泛应用于汽车、制造、电信、石油和天然气等行业。

相关信息:

描述
可用于标准项目
可用于 Serverless可以发布消息
可用于一体化
包含独立主框架
包含独立日志

版本要求

由于 mqtt 库本身的要求,所需要的版本为 Node.js >= 16

前置依赖

由于 MQTT 需要 Broker 作为中转传输,你需要自行部署 MQTT Broker 服务,本文档不提供 MQTT 服务本身的部署指导。

安装组件

安装 mqtt 组件。

$ npm i @midwayjs/mqtt@3 --save

或者在 package.json 中增加如下依赖后,重新安装。

{
"dependencies": {
"@midwayjs/mqtt": "^3.0.0",
// ...
},
"devDependencies": {
// ...
}
}

启用组件

src/configuration.ts 中引入组件

// ...
import * as mqtt from '@midwayjs/mqtt';

@Configuration({
imports: [
// ...other components
mqtt,
],
})
export class MainConfiguration {}

由于 MQTT 分为 订阅者(subscriber) 和 **发布者(publisher)**两部分,两个可以独立使用,我们将分别介绍。

订阅服务

基础配置

通过 sub 字段和 @MqttSubscriber 装饰器,我们可以配置多个订阅者。

比如,下面的 sub1sub2 就是两个不同的订阅者。

// src/config/config.default

export default {
mqtt: {
sub: {
sub1: {
// ...
},
sub2: {
// ...
}
}
}
}

最简单的订阅者配置需要几个字段,订阅的地址和订阅的 Topic。

// src/config/config.default

export default {
mqtt: {
sub: {
sub1: {
connectOptions: {
host: 'test.mosquitto.org',
port: 1883,
},
subscribeOptions: {
topicObject: 'test',
},
},
sub2: {
// ...
}
}
}
}

sub1 订阅者配置了 connectOptionssubscribeOptions ,分别代表连接配置和订阅配置。

订阅实现

我们可以在目录中提供一个标准的订阅器实现,比如 src/consumer/sub1.subscriber.ts

// src/consumer/sub1.subscriber.ts

import { ILogger, Inject } from '@midwayjs/core';
import { Context, IMqttSubscriber, MqttSubscriber } from '@midwayjs/mqtt';

@MqttSubscriber('test')
export class Sub1Subscriber implements IMqttSubscriber {

@Inject()
ctx: Context;

async subscribe() {
// ...
}
}

@MqttSubscriber 装饰器声明了一个订阅类实现,它的参数为订阅者的名字,比如我们配置文件中的 sub1

IMqttSubscriber 接口约定了一个 subscribe 方法,每当接收到新的消息时,这个方法就会被执行。

和其他消息订阅机制一样,消息本身通过 Context 字段来传递。

// ...
export class Sub1Subscriber implements IMqttSubscriber {
@Inject()
ctx: Context;

async subscribe() {
const payload = this.ctx.message.toString();
// ...
}
}

Context 字段包括几个 mqtt 属性。

属性类型描述
ctx.topicstring订阅 Topic
ctx.messageBuffer消息内容
ctx.packetIPublishPacket(来自 mqtt 库)publish 的包信息

消息发布

基础配置

消息发布也需要创建实例,配置本身使用了 服务工厂 的设计模式。

比如多实例配置如下:

// src/config/config.default

export default {
mqtt: {
pub: {
clients: {
default: {
host: 'test.mosquitto.org',
port: 1883,
},
pub2: {
// ...
}
}
}
}
}

上面的配置创建了名为 defaultpub2 的两个实例。

使用发布者

如果实例名为 default ,则可以使用默认的消息发布类。

比如:

// src/service/user.service.ts
import { Provide, Inject } from '@midwayjs/core';
import { DefaultMqttProducer } from '@midwayjs/mqtt';

@Provide()
export class UserService {

@Inject()
producer: DefaultMqttProducer;

async invoke() {
// 同步发布消息
this.producer.publish('test', 'hello world');

// 异步发布
await this.producer.publishAsync('test', 'hello world');

// 增加配置
await this.producer.publishAsync('test', 'hello world', {
qos: 2
});
}
}

也可以使用内置的工厂类 MqttProducerFactory 注入不同的实例。

// src/service/user.service.ts
import { Provide, Inject } from '@midwayjs/core';
import { MqttProducerFactory, DefaultMqttProducer } from '@midwayjs/mqtt';

@Provide()
export class UserService {

@InjectClient(MqttProducerFactory, 'pub2')
producer: DefaultMqttProducer;

async invoke() {
// ...
}
}

组件日志

组件有着自己的日志,默认会将 ctx.logger 记录在 midway-mqtt.log 中。

我们可以单独配置这个 logger 对象。

export default {
midwayLogger: {
// ...
mqttLogger: {
fileLogName: 'midway-mqtt.log',
},
}
}

这个日志的输出格式,我们也可以单独配置。

export default {
mqtt: {
// ...
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} ${info.message}`;
},
}
}