Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
samber/ro — Reactive Streams for Go
Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.
Why samber/ro (Streams vs Slices)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. samber/ro solves this with declarative, chainable stream operators.
When to use which tool:
| Scenario | Tool | Why |
|---|
| Transform a slice (map, filter, reduce) | INLINECODE2 | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling |
errgroup | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) |
samber/ro | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources |
samber/ro | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source |
samber/ro | Hot observables (Share/Subjects) handle multicast natively |
Key differences: lo vs ro
| Aspect | INLINECODE7 | INLINECODE8 |
|---|
| Data | Finite slices | Infinite streams |
| Execution |
Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return
(T, error) per call | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
Installation
CODEBLOCK0
Core Concepts
Four building blocks:
- 1. Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
- Observer — a consumer with three callbacks:
onNext(T), onError(error), INLINECODE12 - Operator — a function that transforms an observable into another observable, chained via INLINECODE13
- Subscription — the connection between observable and observer. Call
.Wait() to block or .Unsubscribe() to cancel
CODEBLOCK1
Cold vs Hot Observables
Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.
Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
|---|
| INLINECODE17 | Cold → hot with reference counting. Last unsubscribe tears down |
| INLINECODE18 |
Same as Share + buffers last N values for late subscribers |
|
Connectable() | Cold → hot, but waits for explicit
.Connect() call |
| Subjects | Natively hot — call
.Send(),
.Error(),
.Complete() directly |
| Subject | Constructor | Replay behavior |
|---|
| INLINECODE24 | INLINECODE25 | None — late subscribers miss past events |
| INLINECODE26 |
NewBehaviorSubject[T](initial) | Replays last value to new subscribers |
|
ReplaySubject |
NewReplaySubject[T](bufferSize) | Replays last N values |
|
AsyncSubject |
NewAsyncSubject[T]() | Emits only last value, only on complete |
|
UnicastSubject |
NewUnicastSubject[T](bufferSize) | Single subscriber only |
For subject details and hot observable patterns, see Subjects Guide.
Operator Quick Reference
| Category | Key operators | Purpose |
|---|
| Creation | INLINECODE34 , FromSlice, FromChannel, Range, Interval, Defer, INLINECODE40 | Create observables from various sources |
| Transform |
Map,
MapErr,
FlatMap,
Scan,
Reduce,
GroupBy | Transform or accumulate stream values |
| Filter |
Filter,
Take,
TakeLast,
Skip,
Distinct,
Find,
First,
Last | Selectively emit values |
| Combine |
Merge,
Concat,
Zip2–
Zip6,
CombineLatest2–
CombineLatest5,
Race | Merge multiple observables |
| Error |
Catch,
OnErrorReturn,
OnErrorResumeNextWith,
Retry,
RetryWithConfig | Recover from errors |
| Timing |
Delay,
DelayEach,
Timeout,
ThrottleTime,
SampleTime,
BufferWithTime | Control emission timing |
| Side effect |
Tap/
Do,
TapOnNext,
TapOnError,
TapOnComplete | Observe without altering stream |
| Terminal |
Collect,
ToSlice,
ToChannel,
ToMap | Consume stream into Go types |
Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
Common Mistakes
| Mistake | Why it fails | Fix |
|---|
Using ro.OnNext() without error handler | Errors are silently dropped — bugs hide in production | Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks |
Using untyped Pipe() instead of Pipe2/ INLINECODE91 |
Loses compile-time type safety, errors surface at runtime | Use
Pipe2,
Pipe3...
Pipe25 for typed operator chains |
| Forgetting
.Unsubscribe() on infinite streams | Goroutine leak — the observable runs forever | Use
TakeUntil(signal), context cancellation, or explicit
Unsubscribe() |
| Using
Share() when cold is sufficient | Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
| Using
samber/ro for finite slice transforms | Stream overhead (goroutines, subscriptions) for a synchronous operation | Use
samber/lo — it's simpler, faster, and purpose-built for slices |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain
ContextWithTimeout or
ThrowOnContextCancel in the pipeline |
Best Practices
- 1. Always handle all three events — use
NewObserver(onNext, onError, onComplete), not just OnNext. Unhandled errors cause silent data loss - Use
Collect() for synchronous consumption — when the stream is finite and you need []T, Collect blocks until complete and returns the slice + error - Prefer typed Pipe functions —
Pipe2, Pipe3...Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chains - Bound infinite streams — use
Take(n), TakeUntil(signal), Timeout(d), or context cancellation. Unbounded streams leak goroutines - Use
Tap/Do for observability — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoring - Prefer
samber/lo for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use lo. Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressure
Plugin Ecosystem
40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
|---|
| Encoding | JSON, CSV, Base64, Gob | INLINECODE121 |
| Network |
HTTP, I/O, FSNotify |
plugins/http,
plugins/io,
plugins/fsnotify |
| Scheduling | Cron, ICS |
plugins/cron,
plugins/ics |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops |
plugins/observability/...,
plugins/samber/oops |
| Rate limiting | Native, Ulule |
plugins/ratelimit/... |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template |
plugins/bytes,
plugins/strings, etc. |
| System | Process, Signal |
plugins/proc,
plugins/signal |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
Cross-References
- - → See
samber/cc-skills-golang@golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice - → See
samber/cc-skills-golang@golang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelines - → See
samber/cc-skills-golang@golang-samber-hot skill for in-memory caching (also available as an ro plugin) - → See
samber/cc-skills-golang@golang-concurrency skill for goroutine/channel patterns when reactive streams are overkill - → See
samber/cc-skills-golang@golang-observability skill for monitoring reactive pipelines in production
Persona: 你是一位 Go 工程师,当数据异步或无限流动时,你会使用响应式流。你使用 samber/ro 构建声明式管道,而不是手动编排 goroutine/channel,但你也知道何时简单的 slice + samber/lo 就足够了。
思维模式: 在设计高级响应式管道或选择冷/热 Observable、Subject 以及组合操作符时,使用 ultrathink。错误的架构会导致资源泄漏或事件丢失。
samber/ro — Go 的响应式流
ReactiveX 的 Go 实现。以泛型优先、类型安全、可组合的管道,用于异步数据流,具有自动背压、错误传播、上下文集成和资源清理功能。包含 150+ 操作符、5 种 Subject 类型、40+ 插件。
官方资源:
此技能并非详尽无遗。请参考库文档和代码示例以获取更多信息。Context7 可作为发现平台提供帮助。
为什么选择 samber/ro(流 vs 切片)
Go channel + goroutine 在处理复杂的异步管道时变得笨拙:手动关闭 channel、冗长的 goroutine 生命周期、跨嵌套 select 的错误传播,以及缺乏可组合的操作符。samber/ro 通过声明式、可链式调用的流操作符解决了这个问题。
何时使用哪种工具:
| 场景 | 工具 | 原因 |
|---|
| 转换切片(map, filter, reduce) | samber/lo | 有限的、同步的、热切的——无需流开销 |
| 带错误处理的简单 goroutine 扇出 |
errgroup | 标准库,轻量级,足以应对有界并发 |
| 无限事件流(WebSocket, tickers, 文件监听) | samber/ro | 声明式管道,支持背压、重试、超时、组合 |
| 来自多个异步源的实时数据丰富 | samber/ro | CombineLatest/Zip 组合依赖流,无需手动 select |
| 多个消费者共享一个源的发布/订阅 | samber/ro | 热 Observable(Share/Subjects)原生支持多播 |
关键区别:lo vs ro
| 方面 | samber/lo | samber/ro |
|---|
| 数据 | 有限切片 | 无限流 |
| 执行 |
同步,阻塞 | 异步,非阻塞 |
| 评估 | 热切(分配中间切片) | 惰性(按到达顺序处理项目) |
| 时序 | 立即 | 时间感知(delay, throttle, interval, timeout) |
| 错误模型 | 每次调用返回 (T, error) | 错误通道通过管道传播 |
| 用例 | 集合转换 | 事件驱动、实时、异步管道 |
安装
bash
go get github.com/samber/ro
核心概念
四个构建块:
- 1. Observable — 随时间发射值的数据源。默认为冷:每个订阅者从头开始触发独立的执行
- Observer — 具有三个回调的消费者:onNext(T), onError(error), onComplete()
- Operator — 将一个 Observable 转换为另一个 Observable 的函数,通过 Pipe 链式调用
- Subscription — Observable 和 Observer 之间的连接。调用 .Wait() 阻塞或 .Unsubscribe() 取消
go
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf(even-%d, x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println(Done!) }, // onComplete
))
// 输出: even-0, even-2, even-4, Done!
// 或者同步收集:
values, err := ro.Collect(observable)
冷 Observable vs 热 Observable
冷(默认):每次 .Subscribe() 都会启动一个新的独立执行。安全且可预测——默认使用。
热:多个订阅者共享一个执行。当源很昂贵(WebSocket、数据库轮询)或订阅者必须看到相同的事件时使用。
| 转换方式 | 行为 |
|---|
| Share() | 冷 → 热,带引用计数。最后一次取消订阅会拆除 |
| ShareReplay(n) |
与 Share 相同 + 为后来的订阅者缓冲最后 N 个值 |
| Connectable() | 冷 → 热,但等待显式的 .Connect() 调用 |
| Subjects | 原生热——直接调用 .Send(), .Error(), .Complete() |
| Subject | 构造函数 | 重放行为 |
|---|
| PublishSubject | NewPublishSubject[T]() | 无——后来的订阅者会错过过去的事件 |
| BehaviorSubject |
NewBehaviorSubject
T | 向新订阅者重放最后一个值 |
| ReplaySubject | NewReplaySubject
T | 重放最后 N 个值 |
| AsyncSubject | NewAsyncSubject[T]() | 仅在完成时发射最后一个值 |
| UnicastSubject | NewUnicastSubject
T | 仅限单个订阅者 |
有关 Subject 的详细信息和热 Observable 模式,请参阅 Subjects 指南。
操作符快速参考
| 类别 | 关键操作符 | 目的 |
|---|
| 创建 | Just, FromSlice, FromChannel, Range, Interval, Defer, Future | 从各种源创建 Observable |
| 转换 |
Map, MapErr, FlatMap, Scan, Reduce, GroupBy | 转换或累积流值 |
| 过滤 | Filter, Take, TakeLast, Skip, Distinct, Find, First, Last | 选择性地发射值 |
| 组合 | Merge, Concat, Zip2–Zip6, CombineLatest2–CombineLatest5, Race | 合并多个 Observable |
| 错误 | Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig | 从错误中恢复 |
| 时序 | Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime | 控制发射时序 |
| 副作用 | Tap/Do, TapOnNext, TapOnError, TapOnComplete | 观察而不改变流 |
| 终端 | Collect, ToSlice, ToChannel, ToMap | 将流消费为 Go 类型 |
使用带类型的 Pipe2, Pipe3 ... Pipe25 以获得操作符链的编译时类型安全。无类型的 Pipe 使用 any 并丢失类型检查。
有关完整操作符目录(150+ 操作符及签名),请参阅 操作符指南。
常见错误
| 错误 | 失败原因 | 修复 |
|---|
| 使用 ro.OnNext() 而没有错误处理 | 错误被静默丢弃——bug 隐藏在生成环境中 | 使用包含所有 3 个回调的 ro.NewObserver(onNext, onError, onComplete) |
| 使用无类型的 Pipe() 而不是 Pipe2/Pipe3 |
丢失编译时类型安全,错误在运行时暴露 | 使用 Pipe2, Pipe3...Pipe25 进行带类型的操作符链 |
| 在无限流上忘记 .Unsubscribe() | goroutine 泄漏——Observable 永远运行 | 使用 TakeUntil(signal)、上下文取消或显式的 Unsubscribe() |
| 在冷流足够时使用 Share() | 不必要的复杂性,更难推理生命周期 | 仅当多个消费者需要同一流时才使用热 Observable |
| 对有限切片转换使用 samber/ro | 同步操作的流开销(goroutines, subscriptions) | 使用 samber/lo——它更简单、更快,并且专为切片构建 |
| 不为取消传播上下文 | 流忽略关闭信号,导致终止时资源泄漏 | 在管道中链式调用 ContextWithTimeout 或 ThrowOnContextCancel |
##