こんにちは。KOUKIです。
前回は、LongGreetAPIを実装していましたので、今回はその続きからです。
<目次>
前回
Go言語記事まとめ
前回の復習
LongGreetAPI続きを実装する前に、前回の復習をします。
calculator.protoにLongCalcAPIを実装してみましょう。
まずは、protoファイルの実装からでしたね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
// calculator/calculatorpb/calculator.proto syntax = "proto3"; package calculator; option go_package="calculatorpb"; service CalculatorService { // Unary rpc Calc(CalcRequest) returns (CalcResponse) {}; // Server Streaming rpc CalcManyTimes(CalcManyTimesRequest) returns (stream CalcManyTimesResponse) {}; // new // Client Streaming rpc LongCalc(stream LongCalcRequest) returns (LongCalcResponse){}; } message Number { int32 num1 = 1; int32 num2 = 2; } message CalcRequest { Number number = 1; } message CalcResponse { int32 sum = 1; } message CalcManyTimesRequest { int32 calcNum = 1; } message CalcManyTimesResponse { int32 result = 1; } // new message LongCalcRequest { float calcNum2 = 1; } // new message LongCalcResponse { float result = 1; } |
protoファイルを変更したらコンパイルをしましょう。
1 |
./generate.sh |
続いて、Serverコードを書きます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
// calculator/calculator_server/server.go package main import ( "context" "fmt" "io" "log" "net" "time" "github.com/selfnote/golang-grpc/calculator/calculatorpb" "google.golang.org/grpc" ) type server struct{} func (*server) Calc(ctx context.Context, req *calculatorpb.CalcRequest) (*calculatorpb.CalcResponse, error) { fmt.Printf("Calc function was invoked with %v\n", req) num1 := req.GetNumber().GetNum1() num2 := req.GetNumber().GetNum2() sum := num1 + num2 res := &calculatorpb.CalcResponse{ Sum: sum, } return res, nil } func (*server) CalcManyTimes(req *calculatorpb.CalcManyTimesRequest, stream calculatorpb.CalculatorService_CalcManyTimesServer) error { fmt.Printf("CalcManyTimes function was invoked with %v\n", req) num := req.GetCalcNum() fmt.Println("num: ", num) var divNum int32 = 2 for num > 1 { if num%divNum == 0 { fmt.Println(divNum) stream.Send(&calculatorpb.CalcManyTimesResponse{ Result: num, }) num = num / divNum } else { divNum++ fmt.Printf("Divisor has incresaed to %v\n", divNum) } time.Sleep(1000 * time.Millisecond) } return nil } // new func (*server) LongCalc(stream calculatorpb.CalculatorService_LongCalcServer) error { fmt.Printf("LongCalc function was invoked with a streaming request\n") var count float32 = 0 var calcNum float32 = 0 for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&calculatorpb.LongCalcResponse{ Result: (calcNum / count), }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } num := req.GetCalcNum2() count++ calcNum += num } } func main() { fmt.Println("Server Start...") lis, err := net.Listen("tcp", "0.0.0.0:50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() calculatorpb.RegisterCalculatorServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("Failed to server %v", err) } } |
このサンプルコードは、Clientから送信されたデータの平均を出力するプログラムです。
続いて、Clientコードを記述しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
// calculator/calculator_client/client.go package main import ( "context" "fmt" "io" "log" "time" "github.com/selfnote/golang-grpc/calculator/calculatorpb" "google.golang.org/grpc" ) func doUnary(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a Unary RPC...") req := &calculatorpb.CalcRequest { Number: &calculatorpb.Number{ Num1: 3, Num2: 10, }, } res, err := c.Calc(context.Background(), req) if err != nil { log.Fatalf("error while calling Calc RPC: %v", err) } log.Printf("Response from Calc: %v", res.Sum) } func doServerStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a Server Streaming RPC...") req := &calculatorpb.CalcManyTimesRequest{ CalcNum: 120, } resStream, err := c.CalcManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling CalcManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from CalcManyTimes: %v", msg.GetResult()) } } // new func doClientStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Streaming to do a Client Streaming RPC...") stream, err := c.LongCalc(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } requests := []float32{1.0, 2.0, 3.0, 4.0} for _, req := range requests { fmt.Printf("Sending number: %v\n", req) stream.Send(&calculatorpb.LongCalcRequest{ CalcNum2: req, }) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receving response from LongGreet: %v", err) } fmt.Printf("LongCalc Response: %v\n", res) } func main() { fmt.Println("Client Request...") cc, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("Could not connect: %v", err) } defer cc.Close() c := calculatorpb.NewCalculatorServiceClient(cc) fmt.Println("Created client: %f\n", c) // doUnary(c) // doServerStreaming(c) doClientStreaming(c) } |
ここまで実装できたら動作確認をします。
Server -> Clientの順に実行しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
make c_srv go run calculator/calculator_server/server.go Server Start... LongCalc function was invoked with a streaming request make c_cnt go run calculator/calculator_client/client.go Client Request... Created client: %f &{0xc0000f6700} Streaming to do a Client Streaming RPC... Sending number: 1 Sending number: 2 Sending number: 3 Sending number: 4 LongCalc Response: result:2.5 |
GreetEveryoneAPI – protoファイル作成 –
復讐はここまでにして、本題に入りましょう。
本日は、Bi Directional Streaming APIを使って、GreetEveryoneAPIを実装します。
Bi Directional Streaming APIの特徴は次の取りです。
・ リクエスト数とレスポンス数は同一でなくても問題ない
・ クライアントまたはサーバーから大量データを送受信したい場合に適している
Bi Directional Streaming APIを実装するときは、リクエストおよびレスポンス両方にstreamキーワードを付与します。
それではまず、protoファイルを実装します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
// greet/greetpb/greet.proto // version syntax = "proto3"; // go pakcage package greet; // Include subdirectory into package too option go_package="greetpb"; service GreetService{ // Unary rpc Greet(GreetRequest) returns (GreetResponse){}; // Server Streaming rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse){}; // Client Streaming rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {}; // new // Bidi Streaming rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {}; }; message Greeting { string first_name = 1; string last_name = 2; } message GreetRequest { Greeting greeting = 1; } message GreetResponse { string result = 1; } message GreetManyTimesRequest { Greeting greeting = 1; } message GreetManyTimesResponse { string result = 1; } message LongGreetRequest { Greeting greeting = 1; } message LongGreetResponse { string result = 1; } // new message GreetEveryoneRequest { Greeting greeting = 1; } // new message GreetEveryoneResponse { string result = 1; } |
protoファイルを変更したら忘れずにコンパイルしましょう。
1 |
./generate.sh |
GreetEveryone API – Serverコード作成 –
次は、Serverコードを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
// greet/greet_server/server.go package main import ( "context" "fmt" "io" "log" "net" "strconv" "time" "github.com/selfnote/golang-grpc/greet/greetpb" "google.golang.org/grpc" ) type server struct{} func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest)(*greetpb.GreetResponse, error) { fmt.Printf("Greet function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName res := &greetpb.GreetResponse { Result: result, } return res, nil } func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error { fmt.Printf("GreetManyTimes function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() // Do print 10 times. for i := 0; i < 10; i++ { result := "Hello " + firstName + " number" + strconv.Itoa(i) res := &greetpb.GreetManyTimesResponse { Result: result, } stream.Send(res) time.Sleep(1000 * time.Millisecond) } return nil } func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error { fmt.Println("LongGreet function was invoked with a streaming request") result := "" for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&greetpb.LongGreetResponse{ Result: result, }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } firstName := req.GetGreeting().GetFirstName() result += "Hello " + firstName + "! " } } // new func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error { fmt.Printf("GreetEveryone function was invoked with a streaming request\n") for { req, err := stream.Recv() if err == io.EOF { return nil } if err != nil { log.Fatalf("Error while reading client stream: %v", err) return err } firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName + "!" sendErr := stream.Send(&greetpb.GreetEveryoneResponse{ Result: result, }) if sendErr != nil { log.Fatalf("Error whlie sending dat ato stream: %v", err) return err } } } func main() { fmt.Println("Hello World") lis, err := net.Listen("tcp", "0.0.0.0:50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() greetpb.RegisterGreetServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } |
受け取ったリクエストの数だけ、レスポンス(First Name)を返す実装を行いました。
GreetEveryone API – Clientコード作成 –
次は、Clientコードを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
// greet/greet_client/client.go package main import ( "context" "fmt" "io" "log" "time" "github.com/selfnote/golang-grpc/greet/greetpb" "google.golang.org/grpc" ) func main() { fmt.Println("Hello I'm a client") cc, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("could not connect: %v", err) } defer cc.Close() c := greetpb.NewGreetServiceClient(cc) // doUnary(c) // doServerStreaming(c) // doClientStreaming(c) doBiDiStreaming(c) } func doUnary(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Unary RPC...") req := &greetpb.GreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Harry", LastName: "Potter", }, } res, err := c.Greet(context.Background(), req) if err != nil { log.Fatalf("error whiole caling Greet RPC: %v\n", err) } log.Printf("Response from Greet: %v\n", res.Result) } func doServerStreaming(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Server Streaming PRC...") req := &greetpb.GreetManyTimesRequest { Greeting: &greetpb.Greeting { FirstName: "Hary", LastName: "Potter", }, } resStream, err := c.GreetManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling GreetManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { fmt.Println(io.EOF) break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from greetManyTimes: %v", msg.GetResult()) } } func doClientStreaming(c greetpb.GreetServiceClient) { fmt.Println("Streaming to do a Cient streaming RPC...") requests := []*greetpb.LongGreetRequest { &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Harry", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Mark", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Mary", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Jane", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Upper", }, }, } stream, err := c.LongGreet(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } for _, req := range requests { fmt.Println("Sending req: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receiving response from LongGreet: %v\n", err) } fmt.Printf("LongGreet Response: %v\n", res) } // new func doBiDiStreaming(c greetpb.GreetServiceClient) { fmt.Println("Streaming to do a Bidi Streaming RPC...") // we create a stream by invoking the client stream, err := c.GreetEveryone(context.Background()) if err != nil { log.Fatalf("Error while creating stream: %v", err) return } requests := []*greetpb.GreetEveryoneRequest{ &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Stephane", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "John", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Lucy", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Mark", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Piper", }, }, } waitc := make(chan struct{}) // we send a bunch of messages to the client (go routine) go func() { for _, req := range requests { fmt.Printf("Sending message: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } stream.CloseSend() }() // we receive a bunch of messages from the client (go routine) go func() { for { res, err := stream.Recv() if err == io.EOF{ break } if err != nil { log.Fatalf("Error while receiving: %v", err) close(waitc) } fmt.Printf("Received: %v\n", res.GetResult()) } }() // block until everything is done <-waitc } |
GreetEveryone API – 動作確認 –
Server -> Clientの順にコードを実行します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
make server go run greet/greet_server/server.go Hello World GreetEveryone function was invoked with a streaming request make client go run greet/greet_client/client.go Hello I'm a client Streaming to do a Bidi Streaming RPC... Sending message: greeting:<first_name:"Stephane" > Received: Hello Stephane! Sending message: greeting:<first_name:"John" > Received: Hello John! Sending message: greeting:<first_name:"Lucy" > Received: Hello Lucy! Sending message: greeting:<first_name:"Mark" > Received: Hello Mark! Sending message: greeting:<first_name:"Piper" > Received: Hello Piper! |
上手くいってますね。
もう少し工夫すれば、Chatアプリケーションが作成できそうな感じですね。
次回
次回も引き続き、gRPCの学習を続けていきます。
コメントを残す
コメントを投稿するにはログインしてください。