こんにちは。KOUKIです。
とあるWeb系企業で、Goエンジニアをやっています。
今日は、Goroutineに関して基本から応用まで記事にしました。平行処理を完全にマスターしましょう!
<目次>
そもそもGoroutineって何?
Goroutineはプログラム内の処理を「平行」して実行できるGoの素晴らしい機能の一つです。
並列処理(マルチスレッド)とよく混同されがちなのですが、実行するプログラムを瞬間的に切り替える(コンテキストスイッチというらしい)ことで、複数のプログラムが同時に実行されているように動作します。しかし、起動している処理は常に一つです。
並列処理では、全ての処理が同時に実行されるイメージですね。この記事がわかりやすいです。
平行処理でプログラムを走らせるとリソースを遊ばせる時間が減るので、処理速度が上がります(逆に遅くなることもありますが)。
平行処理を二人乗り自転車で例えると、お姉ちゃん一人のケイデンスより妹ちゃんのケイデンスを合わせた方がより早く前に進める、そういった感じです。例え、わかりにくいですか? ^^;

Goroutineの基本
Hello Worldプログラム
「Hello World」文字列を出力する簡単なプログラムを実装します。
1 2 3 4 5 6 7 8 9 10 11 |
package main import "fmt" func main() { go helloworld() } func helloworld() { fmt.Println("Hello World!") } |
1 2 |
$ go run main.go Hello World! |
なんてことないHello Worldプログラムですが、実は既にGoroutineが動いています。
Goではmainパッケージのmain関数からプログラムが実行されますが、その時にMain Goroutineが起動します。つまり、Goのプログラムでは最低一つのGoroutineが起動することになります。
go キーワード
「処理を平行に走らせるってどうやるんだ?難しいんだろう?」という声が聞こえてきそうです。
いえいえ、超簡単です。
平行で走らせたい処理の呼び出しに「go」キーワードをつける、ただそれだけですよ♪
1 |
go helloworld() |
これだけで平行処理になるので、Goは素晴らしい言語だとお伝えしているわけです。
では、プログラムを実行してみましょう。
1 |
$ go run main.go |
「……………………….」。
結果が出力されない!
Goroutineのハンドリングは難しい
実は、Goroutineを扱う際には注意点があります。
それは、「処理の流れに注意せよ」です。
前述した通り、GoではMain Goroutineが必ず起動します。そして、goキーワードによりhelloworld関数が平行で走るわけですが、この処理の実行が完了する前にMain Goroutineが終了すると平行起動している処理の有無に関わらず、全ての処理が終了してしまうのです。
その証拠に、time.Sleepで処理を一時停止すると期待した結果が得られます。
1 2 3 4 5 6 7 8 9 10 11 12 |
package main import ( "fmt" "time" ) func main() { go helloworld() time.Sleep(time.Second) } |
1 2 |
$ go run main.go Hello World! |
無名関数でもOK
無名関数でGoroutineを動かすことができます。
1 2 3 4 5 6 7 |
func main() { go func(msg string) { fmt.Println(msg) }("Hello World!") time.Sleep(time.Second) } |
goroutineの起動のしやすさと相まって、お手軽感がありますよね。
Sleep制御ってダサくない?
先ほど、Sleepを使ってGoroutineの制御方法をお見せしましたが、実際のプログラムだと何秒間Sleepさせておけばいいのかわかりませんし、そもそも無駄なSleepはプログラムを遅延させるボトルネックになります。
ゆえに、SleepでGoroutineを制御することはあり得ません(と思う)。
対策として、GoではsyncパッケージのWaitGroupが提供されています。これを使うといい感じに実装できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package main import ( "fmt" "sync" ) func main() { // WaitGroup設定 var wait sync.WaitGroup // 監視するgoroutineを一つ追加 wait.Add(1) go func(msg string) { fmt.Println(msg) // goroutineが完了したことを知らせる wait.Done() }("Hello World!") // goroutineの完了を待つ wait.Wait() } |
いい感じですよね。プログラムを実行してみましょう。
1 2 |
$ go run main.go Hello World! |
コールバック関数
コールバック関数という言葉を聞いたことはないでしょうか。
他の関数に引数として渡される関数で、外側の関数で何らかの処理やアクションを実行します。
簡単な例を挙げてみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package main import ( "fmt" "strings" ) // 引数として渡される関数 <meta charset="utf-8">func greetings(v string) { fmt.Printf("Callback: %s\n", v) } // 外側の関数 func toUpperSync(word string, f func(string)) { f(strings.ToUpper(word)) } func main() { <meta charset="utf-8"><meta charset="utf-8"> toUpperSync("Hello Callbacks!", greetings) } |
上記の例はすぐに実行される同期型のコールバック関数です。
一方、goroutineで非同期型のコールバック関数を実装することもできます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package main import ( "fmt" "strings" "sync" ) var wait sync.WaitGroup func greetings(v string) { defer wait.Done() fmt.Printf("Callback: %s\n", v) } func toUpperSync(word string, f func(string)) { go func() { f(strings.ToUpper(word)) }() } func main() { wait.Add(1) toUpperSync("Hello Callbacks!", greetings) fmt.Println("Waiting async reponse...") wait.Wait() } |
プログラムを実行してみましょう。
1 2 3 |
$ go run -race main.go Waiting async reponse... Callback: HELLO CALLBACKS! |
OKですね。非同期のコールバックは処理が複雑になるので、個人的にはあまり実装したくないです^^;
ちなみに、無名関数を使うとより柔軟なコールバックが実装できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
package main import ( "fmt" "strings" "sync" ) var wait sync.WaitGroup func toUpperSync(word string, f func(string)) { go func() { f(strings.ToUpper(word)) }() } func main() { wait.Add(1) toUpperSync("Hello Callbacks!", func(v string) { toUpperSync(fmt.Sprintf("Callback: %s\n", v), func(v string) { fmt.Printf("Callback: %s\n", v) wait.Done() }) }) fmt.Println("Waiting async reponse...") wait.Wait() } |
1 2 3 |
$ go run -race main.go Waiting async reponse... Callback: CALLBACK: HELLO CALLBACKS! |
関数を数珠繋ぎに呼び出せてますね。
Mutex
Mutexは、排他制御機能を提供するGoの素晴らしい仕組みです。排他制御とは、あるリソース(例えば変数)に対して同時に読み書きすることを防いでくれるGoroutineで平行処理プログラムを実装する時には必須の知識になります。
簡単なサンプルを以下に示しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
package main import ( "fmt" "sync" "time" ) type Counter struct { sync.Mutex value int // 排他制御対象 } func main() { counter := Counter{} for i := 0; i < 10; i++ { go func(i int) { // 1 counter.Lock() defer counter.Unlock() counter.value++ }(i) } time.Sleep(1 * time.Second) // 2 counter.Lock() defer counter.Unlock() fmt.Println(counter.value) } |
Counter構造体のvalueオプションが排他制御対象です。
排他制御を設定した箇所は、2つあります。
- valueのインクリメント
- valueの読み込み
そして、MutexのLock/Unlockメソッドで排他制御をしています。
プログラムを実行してみましょう。
1 2 |
$ go run -race main.go 10 |
OKですね。
ちなみに、「-race」オプションは、競合が発生していないか調べてくれる便利なツールなので積極的に活用していきましょう。
次に、Lock/Unlockをコメントアウトしたバージョンを以下に示します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func main() { counter := Counter{} for i := 0; i < 10; i++ { go func(i int) { // 1 // counter.Lock() // defer counter.Unlock() counter.value++ }(i) } time.Sleep(1 * time.Second) // 2 counter.Lock() defer counter.Unlock() fmt.Println(counter.value) } |
プログラムを実行すると競合が発生するはずです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
$ go run -race main.go ================== WARNING: DATA RACE Read at 0x00c00013c018 by goroutine 8: main.main.func1() /Users/hoge/go/src/github.com/<meta charset="utf-8">hoge/go-algorithms/concurrency/sample1/main.go:21 +0x39 main.main·dwrap·1() /Users/<meta charset="utf-8">hoge/go/src/github.com/<meta charset="utf-8">hoge/go-algorithms/concurrency/sample1/main.go:22 +0x47 Previous write at 0x00c00013c018 by goroutine 7: main.main.func1() /Users/<meta charset="utf-8">hoge/go/src/github.com/<meta charset="utf-8">hoge/go-algorithms/concurrency/sample1/main.go:21 +0x51 main.main·dwrap·1() /Users/<meta charset="utf-8">hoge/go/src/github.com/<meta charset="utf-8">hoge/go-algorithms/concurrency/sample1/main.go:22 +0x47 Goroutine 8 (running) created at: main.main() /Users/<meta charset="utf-8">hoge/go/src/github.com/<meta charset="utf-8">hoge/go-algorithms/concurrency/sample1/main.go:17 +0x68 Goroutine 7 (finished) created at: main.main() /Users/<meta charset="utf-8">hoge/go/src/github.com/<meta charset="utf-8">hoge/go-algorithms/concurrency/sample1/main.go:17 +0x68 ================== 10 Found 1 data race(s) exit status 66 |
「Found 1 data race(s)」と表示されたので、思惑通りになりましたね!
競合が発生するとプログラムがクラッシュするので、排他制御が必須なわけです。
Channels
Goroutine間でデータの受け渡しをするにはどうすればいいのか、不思議に思ったかもしれません。
簡単です。Channelを使えばいいのです。
実装の方法
簡単なサンプルを以下に示します。
1 2 3 4 5 6 7 8 9 10 11 12 |
package main import "fmt" func main() { channel := make(chan string) go func() { channel <- "Hello World!" // 値の送信 }() message := <-channel // 値の受信 fmt.Println(message) } |
Channelを作成するには、Go組み込みのmake関数を使います。
そして、「<-」オペレータを使って値の送受信をしています。
プログラムを実行してみましょう。
1 2 |
$ go run main.go Hello World! |
deadlockにご注意を
Hello World!が出力されましたね。感がいい人は「なんでMain Goroutineが先に終了しないんだ?HelloWorldの時は何も出力されなかったじゃん。」と思われるかもしれません。
実は、Channelはブロック機能を持っていて、「message := <-channel 」で値を受信するまで処理をブロックします。そのため、Main Goroutineが完了しなかったというわけです。
すごく便利そうな機能ではありますが、制御を誤るとdeadlockが発生します。
1 2 3 4 5 6 7 8 9 10 11 12 |
package main import "fmt" func main() { channel := make(chan string) // go func() { // channel <- "Hello World!" // }() message := <-channel fmt.Println(message) } |
上記のコードは、「永久に値が受信されることのない」プログラムです。このような場合に、deadlockが発生します。
1 2 3 4 5 6 7 |
$ go run main.go fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /Users/hoge/go/src/github.com/hoge/go-algorithms/concurrency/sample1/main.go:10 +0x36 exit status 2 |
バッファ付きチャネル
Channelには、サイズ(容量)を設定できます。例えば、サイズを1に設定したchannelを以下に示します。
1 |
channel := make(chan string, 1) |
このバッファが一杯になった場合、このチャネルへの書き込みはブロックされます。
この性質を利用すれば、Goroutineの最大起動数を制御することもできます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 1) defer close(ch) for i := 1; i < 10; i++ { ch <- i // 空きが出るまでブロック fmt.Printf("The Number of goroutine: %d\n", len(ch)) go func() { time.Sleep(100 * time.Millisecond) <-ch }() } } |
ちなみに、close関数でチャネルを閉じることができます。
専属チャネルの作成
メソッドや関数のパラメータに「<-」オペレーターを明示することで、送信専用、受信専用のチャネルを作成することができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package main import "time" func main() { channel := make(chan string, 1) go sendingCh(channel, "Hello World") go receivingCh(channel) time.Sleep(time.Second) println("Finish!") } // 送信専用 func sendingCh(ch chan<- string, message string) { ch <- message } // 受信専用 func receivingCh(ch <-chan string) { msg := <-ch println(msg) } |
「<-」の向きに注意してください。
- 変数 <- ・・・ 受信専用
- <- 変数 ・・・ 送信専用
送信専用チャネルに値を送信しようとしたら(逆もまた然り)、エラーが発生して処理が停止します。
1 2 3 |
$ go run main.go # command-line-arguments ./main.go:23:5: invalid operation: ch <- "hello" (send to receive-only type <-chan string) |
Channelの振り分け
for + selectのコンボで、Channelを振り分けることができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package main import "time" func sendString(ch chan<- string, s string) { ch <- s } func receiver(helloCh, goodbyCh <-chan string, donChan chan<- bool) { for { select { case msg := <-helloCh: println(msg) case msg := <-goodbyCh: println(msg) case <-time.After(time.Second * 2): println("Nothing received in 2 seconds. Exiting") donChan <- true } } } func main() { helloCh := make(chan string, 1) goodbyCh := make(chan string, 1) doneChan := make(chan bool) go receiver(helloCh, goodbyCh, doneChan) go sendString(helloCh, "hello!") go sendString(goodbyCh, "goodbye!") // doneが受信されるまでブロック <-doneChan } |
1 2 3 4 |
$ go run main.go hello! goodbye! Nothing received in 2 seconds. Exiting |
このパターンは結構使えます。チャットアプリケーションなんかで使ったりしますね。
応用: シングルトン カウンター
そろそろGoroutineやChannel、Mutexに慣れてきたのではないでしょうか?
応用として、Singletonパターン + goroutine + Channel or Mutexでカウンタープログラムを作りましょう。
仕様は至ってシンプルです。
- プログラム全体で一意のインスタンスであること(シングルトンが保障されていること)
- インスタンスのcountプロパティをインクリメントする機能を有すること
- インスタンスのcountの現在値を取得する機能を有すること
パターン1: チャネルを使ったバージョン
チャネルを使ったバージョンは、先ほど紹介した「select + for」構文を使います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package singleton_counter // Channels var addCh chan bool = make(chan bool) var getCountCh chan chan int = make(chan chan int) var doneCh chan bool = make(chan bool) func init() { var count int // Do when program runs go func(addCh <-chan bool, getCountCh <-chan chan int, doneCh <-chan bool) { for { select { case <-addCh: count++ case ch := <-getCountCh: ch <- count case <-doneCh: break } } }(addCh, getCountCh, doneCh) } // Singleton type singleton struct{} var instance singleton func GetInstance() *singleton { return &instance } func (s *singleton) AddOne() { addCh <- true } func (s *singleton) GetCount() int { resCh := make(chan int) defer close(resCh) getCountCh <- resCh return <-resCh } func (s *singleton) Stop() { doneCh <- true defer func() { close(addCh) close(getCountCh) close(doneCh) }() } |
ポイントは、「GetInstance」関数です。インスタンスの生成/取得をここから行う縛りにすればシングルトンなインスタンスを取得できます。
あとは、Channelを使って値のカウント、取得をすれば良いだけです。
「var getCountCh chan chan int = make(chan chan int)」は、あまりみない書き方ですよね。int型のchannelを型として定義し、そこからカウント数を取り出したいのでこの様な書き方になっています。
テストコードを実装して、挙動確認をしましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package singleton_counter import ( "fmt" "testing" "time" ) func TestStartInstance(t *testing.T) { singleton := GetInstance() singleton2 := GetInstance() n := 5000 for i := 0; i < n; i++ { go singleton.AddOne() go singleton2.AddOne() } fmt.Printf("Before loop, current count is %d\n", singleton.GetCount()) var val int for val != n*2 { val = singleton.GetCount() time.Sleep(10 * time.Millisecond) } singleton.Stop() } |
このコードは何もテストしていないのですが、挙動を確認する上で役に立ちます。
1 2 3 4 5 6 |
$ go test -v === RUN TestStartInstance Before loop, current count is 7440 --- PASS: TestStartInstance (0.03s) PASS ok singleton_counter 0.716s |
パターン2: Mutexを使う
Mutexで実装したパターンも紹介します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
package singleton_counter import "sync" type singleton struct { count int sync.RWMutex } var instance singleton func GetInstance() *singleton { return &instance } func (s *singleton) AddOne() { s.Lock() defer s.Unlock() s.count++ } func (s *singleton) GetCount() int { s.RLock() defer s.RUnlock() return s.count } |
Channelのパターンと比べるとかなりスッキリしましたね。
先ほど紹介したテストコードから「singleton.Stop()」の行をコメントアウトして、テストを実行してみましょう。
1 2 3 4 5 6 |
$ go test -v === RUN TestStartInstance Before loop, current count is 9955 --- PASS: TestStartInstance (0.01s) PASS ok singleton_counter 0.902s |
応用: Barrier Concurrency Pattern
Barrier Concurrency Patternを実装しましょう。
Wikiによると「同期方法の一つであり、ソースコード中でスレッドやプロセスがある箇所で停止し、他の全てのスレッドプロセスがバリアに到達するまで進行しないようなものを示す。」とあります。
このバリアに到達するまで進行しないようにするという処理はChannelで実現できそうです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
package barrier import ( "fmt" "io/ioutil" "net/http" "time" ) var timeoutMillseconds int = 5000 type barrierResp struct { Err error Resp string } func barrier(endpoints ...string) { // リクエスト数 requestNumber := len(endpoints) // inチャネルの生成 in := make(chan barrierResp, requestNumber) defer close(in) // レスポンス作成 responses := make([]barrierResp, requestNumber) for _, endpoint := range endpoints { go makeRequest(in, endpoint) } // エラーチェック var hasError bool for i := 0; i < requestNumber; i++ { // channelから値を受け取るまでブロック resp := <-in if resp.Err != nil { fmt.Println("Error: ", resp.Err) hasError = true } responses[i] = resp } if !hasError { for _, resp := range responses { fmt.Println(resp.Resp) } } } func makeRequest(out chan<- barrierResp, url string) { res := barrierResp{} client := http.Client{ Timeout: time.Duration(time.Duration( timeoutMillseconds) * time.Millisecond), } resp, err := client.Get(url) if err != nil { res.Err = err out <- res return } byt, err := ioutil.ReadAll(resp.Body) if err != nil { res.Err = err out <- res return } res.Resp = string(byt) out <- res } |
このコードは、あるURLに対してGoのクライアントでリクエストを飛ばします。ただ飛ばすのではなく、Goroutineを使って平行処理で飛ばしています。
そして、全てのGoroutineの結果を受け取れるようにチャネルでブロック処理を入れています。
テストコードを実装して、動きを確認してみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
package barrier import ( "bytes" "io" "os" "strings" "testing" ) func TestBarrier(t *testing.T) { t.Run("Correct endpoints", func(t *testing.T) { endpoints := []string{"https://httpbin.org/headers", "https://httpbin.org/headers"} result := captureBarrierOutput(endpoints...) if !strings.Contains(result, "Accept-Encoding") || !strings.Contains(result, "User-Agent") { t.Fail() } t.Log(result) }) t.Run("One endpoint incorrect", func(t *testing.T) { endpoints := []string{"http://malformed-url", "https://httpbin.org/headers"} result := captureBarrierOutput(endpoints...) if !strings.Contains(result, "Error") { t.Fail() } t.Log(result) }) t.Run("Very short timeout", func(t *testing.T) { endpoints := []string{"http://httpbin.org/headers"} // Timeout値をいじる timeoutMillseconds = 1 result := captureBarrierOutput(endpoints...) if !strings.Contains(result, "Timeout") { t.Fail() } t.Log(result) }) } // ヘルパー関数 // baarrier関数のfmt.Printlnの出力をGoプログラムで扱えるようにする func captureBarrierOutput(endpoints ...string) string { reader, writer, _ := os.Pipe() os.Stdout = writer out := make(chan string) go func() { var buf bytes.Buffer io.Copy(&buf, reader) out <- buf.String() }() barrier(endpoints...) writer.Close() tmp := <-out return tmp } |
captureBarrierOutputはヘルパー関数です。barrier関数は戻り値を返さず、結果を標準出力へ吐き出すので、この出力先を変数に変更し、テストで使えるようにしています。標準出力のテストをしたいときにも有効ですね!
Testは「正しいURL」、「不正なURL」、「Timeout」の3つあり、テストを実行するとbarrier関数が機能していることがわかります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
$ go test -v === RUN TestBarrier === RUN TestBarrier/Correct_endpoints ... === RUN TestBarrier/One_endpoint_incorrect barrier_test.go:27: Error: Get "http://malformed-url": dial tcp: lookup malformed-url: no such host === RUN TestBarrier/Very_short_timeout barrier_test.go:40: Error: Get "http://httpbin.org/headers": context deadline exceeded (Client.Timeout exceeded while awaiting headers) --- PASS: TestBarrier (1.99s) --- PASS: TestBarrier/Correct_endpoints (1.56s) --- PASS: TestBarrier/One_endpoint_incorrect (0.43s) --- PASS: TestBarrier/Very_short_timeout (0.00s) ok barrier 2.817s |
応用: Future Design Pattern
Future Design Patternは非同期処理パターンの一つで、ある処理をGoroutineで実行し、その結果を未来で受け取りたいといった処理に用いられるデザインパターンです。
例を挙げてみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package future import "fmt" type SuccessFunc func(string) type FailFunc func(error) type ExecuteStringFunc func() (string, error) type MaybeString struct { successFunc SuccessFunc failFunc FailFunc } func (s *MaybeString) Success(f SuccessFunc) *MaybeString { s.successFunc = f return s } func (s *MaybeString) Fail(f FailFunc) *MaybeString { s.failFunc = f return s } func (s *MaybeString) Execute(f ExecuteStringFunc) { go func(s *MaybeString) { str, err := f() if err != nil { s.failFunc(err) } else { s.successFunc(str) } }(s) } func setContext(msg string) ExecuteStringFunc { msg = fmt.Sprintf("%s Closure!\n", msg) return func() (string, error) { return msg, nil } } |
このコードには、「Success/Fail/Execute」の3つのメソッドが存在します。Success/FailはMaybeStringを戻り値として返します。そして、ExecuteをGoroutineで実行したときに、Success/Failから結果を受け取ることができます。
とはいえ、わかりにくいと思うのでテストコードを実装してみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
package future import ( "errors" "sync" "testing" "time" ) var future = &MaybeString{} func TestStringOrError_Execute(t *testing.T) { t.Run("Success result", func(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go timeout(t, &wg) // Success/Fail時の処理を予約 // この結果は未来で受け取る future.Success(func(s string) { t.Log(s) wg.Done() }).Fail(func(e error) { t.Fail() wg.Done() }) // Hello Worldを返却する処理を渡す // Success/Faileの結果を受け取る future.Execute(func() (string, error) { return "Hello World!", nil }) wg.Wait() }) t.Run("Failed result", func(t *testing.T) { var wg sync.WaitGroup wg.Add(1) future.Success(func(s string) { t.Fail() wg.Done() }).Fail(func(e error) { t.Log(e.Error()) wg.Done() }) // エラーを渡す future.Execute(func() (string, error) { return "", errors.New("Error ocurred") }) wg.Wait() }) t.Run("Closure Success result", func(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go timeout(t, &wg) future.Success(func(s string) { t.Log(s) wg.Done() }).Fail(func(e error) { t.Fail() wg.Done() }) // コンテキストを渡すこともOK future.Execute(setContext("Hello")) wg.Wait() }) } // タイムアウト時の動作確認用のヘルパー関数 func timeout(t *testing.T, wg *sync.WaitGroup) { time.Sleep(1 * time.Second) t.Log("Timeout!") t.Fail() wg.Done() } |
3パターンくらいテストコードを実装していますが、TestStringOrError_Executeテストさえ見ていただければOKです。
ここでは、Success/Failを事前に登録し、その後にExecuteメソッドを実行して、その結果を受け取ります。処理が成功したときはSuccess、失敗した時はFailが呼ばれる感じですね。
実用的かはともかく、面白い実装だと思います。
テストコードを実行するとそれぞれの結果がログとして確認できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
$ go test -v === RUN TestStringOrError_Execute === RUN TestStringOrError_Execute/Success_result future_test.go:18: Hello World! <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< === RUN TestStringOrError_Execute/Failed_result future_test.go:36: Error ocurred <meta charset="utf-8"><<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< === RUN TestStringOrError_Execute/Closure_Success_result future_test.go:52: Hello Closure! <meta charset="utf-8"><<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< --- PASS: TestStringOrError_Execute (0.00s) --- PASS: TestStringOrError_Execute/Success_result (0.00s) --- PASS: TestStringOrError_Execute/Failed_result (0.00s) --- PASS: TestStringOrError_Execute/Closure_Success_result (0.00s) PASS ok future 0.530s |
応用: Pipeline Pattern
Pipeline Patternは、Channelを使って非同期に処理を繋げていくパターンです。
詳細は、以下の記事を確認ください。
今回は、generator(channel生成)、supply(供給)、sum(合計)の3つの処理をパイプラインで繋げて処理するサンプルを実装します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
package pipeline func LaunchPipeline(amount int) int { return <-sum(supply(generator(amount))) // こっちでもOK // firstCh := generator(amount) // secondCh := supply(firstCh) // thirdCh := sum(secondCh) // result := <-thirdCh // return result } // パイプライン生成 func generator(max int) <-chan int { outChInt := make(chan int, 100) go func() { defer close(outChInt) for i := 1; i <= max; i++ { outChInt <- i } }() return outChInt } // 供給 func supply(in <-chan int) <-chan int { out := make(chan int, 100) go func() { defer close(out) for v := range in { out <- v * v } }() return out } // 合計 func sum(in <-chan int) <-chan int { out := make(chan int, 100) go func() { defer close(out) var sum int for v := range in { sum += v } out <- sum }() return out } |
単純明快の処理ですね。ここまでの内容を理解できていれば、簡単に理解できるソースコードです。
特徴的なのは「sum(supply(generator(amount)))」のような処理が書けることですね。Pipelineを彷彿とさせる書き方です。
goroutineで実行しているので、平行処理できている点もポイントが高いですよね^^
より理解を深めるために、テストコードを実装してみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
package pipline import "testing" func TestLaunchPipline(t *testing.T) { tableTest := [][]int{ {3, 14}, {5, 55}, } var res int for _, test := range tableTest { res = LaunchPipeline(test[0]) if res != test[1] { t.Fatal() } t.Logf("%d == %d\n", res, test[1]) } } |
テストコードを実行します。
1 2 3 4 5 6 7 |
$ go test -v === RUN TestLaunchPipline pipline_test.go:17: 14 == 14 pipline_test.go:17: 55 == 55 --- PASS: TestLaunchPipline (0.00s) PASS ok pipline 0.722s |
OKですね。
おわりに
お疲れ様でした!少し長くなりましたね^^;
しかし、苦労する価値はあるくらい、GoroutineやChannelについての理解は深められたと思います。
GoプログラマーにとってGoroutineはかなり大切な存在ですので、たまに基礎から理解を深め直すことも必要だと個人的に考えています。
最近、仕事ではバックエンド側の言語をGo以外で書いてないです。それほど使いやすい言語なんですよね。
日本ではそれほど流行っていませんが、シリコンバレーではめちゃくちゃ流行っています。
もっとGo言語を流行らせたいですね〜〜〜
それでは、また!
コメントを残す
コメントを投稿するにはログインしてください。