【WebSocket】实时推送与离线消息
这是一篇通道层专题。好友、群聊、消息、通话各篇都会「触发推送」「支持离线补偿」,但都引到这里——本篇统一讲「消息怎么实时到对端的多个端、断线后怎么补」,业务篇不再重复。
# 1. 要解决的问题
一条消息(或一个状态变更)产生后,要做到三件事:
- 实时:在线的接收方,包括他登录的多个端,立刻收到。
- 不丢:接收方不在线、或推送漏了,下次能补回来。
- 一致:多端之间、断线重连后,未读数 / 已读位置 / 好友群状态都对得上。
芋道 IM 的取舍是:在线靠 WebSocket 推(长连接),不在线 / 漏推靠 pull(HTTP 增量查业务表)补。WebSocket 不保证送达,业务表才是权威。
# 2. 整体架构

分四层看:
| 层 | 职责 | 关键实现 |
|---|---|---|
| 接入层 | 维护 WebSocket 连接、按用户下发到多端 | 复用芋道的 WebSocket 框架 WebSocketSenderApi |
| 推送层 | 业务事件 → 通知 DTO → 在线推送 | ImWebSocketService(事务感知 + 异步) |
| 存储层 | 消息单份入库;群消息固化可见成员 | im_private/group/channel_message + receiver_user_ids 快照 |
| 补偿层 | 离线 / 漏推后增量拉取 | 消息 minId 游标 + 状态事件 update_time + id 游标 |
端到端时序(以私聊发消息为例):

# 3. WebSocket 推送通道
# 3.1 统一通知协议
所有推送走同一个外层信封 im-notification,body 固定三段:conversationType(会话类型)+ contentType(内容类型)+ payload(业务数据),对应 ImNotificationWebSocketDTO:
conversationType:枚举 ImConversationTypeEnum(0=无会话,1=私聊,2=群聊,3=频道),决定这条通知归到哪个会话;好友 / 通话等非会话通知用 0。contentType:即《消息》§1 消息类型 的内容类型,决定payload怎么解析、怎么渲染。payload:随contentType变化的业务数据,由service/websocket/notification包下的各 Notification DTO 定义。
通知按 contentType 大致分这几类:
| 类别 | contentType(示例) | 是否入库 | 推送范围 |
|---|---|---|---|
| 聊天消息 | 文本 / 图片 / 语音 … 素材(101~125) | 是 | 会话内可见成员 |
| 撤回 | RECALL(2101) | 是 | 会话内 |
| 已读 / 回执 | READ(2201)/ RECEIPT(2200) | 否 | 相关方 |
| 好友通知 | FRIEND_REQUEST_RECEIVED / FRIEND_ADD 等(1201~1210) | 多数不入库(FRIEND_ADD 入库) | 当事人多端 |
| 群通知 | GROUP_CREATE / GROUP_MEMBER_KICK 等(1501~1533) | 多数(入群申请 / 昵称 / 个人设置变更不入库) | 全员 / 相关方 |
| 通话信令 | RTC_CALL / 参与者加入离开(1601~1603) | 否 | 通话参与方 |
| 通话记录 | RTC_CALL_START / RTC_CALL_END(1610 / 1611) | 是 | 会话内 |
完整内容类型清单见 《消息》§1。
前端拿到通知后,先按 conversationType 分流到对应会话,再按 contentType 路由渲染,不再依赖「好友 / 群通知的数字区间」做判断(前端入口在 @/views/im/home/store/websocketStore.ts 的 dispatchFrame)。
# 3.2 多端下发
ImWebSocketService 暴露三个推送方法,都按 userId 投递到该用户的所有在线端(多端一致):
sendNotificationAsync(userId, ...):推单个用户(如好友通知)。sendNotificationAsync(userIds, ...):推一批用户(如群消息推全体可见成员)。broadcastNotificationAsync(...):全局广播。
三者底层都走芋道框架的 WebSocketSenderApi#sendObject,按 userId 找到该用户的全部在线会话逐一下发:
// ImWebSocketServiceImpl#doSendNotification
webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
ImNotificationWebSocketDTO.TYPE, notification);
# 3.3 事务感知推送
推送一定延迟到数据库事务提交后再执行,否则客户端可能收到 WebSocket 通知时、回查数据库却查不到这条消息:
// ImWebSocketServiceImpl#executeAfterTransaction
private void executeAfterTransaction(Runnable task) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
task.run();
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
task.run(); // 事务提交后再异步推送
}
});
}
推送本身用 @Async 异步执行,不阻塞发送主流程;推送失败只记日志、不影响落库(在线送不到,由 pull 兜底)。
# 4. 在线推 + 离线拉
在线靠 WebSocket 推、离线靠 pull 补:客户端在断线重连 / 切前台时主动发起 pull(前端入口 @/views/im/home/composables/useMessagePuller.ts),服务端按数据类型各提供 /pull 接口增量返回。
WebSocket 只负责「在线的人立刻看到」。一个内容类型是否能离线补回来,由它的 persistent(是否入库)标志决定(见 《消息》):入库的能 pull 补,不入库的(如通话信令、输入中)离线即丢弃,这是预期行为。
各类消息 / 事件的补偿方式:
| 类别 | 是否离线可补 | 是否计未读 | 补偿方式 |
|---|---|---|---|
| 私聊 / 群聊 / 频道消息 | 是 | 是 | 按 minId 游标 pull |
| 撤回、群系统提示 | 是 | 否 | 作为可见事件随消息 pull |
| 好友申请 / 群申请 | 是 | 否 | 业务表按 update_time + id 增量拉 |
| 已读 / 回执 | 是 | 否 | 读位置表补偿 |
| 通话信令 / 输入中 | 否 | 否 | 仅在线推,离线丢弃 |
# 4.1 消息增量拉取
消息用 minId 单调游标增量拉取(私聊 pullPrivateMessageList、群聊 pullGroupMessageList、频道 pullChannelMessageList):客户端记住已拉到的最大消息 id,重连 / 切前台时带 minId 拉后续(前端拉取逻辑在 @/views/im/home/composables/useMessagePuller.ts)。
群消息的可见性靠发送时固化的 receiver_user_ids 快照(逗号分隔的成员 id),pull 时用 FIND_IN_SET 过滤:
SELECT m.* FROM im_group_message m
WHERE m.group_id IN (...) AND m.id > :minId
AND FIND_IN_SET(:userId, m.receiver_user_ids)
ORDER BY m.id ASC LIMIT :size;
这样「发送时谁可见」在落库那一刻就定死,避免按「加入时间」反复重算成员区间——入群前的消息看不到、退群后仍能补拉退群前可见的历史。
# 4.2 顺序性
消息按服务端自增主键 id 定序:落库那一刻就定了序,同一会话内按 id 升序即时间序。不依赖客户端 send_time(各端时钟可能不准、会导致乱序),客户端本地插入、pull 拉取都按 id 升序。
# 4.3 唯一性
靠客户端生成的 client_message_id 配合唯一键 (sender_id, client_message_id) 去重:并发 / 弱网重发同一条时,唯一键冲突即视为已发成功(捕获 DuplicateKeyException、回查已存在消息返回),保证幂等、不会落两条。
# 5. 状态事件的离线补偿
这类状态各有独立的 /pull 接口,同样在重连 / 切前台时增量补偿:会话读位置 ImConversationReadController、好友关系 ImFriendController、好友申请 ImFriendRequestController、加群申请 ImGroupRequestController。
好友 / 群关系、申请、读位置这类会更新旧行的状态,不能只靠「id 单调新增」补,于是统一用 update_time + id 复合游标增量拉取:
SELECT * FROM xxx
WHERE update_time > :lastUpdateTime
OR (update_time = :lastUpdateTime AND id > :lastId)
ORDER BY update_time ASC, id ASC LIMIT :limit;
对应各业务表都建了 idx_sync (..., update_time, id) 索引支撑游标扫描。前端按最后一条的 updateTime + id 推进高水位;且回扫前把游标向前回退几秒做 overlap,覆盖「同秒不同 id 更新」「端 / 服务端时间精度差」等边界,合并逻辑保持幂等。
删除即软删
删除、退群、拉黑、解散都不物理删,否则客户端无法增量补偿到「已失效」这一变化。

两种同步策略
状态补偿其实就两种处理方式:会话读位点、好友 / 群申请这类「权威在业务表、能逐行增量」的,统一按 update_time + id 游标拉;群信息 / 本人成员态、群成员列表这类「靠快照刷新、不宜逐行增量」的,改为本地缓存 + 失效标记、按需全量刷新(fetchGroupList / 按 groupId 刷)。
下面分别看:
# 5.1 会话读位点
已读位置统一持久化在 im_conversation_read(一个用户在一个会话的「最大已读消息 id」),是未读数与回执的唯一权威(不放 Redis,避免双写一致性与退群回退问题),也按 update_time + id 增量拉取补偿。两条规则:
- 单调递增:上报已读只能把位点改大,不能回退(防乱序 / 并发回退)。
- 双向补偿:重连 / 进会话时,既补「我的读位置」(恢复本端未读、红点),也按需补「对端 / 群成员读位置」(恢复私聊已读、群回执人数)。对端读位置补偿有界执行——只在打开会话或有未完成回执时惰性补,不在每次重连时全量拉所有群成员读位置。
前端:conversationStore 持有各会话读位点与未读数;重连时 useMessagePuller 的 pullStateEvents 调 conversationStore.pullConversationReads() 增量补本端读位置,对端 / 群成员读位置在打开会话时由 messageStore 惰性补。
# 5.2 群信息与本人成员态
「群信息」和「本人成员态」是两回事,但都不单开增量链路,靠广播 + 群列表快照校准:
- 群信息:群的共享属性(群名 / 公告 / 头像 / 全员禁言等)。变更时全员广播对应通知(GROUP_NAME_UPDATE / GROUP_NOTICE_UPDATE 等,见 《消息》§1),客户端收到后局部更新;重连时不单独拉群资料。
- 本人成员态:我在不在这个群(被拉进 / 被踢 / 退群 / 解散)。进首页 / 重连时刷新「我相关的群列表」快照(含已退群)一次性校准——离线期间被拉进新群、被踢出群都能对上。
前端:groupStore 持有「我的群列表」快照;重连时 pullStateEvents 先 markAllGroupInfoExpired()、再 fetchGroupList(true) 全量刷新校准成员态,GROUP_* 变更通知经 websocketStore 分发后局部更新。
# 5.3 群成员列表
群成员列表随群规模膨胀,不做全局增量 pull:按 groupId 本地缓存 + 失效标记,进群资料 / 成员列表时按 list?groupId= 全量刷新;收到成员变更通知先局部更新,拿不准就标记该群缓存过期、下次打开再强刷。这样既不引入 per-group 版本号同步,也避免重连时全局扫描所有群成员。
前端:同在 groupStore——重连只 markAllGroupMembersExpired() 标记过期、不全局拉;进入群会话 / 成员列表时再按 groupId 全量刷新。
# 5.4 好友与群申请
好友关系(im_friend)、好友申请(im_friend_request)、加群申请(im_group_request)都按 update_time + id 增量拉取补偿。申请类事件不塞进聊天消息流,权威状态始终以业务表为准;「你们已经是好友了」这类只是额外写一条聊天提示气泡,方便前端展示与交互,真实关系仍看业务表。
前端:friendStore / groupRequestStore 承接;重连时 pullStateEvents 并发调 friendStore.pullFriends()、pullFriendRequests()、groupRequestStore.pullGroupRequests() 按 update_time + id 增量补。