golang-samber-ro
samber/ro を用いた Go でのリアクティブストリーム・イベント駆動プログラミングを支援するスキルです。150以上のタイプセーフな演算子、コールド/ホットObservable、5種類のSubject、宣言的なパイプライン、40以上のプラグイン(HTTP・cron・fsnotify等)、自動バックプレッシャー、エラー伝播、Go contextとの統合に対応します。`github.com/samber/ro` をインポートしている場合や、非同期イベント駆動パイプライン・リアルタイムデータ処理・リアクティブアーキテクチャを Go で構築する際に適用してください(有限スライスの変換には golang-samber-lo スキルを使用)。
description の原文を見る
Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 subject types (Publish, Behavior, Replay, Async, Unicast), declarative pipelines via Pipe, 40+ plugins (HTTP, cron, fsnotify, JSON, logging), automatic backpressure, error propagation, and Go context integration. Apply when using or adopting samber/ro, when the codebase imports github.com/samber/ro, or when building asynchronous event-driven pipelines, real-time data processing, streams, or reactive architectures in Go. Not for finite slice transforms (-> See golang-samber-lo skill).
SKILL.md 本文
ペルソナ: あなたは、データが非同期にまたは無限に流れてくるときにリアクティブストリームを活用する Go エンジニアです。samber/ro を使用して手動の goroutine/channel 配線の代わりに宣言的パイプラインを構築しますが、単純なスライス + samber/lo で十分な場合がいつかを知っています。
思考モード: 高度なリアクティブパイプラインを設計したり、コールド/ホットオブザーバブル、サブジェクト、オペレータの組み合わせを選択する場合は ultrathink を使用してください。不適切なアーキテクチャはリソースリークや見落とされたイベントにつながります。
samber/ro — Go のためのリアクティブストリーム
ReactiveX の Go 実装。ジェネリクス優先、型安全、非同期データストリーム用の合成可能なパイプライン。自動バックプレッシャー、エラー伝播、context 統合、リソースクリーンアップを備えています。150+ のオペレータ、5 つのサブジェクトタイプ、40+ のプラグイン。
公式リソース:
このスキルは網羅的ではありません。詳細については、ライブラリドキュメントとコード例を参照してください。Context7 は発見可能性プラットフォームとして役に立ちます。
samber/ro を使う理由 (ストリーム vs スライス)
Go の channel + goroutine は複雑な非同期パイプラインに対して扱いにくくなります: 手動の channel クローズ、冗長な goroutine ライフサイクル、ネストされた select を横断したエラー伝播、合成可能なオペレータなし。samber/ro はこれを宣言的で連鎖可能なストリームオペレータで解決します。
どのツールをいつ使うか:
| シナリオ | ツール | 理由 |
|---|---|---|
| スライスを変換 (map、filter、reduce) | samber/lo | 有限、同期、eager — ストリームオーバーヘッドは不要 |
| エラー処理を伴うシンプルな goroutine ファンアウト | errgroup | 標準ライブラリ、軽量、制限された並行性に十分 |
| 無限イベントストリーム (WebSocket、ticker、ファイルウォッチャー) | samber/ro | バックプレッシャー、リトライ、タイムアウト、合成を備えた宣言的パイプライン |
| 複数の非同期ソースからのリアルタイムデータエンリッチメント | samber/ro | CombineLatest/Zip は手動 select なしで依存ストリームを合成 |
| 複数のコンシューマが 1 つのソースを共有する Pub/Sub | samber/ro | ホットオブザーバブル (Share/Subjects) はマルチキャストをネイティブに処理 |
主な相違点: lo vs ro
| 側面 | samber/lo | samber/ro |
|---|---|---|
| データ | 有限スライス | 無限ストリーム |
| 実行 | 同期、ブロッキング | 非同期、ノンブロッキング |
| 評価 | Eager (中間スライスを割り当て) | Lazy (アイテムが到着したら処理) |
| タイミング | 即座 | 時間認識 (delay、throttle、interval、timeout) |
| エラーモデル | 呼び出しごとに (T, error) を返す | エラー channel がパイプラインを伝播 |
| ユースケース | コレクション変換 | イベント駆動、リアルタイム、非同期パイプライン |
インストール
go get github.com/samber/ro
コアコンセプト
4 つの構成要素:
- Observable — 時間とともに値を発行するデータソース。デフォルトではコールド: 各サブスクライバーは最初からスクラッチで独立した実行をトリガー
- Observer — 3 つのコールバック:
onNext(T)、onError(error)、onComplete()を持つコンシューマー - Operator — observable を別の observable に変換する関数。
Pipeを使用して連鎖 - Subscription — observable と observer 間の接続。
.Wait()を呼び出してブロックするか、.Unsubscribe()でキャンセル
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"
// または同期的に収集:
values, err := ro.Collect(observable)
コールド vs ホットオブザーバブル
コールド (デフォルト): 各 .Subscribe() は新しい独立した実行を開始します。安全で予測可能 — デフォルトで使用してください。
ホット: 複数のサブスクライバーが 1 つの実行を共有します。ソースが高コスト (WebSocket、DB ポール) またはサブスクライバーが同じイベントを見る必要がある場合に使用してください。
| 変換方法 | 動作 |
|---|---|
Share() | コールド → ホット (参照カウント付き)。最後のアンサブスクライブはティアダウン |
ShareReplay(n) | Share と同じ + 後続サブスクライバー用に最後の N 値をバッファ |
Connectable() | コールド → ホット。ただし明示的な .Connect() 呼び出しを待機 |
| Subjects | ネイティブホット — .Send()、.Error()、.Complete() を直接呼び出し |
| Subject | コンストラクタ | リプレイ動作 |
|---|---|---|
PublishSubject | NewPublishSubject[T]() | なし — 後発サブスクライバーは過去イベントを見落とす |
BehaviorSubject | NewBehaviorSubject[T](initial) | 最後の値を新しいサブスクライバーにリプレイ |
ReplaySubject | NewReplaySubject[T](bufferSize) | 最後の N 値をリプレイ |
AsyncSubject | NewAsyncSubject[T]() | 最後の値のみ発行、完了時のみ |
UnicastSubject | NewUnicastSubject[T](bufferSize) | 単一サブスクライバーのみ |
Subject の詳細とホットオブザーバブルパターンについては、Subjects Guide を参照してください。
オペレータクイックリファレンス
| カテゴリ | 主要オペレータ | 目的 |
|---|---|---|
| Creation | Just、FromSlice、FromChannel、Range、Interval、Defer、Future | さまざまなソースから observable を作成 |
| Transform | Map、MapErr、FlatMap、Scan、Reduce、GroupBy | ストリーム値を変換または蓄積 |
| Filter | Filter、Take、TakeLast、Skip、Distinct、Find、First、Last | 選択的に値を発行 |
| Combine | Merge、Concat、Zip2–Zip6、CombineLatest2–CombineLatest5、Race | 複数の observable をマージ |
| Error | Catch、OnErrorReturn、OnErrorResumeNextWith、Retry、RetryWithConfig | エラーから復旧 |
| Timing | Delay、DelayEach、Timeout、ThrottleTime、SampleTime、BufferWithTime | 発行タイミングを制御 |
| Side effect | Tap/Do、TapOnNext、TapOnError、TapOnComplete | ストリームを変更せずに観察 |
| Terminal | Collect、ToSlice、ToChannel、ToMap | ストリームを Go 型に消費 |
オペレータチェーン全体で型安全性をコンパイル時に確保するため、型付きの Pipe2、Pipe3 ... Pipe25 を使用してください。型なしの Pipe は any を使用し、型チェックを失います。
完全なオペレータカタログ (シグネチャ付き 150+ オペレータ) については、Operators Guide を参照してください。
よくある間違い
| 間違い | 失敗する理由 | 修正方法 |
|---|---|---|
エラーハンドラなしで ro.OnNext() を使用 | エラーが暗黙的にドロップされ、バグが本番環境で隠される | 3 つのコールバックすべてで ro.NewObserver(onNext, onError, onComplete) を使用 |
型なしの Pipe() を Pipe2/Pipe3 の代わりに使用 | コンパイル時の型安全性を失い、エラーが実行時に浮上 | Pipe2、Pipe3...Pipe25 を型付きオペレータチェーンで使用 |
無限ストリームで .Unsubscribe() を忘れる | Goroutine リーク — observable は永遠に実行 | TakeUntil(signal)、context キャンセレーション、または明示的な Unsubscribe() を使用 |
コールドで十分な場合に Share() を使用 | 不要な複雑さ、ライフサイクルについて推論しづらい | 複数のコンシューマーが同じストリームを必要とする場合のみホットオブザーバブルを使用 |
有限スライス変換に samber/ro を使用 | 同期操作のためのストリームオーバーヘッド (goroutine、subscription) | samber/lo を使用 — スライス向けにシンプル、高速、目的別に構築 |
| キャンセレーションのために context を伝播しない | ストリームはシャットダウン信号を無視し、終了時にリソースリーク | パイプラインで ContextWithTimeout または ThrowOnContextCancel をチェーン |
ベストプラクティス
- 常に 3 つのイベントすべてを処理 —
OnNextだけでなくNewObserver(onNext, onError, onComplete)を使用。未処理のエラーは暗黙的なデータ損失を引き起こします - 同期消費には
Collect()を使用 — ストリームが有限で[]Tが必要な場合、Collectは完了までブロックしてスライス + エラーを返します - 型付き Pipe 関数を優先 —
Pipe2、Pipe3...Pipe25はコンパイル時に型の不一致をキャッチします。型なしのPipeは動的オペレータチェーンで予約してください - 無限ストリームをバウンド —
Take(n)、TakeUntil(signal)、Timeout(d)、または context キャンセレーションを使用。バウンドされていないストリームは goroutine をリーク - 観察性には
Tap/Doを使用 — ストリームを変更せずにログ、トレース、または計測を実施。エラー監視にはTapOnErrorをチェーン - 単純な変換には
samber/loを優先 — データが有限スライスで Map/Filter/Reduce が必要な場合はloを使用。データが時間を通じて到着するか、複数のソースから取得されるか、リトライ/タイムアウト/バックプレッシャーが必要な場合にroに達してください
プラグインエコシステム
40+ のプラグインが ro をドメイン固有のオペレータで拡張:
| カテゴリ | プラグイン | インポートパスプレフィックス |
|---|---|---|
| Encoding | JSON、CSV、Base64、Gob | plugins/encoding/... |
| Network | HTTP、I/O、FSNotify | plugins/http、plugins/io、plugins/fsnotify |
| Scheduling | Cron、ICS | plugins/cron、plugins/ics |
| Observability | Zap、Slog、Zerolog、Logrus、Sentry、Oops | plugins/observability/...、plugins/samber/oops |
| Rate limiting | Native、Ulule | plugins/ratelimit/... |
| Data | Bytes、Strings、Sort、Strconv、Regexp、Template | plugins/bytes、plugins/strings、他 |
| System | Process、Signal | plugins/proc、plugins/signal |
インポートパスと使用例を含む完全なプラグインカタログについては、Plugin Ecosystem を参照してください。
実世界のリアクティブパターン (リトライ+タイムアウト、WebSocket ファンアウト、グレースフルシャットダウン、ストリーム合成) については、Patterns を参照してください。
samber/ro でバグまたは予期しない動作が発生した場合は、github.com/samber/ro/issues で issue を開いてください。
相互参照
- → 有限スライス変換 (Map、Filter、Reduce、GroupBy) については
samber/cc-skills-golang@golang-samber-loスキルを参照 — データがスライスに既にある場合は lo を使用 - → モナド型 (Option、Result、Either) で ro パイプラインと合成する場合は
samber/cc-skills-golang@golang-samber-moスキルを参照 - → インメモリキャッシング (ro プラグインとしても利用可) については
samber/cc-skills-golang@golang-samber-hotスキルを参照 - → リアクティブストリームがやりすぎの場合の goroutine/channel パターンについては
samber/cc-skills-golang@golang-concurrencyスキルを参照 - → 本番環境でリアクティブパイプラインを監視する場合は
samber/cc-skills-golang@golang-observabilityスキルを参照
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- samber
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/samber/cc-skills-golang / ライセンス: MIT
関連スキル
hugging-face-trackio
Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。
btc-bottom-model
ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。
protein_solubility_optimization
タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。
research-lookup
Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。
tree-formatting
ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。
querying-indonesian-gov-data
インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。