返回顶部
t

teamgram-messaging-sync消息同步层

Documents the message delivery and sync distribution layer in Teamgram Server, covering Kafka Inbox-T/Sync-T topics, inbox/outbox model, and the complete message send flow.

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
182
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

teamgram-messaging-sync

消息投递与同步层:messenger.msg / inbox / sync

概述

Teamgram 的消息投递使用 Kafka 做异步解耦,分为两个主要 topic:

  • - Inbox-T:写路径(发送/编辑/删除等异步投递)
  • Sync-T:推路径(updates/rpcResult 分发回在线 session)

messenger.msg 配置

teamgramd/etc/msg.yaml 要点:

yaml
InboxConsumer:
Topics:
- Inbox-T
Brokers:
- 127.0.0.1:9092
Group: Inbox-MainCommunity-S
InboxClient:
Topic: Inbox-T
SyncClient:
Topic: Sync-T

消息发送完整链路

text
Client
-> TL: messages.sendMessage / messages.sendMedia / ...
-> gnetway -> session -> bff.messages
-> bff.messages
-> biz_service (dialog/message/chat/user)
-> msg service (异步投递 + Kafka)
-> messenger.msg produces to Kafka Inbox-T
-> inbox consumes Inbox-T, writes inbox/outbox state
-> produces updates to Kafka Sync-T
-> sync consumes Sync-T, decides UpdatesMe/NotMe/PushRpcResult
-> sync calls session (gRPC) to push updates/rpc_result
-> session routes to correct sessionId
-> gnetway encrypt -> client

inbox helper:消费 Kafka Inbox-T

inbox 服务通过 Kafka ConsumerGroup 收到消息后,根据 protobuf messageName 分发到对应 core 方法(threading.RunSafe)。

典型模式:
go
case proto.MessageName((*inbox.TLInboxSendUserMessageToInboxV2)(nil)):
threading.RunSafe(func() {
c := core.New(ctx, svcCtx)
r := new(inbox.TLInboxSendUserMessageToInboxV2)
json.Unmarshal(value, r)
c.InboxSendUserMessageToInboxV2(r)
})

inbox 的核心职责:

  • - 写入收件方的 inbox 消息记录
  • 更新未读计数
  • 触发通知推送
  • 生成 Sync-T 消息通知在线用户

msg gRPC Service:TL 签名即接口契约

msg 子服务的 gRPC service impl 文件会保留 TL 签名注释,例如:

  • - // msg.sendMessageV2 ... = Updates;

这就是跨服务调用时请求/响应的格式契约。

同步分发层:messenger.sync(Kafka Sync-T)

sync 服务消费 Kafka Sync-T,按 protobuf message name 分发到:

  • - SyncUpdatesMe — 推送给发送者自己(确认消息已发送)
  • SyncUpdatesNotMe — 推送给非发送者(对方收到新消息通知)
  • SyncPushUpdates — 通用推送(状态变化等)
  • SyncPushRpcResult — RPC 结果推送(异步 RPC 的响应)
  • SyncBroadcastUpdates — 广播推送(群消息等多人场景)

最终通过 gRPC 调用 session 把 updates/rpc_result 推回在线会话。

典型模式:
go
case proto.MessageName((*sync.TLSyncPushRpcResult)(nil)):
threading.RunSafe(func() {
c := core.New(ctx, svcCtx)
r := new(sync.TLSyncPushRpcResult)
json.Unmarshal(value, r)
c.SyncPushRpcResult(r)
})

Kafka 事件总线总结

Topic生产者消费者内容
Inbox-Tmessenger.msginbox helper消息投递(发送/编辑/删除)
Sync-T
inbox helper / messenger.msg | messenger.sync | Updates/RPC结果分发 |

消息存储模型

Teamgram 使用 inbox/outbox 双写模型

  • - 发送方写入 outbox(自己的消息记录)
  • 接收方通过 Kafka Inbox-T 异步写入 inbox(对方的消息记录)
  • 每条消息在 messages 表中按 userid + peer 维度存储
  • dialogmessage_id 为每个对话内的消息序号(递增)

关键代码路径

  • - msg 服务:app/messenger/msg/
  • inbox 服务:app/messenger/msg/internal/dao/inbox/ (Kafka consumer)
  • sync 服务:app/messenger/sync/
  • 配置文件:teamgramd/etc/msg.yaml, teamgramd/etc/sync.yaml

源代码参考

  • - 仓库地址:https://github.com/teamgram/teamgram-server (Apache-2.0)

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 teamgram-messaging-sync-1776079509 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 teamgram-messaging-sync-1776079509 技能

通过命令行安装

skillhub install teamgram-messaging-sync-1776079509

下载

⬇ 下载 teamgram-messaging-sync v1.0.0(免费)

文件大小: 2.85 KB | 发布时间: 2026-4-15 14:37

v1.0.0 最新 2026-4-15 14:37
Initial release

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部