golang-samber-ro

Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5…

INSTALLATION
npx skills add https://github.com/samber/cc-skills-golang --skill golang-samber-ro
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

$28

When to use which tool:

Scenario

Tool

Why

Transform a slice (map, filter, reduce)

samber/lo

Finite, synchronous, eager — no stream overhead needed

Simple goroutine fan-out with error handling

errgroup

Standard lib, lightweight, sufficient for bounded concurrency

Infinite event stream (WebSocket, tickers, file watcher)

samber/ro

Declarative pipeline with backpressure, retry, timeout, combine

Real-time data enrichment from multiple async sources

samber/ro

CombineLatest/Zip compose dependent streams without manual select

Pub/sub with multiple consumers sharing one source

samber/ro

Hot observables (Share/Subjects) handle multicast natively

Key differences: lo vs ro

Aspect

samber/lo

samber/ro

Data

Finite slices

Infinite streams

Execution

Synchronous, blocking

Asynchronous, non-blocking

Evaluation

Eager (allocates intermediate slices)

Lazy (processes items as they arrive)

Timing

Immediate

Time-aware (delay, throttle, interval, timeout)

Error model

Return (T, error) per call

Error channel propagates through pipeline

Use case

Collection transforms

Event-driven, real-time, async pipelines

Installation

go get github.com/samber/ro

Core Concepts

Four building blocks:

  • Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
  • Observer — a consumer with three callbacks: onNext(T), onError(error), onComplete()
  • Operator — a function that transforms an observable into another observable, chained via Pipe
  • Subscription — the connection between observable and observer. Call .Wait() to block or .Unsubscribe() to cancel
observable := ro.Pipe2(

    ro.RangeWithInterval(0, 5, 1*time.Second),

    ro.Filter(func(x int) bool { return x%2 == 0 }),

    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),

)

observable.Subscribe(ro.NewObserver(

    func(s string) { fmt.Println(s) },      // onNext

    func(err error) { log.Println(err) },    // onError

    func() { fmt.Println("Done!") },         // onComplete

))

// Output: "even-0", "even-2", "even-4", "Done!"

// Or collect synchronously:

values, err := ro.Collect(observable)

Cold vs Hot Observables

Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.

Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.

Convert with

Behavior

Share()

Cold → hot with reference counting. Last unsubscribe tears down

ShareReplay(n)

Same as Share + buffers last N values for late subscribers

Connectable()

Cold → hot, but waits for explicit .Connect() call

Subjects

Natively hot — call .Send(), .Error(), .Complete() directly

Subject

Constructor

Replay behavior

PublishSubject

NewPublishSubject[T]()

None — late subscribers miss past events

BehaviorSubject

NewBehaviorSubject[T](initial)

Replays last value to new subscribers

ReplaySubject

NewReplaySubject[T](bufferSize)

Replays last N values

AsyncSubject

NewAsyncSubject[T]()

Emits only last value, only on complete

UnicastSubject

NewUnicastSubject[T](bufferSize)

Single subscriber only

For subject details and hot observable patterns, see Subjects Guide.

Operator Quick Reference

Category

Key operators

Purpose

Creation

Just, FromSlice, FromChannel, Range, Interval, Defer, Future

Create observables from various sources

Transform

Map, MapErr, FlatMap, Scan, Reduce, GroupBy

Transform or accumulate stream values

Filter

Filter, Take, TakeLast, Skip, Distinct, Find, First, Last

Selectively emit values

Combine

Merge, Concat, Zip2Zip6, CombineLatest2CombineLatest5, Race

Merge multiple observables

Error

Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig

Recover from errors

Timing

Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime

Control emission timing

Side effect

Tap/Do, TapOnNext, TapOnError, TapOnComplete

Observe without altering stream

Terminal

Collect, ToSlice, ToChannel, ToMap

Consume stream into Go types

Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.

For the complete operator catalog (150+ operators with signatures), see Operators Guide.

Common Mistakes

Mistake

Why it fails

Fix

Using ro.OnNext() without error handler

Errors are silently dropped — bugs hide in production

Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks

Using untyped Pipe() instead of Pipe2/Pipe3

Loses compile-time type safety, errors surface at runtime

Use Pipe2, Pipe3...Pipe25 for typed operator chains

Forgetting .Unsubscribe() on infinite streams

Goroutine leak — the observable runs forever

Use TakeUntil(signal), context cancellation, or explicit Unsubscribe()

Using Share() when cold is sufficient

Unnecessary complexity, harder to reason about lifecycle

Use hot observables only when multiple consumers need the same stream

Using samber/ro for finite slice transforms

Stream overhead (goroutines, subscriptions) for a synchronous operation

Use samber/lo — it's simpler, faster, and purpose-built for slices

Not propagating context for cancellation

Streams ignore shutdown signals, causing resource leaks on termination

Chain ContextWithTimeout or ThrowOnContextCancel in the pipeline

Best Practices

  • Always handle all three events — use NewObserver(onNext, onError, onComplete), not just OnNext. Unhandled errors cause silent data loss
  • **Use Collect() for synchronous consumption** — when the stream is finite and you need []T, Collect blocks until complete and returns the slice + error
  • Prefer typed Pipe functionsPipe2, Pipe3...Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chains
  • Bound infinite streams — use Take(n), TakeUntil(signal), Timeout(d), or context cancellation. Unbounded streams leak goroutines
  • **Use Tap/Do for observability** — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoring
  • **Prefer samber/lo for simple transforms** — if the data is a finite slice and you need Map/Filter/Reduce, use lo. Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressure

Plugin Ecosystem

40+ plugins extend ro with domain-specific operators:

Category

Plugins

Import path prefix

Encoding

JSON, CSV, Base64, Gob

plugins/encoding/...

Network

HTTP, I/O, FSNotify

plugins/http, plugins/io, plugins/fsnotify

Scheduling

Cron, ICS

plugins/cron, plugins/ics

Observability

Zap, Slog, Zerolog, Logrus, Sentry, Oops

plugins/observability/..., plugins/samber/oops

Rate limiting

Native, Ulule

plugins/ratelimit/...

Data

Bytes, Strings, Sort, Strconv, Regexp, Template

plugins/bytes, plugins/strings, etc.

System

Process, Signal

plugins/proc, plugins/signal

For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.

For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.

If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.

Cross-References

  • → See samber/cc-skills-golang@golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
  • → See samber/cc-skills-golang@golang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelines
  • → See samber/cc-skills-golang@golang-samber-hot skill for in-memory caching (also available as an ro plugin)
  • → See samber/cc-skills-golang@golang-concurrency skill for goroutine/channel patterns when reactive streams are overkill
  • → See samber/cc-skills-golang@golang-observability skill for monitoring reactive pipelines in production
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card