Kafka
在复杂系统的架构中,事件流是很重要的一环,包括从事件源中(数据库、传感器、移动设备等)以事件流的方式去实时捕获数据,持久化事件流方便检索,并实时和回顾操作处理响应事件流。
应用于支付和金融交易、实施跟踪和监控汽车等行业信息流动、捕获分析物联网数据等等。
在 Midway中,我们提供了订阅 Kafka 的能力,专门来满足用户的这类需求。
相关信息:
订阅服务
描述 | |
---|---|
可用于标准项目 | ✅ |
可用于 Serverless | ❌ |
可用于一体化 | ✅ |
包含独立主框架 | ✅ |
包含独立日志 | ✅ |
基础概念
分布式流处理平台
- 发布订阅(流)信息
- 容错(故障转移)存储信息(流),存储事件流
- 在消息流发生的时候进行处理,处理事件流
理解 Producer(生产者)
- 发布消息到一个主题或多个 topic (主题)。
理解 Consumer(主题消费者)
- 订阅一个或者多个 topic,并处理产生的信息。
理解 Stream API
- 充当一个流处理器,从 1 个或多个 topic 消费输入流,并生产一个输出流到1个或多个输出 topic,有效地将输入流转换到输出流。
理解 Broker
- 已发布的消息保存在一组服务器中,称之为 Kafka 集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
从 v3.19 开始,Kafka 组件做了一次重构,Kafka 组件的配置、使用方法和之前都有较大差异,原有使用方式兼容,但是文档不再保留。
安装依赖
安装 @midwayjs/kafka
模块。
$ npm i @midwayjs/kafka --save
或者在 package.json
中增加如下依赖后,重新安装。
{
"dependencies": {
"@midwayjs/kafka": "^3.0.0",
// ...
}
}
开启组件
@midwayjs/kafka
可以作为独立主框架使用。
// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as kafka from '@midwayjs/kafka';
@Configuration({
imports: [
kafka
],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}
也可以附加在其他的主框架下,比如 @midwayjs/koa
。
// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
import * as kafka from '@midwayjs/kafka';
@Configuration({
imports: [
koa,
kafka
],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}
由于 Kafka 分为 消费者(Consumer) 和 生产者(Producer) 两部分,两个可以独立使用,我们将分别介绍。
消费者(Consumer)
目录结构
我们一般把消费者放在 consumer 目录。比如 src/consumer/user.consumer.ts
。
➜ my_midway_app tree
.
├── src
│ ├── consumer
│ │ └── user.consumer.ts
│ ├── interface.ts
│ └── service
│ └── user.service.ts
├── test
├── package.json
└── tsconfig.json
基础配置
通过 consumer
字段和 @KafkaConsumer
装饰器,我们可以配置多个消费者。
比如,下面的 sub1
和 sub2
就是两个不同的消费者。
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
// ...
},
sub2: {
// ...
},
}
}
}
最简单的消费者配置需要几个字段,Kafka 的连接配置、消费者配置以及订阅配置。
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
// ...
},
consumerOptions: {
// ...
},
subscribeOptions: {
// ...
},
},
}
}
}
比如:
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumerOptions: {
groupId: 'groupId-test-1',
},
subscribeOptions: {
topics: ['topic-test-1'],
}
},
}
}
}
完整可配置参数包括:
connectionOptions
:Kafka 的连接配置,即new Kafka(consumerOptions)
的参数consumerOptions
:Kafka 的消费者配置,即kafka.consumer(consumerOptions)
的参数subscribeOptions
:Kafka 的订阅配置,即consumer.subscribe(subscribeOptions)
的参数consumerRunConfig
:消费者运行配置,即consumer.run(consumerRunConfig)
的参数
这些参数的详细说明,可以参考 KafkaJS Consumer 文档。
复用 Kafka 实例
如果如果需要复用 Kafka 实例,可以通过 kafkaInstanceRef
字段来指定。
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumerOptions: {
groupId: 'groupId-test-1',
},
subscribeOptions: {
topics: ['topic-test-1'],
}
},
sub2: {
kafkaInstanceRef: 'sub1',
consumerOptions: {
groupId: 'groupId-test-2',
},
subscribeOptions: {
topics: ['topic-test-2'],
}
}
}
}
}
注意,上述的 sub1
和 sub2
是两个不同的消费者,但是它们共享同一个 Kafka 实例,且 sub2
的 groupId
需要和 sub1
不同。
用 Kafka SDK 写法类似如下:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
const consumer1 = kafka.consumer({ groupId: 'groupId-test-1' });
const consumer2 = kafka.consumer({ groupId: 'groupId-test-2' });
消费者实现
我们可以在目录中提供一个标准的消费者实现,比如 src/consumer/sub1.consumer.ts
。
// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachMessagePayload } from '@midwayjs/kafka';
@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
async eachMessage(payload: EachMessagePayload) {
// ...
}
}
sub1
是消费者名称,使用的是配置中的 sub1
消费者。
也可以实现 eachBatch
方法,处理批量消息。
// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachBatchPayload } from '@midwayjs/kafka';
@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
async eachBatch(payload: EachBatchPayload) {
// ...
}
}
消息上下文
和其他消息订阅机制一样,消息本身通过 Context
字段来传递。
// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachMessagePayload, Context } from '@midwayjs/kafka';
import { Inject } from '@midwayjs/core';
@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
@Inject()
ctx: Context;
async eachMessage(payload: EachMessagePayload) {
// ...
}
}
Context
字段包括几个属性:
属性 | 类型 | 描述 |
---|---|---|
ctx.payload | EachMessagePayload, EachBatchPayload | 消息内容 |
ctx.consumer | Consumer | 消费者实例 |
你可以通过 ctx.consumer
来调用 Kafka 的 API,比如 ctx.consumer.commitOffsets
来手动提交偏移量或者 ctx.consumer.pause
来暂停消费。