返回顶部
g

golang-samber-roGo反应式流

Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 subject types (Publish, Behavior, Replay, Async, Unicast), declarative pipelines via Pipe, 40+ plugins (HTTP, cron, fsnotify, JSON, logging), automatic backpressure, error propagation, and Go context integration. Apply when using or adopting samber/ro, when the codebase imports github.com/samber/ro, or when building asynchronous event-driven pi

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.3
安全检测
已通过
148
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

golang-samber-ro

Persona: 你是一位 Go 工程师,当数据异步或无限流动时,你会使用响应式流。你使用 samber/ro 构建声明式管道,而不是手动编排 goroutine/channel,但你也知道何时简单的 slice + samber/lo 就足够了。

思维模式: 在设计高级响应式管道或选择冷/热 Observable、Subject 以及组合操作符时,使用 ultrathink。错误的架构会导致资源泄漏或事件丢失。

samber/ro — Go 的响应式流

ReactiveX 的 Go 实现。以泛型优先、类型安全、可组合的管道,用于异步数据流,具有自动背压、错误传播、上下文集成和资源清理功能。包含 150+ 操作符、5 种 Subject 类型、40+ 插件。

官方资源:

此技能并非详尽无遗。请参考库文档和代码示例以获取更多信息。Context7 可作为发现平台提供帮助。

为什么选择 samber/ro(流 vs 切片)

Go channel + goroutine 在处理复杂的异步管道时变得笨拙:手动关闭 channel、冗长的 goroutine 生命周期、跨嵌套 select 的错误传播,以及缺乏可组合的操作符。samber/ro 通过声明式、可链式调用的流操作符解决了这个问题。

何时使用哪种工具:

场景工具原因
转换切片(map, filter, reduce)samber/lo有限的、同步的、热切的——无需流开销
带错误处理的简单 goroutine 扇出
errgroup | 标准库,轻量级,足以应对有界并发 |
| 无限事件流(WebSocket, tickers, 文件监听) | samber/ro | 声明式管道,支持背压、重试、超时、组合 |
| 来自多个异步源的实时数据丰富 | samber/ro | CombineLatest/Zip 组合依赖流,无需手动 select |
| 多个消费者共享一个源的发布/订阅 | samber/ro | 热 Observable(Share/Subjects)原生支持多播 |

关键区别:lo vs ro

方面samber/losamber/ro
数据有限切片无限流
执行
同步,阻塞 | 异步,非阻塞 |
| 评估 | 热切(分配中间切片) | 惰性(按到达顺序处理项目) |
| 时序 | 立即 | 时间感知(delay, throttle, interval, timeout) |
| 错误模型 | 每次调用返回 (T, error) | 错误通道通过管道传播 |
| 用例 | 集合转换 | 事件驱动、实时、异步管道 |

安装

bash
go get github.com/samber/ro

核心概念

四个构建块:

  1. 1. Observable — 随时间发射值的数据源。默认为冷:每个订阅者从头开始触发独立的执行
  2. Observer — 具有三个回调的消费者:onNext(T), onError(error), onComplete()
  3. Operator — 将一个 Observable 转换为另一个 Observable 的函数,通过 Pipe 链式调用
  4. Subscription — Observable 和 Observer 之间的连接。调用 .Wait() 阻塞或 .Unsubscribe() 取消

go
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf(even-%d, x) }),
)

observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println(Done!) }, // onComplete
))
// 输出: even-0, even-2, even-4, Done!

// 或者同步收集:
values, err := ro.Collect(observable)

冷 Observable vs 热 Observable

(默认):每次 .Subscribe() 都会启动一个新的独立执行。安全且可预测——默认使用。

:多个订阅者共享一个执行。当源很昂贵(WebSocket、数据库轮询)或订阅者必须看到相同的事件时使用。

转换方式行为
Share()冷 → 热,带引用计数。最后一次取消订阅会拆除
ShareReplay(n)
与 Share 相同 + 为后来的订阅者缓冲最后 N 个值 |
| Connectable() | 冷 → 热,但等待显式的 .Connect() 调用 |
| Subjects | 原生热——直接调用 .Send(), .Error(), .Complete() |
Subject构造函数重放行为
PublishSubjectNewPublishSubject[T]()无——后来的订阅者会错过过去的事件
BehaviorSubject
NewBehaviorSubjectT | 向新订阅者重放最后一个值 |
| ReplaySubject | NewReplaySubjectT | 重放最后 N 个值 |
| AsyncSubject | NewAsyncSubject[T]() | 仅在完成时发射最后一个值 |
| UnicastSubject | NewUnicastSubjectT | 仅限单个订阅者 |

有关 Subject 的详细信息和热 Observable 模式,请参阅 Subjects 指南

操作符快速参考

类别关键操作符目的
创建Just, FromSlice, FromChannel, Range, Interval, Defer, Future从各种源创建 Observable
转换
Map, MapErr, FlatMap, Scan, Reduce, GroupBy | 转换或累积流值 | | 过滤 | Filter, Take, TakeLast, Skip, Distinct, Find, First, Last | 选择性地发射值 | | 组合 | Merge, Concat, Zip2–Zip6, CombineLatest2–CombineLatest5, Race | 合并多个 Observable | | 错误 | Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig | 从错误中恢复 | | 时序 | Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime | 控制发射时序 | | 副作用 | Tap/Do, TapOnNext, TapOnError, TapOnComplete | 观察而不改变流 | | 终端 | Collect, ToSlice, ToChannel, ToMap | 将流消费为 Go 类型 |

使用带类型的 Pipe2, Pipe3 ... Pipe25 以获得操作符链的编译时类型安全。无类型的 Pipe 使用 any 并丢失类型检查。

有关完整操作符目录(150+ 操作符及签名),请参阅 操作符指南

常见错误

错误失败原因修复
使用 ro.OnNext() 而没有错误处理错误被静默丢弃——bug 隐藏在生成环境中使用包含所有 3 个回调的 ro.NewObserver(onNext, onError, onComplete)
使用无类型的 Pipe() 而不是 Pipe2/Pipe3
丢失编译时类型安全,错误在运行时暴露 | 使用 Pipe2, Pipe3...Pipe25 进行带类型的操作符链 | | 在无限流上忘记 .Unsubscribe() | goroutine 泄漏——Observable 永远运行 | 使用 TakeUntil(signal)、上下文取消或显式的 Unsubscribe() | | 在冷流足够时使用 Share() | 不必要的复杂性,更难推理生命周期 | 仅当多个消费者需要同一流时才使用热 Observable | | 对有限切片转换使用 samber/ro | 同步操作的流开销(goroutines, subscriptions) | 使用 samber/lo——它更简单、更快,并且专为切片构建 | | 不为取消传播上下文 | 流忽略关闭信号,导致终止时资源泄漏 | 在管道中链式调用 ContextWithTimeout 或 ThrowOnContextCancel |

##

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 golang-samber-ro-1776056246 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 golang-samber-ro-1776056246 技能

通过命令行安装

skillhub install golang-samber-ro-1776056246

下载

⬇ 下载 golang-samber-ro v1.0.3(免费)

文件大小: 22.66 KB | 发布时间: 2026-4-14 14:36

v1.0.3 最新 2026-4-14 14:36
- Update version to 1.0.3 in SKILL.md metadata.
- Add AskUserQuestion tool to allowed-tools.
- Minor correction of a typo: "more informations" → "more information" in documentation.

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部