こんにちは。KOUKIです。
前回は、LongGreetAPIを実装していましたので、今回はその続きからです。
<目次>
前回
Go言語記事まとめ
前回の復習
LongGreetAPI続きを実装する前に、前回の復習をします。
calculator.protoにLongCalcAPIを実装してみましょう。
まずは、protoファイルの実装からでしたね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
// calculator/calculatorpb/calculator.proto syntax = "proto3"; package calculator; option go_package="calculatorpb"; service CalculatorService { // Unary rpc Calc(CalcRequest) returns (CalcResponse) {}; // Server Streaming rpc CalcManyTimes(CalcManyTimesRequest) returns (stream CalcManyTimesResponse) {}; // new // Client Streaming rpc LongCalc(stream LongCalcRequest) returns (LongCalcResponse){}; } message Number { int32 num1 = 1; int32 num2 = 2; } message CalcRequest { Number number = 1; } message CalcResponse { int32 sum = 1; } message CalcManyTimesRequest { int32 calcNum = 1; } message CalcManyTimesResponse { int32 result = 1; } // new message LongCalcRequest { float calcNum2 = 1; } // new message LongCalcResponse { float result = 1; } |
protoファイルを変更したらコンパイルをしましょう。
1 |
./generate.sh |
続いて、Serverコードを書きます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
// calculator/calculator_server/server.go package main import ( "context" "fmt" "io" "log" "net" "time" "github.com/selfnote/golang-grpc/calculator/calculatorpb" "google.golang.org/grpc" ) type server struct{} func (*server) Calc(ctx context.Context, req *calculatorpb.CalcRequest) (*calculatorpb.CalcResponse, error) { fmt.Printf("Calc function was invoked with %v\n", req) num1 := req.GetNumber().GetNum1() num2 := req.GetNumber().GetNum2() sum := num1 + num2 res := &calculatorpb.CalcResponse{ Sum: sum, } return res, nil } func (*server) CalcManyTimes(req *calculatorpb.CalcManyTimesRequest, stream calculatorpb.CalculatorService_CalcManyTimesServer) error { fmt.Printf("CalcManyTimes function was invoked with %v\n", req) num := req.GetCalcNum() fmt.Println("num: ", num) var divNum int32 = 2 for num > 1 { if num%divNum == 0 { fmt.Println(divNum) stream.Send(&calculatorpb.CalcManyTimesResponse{ Result: num, }) num = num / divNum } else { divNum++ fmt.Printf("Divisor has incresaed to %v\n", divNum) } time.Sleep(1000 * time.Millisecond) } return nil } // new func (*server) LongCalc(stream calculatorpb.CalculatorService_LongCalcServer) error { fmt.Printf("LongCalc function was invoked with a streaming request\n") var count float32 = 0 var calcNum float32 = 0 for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&calculatorpb.LongCalcResponse{ Result: (calcNum / count), }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } num := req.GetCalcNum2() count++ calcNum += num } } func main() { fmt.Println("Server Start...") lis, err := net.Listen("tcp", "0.0.0.0:50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() calculatorpb.RegisterCalculatorServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("Failed to server %v", err) } } |
このサンプルコードは、Clientから送信されたデータの平均を出力するプログラムです。
続いて、Clientコードを記述しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
// calculator/calculator_client/client.go package main import ( "context" "fmt" "io" "log" "time" "github.com/selfnote/golang-grpc/calculator/calculatorpb" "google.golang.org/grpc" ) func doUnary(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a Unary RPC...") req := &calculatorpb.CalcRequest { Number: &calculatorpb.Number{ Num1: 3, Num2: 10, }, } res, err := c.Calc(context.Background(), req) if err != nil { log.Fatalf("error while calling Calc RPC: %v", err) } log.Printf("Response from Calc: %v", res.Sum) } func doServerStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a Server Streaming RPC...") req := &calculatorpb.CalcManyTimesRequest{ CalcNum: 120, } resStream, err := c.CalcManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling CalcManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from CalcManyTimes: %v", msg.GetResult()) } } // new func doClientStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Streaming to do a Client Streaming RPC...") stream, err := c.LongCalc(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } requests := []float32{1.0, 2.0, 3.0, 4.0} for _, req := range requests { fmt.Printf("Sending number: %v\n", req) stream.Send(&calculatorpb.LongCalcRequest{ CalcNum2: req, }) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receving response from LongGreet: %v", err) } fmt.Printf("LongCalc Response: %v\n", res) } func main() { fmt.Println("Client Request...") cc, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("Could not connect: %v", err) } defer cc.Close() c := calculatorpb.NewCalculatorServiceClient(cc) fmt.Println("Created client: %f\n", c) // doUnary(c) // doServerStreaming(c) doClientStreaming(c) } |
ここまで実装できたら動作確認をします。
Server -> Clientの順に実行しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
make c_srv go run calculator/calculator_server/server.go Server Start... LongCalc function was invoked with a streaming request make c_cnt go run calculator/calculator_client/client.go Client Request... Created client: %f &{0xc0000f6700} Streaming to do a Client Streaming RPC... Sending number: 1 Sending number: 2 Sending number: 3 Sending number: 4 LongCalc Response: result:2.5 |
GreetEveryoneAPI – protoファイル作成 –
復讐はここまでにして、本題に入りましょう。
本日は、Bi Directional Streaming APIを使って、GreetEveryoneAPIを実装します。
Bi Directional Streaming APIの特徴は次の取りです。
・ リクエスト数とレスポンス数は同一でなくても問題ない
・ クライアントまたはサーバーから大量データを送受信したい場合に適している
Bi Directional Streaming APIを実装するときは、リクエストおよびレスポンス両方にstreamキーワードを付与します。
それではまず、protoファイルを実装します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
// greet/greetpb/greet.proto // version syntax = "proto3"; // go pakcage package greet; // Include subdirectory into package too option go_package="greetpb"; service GreetService{ // Unary rpc Greet(GreetRequest) returns (GreetResponse){}; // Server Streaming rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse){}; // Client Streaming rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {}; // new // Bidi Streaming rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {}; }; message Greeting { string first_name = 1; string last_name = 2; } message GreetRequest { Greeting greeting = 1; } message GreetResponse { string result = 1; } message GreetManyTimesRequest { Greeting greeting = 1; } message GreetManyTimesResponse { string result = 1; } message LongGreetRequest { Greeting greeting = 1; } message LongGreetResponse { string result = 1; } // new message GreetEveryoneRequest { Greeting greeting = 1; } // new message GreetEveryoneResponse { string result = 1; } |
protoファイルを変更したら忘れずにコンパイルしましょう。
1 |
./generate.sh |
GreetEveryone API – Serverコード作成 –
次は、Serverコードを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
// greet/greet_server/server.go package main import ( "context" "fmt" "io" "log" "net" "strconv" "time" "github.com/selfnote/golang-grpc/greet/greetpb" "google.golang.org/grpc" ) type server struct{} func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest)(*greetpb.GreetResponse, error) { fmt.Printf("Greet function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName res := &greetpb.GreetResponse { Result: result, } return res, nil } func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error { fmt.Printf("GreetManyTimes function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() // Do print 10 times. for i := 0; i < 10; i++ { result := "Hello " + firstName + " number" + strconv.Itoa(i) res := &greetpb.GreetManyTimesResponse { Result: result, } stream.Send(res) time.Sleep(1000 * time.Millisecond) } return nil } func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error { fmt.Println("LongGreet function was invoked with a streaming request") result := "" for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&greetpb.LongGreetResponse{ Result: result, }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } firstName := req.GetGreeting().GetFirstName() result += "Hello " + firstName + "! " } } // new func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error { fmt.Printf("GreetEveryone function was invoked with a streaming request\n") for { req, err := stream.Recv() if err == io.EOF { return nil } if err != nil { log.Fatalf("Error while reading client stream: %v", err) return err } firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName + "!" sendErr := stream.Send(&greetpb.GreetEveryoneResponse{ Result: result, }) if sendErr != nil { log.Fatalf("Error whlie sending dat ato stream: %v", err) return err } } } func main() { fmt.Println("Hello World") lis, err := net.Listen("tcp", "0.0.0.0:50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() greetpb.RegisterGreetServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } |
受け取ったリクエストの数だけ、レスポンス(First Name)を返す実装を行いました。
GreetEveryone API – Clientコード作成 –
次は、Clientコードを作成します。
|
// greet/greet_client/client.go package main import ( "context" "fmt" "io" "log" "time" "github.com/selfnote/golang-grpc/greet/greetpb" "google.golang.org/grpc" ) func main() { fmt.Println("Hello I'm a client") cc, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("could not connect: %v", err) } defer cc.Close() c := greetpb.NewGreetServiceClient(cc) // doUnary(c) // doServerStreaming(c) // doClientStreaming(c) doBiDiStreaming(c) } func doUnary(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Unary RPC...") req := &greetpb.GreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Harry", LastName: "Potter", }, } res, err := c.Greet(context.Background(), req) if err != nil { log.Fatalf("error whiole caling Greet RPC: %v\n", err) } log.Printf("Response from Greet: %v\n", res.Result) } func doServerStreaming(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Server Streaming PRC...") req := &greetpb.GreetManyTimesRequest { Greeting: &greetpb.Greeting { FirstName: "Hary", LastName: "Potter", }, } resStream, err := c.GreetManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling GreetManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { fmt.Println(io.EOF) break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from greetManyTimes: %v", msg.GetResult()) } } func doClientStreaming(c greetpb.GreetServiceClient) { fmt.Println("Streaming to do a Cient streaming RPC...") requests := []*greetpb.LongGreetRequest { &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Harry", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Mark", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Mary", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Jane", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Upper", }, }, } stream, err := c.LongGreet(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } for _, req := range requests { fmt.Println("Sending req: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receiving response from LongGreet: %v\n", err) } fmt.Printf("LongGreet Response: %v\n", res) } // new func doBiDiStreaming(c greetpb.GreetServiceClient) { fmt.Println("Streaming to do a Bidi Streaming RPC...") // we create a stream by invoking the client stream, err := c.GreetEveryone(context.Background()) if err != nil { log.Fatalf("Error while creating stream: %v", err) return } requests := []*greetpb.GreetEveryoneRequest{ &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Stephane", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "John", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Lucy", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Mark", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Piper", }, }, } waitc := make(chan struct{}) // we send a bunch of messages to the client (go routine) go func() { for _, req := range requests { fmt.Printf("Sending message: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } stream.CloseSend() }() // we receive a bunch of messages from the client (go routine) go func() { for { res, err := stream.Recv() if err == io.EOF{ break } if err != nil { log.Fatalf("Error while receiving: %v", err) close(waitc) } fmt.Printf("Received: %v\n", res.GetResult()) } }() // block until everything is done <-waitc } |
GreetEveryone API – 動作確認 –
Server -> Clientの順にコードを実行します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
make server go run greet/greet_server/server.go Hello World GreetEveryone function was invoked with a streaming request make client go run greet/greet_client/client.go Hello I'm a client Streaming to do a Bidi Streaming RPC... Sending message: greeting:<first_name:"Stephane" > Received: Hello Stephane! Sending message: greeting:<first_name:"John" > Received: Hello John! Sending message: greeting:<first_name:"Lucy" > Received: Hello Lucy! Sending message: greeting:<first_name:"Mark" > Received: Hello Mark! Sending message: greeting:<first_name:"Piper" > Received: Hello Piper! |
上手くいってますね。
もう少し工夫すれば、Chatアプリケーションが作成できそうな感じですね。
次回
次回も引き続き、gRPCの学習を続けていきます。
コメントを残す
コメントを投稿するにはログインしてください。