V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
Junian
V2EX  ›  Go 编程语言

写了个 Go 库解决 LLM 流式输出断线重连的问题

  •  
  •   Junian · 18 小时 16 分钟前 · 1038 次点击
    最近在做一个项目,后端 Go ,前端 SSE 推流 LLM 的输出。遇到一个很烦的问题:用户刷新页面或者网络抖一下,流就断了,但后端还在跑,token 照烧不误。

    更麻烦的是我们的 LLM worker 和 HTTP handler 不在同一个实例上,负载均衡一转发,重连过来的请求根本找不到原来那个流。

    JS/TS 那边有 Vercel 的 resumable-stream 可以用,但 Go 这边翻了一圈啥也没有,就自己撸了一个:

    https://github.com/gtoxlili/streamhub

    思路不复杂:
    - Redis Streams 存 chunk ,断线重连的订阅者先 replay 历史再接实时数据
    - Redis Pub/Sub 传 cancel 信号,用户在 A 节点点停止,B 节点上的生成就能收到
    - 每个 producer 有个 generation ID 做 fencing token ,防止旧 producer 写脏数据
    - 同一个 session 只允许一个 producer 注册,不会重复调 LLM

    代码大概长这样:

    ```go
    // 生产端
    stream, created, err := hub.Register("chat:123", cancelFunc)
    if !created {
    return // 已经有人在跑了
    }
    defer stream.Close()
    stream.Publish("hello")

    // 消费端(任意实例)
    chunks, unsub := hub.Get("chat:123").Subscribe(128)
    defer unsub()
    for chunk := range chunks {
    // 先 replay 再 live
    fmt.Fprint(w, chunk)
    }
    ```

    目前还比较早期,API 可能还会改。做类似场景的同学可以看看,有想法欢迎提 issue 。
    10 条回复    2026-04-15 17:00:03 +08:00
    bv
        1
    bv  
       10 小时 45 分钟前
    给你提示一点:req.Context()
    raycheung
        2
    raycheung  
       10 小时 43 分钟前
    DefoliationM
        3
    DefoliationM  
       10 小时 32 分钟前 via Android   ❤️ 1
    哥们基础不牢呀,用 context 就行了。
    cryptovae
        4
    cryptovae  
       10 小时 14 分钟前   ❤️ 1
    为了解决 A 又引入了 B ,我真是服了

    简单事简单做,LLM 流式输出,管它前端断不断,最终结果肯定要入库,等前端用户刷新的时候自己去轮训这个 message 结果就行了
    yoshiyuki
        5
    yoshiyuki  
       9 小时 46 分钟前
    @cryptovae 用户不会像你想象的用的,他断了就会去新开会话,当前这个 token 就算白烧了
    boolean1135
        6
    boolean1135  
       8 小时 18 分钟前 via Android
    mark 一下,到时候回家学习学习 AI 开发
    cryptovae
        7
    cryptovae  
       7 小时 3 分钟前
    @yoshiyuki 你得的回答完全不切合主题,题主要的是异常断开后,能继续恢复,你给我说浪费 token 的事,在没有用户主动点击取消的情况下,用户只会觉得你凭什么不给我回答了,比如 deepseek ,chatgpt 都会在刷新页面自动接着回答
    Charlie17Li
        8
    Charlie17Li  
       3 小时 43 分钟前
    @yoshiyuki 如果是我我会刷新一下,不想多打一点字
    Junian
        9
    Junian  
    OP
       3 小时 4 分钟前
    谢谢各位反馈,统一回一下。

    @bv @DefoliationM req.Context() 确实能感知客户端断开,这个我清楚。但我要解决的不是"知道客户端走了",是断开之后的事:

    1. 重连后怎么把之前的 chunk 补回去( replay )
    2. 生产者和消费者不在同一个进程甚至不在同一台机器
    3. 用户在 A 节点点取消,B 节点的生成要能停

    单靠 context 搞不定这几个。streamhub 底下该用 context 的地方也在用,上面多了一层跨实例的状态管理。

    @raycheung durable-streams 是一个协议规范,定义 HTTP 层 offset-based streaming 的标准。streamhub 是直接用 Redis Streams 做存储的 Go 库,层级不太一样,但确实解决的问题有重叠。

    @cryptovae 最终结果入库+轮询在生成完之后确实没问题。流式输出的点在于生成过程中就要实时推,不是攒完再返回。刷新后自动接上( deepseek/chatgpt 就是这么做的)才是这个库在解决的场景。
    Junian
        10
    Junian  
    OP
       2 小时 54 分钟前
    @raycheung 哦刚仔细看了下 durable-streams ,确实比我之前说的要大不少,不只是协议规范,人家有完整的多语言客户端和 server 实现,还集成了 Vercel AI SDK

    不过定位还是不太一样。durable-streams 是一个通用的持久化流协议,偏基础设施层,你需要跑他的 server 。streamhub 就是一个库,直接用你现有的 Redis 就行,不引入新的依赖,适合已经有 Redis 的项目快速接入
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   Solana   ·   3391 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 46ms · UTC 11:54 · PVG 19:54 · LAX 04:54 · JFK 07:54
    ♥ Do have faith in what you're doing.