Protobuf protocol buffers 是一种灵活,高效,自动化机制的结构数据序列化方法-可类比 XML,但是比 XML 更小、更快、更为简单。有性能高、跨语言、开发容易维护等特点,而且天然集成gRPC框架,因此,Protobuf是微服务中的基石。
Protobuf序列化和反序列化,json化的方式在文档:网络编程 和RPC 中,这里不再赘叙。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1 ; } message HelloReply { string message = 1 ; }
语法 Protobuf文件一般以.proto
结尾
1 2 3 syntax = "proto3" ; option go_package = "920/minegrpc;minegrpc" ;
定义数据结构
1 2 3 message HelloRequest { string name = 1 ; }
下载工具
目录树
1 2 3 minegrpc └── helloworld └── helloworld.proto
生成stub代码
1 2 3 $ cd minegrpc && protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ helloworld/helloworld.proto
--go_out=.
:代表将数据结构 go代码生成到当前目录
--go-grpc_out=.
:代表将stub go代码生成到当前目录
paths=source_relative
:会使文件中option go_package = "920/grpc;grpc";
定义失效,会生成到helloworld
目录下
生成代码如下:
1 2 3 4 5 minegrpc └── helloworld ├── helloworld.pb.go ├── helloworld.proto └── helloworld_grpc.pb.go
如果要生成到minegrpc
目录下,可以通过--proto_path
指定目录,设置当前目录为grpc
目录(或者 -I
指定目录)
1 2 3 4 $ protoc --proto_path=./helloworld \ --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ helloworld.proto
结果
1 2 3 4 5 minegrpc ├── helloworld │ └── helloworld.proto ├── helloworld.pb.go // 生成结构体对象 └── helloworld_grpc.pb.go // 生成对象方法
引用包中的结构体和方法是,则是通过
1 import minegrpc "gostudy/920/minegrpc"
生成的结构体,可以看到注释也一起生成
1 2 3 4 5 6 7 8 type HelloRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` }
生成的服务端方法,在helloworld_grpc.pb.go
中
1 2 3 4 5 6 7 type GreeterServer interface { SayHello(context.Context, *HelloRequest) (*HelloReply, error ) SayHelloAgain(context.Context, *HelloRequest) (*HelloReply, error ) mustEmbedUnimplementedGreeterServer() }
客户端方法也在helloworld_grpc.pb.go
中
1 2 3 4 5 6 7 8 9 10 type GreeterClient interface { SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error ) SayHelloAgain(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error ) } func NewGreeterClient (cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} }
数据类型 protoco buffer中的数据类型对照:proto3 ,中文:高效的数据压缩编码方式 Protobuf
默认值:
当一个消息被解析时,如果被编码的信息不包含一个特定的singular元素,被解析的对象锁对应的域被设置为一个默认值,不同类型默认值如下:
string,默认是一个空的string
bytes,默认是一个空的bytes
bools,默认是false
数值类型,默认是0
枚举类型,默认是第一个枚举值,也就是0
消息体,在go里面是nil,而不是为空
重复型,空slice
默认值在不同的语言中可能不同,具体的默认值可以查看文档:generated code guide
例如枚举类型
1 2 3 4 5 enum Goods { GOODS_UNSPECIFIED = 0 ; GOODS_APPLE = 1 ; GOODS_BANANA = 2 ; }
生成代码
1 2 3 4 5 6 7 type Goods int32 const ( Goods_GOODS_UNSPECIFIED Goods = 0 Goods_GOODS_APPLE Goods = 1 Goods_GOODS_BANANA Goods = 2 )
例如map类型
1 2 3 message Map { map<string ,string > mp =1 ; }
生成代码
1 2 3 4 5 6 7 type Map struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Mp map [string ]string `protobuf:"bytes,1,rep,name=mp,proto3" json:"mp,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` }
例如时间戳类型
1 2 3 message Data { google.protobuf.Timestamp time =1 ; }
生成代码
1 2 3 4 5 6 7 type Data struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Time *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=time,proto3" json:"time,omitempty"` }
而使用方式是通过
1 2 3 4 5 func New (t time.Time) *Timestamp { return &Timestamp{Seconds: int64 (t.Unix()), Nanos: int32 (t.Nanosecond())} }
引入其他的proto文件的message 引入同目录下的proto文件helloworld.proto
1 2 3 4 5 6 7 8 9 syntax = "proto3" ; import "base.proto" ; option go_package = "920/minegrpc;minegrpc" ; service Greeter { rpc Ping(Empty) returns (Pong) {} }
base.proto
1 2 3 4 5 6 7 8 9 10 11 syntax = "proto3"; option go_package = "920/minegrpc;minegrpc"; message Empty { } message Pong{ }
这种形式,如果只生成helloworld.proto
对应的代码,是无法直接使用Empty
和Pong
的,还需要将base.proto
生成代码。
引入第三方包
1 2 3 4 5 6 7 8 9 10 syntax = "proto3" ; import "base.proto" ;import "google/protobuf/empty.proto" ; option go_package = "920/minegrpc;minegrpc" ; service Greeter { rpc Ping(google.protobuf.Empty) returns (Pong) {} }
第三方可以引用的包在go/pkg/mod/google.golang.org/[email protected] /types/known
中
在源码中,可以看到包是option go_package = "google.golang.org/protobuf/types/known/emptypb";
嵌套message 可以直接嵌套引用
1 2 3 4 5 6 message Empty {} message Pong { repeated Empty empty = 1 ; }
也可以只放在内部使用
1 2 3 4 5 6 message Pong { message Empty { string data = 1 ; } repeated Empty empty = 1 ; }
也可以引用其他的message内部嵌套的message
1 2 3 4 5 6 7 8 9 message Empty { Pong.Data data = 1 ; } message Pong { message Data { string data = 1 ; } }
gRPC gRPC的框架或者说插件,可以将proto中的rpc service生成golang的方法代码。
例如在proto中
1 2 3 4 5 service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} }
在服务端生成的代码
1 2 3 4 5 6 7 8 9 type GreeterServer interface { SayHello(context.Context, *HelloRequest) (*HelloReply, error ) mustEmbedUnimplementedGreeterServer() } func RegisterGreeterServer (s grpc.ServiceRegistrar, srv GreeterServer) { s.RegisterService(&Greeter_ServiceDesc, srv) }
在客户端生成的代码
1 2 3 4 5 6 7 8 9 10 11 12 type GreeterClient interface { SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error ) } type greeterClient struct { cc grpc.ClientConnInterface } func NewGreeterClient (cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} }
服务端生成的服务需要进行注册,客户端生成的代码,则构造客户端对象(interface
),进行调用。
客户端服务端之间的模式 简单模式 Simple RPC,客户端发起一次请求,服务端响应一个数据,跟平时熟悉的RPC没有大的区别。
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package mainimport ( "context" "log" "net" "google.golang.org/grpc" "gostudy/920/minegrpc" ) type Service struct { minegrpc.UnimplementedGreeterServer } func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error ) { return &minegrpc.HelloReply{Message: "hello" + req.Name}, nil } func (s *Service) SayHelloAgain(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error ) { return &minegrpc.HelloReply{Message: "hello again" + req.Name}, nil } func main () { listener, err := net.Listen("tcp" , ":8080" ) if err != nil { log.Fatal(err) } s := grpc.NewServer() minegrpc.RegisterGreeterServer(s, &Service{}) log.Fatal(s.Serve(listener)) }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package mainimport ( "context" "fmt" "log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gostudy/920/minegrpc" ) func main () { dial, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal(err) } defer dial.Close() client := minegrpc.NewGreeterClient(dial) ctx, _ := context.WithCancel(context.Background()) req := minegrpc.HelloRequest{Name: "abc" } hello, err := client.SayHello(ctx, &req) if err != nil { log.Fatal(err) } fmt.Println(hello.Message) }
ps:简单模式也是长连接,只是收发过程是一对一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func main () { dial, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal(err) } defer dial.Close() client := minegrpc.NewGreeterClient(dial) ctx, _ := context.WithCancel(context.Background()) req := minegrpc.HelloRequest{Name: "abc" } hello, err := client.SayHello(ctx, &req) if err != nil { log.Fatal(err) } fmt.Println(hello.Message) select {} }
客户端处于阻塞状态时,通过netstat
可以看到socket信息
1 2 3 ➜ netstat -an | grep 8080 tcp4 0 0 127.0.0.1.8080 127.0.0.1.65502 ESTABLISHED tcp4 0 0 127.0.0.1.65502 127.0.0.1.8080 ESTABLISHED
服务端数据流模式 Server-side streaming RPC:客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源不断的返回给客户端。
1 2 3 4 5 service Greeter { rpc GetStream (HelloRequest) returns (stream HelloReply) {} }
客户端数据流模式 Client-side streaming RPC :客户端源源不断向服务端发送数据流,发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器发送数据。
1 2 3 4 5 service Greeter { rpc PutStream (stream HelloRequest) returns (HelloReply) {} }
双向数据流模式 Bidirectional streaming RPC:客户端和服务端都可以向对方发送数据流,这个时候数据可以同时相互发送,也就是可以实现实时交互。典型的例子是聊天机器人,机器人可以源源不断发送信息,客户端也可以向聊天机器人源源不断发送信息。
1 2 3 4 service Greeter { rpc AllStream (stream HelloRequest) returns (HelloReply) {} }
此时,通过protoc生成代码之后,方法为
1 2 3 4 5 6 7 8 type GreeterServer interface { GetStream(*HelloRequest, Greeter_GetStreamServer) error PutStream(Greeter_PutStreamServer) error AllStream(Greeter_AllStreamServer) error mustEmbedUnimplementedGreeterServer() }
服务端数据流模式,服务端代码
1 2 3 4 5 6 7 8 9 10 func (s *Service) GetStream(req *minegrpc.HelloRequest, res minegrpc.Greeter_GetStreamServer) error { for i := 0 ; i < 10 ; i++ { if err := res.Send(&minegrpc.HelloReply{Message: "hello " + req.Name + ":" + time.Now().String()}); err != nil { return err } time.Sleep(500 * time.Millisecond) } return nil }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package mainimport ( "context" "fmt" "log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gostudy/920/minegrpc" ) func main () { dial, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal(err) } defer dial.Close() client := minegrpc.NewGreeterClient(dial) ctx, _ := context.WithCancel(context.Background()) req := minegrpc.HelloRequest{Name: "abc" } hello, err := client.GetStream(ctx, &req) if err != nil { log.Fatal(err) } for { reply, err := hello.Recv() if err != nil { log.Fatal(err) } fmt.Println(reply.Message) } }
执行效果,客户端显示
1 2 3 4 5 6 7 8 9 10 11 hello abc:2022-09-21 23:41:52.996397 +0800 CST m=+4.289561293 hello abc:2022-09-21 23:41:53.497469 +0800 CST m=+4.790634043 hello abc:2022-09-21 23:41:53.998574 +0800 CST m=+5.291739501 hello abc:2022-09-21 23:41:54.500105 +0800 CST m=+5.793270918 hello abc:2022-09-21 23:41:55.003888 +0800 CST m=+6.297055335 hello abc:2022-09-21 23:41:55.505045 +0800 CST m=+6.798212876 hello abc:2022-09-21 23:41:56.006222 +0800 CST m=+7.299389793 hello abc:2022-09-21 23:41:56.507297 +0800 CST m=+7.800465251 hello abc:2022-09-21 23:41:57.008563 +0800 CST m=+8.301732210 hello abc:2022-09-21 23:41:57.509321 +0800 CST m=+8.802490793 2022/09/21 23:41:58 EOF
客户端数据流模式,服务端代码
1 2 3 4 5 6 7 8 9 func (s *Service) PutStream(req minegrpc.Greeter_PutStreamServer) error { for { recv, err := req.Recv() if err != nil { return err } fmt.Println(recv.Name) } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "context" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gostudy/920/minegrpc" ) func main () { dial, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal(err) } defer dial.Close() client := minegrpc.NewGreeterClient(dial) ctx, _ := context.WithCancel(context.Background()) hello, err := client.PutStream(ctx) if err != nil { log.Fatal(err) } for i := 0 ; i < 5 ; i++ { req := minegrpc.HelloRequest{Name: "abc: " + time.Now().String()} err := hello.Send(&req) if err != nil { log.Fatal(err) } time.Sleep(time.Second) } }
执行结果服务端显示
1 2 3 4 5 abc: 2022-09-21 23:47:15.221367 +0800 CST m=+0.003326418 abc: 2022-09-21 23:47:16.222609 +0800 CST m=+1.004569751 abc: 2022-09-21 23:47:17.223701 +0800 CST m=+2.005663168 abc: 2022-09-21 23:47:18.224789 +0800 CST m=+3.006752376 abc: 2022-09-21 23:47:19.225939 +0800 CST m=+4.007903626
双向流模式,服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (s *Service) AllStream(req minegrpc.Greeter_AllStreamServer) error { wg := sync.WaitGroup{} wg.Add(2 ) go func () { defer wg.Done() for { recv, err := req.Recv() if err != nil { log.Println(err) return } fmt.Println("server receive: " , recv.Name) } }() go func () { defer wg.Done() for { send := minegrpc.HelloReply{Message: "server: " + time.Now().String()} err := req.SendMsg(&send) if err != nil { log.Println(err) return } fmt.Println("server send: " , send.Message) } }() wg.Wait() return nil }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func main () { log.SetFlags(log.Llongfile) dial, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal(err) } defer dial.Close() client := minegrpc.NewGreeterClient(dial) ctx, _ := context.WithCancel(context.Background()) hello, err := client.AllStream(ctx) if err != nil { log.Fatal(err) } wg := sync.WaitGroup{} wg.Add(2 ) go func () { defer wg.Done() for i := 0 ; i < 5 ; i++ { req := minegrpc.HelloRequest{Name: "client: " + time.Now().String()} err := hello.Send(&req) if err != nil && err != io.EOF { log.Fatal(err) } fmt.Println("client send: " , req.Name) } }() go func () { defer wg.Done() for i := 0 ; i < 5 ; i++ { var req minegrpc.HelloReply err = hello.RecvMsg(&req) if err != nil { log.Println(err) } fmt.Println("client receive: " , req.Message) } }() wg.Wait() }
服务端结果显示
1 2 3 4 5 6 7 8 9 10 11 server send: server: 2022 -09 -22 00 :16 :46.397325 +0800 CST m=+1.809850334 server receive: client: 2022 -09 -22 00 :16 :46.397059 +0800 CST m=+0.004309168 server receive: client: 2022 -09 -22 00 :16 :46.397447 +0800 CST m=+0.004697376 server receive: client: 2022 -09 -22 00 :16 :46.397453 +0800 CST m=+0.004702835 server receive: client: 2022 -09 -22 00 :16 :46.397456 +0800 CST m=+0.004705835 server receive: client: 2022 -09 -22 00 :16 :46.397458 +0800 CST m=+0.004708251 server send: server: 2022 -09 -22 00 :16 :46.397959 +0800 CST m=+1.810484501 server send: server: 2022 -09 -22 00 :16 :46.397961 +0800 CST m=+1.810486334 server send: server: 2022 -09 -22 00 :16 :46.397964 +0800 CST m=+1.810488959 main.go :61 : rpc error : code = Canceled desc = context canceled main.go :48 : rpc error : code = Canceled desc = context canceled
客户端显示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 client send: client: 2022 -09 -22 00 :16 :46.397059 +0800 CST m=+0.004309168 client send: client: 2022 -09 -22 00 :16 :46.397447 +0800 CST m=+0.004697376 client send: client: 2022 -09 -22 00 :16 :46.397453 +0800 CST m=+0.004702835 client send: client: 2022 -09 -22 00 :16 :46.397456 +0800 CST m=+0.004705835 client send: client: 2022 -09 -22 00 :16 :46.397458 +0800 CST m=+0.004708251 main.go :51 : rpc error : code = Unknown desc = grpc: client streaming protocol violation: get <nil >, want <EOF> client receive: server: 2022 -09 -22 00 :16 :46.397679 +0800 CST m=+1.810204626 main.go :51 : rpc error : code = Unknown desc = grpc: client streaming protocol violation: get <nil >, want <EOF> client receive: server: 2022 -09 -22 00 :16 :46.397704 +0800 CST m=+1.810229084 main.go :51 : rpc error : code = Unknown desc = grpc: client streaming protocol violation: get <nil >, want <EOF> client receive: server: 2022 -09 -22 00 :16 :46.397709 +0800 CST m=+1.810234459 main.go :51 : rpc error : code = Unknown desc = grpc: client streaming protocol violation: get <nil >, want <EOF> client receive: server: 2022 -09 -22 00 :16 :46.397713 +0800 CST m=+1.810238292 main.go :51 : rpc error : code = Unknown desc = grpc: client streaming protocol violation: get <nil >, want <EOF> client receive: server: 2022 -09 -22 00 :16 :46.397715 +0800 CST m=+1.810239917
可见,并不是客户端接受到,服务端才发送,而是一端直接发送到对端。
在gRPC调用过程中,需要传递一些隐式数据(例如token
,不需要嵌入业务逻辑的代码,需要用header
传输),而且是跨进程通信,此时就可以通过gRPC
中的context
传递,将metadata
传入context
中。metadata
就可以用于权限验证、链路跟踪等。
metadata是以key-value
的形式存储数据的,key是string
类型,value
是[]string
,即一个字符串数组类型。http
中的header
的生命周期是一次http
请求,metadata
的生命周期就是一次RPC
调用。
1 2 type MD map [string ][]string
使用方式
1 md := metadata.Pairs("key1" , "value1" )
RPC客户端发送
1 reply,err := client.SomeRPC(ctx,req)
RPC服务端接受
1 2 3 4 5 6 7 8 md, ok := metadata.FromIncomingContext(ctx) if !ok { log.Fatal("something wrong" ) } for k, v := range md { fmt.Printf("key: %s,value: %v" , k, v) }
拦截器 也就是中间件,例如记录接口响应时间,鉴权,做过滤,获取请求放信息等。
拦截器分为:一元拦截器(对于简单模式);和流拦截器(对于数据流模式)
在创建Server中需要注入ServerOption
1 2 3 4 5 6 7 8 func NewServer (opt ...ServerOption) *Server {}func UnaryInterceptor (i UnaryServerInterceptor) ServerOption {type UnaryServerInterceptor func (ctx context.Context, req interface {}, info *UnaryServerInfo, handler UnaryHandler) (resp interface {}, err error )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func intercept (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { fmt.Println("receive a new request." ) resp, err = handler(ctx, req) fmt.Println("request already done." ) return resp, err } func main () { log.SetFlags(log.Lshortfile) listener, err := net.Listen("tcp" , ":8080" ) if err != nil { log.Fatal(err) } opt := grpc.UnaryInterceptor(intercept) s := grpc.NewServer(opt) minegrpc.RegisterGreeterServer(s, &Service{}) log.Fatal(s.Serve(listener)) }
执行之后,服务端可以看到
拦截器也可以放在客户端
1 2 3 4 5 6 func Dial (target string , opts ...DialOption) (*ClientConn, error ) {}func WithUnaryInterceptor (f UnaryClientInterceptor) DialOption {}type UnaryClientInterceptor func (ctx context.Context, method string , req, reply interface {}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func main () { log.SetFlags(log.Lshortfile) var opts []grpc.DialOption opts = append (opts, grpc.WithUnaryInterceptor(interceptor)) opts = append (opts, grpc.WithTransportCredentials(insecure.NewCredentials())) dial, err := grpc.Dial("127.0.0.1:8080" , opts...) if err != nil { log.Fatal(err) } defer dial.Close() client := minegrpc.NewGreeterClient(dial) ctx, _ := context.WithCancel(context.Background()) hello, err := client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka" }) if err != nil { log.Fatal(err) } fmt.Println(hello.Message) } func interceptor (ctx context.Context, method string , req, reply interface {}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) fmt.Printf("spend: %s\n" , time.Since(start)) return err }
执行之后,客户端可以看到
1 2 spend: 1.638416ms hellomitaka
拦截器的开源项目:go-grpc-midware
例如做一个身份验证的拦截器
服务端通过拦截器验证token:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func auth (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil , status.Error(codes.Unauthenticated, "no token" ) } log.Println(md) var ( appid string appkey string ) if v1, ok := md["appid" ]; ok { appid = v1[0 ] } if v2, ok := md["appkey" ]; ok { appkey = v2[0 ] } if !(appid == "mitaka" && appkey == "pwd" ) { return nil , status.Error(codes.Unauthenticated, "auth failed" ) } return handler(ctx, req) }
客户端通过拦截器加载token
1 2 3 4 5 6 7 8 9 10 func auth (ctx context.Context, method string , req, reply interface {}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { md := metadata.Pairs( "appid" , "mitaka" , "appkey" , "pwd" , ) ctx = metadata.NewOutgoingContext(ctx, md) err := invoker(ctx, method, req, reply, cc, opts...) return err }
在Golang中,针对认证拦截器,可以通过WithPerRPCCredentials
的拦截器
1 2 3 4 5 6 7 func WithPerRPCCredentials (creds credentials.PerRPCCredentials) DialOption {}type PerRPCCredentials interface { GetRequestMetadata(ctx context.Context, uri ...string ) (map [string ]string , error ) RequireTransportSecurity() bool }
在客户端上实现
1 2 3 4 5 6 7 8 9 10 11 12 13 type perRPCCredentials struct {} func (p *perRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string ) (map [string ]string , error ) { return map [string ]string { "appid" : "mitaka" , "appkey" : "pwd" , }, nil } func (p *perRPCCredentials) RequireTransportSecurity() bool { return false }
使用
1 2 3 4 var opts []grpc.DialOptionopts = append (opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithPerRPCCredentials(&perRPCCredentials{})) dial, err := grpc.Dial("127.0.0.1:8080" , opts...)
验证器 validator,验证传入参数是否满足要求。更多的匹配规则在官方文档中:protoc-gen-validate
安装
1 2 3 4 5 # fetches this repo into $GOPATH go get -d github.com/envoyproxy/protoc-gen-validate # installs PGV into $GOPATH /bin make build
或者
1 go install github.com/envoyproxy/protoc-gen-validate@latest
拷贝validator的proto,目录如下
1 2 3 4 5 6 7 8 9 . ├── base.pb.go ├── helloworld │ ├── base.proto │ └── helloworld.proto ├── helloworld.pb.go ├── helloworld_grpc.pb.go └── validate └── validate.proto
定义数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 message Person { uint64 id = 1 [(validate.rules).uint64 .gt = 999 ]; string email = 2 [(validate.rules).string .email = true ]; string name = 3 [(validate.rules).string = { pattern: "^[^[0-9]A-Za-z]+( [^[0-9]A-Za-z]+)*$" , max_bytes: 256 , }]; Location home = 4 [(validate.rules).message.required = true ]; message Location { double lat = 1 [(validate.rules).double = { gte: -90 , lte: 90 }]; double lng = 2 [(validate.rules).double = { gte: -180 , lte: 180 }]; } }
生成代码
1 2 3 4 5 6 protoc --proto_path=. \ --proto_path=./helloworld \ --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ --validate_out="lang=go:../../" \ // 注意生成路径,由于在helloworld.proto指定package,因此路径会按照当前路径生成目录 helloworld.proto
可以看到生成一个helloworld.pb.validate.go
1 2 3 4 5 6 7 8 func main () { p := minegrpc.Person{ Id: 0 , } if err := p.Validate(); err != nil { log.Fatal(err) } }
可以看到出现报错:
1 2022 /09 /23 10 :33 :45 invalid Person.Id: value must be greater than 999
如果每个参数都这样校验,逻辑会有很多重复的,此时可以使用拦截器。
那么其中有个问题,如何确定req有没有validate方法,这里就可以通过interface和断言实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type validator interface { Validate() error } func intercept (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { if v, ok := req.(validator); ok && v.Validate() != nil { return nil , status.Error(codes.InvalidArgument, err.Error()) } return handler(ctx, resp) }
异常处理 在HTTP中有Code标明这个请求的状态,在gRPC中也有状态码:
官方文档:Status Response Codes
官方文档:GRPC Core
服务端通过制定返回错误
1 2 3 func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error ) { return nil , status.Error(codes.NotFound, "not found" ) }
客户端调用之后,可以收到
1 2 3 4 5 6 7 hello, err := client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka" }) if err != nil { if e, ok := status.FromError(err); ok { fmt.Println(e.Code()) fmt.Println(e.Message()) } }
超时机制 gRPC在客户端可以设置超时时间,当出现网络抖动、网络拥塞、服务端处理响应慢、服务端依赖数据库等第三方导致很慢,如果一直等待返回,如果后端服务会有连续调用,已经响应过慢的请求本来就没有必要再响应,而且还会占用服务端资源,因此需要使用超时机制。在gRPC中,超时可以向下传递,可以在请求的每个节点上都实现超时控制,在当前节点已经超时,则不需要将请求再传递到下一个节点。在熔断、降级、限流的功能中,都需要用到超时机制。
在客户端请求时,设置超时
1 2 3 4 client := minegrpc.NewGreeterClient(dial) ctx, cancel := context.WithTimeout(context.Background(), 1 *time.Second) defer cancel()_, err = client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka" })
服务端改动,改成sleep 2s
可以看到客户端请求之后
1 2 DeadlineExceeded context deadline exceeded
gRPC开发中碰到的问题
params:参数默认值,是否为非空
在接口对接过程中,例如一些更新接口,由于没传和零值可能造成混淆,例如开关,没有更新开关,gRPC会默认收到false
,与将开关关闭逻辑一样,因此需要区分零值和空值;还有一种情况是状态请求,响应的请求如果是零值,则不会返回,就变成当开关是off
,此时获取开关状态是空的,这个需要注意。
在Golang
中,可以通过指针进行判断值是nil
还是为零值,将这个思想移植过来,则可以通过oneof
或者wrappers
的方式。
oneof
1 2 3 4 5 6 7 message HelloRequest { string name = 1 ; oneof switch { bool on =2 ; bool off=3 ; } }
在使用中,就需要进行判断
1 2 3 if req.GetOn() { stat = true }
可以看到,相比较下,这个方式不易读,更加推崇wrappers
的方式
1 2 3 4 5 6 7 8 9 10 11 message HelloReply { string message = 1 ; google.protobuf.BoolValue switch =2 ; Gender sex =3 ; } enum Gender { SEX_UNSPECIFIED = 0 ; SEX_MALE = 1 ; SEX_FEMALE = 2 ; }
使用过程通过与nil进行判断,更加符合Golang中的习惯
1 2 3 if req.Switch != nil { stat = req.Switch.Value }
枚举类型同理,在获取值时,需要先判断是否为未指定(未指定是枚举类型的零值)
1 2 3 if req.Sex != minegrpc.Gender_SEX_UNSPECIFIED { sex = req.Sex }
swagger
分组
通过gRPC-GATEWAY
可以生成HTTP
请求,还可以通过swagger
生成swagger
,如果所有的rpc
请求都放在同一个service
中,swagger
就没有分组,swagger
默认以service
进行分组。
Proto
枚举下,使用int
而不是string
,以及变量输出格式为下划线(蛇形体)而不是驼峰体
在使用枚举类型时,返回的结果可能会将枚举的值返回出来,而在使用proto进行对接时,枚举类型暴露出来意义不大,而且更容易出错,因此建议使用枚举类型的序号。在RPC交互时,推荐使用序号,在存储到数据库时,推荐使用值。
1 2 3 4 5 6 7 8 9 10 11 12 13 func NewGRPCGateway () *runtime.ServeMux { return runtime.NewServeMux(runtime.WithMarshalerOption( runtime.MIMEWildcard, &runtime.JSONPb{ MarshalOptions: protojson.MarshalOptions{ UseEnumNumbers: true , UseProtoNames: true , }, UnmarshalOptions: protojson.UnmarshalOptions{}, }, )) }
ps:RPC交互时,传枚举类型还是传值,都可以。
gRPC实现文件上传相比而言比较麻烦,
一些坑:
所有字段都可选,默认不填为零值。这是为了适配新老系统。因此需要在设计阶段就要考虑。例如,增加一个状态字段
字段序号确定之后,就不能改变,改变了会影响老系统对接
定义字段的时候,顺序没有关系,只有字段后面的数字会有影响
使用int32
而不是int64
,不然数字会变成string
1 2 option go_package = "api/v1;v1" ;