こんにちは。KOUKIです。
前回で、GreetEveryoneAPIを実装していましたので、今回はその続きから実装します。
<目次>
前回
Go言語記事まとめ
前回の復習
前回の復習として、FindMaximumAPIを実装します。
まずは、protoファイルからですね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
// calculator/calculatorpb/calculator.proto syntax = "proto3"; package calculator; option go_package="calculatorpb"; service CalculatorService { // Unary rpc Calc(CalcRequest) returns (CalcResponse) {}; // Server Streaming rpc CalcManyTimes(CalcManyTimesRequest) returns (stream CalcManyTimesResponse) {}; // Client Streaming rpc LongCalc(stream LongCalcRequest) returns (LongCalcResponse){}; // new // Bidi Streaming rpc FindMaximum(stream FindMaximumRequest) returns (stream FindMaximumResponse) {}; } message Number { int32 num1 = 1; int32 num2 = 2; } message CalcRequest { Number number = 1; } message CalcResponse { int32 sum = 1; } message CalcManyTimesRequest { int32 calcNum = 1; } message CalcManyTimesResponse { int32 result = 1; } message LongCalcRequest { float calcNum2 = 1; } message LongCalcResponse { float result = 1; } // new message FindMaximumRequest { int32 number = 1; } // new message FindMaximumResponse { int32 maximum = 1; } |
protoファイルを変更したら忘れずにコンパイルしましょう。
1 |
./generate.sh |
続いて、Serverコードを実装します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
// calculator/calculator_server/server.go package main import ( "context" "fmt" "io" "log" "net" "time" "github.com/selfnote/golang-grpc/calculator/calculatorpb" "google.golang.org/grpc" ) type server struct{} func (*server) Calc(ctx context.Context, req *calculatorpb.CalcRequest) (*calculatorpb.CalcResponse, error) { fmt.Printf("Calc function was invoked with %v\n", req) num1 := req.GetNumber().GetNum1() num2 := req.GetNumber().GetNum2() sum := num1 + num2 res := &calculatorpb.CalcResponse{ Sum: sum, } return res, nil } func (*server) CalcManyTimes(req *calculatorpb.CalcManyTimesRequest, stream calculatorpb.CalculatorService_CalcManyTimesServer) error { fmt.Printf("CalcManyTimes function was invoked with %v\n", req) num := req.GetCalcNum() fmt.Println("num: ", num) var divNum int32 = 2 for num > 1 { if num%divNum == 0 { fmt.Println(divNum) stream.Send(&calculatorpb.CalcManyTimesResponse{ Result: num, }) num = num / divNum } else { divNum++ fmt.Printf("Divisor has incresaed to %v\n", divNum) } time.Sleep(1000 * time.Millisecond) } return nil } func (*server) LongCalc(stream calculatorpb.CalculatorService_LongCalcServer) error { fmt.Printf("LongCalc function was invoked with a streaming request\n") var count float32 = 0 var calcNum float32 = 0 for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&calculatorpb.LongCalcResponse{ Result: (calcNum / count), }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } num := req.GetCalcNum2() count++ calcNum += num } } // new func (*server) FindMaximum(stream calculatorpb.CalculatorService_FindMaximumServer) error { fmt.Println("Received FindMaximum RPC") maximum := int32(0) for { req, err := stream.Recv() if err == io.EOF { return nil } if err != nil { log.Fatalf("Error while reading client stream: %v", err) return err } number := req.GetNumber() if number > maximum { maximum = number sendErr := stream.Send(&calculatorpb.FindMaximumResponse{ Maximum: maximum, }) if sendErr != nil { log.Fatalf("Error while sending data to client: %v", err) return err } } } } func main() { fmt.Println("Server Start...") lis, err := net.Listen("tcp", "0.0.0.0:50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() calculatorpb.RegisterCalculatorServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("Failed to server %v", err) } } |
このAPIは、リクエストとして受け取った数値の最大値を取得できるAPIです。
続いて、Clientコードを作成しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
// calculator/calculator_client/client.go package main import ( "context" "fmt" "io" "log" "time" "github.com/selfnote/golang-grpc/calculator/calculatorpb" "google.golang.org/grpc" ) func doUnary(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a Unary RPC...") req := &calculatorpb.CalcRequest { Number: &calculatorpb.Number{ Num1: 3, Num2: 10, }, } res, err := c.Calc(context.Background(), req) if err != nil { log.Fatalf("error while calling Calc RPC: %v", err) } log.Printf("Response from Calc: %v", res.Sum) } func doServerStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a Server Streaming RPC...") req := &calculatorpb.CalcManyTimesRequest{ CalcNum: 120, } resStream, err := c.CalcManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling CalcManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from CalcManyTimes: %v", msg.GetResult()) } } func doClientStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Streaming to do a Client Streaming RPC...") stream, err := c.LongCalc(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } requests := []float32{1.0, 2.0, 3.0, 4.0} for _, req := range requests { fmt.Printf("Sending number: %v\n", req) stream.Send(&calculatorpb.LongCalcRequest{ CalcNum2: req, }) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receving response from LongGreet: %v", err) } fmt.Printf("LongCalc Response: %v\n", res) } // new func doBiDiStreaming(c calculatorpb.CalculatorServiceClient) { fmt.Println("Starting to do a FindMaimum BiDi Streaming... RPC") stream, err := c.FindMaximum(context.Background()) if err != nil { log.Fatalf("Error while opening stream and calling FindMaximum: %v", err) } // Create chanel waitc := make(chan struct{}) // Send request by go routine go func() { numbers := []int32{3, 9, 2, 5, 4, 1, 4, 3} for _, number := range numbers { fmt.Printf("Sending number: %v\n", number) stream.Send(&calculatorpb.FindMaximumRequest{ Number: number, }) time.Sleep(1000 * time.Millisecond) } stream.CloseSend() }() // receive response by go rouine go func() { for { res, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("Problem while reading server stream: %v", err) break } maximum := res.GetMaximum() fmt.Printf("Received a new maximum of...: %v\n", maximum) } close(waitc) }() <-waitc } func main() { fmt.Println("Client Request...") cc, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("Could not connect: %v", err) } defer cc.Close() c := calculatorpb.NewCalculatorServiceClient(cc) fmt.Println("Created client: %f\n", c) // doUnary(c) // doServerStreaming(c) // doClientStreaming(c) doBiDiStreaming(c) } |
Clientコードが実装できたら、Server -> Clientの順にプログラムを起動します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
make c_srv go run calculator/calculator_server/server.go Server Start... Received FindMaximum RPC make c_cnt go run calculator/calculator_client/client.go Client Request... Created client: %f &{0xc0000f6700} Starting to do a FindMaimum BiDi Streaming... RPC Sending number: 3 Received a new maximum of...: 3 Sending number: 9 Received a new maximum of...: 9 Sending number: 2 Sending number: 5 Sending number: 4 Sending number: 1 Sending number: 4 Sending number: 3 |
GreetWithDeadlineAPI – protoファイルの作成 –
復習はここまでにして、本題に入りましょう。
gRPCでは、全てのClientコードで、Deadline(締め切り)の設定を推奨されています。
サーバーにリクエストを投げたら、「最大何秒待つか」の設定ができます。
Deadlineのサンプルコードです。
1 2 |
clientDeadline := time.Now().Add(time.Duration(*deadlineMs) * time.Millisecond) ctx, cancel := context.WithDeadline(ctx, clientDeadline) |
GreetWithDeadline APIにDeadlineを組み込んでみましょう。
まずは、protoファイルを編集します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
// greet/greetpb/greet.proto // version syntax = "proto3"; // go pakcage package greet; // Include subdirectory into package too option go_package="greetpb"; service GreetService{ // Unary rpc Greet(GreetRequest) returns (GreetResponse){}; // Server Streaming rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManyTimesResponse){}; // Client Streaming rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {}; // Bidi Streaming rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {}; // new // Unary With Deadline rpc GreetWithDeadline(GreetWithDeadLineRequest) returns (GreetWithDeadLineResponse) {}; }; message Greeting { string first_name = 1; string last_name = 2; } message GreetRequest { Greeting greeting = 1; } message GreetResponse { string result = 1; } message GreetManyTimesRequest { Greeting greeting = 1; } message GreetManyTimesResponse { string result = 1; } message LongGreetRequest { Greeting greeting = 1; } message LongGreetResponse { string result = 1; } message GreetEveryoneRequest { Greeting greeting = 1; } message GreetEveryoneResponse { string result = 1; } // new message GreetWithDeadLineRequest { Greeting greeting = 1; } // new message GreetWithDeadLineResponse { string result = 1; } |
protoファイルを書き換えたので、コンパイルをしましょう。
1 |
./generate.sh |
GreetWithDeadline API – Serverコードの作成 –
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
// greet/greet_server/server.go package main import ( "context" "fmt" "io" "log" "net" "strconv" "time" "github.com/selfnote/golang-grpc/greet/greetpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type server struct{} func (*server) Greet(ctx context.Context, req *greetpb.GreetRequest)(*greetpb.GreetResponse, error) { fmt.Printf("Greet function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName res := &greetpb.GreetResponse { Result: result, } return res, nil } func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error { fmt.Printf("GreetManyTimes function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() // Do print 10 times. for i := 0; i < 10; i++ { result := "Hello " + firstName + " number" + strconv.Itoa(i) res := &greetpb.GreetManyTimesResponse { Result: result, } stream.Send(res) time.Sleep(1000 * time.Millisecond) } return nil } func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error { fmt.Println("LongGreet function was invoked with a streaming request") result := "" for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&greetpb.LongGreetResponse{ Result: result, }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } firstName := req.GetGreeting().GetFirstName() result += "Hello " + firstName + "! " } } func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error { fmt.Printf("GreetEveryone function was invoked with a streaming request\n") for { req, err := stream.Recv() if err == io.EOF { return nil } if err != nil { log.Fatalf("Error while reading client stream: %v", err) return err } firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName + "!" sendErr := stream.Send(&greetpb.GreetEveryoneResponse{ Result: result, }) if sendErr != nil { log.Fatalf("Error whlie sending dat ato stream: %v", err) return err } } } // new func (*server) GreetWithDeadline(ctx context.Context, req *greetpb.GreetWithDeadLineRequest) (*greetpb.GreetWithDeadLineResponse, error) { fmt.Printf("GreetWithDeadline function was invoked with %v\n", req) for i := 0; i < 3; i++ { if ctx.Err() == context.Canceled { fmt.Println("The client canceld the request!") return nil, status.Error(codes.Canceled , "the client canceld the request") } time.Sleep(1 * time.Second) } firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName res := &greetpb.GreetWithDeadLineResponse { Result: result, } return res, nil } func main() { fmt.Println("Hello World") lis, err := net.Listen("tcp", "0.0.0.0:50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer() greetpb.RegisterGreetServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } |
GreetWithDeadline API – Clientコードの作成 –
続いて、Clientコードを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
// greet/greet_client/client.go package main import ( "context" "fmt" "io" "log" "time" "github.com/selfnote/golang-grpc/greet/greetpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func main() { fmt.Println("Hello I'm a client") cc, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("could not connect: %v", err) } defer cc.Close() c := greetpb.NewGreetServiceClient(cc) // doUnary(c) // doServerStreaming(c) // doClientStreaming(c) // doBiDiStreaming(c) doUnaryWithDeadline(c, 5*time.Second) // should complete doUnaryWithDeadline(c, 1*time.Second) // should timeout } func doUnary(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Unary RPC...") req := &greetpb.GreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Harry", LastName: "Potter", }, } res, err := c.Greet(context.Background(), req) if err != nil { log.Fatalf("error whiole caling Greet RPC: %v\n", err) } log.Printf("Response from Greet: %v\n", res.Result) } func doServerStreaming(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Server Streaming PRC...") req := &greetpb.GreetManyTimesRequest { Greeting: &greetpb.Greeting { FirstName: "Hary", LastName: "Potter", }, } resStream, err := c.GreetManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling GreetManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { fmt.Println(io.EOF) break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from greetManyTimes: %v", msg.GetResult()) } } func doClientStreaming(c greetpb.GreetServiceClient) { fmt.Println("Streaming to do a Cient streaming RPC...") requests := []*greetpb.LongGreetRequest { &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Harry", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Mark", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Mary", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Jane", }, }, &greetpb.LongGreetRequest { Greeting: &greetpb.Greeting { FirstName: "Upper", }, }, } stream, err := c.LongGreet(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } for _, req := range requests { fmt.Println("Sending req: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receiving response from LongGreet: %v\n", err) } fmt.Printf("LongGreet Response: %v\n", res) } func doBiDiStreaming(c greetpb.GreetServiceClient) { fmt.Println("Streaming to do a Bidi Streaming RPC...") // we create a stream by invoking the client stream, err := c.GreetEveryone(context.Background()) if err != nil { log.Fatalf("Error while creating stream: %v", err) return } requests := []*greetpb.GreetEveryoneRequest{ &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Stephane", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "John", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Lucy", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Mark", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Piper", }, }, } waitc := make(chan struct{}) // we send a bunch of messages to the client (go routine) go func() { for _, req := range requests { fmt.Printf("Sending message: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } stream.CloseSend() }() // we receive a bunch of messages from the client (go routine) go func() { for { res, err := stream.Recv() if err == io.EOF{ break } if err != nil { log.Fatalf("Error while receiving: %v", err) close(waitc) } fmt.Printf("Received: %v\n", res.GetResult()) } }() // block until everything is done <-waitc } // new func doUnaryWithDeadline(c greetpb.GreetServiceClient, timeout time.Duration) { fmt.Println("Starting to do a UnaryWithDeadline RPC...") req := &greetpb.GreetWithDeadLineRequest { Greeting: &greetpb.Greeting { FirstName: "Harry", LastName: "Potter", }, } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() res, err := c.GreetWithDeadline(ctx, req) if err != nil { statusErr, ok := status.FromError(err) if ok { if statusErr.Code() == codes.DeadlineExceeded { fmt.Println("Timeout was hit! Deadline was exceeded") } else { fmt.Println("unexpected error: %v", statusErr) } } else { log.Fatalf("error while calling UnaryWithDeadline RPC: %v", err) } return } log.Printf("Response from UnaryWithDeadline: %v", res.Result) } |
GreetWithDeadline API – 動作確認 –
Server -> Clientの順にプログラムを実行してください。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
make server go run greet/greet_server/server.go Hello World GreetWithDeadline function was invoked with greeting:<first_name:"Harry" last_name:"Potter" > GreetWithDeadline function was invoked with greeting:<first_name:"Harry" last_name:"Potter" > make client go run greet/greet_client/client.go Hello I'm a client Starting to do a UnaryWithDeadline RPC... 2019/10/28 10:47:31 Response from UnaryWithDeadline: Hello Harry Starting to do a UnaryWithDeadline RPC... Timeout was hit! Deadline was exceeded |
1つ目のリクエストは5秒間のDeadlineを設けました。
その範囲内で処理が完了したため、Timeoutは発生しなかったようです。
一方、2つ目のリクエストは1秒間に設定したため、処理が完了せずにTimeoutになりました。
このように、開発者が各Clientコード上でDeadlineを定めます。
参考 ~ gRPC Error ~
gRPCのエラーハンドリングの参考情報を載せておきます。
次回
次回もgRPCについて学習していきましょう。
コメントを残す
コメントを投稿するにはログインしてください。