こんにちは。KOUKIです。
とあるWeb系企業でGoエンジニアをしています。
Goの最も特徴的な機能の一つが平行処理を行うためのgoroutineです。今回は、Goroutineを使いこなすためのポイントやChannelを使ったデータの受け渡し、シンプルな平行処理を書くために知っておくべきことなどを紹介したいと思います。

Goの平行処理におけるデータ競合問題
Goで平行処理を書く上で気をつけたいことは、複数のgoroutineから変数へアクセスする際のデータ競合でしょう。
Goにはこのようなデータ競合を避けるためのChannelやMutexが提供されています。
Metexは、複数のgoroutine間でメモリを共有した領域を保護し、値の状態を担保する仕組みです。しかし、このメモリを共有する行為自体がプログラムを複雑にし、バグを発生させやすくします。
公式サイトにも以下の注意文が記載されていました。
Do not communicate by sharing memory; instead, share memory by communicating.
「メモリを共有することでコミュニケーションを図るのではなく、コミュニケーションすることでメモリを共有せよ!」
このコミュニケーションとは、goroutine間のやりとりを指します。
つまり、メモリを共有して複数のgoroutineがアクセスするのではなく、goroutine間でコミュニケーションを行い、メモリ内容を共有することで平行処理をしなさい、ということです。
Channelは上記のアプローチで平行処理を行うので、Mutexよりも推奨されています。しかし、場合によってはMutexの方が簡潔に書ける場合があるので、必要に応じて使い分けた方が良さそうです。
Gorouine
前述の通り、Goroutineにより平行処理を行えます。
使い方はかなりシンプルで、「go」という予約語を関数呼び出しの前に付与すると、goroutineが生成されます。
1 2 |
# 例 go Hello() |
goでは、mainパッケージのmain関数から処理が呼び出されますが、このmain関数自身もgoroutineであり、main goroutineと呼ばれています。
このmain goroutineが完了すると他のgoroutine(child goroutine)の起動の有無に関係なく、強制的にプログラムが終了します。
起動したchild goroutineが完了するまでmain goroutineを終了させない方法として、Channelが用いられます。
Channel
Channelを使うと複数のgoroutine間で値の送受信ができます。Channelを介して、goroutine間でコミュニケーションを計り、送信側のgoroutineから受信側のgoroutineへ処理をリクエストする、ということも可能になります。
作成
Chanelは、make関数で作成することができます。
1 2 |
# 例(int型) - バッファなしChannel ch := make(chan int) |
makeの第二引数はCapacity(容量)を指定でき、保存できるデータ数を指定できます。
1 2 |
# 例(int型) - バッファありChannel ch := make(chan int, 2) |
送信
データの送信は「<-」オペレータを使います。
1 |
ch <- 100 |
バッファなしChannelの場合、送信している値が受信されるまで後続処理をブロックします。
停止
close関数を使うとChannelの利用を停止し、受信側に停止したことを知らせます。
1 |
close(ch) |
このcloseしたChannelに対して、値の送信や2度目のcloseを行うなどすると、panicが発生しプログラムが停止します。
受信
データの受信も「<-」オペレータを使います。ただし、データの向きに注意が必要です。
1 2 |
# 受信(送信の場合は、ch <- n) n := <-ch |
同時処理
selectを使うと複数のChannelから同時に受信・送信ができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
<meta charset="utf-8">// 送信 ch1 := make(chan int) ch2 := make(chan int) select { case ch1 <- 100: case ch2 <- 200: } // 受信 select { case n, ok := <-ch1: fmt.Println(n, ok) case n, ok := <-ch2: fmt.Println(n, ok) } |
range
Channelがcloseされるまで受信を行うのに、rangeが便利です。
1 |
for n := range ch {...} |
Channel制御
Channelを受信あるいは送信のみに限定したい場合は、一方向Channel型を使います。これは関数のパラメータや戻り値に指定します。
1 2 3 4 5 6 7 8 9 |
// 受信のみ可 func send(ch chan<-int, n int) { ch <-n } / 送信のみ可 func receive(ch <-chan int) int { return <-ch } |
プログラムの流れが見やすくなるので、結構おすすめな書き方です。
Channel Pattern
Channelの扱いにはPatternがあり、同じようなコードをよく見かける機会があると思います。
select-default
selectとChannelはベストパートナーです。
1 2 3 4 5 6 7 8 9 10 |
ch := make(chan int) select { case ch <- 100: // 値を送信 fmt.Println("sent") case <-ch: fmt.Println("received") default: // Channelに送信(または受信)されない場合、defaultの処理が実行される ... } |
defaultを付与すると、ブロックされるChannelしかなくなった場合に行う処理を記述できるようです。
for-select
定期実行される処理を書きたい場合は、for-selectを組み合わせます。
1 2 3 4 5 6 |
for { select { case <-time.Tick(1*time.Second): fmt.Println("Waiting...") ... } |
nil channel
Channelのゼロ値は、nilです。nil channelを使った送信及び受信は、永久にブロックできるようです。
1 2 3 4 5 6 |
select { case <-ch1: ch1 = nil // 永久にブロック case <- ch2: .... } |
上記の例では、ch1が受信したらch1にnilを代入し、ch1からの受信をブロックするようにしました。
broadcast
closeされたChannelを受信するとブロックされずにnilが返却されます。そして、Channelがcloseされたことを複数のgoroutineへ一斉に伝えることができます。
closeするには、close関数を使えばOKです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func main() { done := make(chan struct{}) go something(1, done) go something(2, done) <meta charset="utf-8"> go something(3, done) // Channelを閉じる close(done) time.Sleep(1 * time.Second) } func <meta charset="utf-8">something(n int, don <-chan struct{}){ for { select { case <-done: fmt.Printf("done %f", n) return default: time.Sleep(100 * time.Millsecond) } } } |
goroutineの管理
goroutineを乱用すると複雑なコードになりがちです。しかし、goroutineを使わないという選択肢もまたありません。
なるべく簡単に、わかりやすくgoroutineを使う必要があります。
sync.WaitGroup
sync.WaitGroupは複数のgoroutineを管理します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // wgに処理の完了を通知する fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) } func main() { var wg sync.WaitGroup // wgを作成 for i := 1; i <= 5; i++ { wg.Add(1) // goroutine数を管理 go worker(i, &wg) } wg.Wait() // Doneが呼ばれるまで待つ } |
errgroup.Group
英列に実行する関数からエラーを返却できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func main() { var eg errgroup.Group // goroutineとerrを管理 for i := 0; i < 10; i++ { n := i eg.Go(func() error { // goroutineを作成 return do(n) }) } if err := eg.Wait(); err != nil { // Waitで全てのgorouineの完了を待つ(最初に返却されたエラーのみを返す) fmt.Println(err.Error()) } } func do(n int) error { if n%2 == 0 { return errors.New("err") } time.Sleep(1 * time.Second) log.Printf("%d called", n) return nil } |
semaphore.Weighted
特定リソースへの同時アクセスを制御する方法として、セマフォが採用されることがしばしばあります。
semaphoreパッケージのWeightedは、特定の処理が動いている場合は、同時実行数を少なくしたいなどの要望を叶えます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
func main() { sem := semaphore.NewWeighted(5) // トークン数5つのセマフォを作成 go do(sem, func() { time.Sleep(1 * time.Second) }, 1) ... time.Sleep(5 * time.Second) } func do(sem *semaphore.Weighted, f func(), w int64) { if err := sem.Acquire(context.Background(), w); err != nil { // トークンを獲得 log.Println(err) return } defer sem.Release(w) log.Printf("acquired %d", w) f() } |
上記のw変数は、取得したいトークン数である重みを示しており、上記の例では1の重みでトークンを取得しようとしています。この重みを増やすほど、goroutineの同時実行数は少なくなります。なぜなら、取得可能なトークン数が取得したいトークン数より少ない場合、取得したいトークン数以上になるまでgoroutineをブロックするからです。
sync.Once
複数の後ルーチンがアクセスする可能性があるものの、一度だけ実行できれば良いような処理を実装したい場合は、sync.Onceが便利です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func main() { var once sync.Once onceBody := func() { fmt.Println("Only once") } done := make(chan bool) for i := 0; i < 10; i++ { go func() { once.Do(onceBody) done <- true }() } for i := 0; i < 10; i++ { <-done } } |
1 2 3 |
Output: Only once |
アプリケーションの実行中に一度だけ呼び出すべきものを扱うときに有効です。
singleflight.Group
短期間に重複してAPIを呼び出してしまうような状況で、これを抑制したい場合、singleflight.Groupを使いましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
func main() { var g group g.do("foo") g.do("bar") g.Wait() g.Forget() g.do("hoge") g.do("fuga") g.Wait() } type group struct { singleflight.Group sync.WaitGroup } func (g *group) do(s string) { g.Add(1) go func() { defer g.Done() v, err, shared := g.Do("key", func() (interface{}, error) { time.Sleep(1 * time.Second) log.Printf("cached %s\n", s) return s, nil }) log.Println(v, err, shared) }() } func (g *group) Forget() { g.Group.Forget("key") } |
このコードでは、”foo”及び”bar”のキーを持つDoメソッドが実行されます。そして、最もDoメソッドの呼び出しが早かったgoroutineが持つsの値がキャッシュされ、後続の1つのgorutineにはキャッシュされた値が返されます。
アプリケーションの実行中に更新処理が起きるようなものに有効です。
なお、Forgetメソッドを呼ぶとキャッシュされた値をクリアできます。
バグを発生さないように気をつける方法
バグが発生しにくいgoroutineの実装を行いたい場合、goroutineのライフサイクルを意識すると良いです。
方法1: 一方向Channel
基本的に、一方向Channelを使い、「送信」or「受信」のみできるように制限をかけましょう。
- closeしたChannelに送信してpanicが発生することを防ぐ
- closeしたChannelで受信しようとして永久にブロックされることを防ぐ
- 送信/受信どちらかの操作のみに集中すればよくなる
方法2: 受信より送信を先に終了させる
受信Channelは、closeされている場合でもpanicを起こさず、ゼロ値(nil)とboo値(closeしたかどうかを示すもの)を返します。そして、送信Channelは受信しているgoroutineがない場合はブロックされ続けます。
よって、送信しているgoroutine側でChannelをcloseし、その後、Channelのcloseを検知させて、受信goroutineを終了すると滞りなく全体を停止することが可能です。
方法3: goroutineリークを検出する
goroutineリークは、生成されたgoroutineが終了せずに滞留することです。サーバーのリソース枯渇に繋がったりするので、厄介な存在です。
これを検出するために、uber-go/goleakツールが便利です。
簡単なサンプルを提示しましょう。
1 2 3 4 5 6 7 |
// main.go import "time" func main() { // 以下のgoroutineが終了する前に、main goroutineが先に終了する go time.Sleep(1 * time.Second) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// main_test.go package main import ( "testing" "go.uber.org/goleak" ) func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } func Test_main(t *testing.T) { main() } |
TestMain関数でuber-go/goleakの関数を呼ぶとテストの終了時にgoroutineがリークしていないかチェックできます。ただし、このツールは絶対にgoroutineリークが起こらないことを保証するものではなく、テスト時に起きていないことをチェックできるだけです。テスト漏れやgoroutineリークが起きるケースをテストできない場合は効果範囲外ということですね。
1 2 3 4 5 6 7 8 9 10 11 12 |
$ go test PASS goleak: Errors on successful test run: found unexpected goroutines: [Goroutine 5 in state sleep, with time.Sleep on top of the stack: goroutine 5 [sleep]: time.Sleep(0x3b9aca00) /usr/local/Cellar/go/1.17/libexec/src/runtime/time.go:193 +0x12e created by github.com/hoge/sample2.main /Users/hoge/go/src/github.com/hoge/sample2/main.go:7 +0x45 ] exit status 1 FAIL github.com/hoge/sample2 0.740s |
方法:4 データ競合を検出する
データ競合とは、あるリソースに対して複数のgoroutineがアクセスしてしまうことです。
goコマンドの-raceフラグを有効にすると、テスト時にメモリ上でデータ競合が発生したか検出できます。
1 2 3 4 5 6 7 8 9 10 |
package main import "time" func main() { a := 100 go func() { a += 50 }() go func() { a = 50 }() time.Sleep(100 * time.Millisecond) } |
このコードは、Mutexを使わずにリソース(変数a)に対して並行してアクセスしているため、データ競合が発生します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
go test -race ================== WARNING: DATA RACE Write at 0x00c0000a8128 by goroutine 9: github.com/hoge/sample2.main.func2() /Users/hoge/go/src/github.com/hoge/sample2/main.go:8 +0x30 Previous write at 0x00c0000a8128 by goroutine 8: github.com/hoge/sample2.main.func1() /Users/hoge/go/src/github.com/hoge/sample2/main.go:7 +0x44 ... ================== --- FAIL: Test_main (0.10s) testing.go:1152: race detected during execution of test FAIL exit status 1 FAIL github.com/hoge/sample2 0.878s |
「testing.go:1152: race detected during execution of test」が表示されているのでデータ競合が発生したことがわかりますね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
package main import ( "sync" "time" ) func main() { var m sync.Mutex a := 100 go func() { m.Lock() a += 50 m.Unlock() }() go func() { m.Lock() a = 50 m.Unlock() }() time.Sleep(100 * time.Millisecond) } |
1 2 3 |
$ go test -race PASS ok github.com/hoge/sample2 0.617s |
「-race」フラグもテスト時にデータ競合したものだけ検出できることに注意が必要です。
まとめ
goroutineはとても便利ですが、とても難しいものであるとご理解いただけたと思います。
調子に乗ってgoroutineを使いまくると思わぬバグが発生したり、保守するときに地獄を見たりと危険がいっぱいです。
しかし、結構楽しくプログラミングができるので、実装している時はハイな気分で仕事ができると思いますw
それでは、また!
コメントを残す
コメントを投稿するにはログインしてください。