こんにちは。KOUKIです。
とあるWeb系企業で、Go言語を使った開発業務に従事しています。
本記事では、ループ処理内でgoroutineを使う際に考慮しておくべきことをまとめました^^
ちなみに、goroutineは、プログラムの処理を並行化させるGoの素晴らしい機能です。
goroutineを使わないバージョン
次のコードを見てください。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package main import ( "fmt" "log" "time" ) func main() { start := time.Now() var count int for i := 0; i < 100000; i++ { func() { time.Sleep(10 * time.Microsecond) count++ }() } end := time.Now() log.Printf("Done: %d times\n", count) log.Println(fmt.Sprintf("The end time %v(s)", end.Sub(start).Seconds())) } |
このコードは、10万回のループ後に、実行結果と経過時間を標準出力に表示する簡単なプログラムです。
1 2 3 |
$ go run main.go 2021/03/01 17:57:55 Done: 100000 times 2021/03/01 17:57:55 The end time 2.175957663(s) |
2.1秒で完了しました。けっこう早いですね^^
goroutineを使うバージョン
goroutineを使うと先ほどの処理をもっと早く実行できます。しかもかなり簡単に!
しかし、考慮すべき点があるので、一緒に考えていきましょう。
無名関数をgoroutine化
gorutine化には「go」キーワード使います。これを並行化させたい関数の前に付与するだけで、処理を並行化させることが可能です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func main() { start := time.Now() var count int for i := 0; i < 100000; i++ { // goroutine化 go func() { time.Sleep(10 * time.Microsecond) count++ }() } end := time.Now() log.Printf("Done: %d times\n", count) log.Println(fmt.Sprintf("The end time %v(s)", end.Sub(start).Seconds())) } |
プログラムを実行してみましょう。
1 2 3 |
$ go run main.go 2021/03/01 17:58:52 Done: 97635 times 2021/03/01 17:58:52 The end time 0.087922907(s) |
めっちゃ早くなっ….いや違いますね。
「Done: 97635 times」となっているので、何かがおかしいです。
考慮1: WaitGroupで完了待ちにする
実は、goroutineは別プロセスで動くスレッドです。メインプログラム(main)の動作が完了した時点で、スレッドが稼働中か否かに関係なくプログラムを停止してしまいます。
そのため、次の考慮が必要になります。
スレッドの完了を可能にするのが、syncパッケージのWaitGroupです。これで、別スレッドの実行が完了するのを待つことができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package main import ( "fmt" "log" "sync" "time" ) func main() { start := time.Now() var count int // WaitGroupを定義 var wg sync.WaitGroup for i := 0; i < 100000; i++ { // WaitGroupに追加 wg.Add(1) go func(wg *sync.WaitGroup) { // 処理完了を通知 defer wg.Done() time.Sleep(10 * time.Microsecond) count++ }(&wg) } // 全てDoneになるまで待機 wg.Wait() end := time.Now() log.Printf("Done: %d times\n", count) log.Println(fmt.Sprintf("The end time %v(s)", end.Sub(start).Seconds())) } |
プログラムを実行してみましょう!
1 2 3 |
$ go run main.go 2021/03/02 17:22:45 Done: 9567 times 2021/03/02 17:22:45 The end time 0.003757162(s) |
だめですね。。
考慮2: 排他制御をする
まだ、考慮すべきことがあります。
プログラム内で使用されているcount変数は、全てのgoroutineの中で同じメモリをみています。つまり、同じメモリに対して、書き込み、読み込み処理をしているわけです。
そのため、次の考慮が必要になります。
排他制御をかけるには、syncパッケージのMutexを使います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package main import ( "fmt" "log" "sync" "time" ) func main() { start := time.Now() var count int var wg sync.WaitGroup // 排他制御 var mu sync.Mutex for i := 0; i < 100000; i++ { wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() // 無名関数が完了したらUnlock defer mu.Unlock() time.Sleep(10 * time.Microsecond) mu.Lock() count++ }(&wg) } wg.Wait() end := time.Now() log.Printf("Done: %d times\n", count) log.Println(fmt.Sprintf("The end time %v(s)", end.Sub(start).Seconds())) } |
1 2 3 |
$ go run main.go 2021/03/02 17:45:28 Done: 100000 times 2021/03/02 17:45:28 The end time 0.036464458(s) |
今度こそOKですね。
考慮3: WaitGroupとMutexは一緒に使うべきか
ちなみに、WaitGroupを削除したらどうなるのでしょうか。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
func main() { start := time.Now() var count int var mu sync.Mutex for i := 0; i < 100000; i++ { go func() { // 無名関数が完了したらUnlock defer mu.Unlock() time.Sleep(10 * time.Microsecond) mu.Lock() count++ }() } end := time.Now() log.Printf("Done: %d times\n", count) log.Println(fmt.Sprintf("The end time %v(s)", end.Sub(start).Seconds())) } |
プログラムを実行します。
1 2 3 |
$ go run main.go 2021/03/02 17:46:45 Done: 87898 times 2021/03/02 17:46:45 The end time 0.048239669(s) |
やはり、うまく動きませんね。
そのため、次の考慮が必要になります。
考慮4: gorutineの最大数の制御
goroutineを作りすぎると逆に遅くなるらしいので、channelでgoroutineの最大数を制御してみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package main import ( "fmt" "log" "sync" "time" ) func main() { start := time.Now() var count int var wg sync.WaitGroup var mu sync.Mutex // goroutineの最大数の制御 // 最大数を10にする semaphore := make(chan struct{}, 10) for i := 0; i < 100000; i++ { // 10以上の場合、ここで処理をブロック semaphore <- struct{}{} wg.Add(1) go func(wg *sync.WaitGroup) { defer func() { wg.Done() mu.Unlock() // 処理が完了したらリリース <-semaphore }() time.Sleep(10 * time.Microsecond) mu.Lock() count++ }(&wg) } wg.Wait() end := time.Now() log.Printf("Done: %d times\n", count) log.Println(fmt.Sprintf("The end time %v(s)", end.Sub(start).Seconds())) } |
channelは、容量以上のデータを追加したり、逆に空のデータを読み込もうとした場合、処理がブロックされます。
このサンプルコードの例だと、容量を10と指定しているので、11以上のgoroutineを作られないようにしました。
プログラムを実行してみましょう。
1 2 3 |
$ go run main.go 2021/03/04 17:04:51 Done: 100000 times 2021/03/04 17:04:51 The end time 0.230159865(s) |
逆に遅くなってしまいましたね^^;
Channelでブロックしてるからだと思います。
しかし、goroutineの上限数の制御は、考慮に入れておいた方がいいです。
goroutineの最大数の制御は絶対にやったほうがいいことではありませんが、メモリの枯渇などマシンのリソースにシビアなシステムの場合は、goroutineに気を配ったほうが良いと思います。
考慮5: goroutineのエラーの受け取り方
goroutineは、別スレッドで動くプログラムです。エラーを受け取ることは可能なのでしょうか?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
package main import ( "errors" "log" ) func doErr() error { return errors.New("somthing happend and went error") } func main() { err := go doErr() if err != nil { log.Fatal(err) } } |
このコードは、コンパイルエラーになります。
1 |
expected ';', found 'go' |

エラーを受け取れないのか、、、と思っていたところ、以下のように実装してみました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func doErr() error { return errors.New("somthing happend and went error") } func main() { var wg sync.WaitGroup wg.Add(1) go func() { // doneをこちらに移動 defer func() { wg.Done() }() err := doErr() if err != nil { log.Fatal(err) } }() wg.Wait() } |
1 2 3 |
$ go run main.go 2021/03/04 17:32:40 somthing happend and went error exit status 1 |
普通に無名関数で包めばOKですね。
ちなみに、Channelで受け取る方法もあります。
考慮6: goroutineの複数のエラーの受け取り方
goroutineで発生した複数のエラーを受け取りたい場合は、multierrが便利です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
func main() { var wg sync.WaitGroup // 複数のエラーを受け取れるようにする var multiErr error semaphore := make(chan struct{}, 10) for i := 0; i < 10; i++ { // 動作確認のため、100000->10へ semaphore <- struct{}{} wg.Add(1) go func() { defer func() { wg.Done() <-semaphore }() err := doErr() if err != nil { // Appendメソッドでエラーを追加 multiErr = multierr.Append(multiErr, err) } }() } wg.Wait() // エラー発生をチェック if multiErr != nil { log.Fatal(multiErr) } } |
「multierr.Append」で、発生した複数のエラーを束ねることができます。
実行結果を見てみましょう。
1 2 3 |
$ go run main.go 2021/03/05 11:55:55 somthing happend and went error; somthing happend and went error; somthing happend and went error; somthing happend and went error; somthing happend and went error; somthing happend and went error; somthing happend and went error exit status 1 |
こんな感じで、エラーを束ねられます。
まとめ
以下、まとめです。
* データの書き込み、読み込み時には排他制御をかける
* WaitGroupとMutexは一緒に使う
* goroutineの最大数に上限をもうけたほうが良い
* goroutineでエラーを受け取りたい場合は、無名関数で包む
* goroutineで複数のエラーを受け取るのに、multierrが便利
もっと考慮するべきことがあるだろ!という感じかもしれませんが、ひとまずこの辺にします^^
また何か気がついたら追加していきますので、よろしくお願いします!
それでは、また!
コメントを残す
コメントを投稿するにはログインしてください。