跳到主要内容
版本:3.0.0

SocketIO

Socket.io 是一个业界常用库,可用于在浏览器和服务器之间进行实时,双向和基于事件的通信。

image.png

Midway 提供了对 Socket.io 的支持和封装,能够简单的创建一个 Socket.io 服务。本篇内容演示了如何在 Midway 体系下,提供 Socket.io 服务的方法。

Midway 当前采用了最新的 Socket.io (v4.0.0) 进行开发。

相关信息:

提供服务

描述
可用于标准项目
可用于 Serverless
可用于一体化
包含独立主框架
包含独立日志

安装依赖

在现有项目中安装 Socket.io 的依赖。

$ npm i @midwayjs/socketio@3 --save
## 客户端可选
$ npm i @types/socket.io-client socket.io-client --save-dev

或者在 package.json 中增加如下依赖后,重新安装。

{
"dependencies": {
"@midwayjs/socket.io": "^3.0.0",
// 客户端可选
"socket.io-client": "^4.4.1",
// ...
},
"devDependencies": {
// 客户端可选
"@types/socket.io-client": "^1.4.36",
// ...
}
}

开启组件

@midwayjs/socket.io 可以作为独立主框架使用。

import { Configuration } from '@midwayjs/core';
import * as socketio from '@midwayjs/socketio';

@Configuration({
imports: [socketio],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}

也可以附加在其他的主框架下,比如 @midwayjs/koa

import { Configuration } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
import * as socketio from '@midwayjs/socketio';

@Configuration({
imports: [koa, socketio],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}


目录结构

下面是 Socket.io 项目的基础目录结构,和传统应用类似,我们创建了 socket 目录,用户存放 Soscket.io 业务的服务代码。

.
├── package.json
├── src
│ ├── configuration.ts ## 入口配置文件
│ ├── interface.ts
│ └── socket ## socket.io 服务的文件
│ └── hello.controller.ts
├── test
├── bootstrap.js ## 服务启动入口
└── tsconfig.json

Socket.io 工作原理

Socket.io 服务器和 Socket.io 客户端(浏览器,Node.js 或另一种编程语言)之间的双向通道通过 WebSocket连接 建立起来,在不可用时,将使用 HTTP 长轮询作为备用手段。

Socket.io 代码是基于 Engine.io 库搭建起来的,是属于 Engine.io 的上层实现。Engine.io 负责整个服务端和客户端连接的部分,包括连接检查,传输方式等等。而 Socket.io 负责上层的重连,封包缓冲,广播等等特性。

Socket.io(Engine.io)实现了两种 Transports(传输方式)。

第一种是 HTTP 长轮询。HTTP Get 请求用于 long-running(长连接),Post 请求用于 short-running(短连接)。

image.png

第二种是 WebSocket 协议,直接基于 WebSocket Connection 实现。它在服务器和客户端之间提供了双向且低延迟的通信通道。

在默认的情况下,Socket.io 会先采用 HTTP 长轮询进行连接,并发送一个类似下面结构的数据。

{
"sid": "FSDjX-WRwSA4zTZMALqx", // 连接的 session id
"upgrades": ["websocket"], // 可升级的协议
"pingInterval": 25000, // 心跳时间间隔
"pingTimeout": 20000 // 心跳超时时间
}

当当前的服务满足升级到 WebSocket 协议的要求时,会自动升级到 WebSocket 协议,如下图。 image.png

  • 1、第一次握手,传输 sid 等结构
  • 2、使用 HTTP 长轮询发送数据
  • 3、使用 HTTP 长轮询返回数据
  • 4、升级协议,使用 WebSocket 协议发送数据
  • 5、当协议升级后,关闭之前的长轮询

之后就开始正常的 WebSocket 通信了。

提供 Socket 服务

Midway 通过 @WSController 装饰器定义 Socket 服务。

@WSController('/')
export class HelloController {
// ...
}

@WSController 的入参,指代了每个 Socket 的 Namespace(非 path)。如果不提供 Namespace,每个 Socket.io 会自动创建一个 / 的 Namespace,并且将客户端连接都归到其中。

信息

这里的 namespace 支持字符串和正则。

当 Namespace 有客户端连接时,会触发 connection 事件,我们在代码中可以使用 @OnWSConnection() 装饰器来修饰一个方法,当每个客户端第一次连接到该 Namespace 时,将自动调用该方法。

import { WSController, OnWSConnection, Inject } from '@midwayjs/core';
import { Context } from '@midwayjs/socketio';

@WSController('/')
export class HelloSocketController {

@Inject()
ctx: Context;

@OnWSConnection()
async onConnectionMethod() {
console.log('on client connect', this.ctx.id);
}
}

信息

这里的 ctx 等价于 socket 实例。

消息和响应

Socket.io 是通过事件的监听方式来获取数据。Midway 提供了 @OnWSMessage() 装饰器来格式化接收到的事件,每次客户端发送事件,被修饰的方法都将被执行。

import { WSController, Provide, OnWSMessage, Inject } from '@midwayjs/core';
import { Context } from '@midwayjs/socketio';

@WSController('/')
export class HelloSocketController {

@Inject()
ctx: Context;

@OnWSMessage('myEvent')
async gotMessage(data) {
console.log('on data got', this.ctx.id, data);
}
}

注意,由于 Socket.io 在一个事件中可以传递多个数据,这里的参数可以是多个。

  @OnWSMessage('myEvent')
async gotMessage(data1, data2, data3) {
// ...
}

当获取到数据之后,通过业务逻辑处理数据,然后将结果返回给客户端,返回的时候,我们也是通过另一个事件发送给客户端。

通过 @WSEmit 装饰器来将方法的返回值返回给客户端。

import { WSController, OnWSConnection, Inject } from '@midwayjs/core';
import { Context } from '@midwayjs/socketio';

@WSController('/')
export class HelloSocketController {

@Inject()
ctx: Context;

@OnWSMessage('myEvent')
@WSEmit('myEventResult')
async gotMessage() {
return 'hello world'; // 这里将 hello world 字符串返回给客户端
}
}

上面的代码,我们的方法返回值 hello world,将自动发送给客户端监听的 myEventResult 事件。

Socket 中间件

Socket 中的中间件的写法和 Web 中间件 相似,但是加载的时机略有不同。

由于 Socket 有连接和接收消息两个阶段,所以中间件以此分为几类。

  • 全局 Connection 中间件,会对所有 namespace 下的 connection 生效
  • 全局 Message 中间件,会对所有 namespace 下的 message 生效
  • Controller 中间件,会对单个 namespace 下的 connection 和 message 生效
  • Connection 中间件,会对单个 namespace 下的 connection 生效
  • Message 中间件,会对单个 namespace 下的 message 生效

中间件写法

注意,中间件必须通过 return 返回结果。

// src/middleware/socket.middleware.ts
import { Middleware } from '@midwayjs/core';
import { Context, NextFunction } from '@midwayjs/socketio';

@Middleware()
export class SocketMiddleware {
resolve() {
return async (ctx: Context, next: NextFunction) => {
// ...
return await next();
}
}
}

全局中间件

和 Web 中间件类似,通过 socket.io 的 app 实例,注册中间件。

import * as socketio from '@midwayjs/socketio';

@Configuration({
imports: [
socketio
],
// ...
})
export class MainConfiguration {

@App('socketIO')
app: Application;

async onReady() {
// 可以注册全局 connection 中间件
this.app.useConnectionMiddleware(SocketMiddleware);
// 也可以注册全局 Message 中间件
this.app.useMiddleware(SocketMiddleware);
}
}

Namespace 中的中间件

通过装饰器,注册不同阶段的中间件。

比如 Namespace 级别的中间件,会对单个 namespace 下的 connection 和 message 生效。

// ...

// Namespace 级别的中间件
@WSController('/api', { middleware: [SocketMiddleware]})
export class APIController {
}

Connection 中间件,在连接时生效。

// ...

@WSController('/api')
export class APIController {

// Connection 触发时的中间件
@OnWSConnection({
middleware: [SocketMiddleware]
})
init() {
// ...
}
}

Message 中间件,接收到特定消息时生效。

// ...

@WSController('/api')
export class APIController {

// Message 触发时的中间件
@OnWSMessage('my', {
middleware: [SocketMiddleware]
})
@WSEmit('ok')
async gotMyMessage() {
// ...
}
}

本地测试

由于 socket.io 框架可以独立启动(依附于默认的 http 服务,也可以和其他 midway 框架一起启动)。

当作为独立框架启动时,需要指定端口。

// src/config/config.default
export default {
// ...
socketIO: {
port: 3000,
},
}

当作为副框架启动时(比如和 http ,由于 http 在单测时未指定端口(使用 supertest 自动生成),无法很好的测试,可以仅在测试环境显式指定一个端口。

// src/config/config.unittest
export default {
// ...
koa: {
port: null,
},
socketIO: {
port: 3000,
},
}
提示
  • 1、这里的端口仅为 WebSocket 服务在测试时启动的端口
  • 2、koa 中的端口为 null,即意味着在测试环境下,不配置端口,不会启动 http 服务

和其他 Midway 测试方法一样,我们使用 createApp 启动项目。

import { createApp, close } from '@midwayjs/mock'
// 这里使用的 Framework 定义,以主框架为准
import { Framework } from '@midwayjs/koa';

describe('/test/index.test.ts', () => {
it('should create app and test socket.io', async () => {
const app = await createApp<Framework>();

//...

await close(app);
});

});

你可以直接使用 socket.io-client 来测试。也可以使用 Midway 提供的基于 socket.io-client 模块封装的测试客户端。

假如我们的服务端处理逻辑如下(返回客户端传递的数据相加的结果):

@OnWSMessage('myEvent')
@WSEmit('myEventResult')
async gotMessage(data1, data2, data3) {
return {
name: 'harry',
result: data1 + data2 + data3,
};
}

测试代码如下:

import { createApp, close } from '@midwayjs/mock'
import { Framework } from '@midwayjs/koa';
import { createSocketIOClient } from '@midwayjs/mock';
import { once } from 'events';

describe('/test/index.test.ts', () => {
it('should test create socket app', async () => {

// 创建一个服务
const app = await createApp<Framework>();

// 创建一个对应的客户端
const client = await createSocketIOClient({
port: 3000,
});

// 拿到结果返回
const data = await new Promise(resolve => {
client.on('myEventResult', resolve);
// 发送事件
client.send('myEvent', 1, 2, 3);
});

// 判断结果
expect(data).toEqual({
name: 'harry',
result: 6,
});

// 关闭客户端
await client.close();
// 关闭服务端
await close(app);
});

});

如果多个客户端,也可以使用更简单的写法,使用 node 自带的 events 模块的 once 方法来优化,就会变成下面的代码。

import { createApp, close } from '@midwayjs/mock'
import { Framework } from '@midwayjs/koa';
import { createSocketIOClient } from '@midwayjs/mock';
import { once } from 'events';

describe('/test/index.test.ts', () => {

it('should test create socket app', async () => {

// 创建一个服务
const app = await createApp<Framework>();

// 创建一个对应的客户端
const client = await createSocketIOClient({
port: 3000,
});

// 用事件的 promise 写法监听
const gotEvent = once(client, 'myEventResult');
// 发送事件
client.send('myEvent', 1, 2, 3);
// 等待返回
const [data] = await gotEvent;
// 判断结果
expect(data).toEqual({
name: 'harry',
result: 6,
});

// 关闭客户端
await client.close();
// 关闭服务端
await close(app);
});

});

两种写法效果相同,按自己理解的写就行。

等待回执(ack)的消息

Socket.io 支持一种直接返回消息的写法。当客户端传递消息的时候,如果最后一个参数为一个 function(callback),则服务端可以拿到这个 callback,将数据直接返回给客户端,不需要创建一个新的消息。

我们的服务代码不需要变化, @midwayjs/socketio 内部会判断最后一个参数,自动返回给客户端。

比如,服务端代码:

@OnWSMessage('myEvent')
@WSEmit('myEventResult')
async gotMessage(data1, data2, data3) {
return {
name: 'harry',
result: data1 + data2 + data3,
};
}

客户端测试代码:

import { createApp, close } from '@midwayjs/mock'
import { Framework } from '@midwayjs/koa';
import { createSocketIOClient } from '@midwayjs/mock';
import { once } from 'events';

describe('/test/index.test.ts', () => {

it('should test create socket app', async () => {

// 创建一个服务
const app = await createApp<Framework>();

// 创建一个对应的客户端
const client = await createSocketIOClient({
port: 3000,
});

// 发送事件,这里使用了 await 的写法
const data = await client.sendWithAck('myEvent', 1, 2, 3);

// 判断结果
expect(data).toEqual({
name: 'harry',
result: 6,
});

// 关闭客户端
await client.close();
// 关闭服务端
await close(app);
});

});

常见的消息和广播

以下面的代码示例举例:

import { Context, Application } from '@midwayjs/socketio';
import { WSController, OnWSMessage, WSEmit, App, Inject } from '@midwayjs/core';

@WSController('/')
export class HelloSocketController {

@Inject()
ctx: Context;

@App()
app: Application;

@OnWSMessage('myEvent')
@WSEmit('myEventResult')
async gotMessage() {
// TODO
}
}

发送给客户端(也可以用装饰器形式直接 return)。

this.ctx.emit("hello", "can you hear me?", 1, 2, "abc");

发送给的所有除发件人以外的所有客户端。

this.ctx.broadcast.emit("broadcast", "hello friends!");

发送给所有在 game 房间的客户端(除了发送者)。

this.ctx.to("game").emit("nice game", "let's play a game");

发送给所有的 game1game2 房间的客户端(除了发送者)。

this.ctx.to("game1").to("game2").emit("nice game", "let's play a game (too)");

发送给所有 game 房间的客户端,包括发送者。

this.app.in("game").emit("big-announcement", "the game will start soon");

myNamespace 命名空间的客户端广播,包括发送者。

// 从 app 发送
this.app.of("myNamespace").emit("bigger-announcement", "the tournament will start soon");
// 从 ctx 发送
this.ctx.nsp.emit("bigger-announcement", "the tournament will start soon");

发送到特定的 namespace 和 room,包括发送者。

// 从 app 发送
this.app.of("myNamespace").to("room").emit("event", "message");
// 从 ctx 发送
this.ctx.nsp.emit("bigger-announcement", "the tournament will start soon");

发送给所有连接到当前节点上的客户端(多个节点的时候,就是多进程)

this.app.local.emit("hi", "my lovely babies");

Appliation(io 对象)

传统的 Socket.io 服务端创建代码如下:

const io = require("socket.io")(3000);

io.on("connection", socket => {
// ...
});

@midwayjs/socketio 框架中,Application 实例即为该 io 实例,类型和能力保持一致。即通过 @App 装饰器注入的 app 实例,即为 io 对象。

我们可以通过该对象做一些全局的事情。

比如获取所有的 socket 实例。

// 返回所有的 socket 实例
const sockets = await app.fetchSockets();

// 返回所有的在 room1 的 socket 实例
const sockets = await app.in("room1").fetchSockets();

// 返回特定 socketId 的实例
const sockets = await app.in(theSocketId).fetchSockets();

多框架下,主框架一般为 Web 框架,我们可以通过指定 key 获取 Socket.io 的 app。

import { Application as SocketApplication } from '@midwayjs/socketio';
import { Controller, App } from '@midwayjs/core';

@Controller()
export class UserController {

@App('socketIO')
socketApp: SocketApplication;
}

这样我们可以通过 @midwayjs/socketio 的 app 对象(等价于 io),调用现有的 socket 连接。

比如,HTTP 请求调用进来对特定 namespace 下的所有客户端广播:

import { Application as SocketApplication } from '@midwayjs/socketio';
import { Provide, Controller, App, Get } from '@midwayjs/core';

@Controller()
export class UserController {

@App('socketIO')
socketApp: SocketApplication;

@Get()
async invoke() {
// 对 / 下的连接做广播
this.socketApp.of('/').emit('hi', 'everyone');
}
}

更多的 io API,请参考 Socket.io Server instance 文档

Socket 部署

Socket 服务端口

@midwayjs/socketio 的配置样例如下:

// src/config/config.default
export default {
// ...
socketIO: {
port: 7001,
},
}

@midwayjs/socketio 和其他 @midwayjs/web@midwayjs/koa@midwayjs/express 同时启用时,可以复用http 端口。

// src/config/config.default
export default {
// ...
koa: {
port: 7001,
},
socketIO: {
// 这里不配置即可
},
}

Nginx 配置

一般来说,我们的 Node.js 服务前都会有 Nginx 等类似的反向代理服务,这里以 Nginx 的配置为例。

http {
server {
listen 80;
server_name example.com;

location / {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;

proxy_pass http://localhost:7001;

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}

配置

可用配置

属性类型描述
portnumber可选,如果传递了该端口,socket.io 内部会创建一个该端口的 HTTP 服务,并将 socket 服务 attach 在其之上。如果希望和 midway 其他的 web 框架配合使用,请不要传递该参数。
pathstring可选,服务端 path
adapterobject分布式处理的适配器,比如可以配置 redis-adapter
connectTimeoutnumber客户端超时时间,单位 ms,默认值 45000

更多的启动选项,请参考 Socket.io 文档

适配器

适配器是用于 Socket.io 在分布式部署时,在多台机器,多个进程能够进行通信的一层适配层,当前 socket.io 官方提供的适配器有几种:

  • 1、cluster-adapter 用于在单台机器,多进程之间适配
  • 2、redis-adapter 用于在多台机器,多个进程之间适配

在分布式场景下,我们一般使用 redis-adapater 来实现功能。

配置 redis 适配器

@midwayjs/socketio 提供了一个适配器(adapter)的入口配置,只需要初始化适配器实例,传入即可。

提示

Socket.io 官方已经更新了原有的适配器包名,现在的包名为 @socket.io/redis-adapter(原来叫 socket.io-redis),配置有更新,迁移参考请查看 官方文档

安装如下:

$ npm i @socket.io/redis-adapter --save

新版本配置的示例如下,更多的配置可以参考 官方文档

// src/config/config.default
import { createAdapter } from '@socket.io/redis-adapter';
import Redis from 'ioredis';

// github 文档创建 redis 实例

const pubClient = new Redis(/* redis 配置 */);
const subClient = pubClient.duplicate();

export default {
// ...
socketIO: {
adapter: createAdapter(pubClient, subClient)
},
}

通过使用 @socket.io/redis-adapter 适配器运行 Socket.io,可以在不同的进程或服务器中运行多个 Socket.io 实例,这些实例都可以相互广播和发送事件。

此外,还有一些 Adapter 上的特殊 API,具体可以查看 文档

粘性会话

由于 Node.js 经常在启动时使用多进程(cluster)模式,如果同一个会话(sid)无法多次访问到同一个进程上,socket.io 就会报错。

解决办法有两种。

使用 WebSocket 协议

最简单的方法,只启用 WebSocket 协议(禁用长轮询),这样就可以规避上述问题。

你需要在服务端和客户端同时配置。

// 服务端
export default {
// ...
socketIO: {
// ...
transports: ['websocket'],
},
}

// 客户端
const socket = io("http://127.0.0.1:7001", {
transports: ['websocket']
});

调整进程模型

这是相对复杂的方法,但是在 pm2 部署的场景下,既要支持粘性会话又要启用轮询支持,这是唯一的解法。

第一步,禁用配置中启动的端口,比如:

// src/config/config.default
export default {
koa: {
// port: 7001,
},
socketIO: {
// ...
},
};

如果开发需要,可以在 config.local 中加上端口,或者直接在 package.json 的 scripts 中加上端口。

"scripts": {
"dev": "cross-env NODE_ENV=local midway-bin dev --ts --port=7001",
},

第二步,调整你的 bootstrap.js 文件内容,使其变为下面的代码。

const { Bootstrap, ClusterManager, setupStickyMaster } = require('@midwayjs/bootstrap');
const http = require('http');

// 创建一个进程管理器,处理子进程
const clusterManager = new ClusterManager({
exec: __filename,
count: 4,
sticky: true, // 开启粘性会话支持
});

if (clusterManager.isPrimary()) {
// 主进程启动一个 http server 做监听
const httpServer = http.createServer();
setupStickyMaster(httpServer);

// 启动子进程
clusterManager.start().then(() => {
// 监听端口
httpServer.listen(7001);
console.log('main process is ok');
});

clusterManager.onStop(async () => {
// 停止时关闭 http server
await httpServer.close();
});
} else {
// 子进程逻辑
Bootstrap
.run()
.then(() => {
console.log('child is ready');
});
}

在 pm2 启动时,无需指定 -i 参数来启动 worker,直接 pm2 --name=xxx ./bootstrap.js 使其只启动一个进程。

常见 API

获取连接数

const count = app.engine.clientsCount;      // 获取所有的连接数
const count = app.of('/').sockets.size; // 获取单个 namespace 里的连接数

修改 sid 生成

const uuid = require("uuid");

app.engine.generateId = (req) => {
return uuid.v4(); // must be unique across all Socket.IO servers
}

常见问题

服务端/客户端没连上,没响应

1、端口服务端和客户端一致

export default {
koa: {
port: 7001, // 这里的端口
}
}

// 或者

export default {
socketIO: {
port: 7001, // 这里的端口
}
}

和下面的端口要一致。

// socket.io client
const socket = io('************:7001', {
//...
});

// midway 的 socket.io 测试客户端
const client = await createSocketIOClient({
port: 7001
});

2、服务端的 path 和客户端的 path 要保持一致。path 指的是启动参数的部分。

// config.default
export default {
socketIO: {
path: '/testPath' // 这里是服务端 path
}
}

和下面的 path 要一致

// socket.io client
const socket = io('************:7001', {
path: '/testPath' // 这里是客户端的 path
});

// midway 的 socket.io 测试客户端
const client = await createSocketIOClient({
path: '/testPath'
});

3、服务端的 namespace 和客户端的 namespace 要保持一致。

// server
@WSController('/test') // 这里是服务端的 namespace
export class HelloController {
}

// socket.io client
const io = require("socket.io-client")
io('*****:3000/test', {}); // 这里是客户端的 namespace


// midway 的 socket.io 测试客户端
const client = await createSocketIOClient({
namespace: '/test',
});

配置 CORS

如果出现跨域错误,需要在启动的时候配置 cors 信息。

// config.default
export default {
socketIO: {
cors: {
origin: "http://localhost:8080",
methods: ["GET", "POST"]
}
}
}

具体参数可以参考 Socket.io Handling CORS