go-concurrency-patterns
Goの並行処理をgoroutine、channel、sync プリミティブ、contextを用いてマスターするスキルです。並行処理を伴うGoアプリケーションの構築、ワーカープールの実装、またはレースコンディションのデバッグを行う際に活用してください。
description の原文を見る
Master Go concurrency with goroutines, channels, sync primitives, and context. Use when building concurrent Go applications, implementing worker pools, or debugging race conditions.
SKILL.md 本文
Go Concurrency Patterns
Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.
このスキルを使用する場合
- 並行処理を含む Go アプリケーションを構築する
- ワーカープールとパイプラインを実装する
- goroutine のライフサイクルを管理する
- goroutine 間の通信にチャネルを使用する
- race condition をデバッグする
- グレースフルシャットダウンを実装する
コア概念
1. Go の並行処理プリミティブ
| プリミティブ | 目的 |
|---|---|
goroutine | 軽量並行実行 |
channel | goroutine 間の通信 |
select | チャネル操作の多重化 |
sync.Mutex | 相互排斥 |
sync.WaitGroup | goroutine の完了待機 |
context.Context | キャンセルとデッドライン |
2. Go の並行処理の格言
Don't communicate by sharing memory;
share memory by communicating.
クイックスタート
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
results := make(chan string, 10)
var wg sync.WaitGroup
// Spawn workers
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(ctx, i, results, &wg)
}
// Close results when done
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Println(result)
}
}
func worker(ctx context.Context, id int, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
select {
case <-ctx.Done():
return
case results <- fmt.Sprintf("Worker %d done", id):
}
}
パターン
パターン 1: ワーカープール
package main
import (
"context"
"fmt"
"sync"
)
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Err error
}
func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result, len(jobs))
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
default:
result := processJob(job)
results <- result
}
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
func processJob(job Job) Result {
// Simulate work
return Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed: %s", job.Data),
}
}
// Usage
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobs := make(chan Job, 100)
// Send jobs
go func() {
for i := 0; i < 50; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}
}
close(jobs)
}()
// Process with 5 workers
results := WorkerPool(ctx, 5, jobs)
for result := range results {
fmt.Printf("Result: %+v\n", result)
}
}
パターン 2: Fan-Out/Fan-In パイプライン
package main
import (
"context"
"sync"
)
// Stage 1: Generate numbers
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}()
return out
}
// Stage 2: Square numbers (can run multiple instances)
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done():
return
case out <- n * n:
}
}
}()
return out
}
// Fan-in: Merge multiple channels into one
func merge(ctx context.Context, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start output goroutine for each input channel
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Close out after all inputs are done
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Generate input
in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Fan out to multiple squarers
c1 := square(ctx, in)
c2 := square(ctx, in)
c3 := square(ctx, in)
// Fan in results
for result := range merge(ctx, c1, c2, c3) {
fmt.Println(result)
}
}
パターン 3: セマフォを使用した限定並行処理
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"sync"
)
type RateLimitedWorker struct {
sem *semaphore.Weighted
}
func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
return &RateLimitedWorker{
sem: semaphore.NewWeighted(maxConcurrent),
}
}
func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)
for _, task := range tasks {
// Acquire semaphore (blocks if at limit)
if err := w.sem.Acquire(ctx, 1); err != nil {
return []error{err}
}
wg.Add(1)
go func(t func() error) {
defer wg.Done()
defer w.sem.Release(1)
if err := t(); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(task)
}
wg.Wait()
return errors
}
// Alternative: Channel-based semaphore
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(chan struct{}, n)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
パターン 4: グレースフルシャットダウン
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type Server struct {
shutdown chan struct{}
wg sync.WaitGroup
}
func NewServer() *Server {
return &Server{
shutdown: make(chan struct{}),
}
}
func (s *Server) Start(ctx context.Context) {
// Start workers
for i := 0; i < 5; i++ {
s.wg.Add(1)
go s.worker(ctx, i)
}
}
func (s *Server) worker(ctx context.Context, id int) {
defer s.wg.Done()
defer fmt.Printf("Worker %d stopped\n", id)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// Cleanup
fmt.Printf("Worker %d cleaning up...\n", id)
time.Sleep(500 * time.Millisecond) // Simulated cleanup
return
case <-ticker.C:
fmt.Printf("Worker %d working...\n", id)
}
}
}
func (s *Server) Shutdown(timeout time.Duration) {
// Signal shutdown
close(s.shutdown)
// Wait with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
fmt.Println("Clean shutdown completed")
case <-time.After(timeout):
fmt.Println("Shutdown timed out, forcing exit")
}
}
func main() {
// Setup signal handling
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
server := NewServer()
server.Start(ctx)
// Wait for signal
sig := <-sigCh
fmt.Printf("\nReceived signal: %v\n", sig)
// Cancel context to stop workers
cancel()
// Wait for graceful shutdown
server.Shutdown(5 * time.Second)
}
パターン 5: キャンセル機能付き errgroup
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"net/http"
)
func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // Capture loop variables
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("creating request for %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
defer resp.Body.Close()
results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
return nil
})
}
// Wait for all goroutines to complete or one to fail
if err := g.Wait(); err != nil {
return nil, err // First error cancels all others
}
return results, nil
}
// With concurrency limit
func fetchWithLimit(ctx context.Context, urls []string, limit int) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(limit) // Max concurrent goroutines
results := make([]string, len(urls))
var mu sync.Mutex
for i, url := range urls {
i, url := i, url
g.Go(func() error {
result, err := fetchURL(ctx, url)
if err != nil {
return err
}
mu.Lock()
results[i] = result
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
パターン 6: sync.Map を使用した並行マップ
package main
import (
"sync"
)
// For frequent reads, infrequent writes
type Cache struct {
m sync.Map
}
func (c *Cache) Get(key string) (interface{}, bool) {
return c.m.Load(key)
}
func (c *Cache) Set(key string, value interface{}) {
c.m.Store(key, value)
}
func (c *Cache) GetOrSet(key string, value interface{}) (interface{}, bool) {
return c.m.LoadOrStore(key, value)
}
func (c *Cache) Delete(key string) {
c.m.Delete(key)
}
// For write-heavy workloads, use sharded map
type ShardedMap struct {
shards []*shard
numShards int
}
type shard struct {
sync.RWMutex
data map[string]interface{}
}
func NewShardedMap(numShards int) *ShardedMap {
m := &ShardedMap{
shards: make([]*shard, numShards),
numShards: numShards,
}
for i := range m.shards {
m.shards[i] = &shard{data: make(map[string]interface{})}
}
return m
}
func (m *ShardedMap) getShard(key string) *shard {
// Simple hash
h := 0
for _, c := range key {
h = 31*h + int(c)
}
return m.shards[h%m.numShards]
}
func (m *ShardedMap) Get(key string) (interface{}, bool) {
shard := m.getShard(key)
shard.RLock()
defer shard.RUnlock()
v, ok := shard.data[key]
return v, ok
}
func (m *ShardedMap) Set(key string, value interface{}) {
shard := m.getShard(key)
shard.Lock()
defer shard.Unlock()
shard.data[key] = value
}
パターン 7: タイムアウトとデフォルト値を使用した select
func selectPatterns() {
ch := make(chan int)
// Timeout pattern
select {
case v := <-ch:
fmt.Println("Received:", v)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
// Non-blocking send/receive
select {
case ch <- 42:
fmt.Println("Sent")
default:
fmt.Println("Channel full, skipping")
}
// Priority select (check high priority first)
highPriority := make(chan int)
lowPriority := make(chan int)
for {
select {
case msg := <-highPriority:
fmt.Println("High priority:", msg)
default:
select {
case msg := <-highPriority:
fmt.Println("High priority:", msg)
case msg := <-lowPriority:
fmt.Println("Low priority:", msg)
}
}
}
}
Race 検出
# テストを race detector で実行
go test -race ./...
# race detector 付きでビルド
go build -race .
# race detector 付きで実行
go run -race main.go
ベストプラクティス
すること
- context を使用する - キャンセルとデッドラインのため
- チャネルをクローズする - 送信側からのみ
- errgroup を使用する - エラーを伴う並行操作のため
- チャネルをバッファする - 数を知っている場合
- チャネルを優先する - ミューテックスより可能な限り
しないこと
- goroutine をリークしない - 常に終了パスを用意する
- 受信側からクローズしない - panic を引き起こす
- 共有メモリを使用しない - 必要な場合を除き
- context キャンセルを無視しない - ctx.Done() をチェック
- 同期に time.Sleep を使用しない - 適切なプリミティブを使用
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- wshobson
- リポジトリ
- wshobson/agents
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/wshobson/agents / ライセンス: MIT
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。