微服务开发之gRPC

Protobuf

protocol buffers 是一种灵活,高效,自动化机制的结构数据序列化方法-可类比 XML,但是比 XML 更小、更快、更为简单。有性能高、跨语言、开发容易维护等特点,而且天然集成gRPC框架,因此,Protobuf是微服务中的基石。

Protobuf序列化和反序列化,json化的方式在文档:网络编程RPC中,这里不再赘叙。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

语法

Protobuf文件一般以.proto结尾

1
2
3
syntax = "proto3"; // 表明版本是proto3,需要放在最开头

option go_package = "920/minegrpc;minegrpc"; // 代表生成的go代码,包目录是920/minegrpc,包名称是minegrpc

定义数据结构

1
2
3
message HelloRequest {
string name = 1; // 1 代表字段编号
}

下载工具

1
2
$ go install google.golang.org/protobuf/cmd/[email protected]
$ go install google.golang.org/grpc/cmd/[email protected]

目录树

1
2
3
minegrpc
└── helloworld
└── helloworld.proto

生成stub代码

1
2
3
$ cd minegrpc && protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
helloworld/helloworld.proto

--go_out=.:代表将数据结构go代码生成到当前目录

--go-grpc_out=.:代表将stubgo代码生成到当前目录

paths=source_relative:会使文件中option go_package = "920/grpc;grpc";定义失效,会生成到helloworld目录下

生成代码如下:

1
2
3
4
5
minegrpc
└── helloworld
├── helloworld.pb.go
├── helloworld.proto
└── helloworld_grpc.pb.go

如果要生成到minegrpc目录下,可以通过--proto_path指定目录,设置当前目录为grpc目录(或者 -I 指定目录)

1
2
3
4
$ protoc --proto_path=./helloworld \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
helloworld.proto

结果

1
2
3
4
5
minegrpc
├── helloworld
│   └── helloworld.proto
├── helloworld.pb.go // 生成结构体对象
└── helloworld_grpc.pb.go // 生成对象方法

引用包中的结构体和方法是,则是通过

1
import minegrpc "gostudy/920/minegrpc"

生成的结构体,可以看到注释也一起生成

1
2
3
4
5
6
7
8
// The request message containing the user's name.
type HelloRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // 1 代表字段编号
}

生成的服务端方法,在helloworld_grpc.pb.go

1
2
3
4
5
6
7
type GreeterServer interface {
// Sends a greeting
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
// Sends another greeting
SayHelloAgain(context.Context, *HelloRequest) (*HelloReply, error)
mustEmbedUnimplementedGreeterServer()
}

客户端方法也在helloworld_grpc.pb.go

1
2
3
4
5
6
7
8
9
10
type GreeterClient interface {
// Sends a greeting
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
// Sends another greeting
SayHelloAgain(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}

数据类型

protoco buffer中的数据类型对照:proto3,中文:高效的数据压缩编码方式 Protobuf

image-20220922210413261

默认值:

当一个消息被解析时,如果被编码的信息不包含一个特定的singular元素,被解析的对象锁对应的域被设置为一个默认值,不同类型默认值如下:

  • string,默认是一个空的string
  • bytes,默认是一个空的bytes
  • bools,默认是false
  • 数值类型,默认是0
  • 枚举类型,默认是第一个枚举值,也就是0
  • 消息体,在go里面是nil,而不是为空
  • 重复型,空slice

默认值在不同的语言中可能不同,具体的默认值可以查看文档:generated code guide

例如枚举类型

1
2
3
4
5
enum Goods {    // 枚举类型,在Golang中生成是int32类型
GOODS_UNSPECIFIED = 0; // 注意,枚举类型的第一个字段,一定是0
GOODS_APPLE = 1;
GOODS_BANANA = 2;
}

生成代码

1
2
3
4
5
6
7
type Goods int32

const (
Goods_GOODS_UNSPECIFIED Goods = 0
Goods_GOODS_APPLE Goods = 1
Goods_GOODS_BANANA Goods = 2
)

例如map类型

1
2
3
message Map {
map<string,string> mp =1; // 在Golang中,会生成map[string]string
}

生成代码

1
2
3
4
5
6
7
type Map struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Mp map[string]string `protobuf:"bytes,1,rep,name=mp,proto3" json:"mp,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}

例如时间戳类型

1
2
3
message Data {
google.protobuf.Timestamp time =1;
}

生成代码

1
2
3
4
5
6
7
type Data struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Time *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=time,proto3" json:"time,omitempty"`
}

而使用方式是通过

1
2
3
4
5
// types/known/timestamppb/timestamp.pb.go

func New(t time.Time) *Timestamp {
return &Timestamp{Seconds: int64(t.Unix()), Nanos: int32(t.Nanosecond())}
}

引入其他的proto文件的message

引入同目录下的proto文件helloworld.proto

1
2
3
4
5
6
7
8
9
syntax = "proto3"; // 表明版本是proto3,需要放在最开头

import "base.proto"; // 通过import加文件名称即可

option go_package = "920/minegrpc;minegrpc"; // 代表生成的go代码,包目录是920/grpc,包名称是grpc

service Greeter {
rpc Ping(Empty) returns(Pong){}
}

base.proto

1
2
3
4
5
6
7
8
9
10
11
syntax = "proto3";

option go_package = "920/minegrpc;minegrpc";

message Empty {

}

message Pong{

}

这种形式,如果只生成helloworld.proto对应的代码,是无法直接使用EmptyPong的,还需要将base.proto生成代码。

引入第三方包

1
2
3
4
5
6
7
8
9
10
syntax = "proto3"; // 表明版本是proto3,需要放在最开头

import "base.proto";
import "google/protobuf/empty.proto"; // 通过路径引用

option go_package = "920/minegrpc;minegrpc"; // 代表生成的go代码,包目录是920/grpc,包名称是grpc

service Greeter {
rpc Ping(google.protobuf.Empty) returns(Pong){}
}

第三方可以引用的包在go/pkg/mod/google.golang.org/[email protected]/types/known

在源码中,可以看到包是option go_package = "google.golang.org/protobuf/types/known/emptypb";

嵌套message

可以直接嵌套引用

1
2
3
4
5
6
message Empty {
}

message Pong{
repeated Empty empty = 1;
}

也可以只放在内部使用

1
2
3
4
5
6
message Pong{
message Empty {
string data = 1;
}
repeated Empty empty = 1;
}

也可以引用其他的message内部嵌套的message

1
2
3
4
5
6
7
8
9
message Empty {
Pong.Data data = 1;
}

message Pong{
message Data {
string data = 1;
}
}

gRPC

gRPC的框架或者说插件,可以将proto中的rpc service生成golang的方法代码。

例如在proto中

1
2
3
4
5
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

在服务端生成的代码

1
2
3
4
5
6
7
8
9
type GreeterServer interface {
// Sends a greeting
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
mustEmbedUnimplementedGreeterServer()
}

func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
s.RegisterService(&Greeter_ServiceDesc, srv)
}

在客户端生成的代码

1
2
3
4
5
6
7
8
9
10
11
12
type GreeterClient interface {
// Sends a greeting
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

type greeterClient struct {
cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}

服务端生成的服务需要进行注册,客户端生成的代码,则构造客户端对象(interface),进行调用。

客户端服务端之间的模式

简单模式

Simple RPC,客户端发起一次请求,服务端响应一个数据,跟平时熟悉的RPC没有大的区别。

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"context"
"log"
"net"

"google.golang.org/grpc"

"gostudy/920/minegrpc"
)

type Service struct {
minegrpc.UnimplementedGreeterServer
}

func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error) {
return &minegrpc.HelloReply{Message: "hello" + req.Name}, nil
}

func (s *Service) SayHelloAgain(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error) {
return &minegrpc.HelloReply{Message: "hello again" + req.Name}, nil
}

func main() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}

s := grpc.NewServer()
minegrpc.RegisterGreeterServer(s, &Service{})
log.Fatal(s.Serve(listener))
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"context"
"fmt"
"log"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"gostudy/920/minegrpc"
)

func main() {
dial, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)
ctx, _ := context.WithCancel(context.Background())
req := minegrpc.HelloRequest{Name: "abc"}
hello, err := client.SayHello(ctx, &req)
if err != nil {
log.Fatal(err)
}
fmt.Println(hello.Message)
}

ps:简单模式也是长连接,只是收发过程是一对一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
dial, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)
ctx, _ := context.WithCancel(context.Background())
req := minegrpc.HelloRequest{Name: "abc"}
hello, err := client.SayHello(ctx, &req)
if err != nil {
log.Fatal(err)
}
fmt.Println(hello.Message)

select {}
}

客户端处于阻塞状态时,通过netstat可以看到socket信息

1
2
3
➜ netstat -an | grep 8080
tcp4 0 0 127.0.0.1.8080 127.0.0.1.65502 ESTABLISHED
tcp4 0 0 127.0.0.1.65502 127.0.0.1.8080 ESTABLISHED

服务端数据流模式

Server-side streaming RPC:客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源不断的返回给客户端。

1
2
3
4
5
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc GetStream (HelloRequest) returns (stream HelloReply) {} // 服务端返回stream,是服务端流模式
}

客户端数据流模式

Client-side streaming RPC :客户端源源不断向服务端发送数据流,发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器发送数据。

1
2
3
4
5
// The greeting service definition.
service Greeter {
// Sends another greeting
rpc PutStream (stream HelloRequest) returns (HelloReply) {} // 客户端发送stream,是客户端流模式
}

双向数据流模式

Bidirectional streaming RPC:客户端和服务端都可以向对方发送数据流,这个时候数据可以同时相互发送,也就是可以实现实时交互。典型的例子是聊天机器人,机器人可以源源不断发送信息,客户端也可以向聊天机器人源源不断发送信息。

1
2
3
4
// The greeting service definition.
service Greeter {
rpc AllStream (stream HelloRequest) returns (HelloReply) {} // 客户端发送stream,服务端返回stream,是双向流模式
}

此时,通过protoc生成代码之后,方法为

1
2
3
4
5
6
7
8
type GreeterServer interface {
// Sends a greeting
GetStream(*HelloRequest, Greeter_GetStreamServer) error
// Sends another greeting
PutStream(Greeter_PutStreamServer) error
AllStream(Greeter_AllStreamServer) error
mustEmbedUnimplementedGreeterServer()
}

服务端数据流模式,服务端代码

1
2
3
4
5
6
7
8
9
10
func (s *Service) GetStream(req *minegrpc.HelloRequest, res minegrpc.Greeter_GetStreamServer) error {
for i := 0; i < 10; i++ {
if err := res.Send(&minegrpc.HelloReply{Message: "hello " + req.Name + ":" + time.Now().String()}); err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
}

return nil
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"fmt"
"log"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"gostudy/920/minegrpc"
)

func main() {
dial, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)
ctx, _ := context.WithCancel(context.Background())
req := minegrpc.HelloRequest{Name: "abc"}
hello, err := client.GetStream(ctx, &req)
if err != nil {
log.Fatal(err)
}
for {
reply, err := hello.Recv()
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.Message)
}

}

执行效果,客户端显示

1
2
3
4
5
6
7
8
9
10
11
hello abc:2022-09-21 23:41:52.996397 +0800 CST m=+4.289561293
hello abc:2022-09-21 23:41:53.497469 +0800 CST m=+4.790634043
hello abc:2022-09-21 23:41:53.998574 +0800 CST m=+5.291739501
hello abc:2022-09-21 23:41:54.500105 +0800 CST m=+5.793270918
hello abc:2022-09-21 23:41:55.003888 +0800 CST m=+6.297055335
hello abc:2022-09-21 23:41:55.505045 +0800 CST m=+6.798212876
hello abc:2022-09-21 23:41:56.006222 +0800 CST m=+7.299389793
hello abc:2022-09-21 23:41:56.507297 +0800 CST m=+7.800465251
hello abc:2022-09-21 23:41:57.008563 +0800 CST m=+8.301732210
hello abc:2022-09-21 23:41:57.509321 +0800 CST m=+8.802490793
2022/09/21 23:41:58 EOF

客户端数据流模式,服务端代码

1
2
3
4
5
6
7
8
9
func (s *Service) PutStream(req minegrpc.Greeter_PutStreamServer) error {
for {
recv, err := req.Recv()
if err != nil {
return err
}
fmt.Println(recv.Name)
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"gostudy/920/minegrpc"
)

func main() {
dial, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)
ctx, _ := context.WithCancel(context.Background())
hello, err := client.PutStream(ctx)
if err != nil {
log.Fatal(err)
}
for i := 0; i < 5; i++ {
req := minegrpc.HelloRequest{Name: "abc: " + time.Now().String()}
err := hello.Send(&req)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}

执行结果服务端显示

1
2
3
4
5
abc: 2022-09-21 23:47:15.221367 +0800 CST m=+0.003326418
abc: 2022-09-21 23:47:16.222609 +0800 CST m=+1.004569751
abc: 2022-09-21 23:47:17.223701 +0800 CST m=+2.005663168
abc: 2022-09-21 23:47:18.224789 +0800 CST m=+3.006752376
abc: 2022-09-21 23:47:19.225939 +0800 CST m=+4.007903626

双向流模式,服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (s *Service) AllStream(req minegrpc.Greeter_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
recv, err := req.Recv()
if err != nil {
log.Println(err)
return
}
fmt.Println("server receive: ", recv.Name)
}
}()

go func() {
defer wg.Done()
for {
send := minegrpc.HelloReply{Message: "server: " + time.Now().String()}
err := req.SendMsg(&send)
if err != nil {
log.Println(err)
return
}
fmt.Println("server send: ", send.Message)
}
}()

wg.Wait()
return nil
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func main() {
log.SetFlags(log.Llongfile)
dial, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)
ctx, _ := context.WithCancel(context.Background())
hello, err := client.AllStream(ctx)
if err != nil {
log.Fatal(err)
}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
req := minegrpc.HelloRequest{Name: "client: " + time.Now().String()}
err := hello.Send(&req)
if err != nil && err != io.EOF {
log.Fatal(err)
}
//time.Sleep(time.Second)
fmt.Println("client send: ", req.Name)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
var req minegrpc.HelloReply
err = hello.RecvMsg(&req)
if err != nil {
log.Println(err) // 注意这里,接收消息是,一定会返回报错,因此无法对错误进行判断状态
}
fmt.Println("client receive: ", req.Message)
}
}()
wg.Wait()
}

服务端结果显示

1
2
3
4
5
6
7
8
9
10
11
server send:  server: 2022-09-22 00:16:46.397325 +0800 CST m=+1.809850334
server receive: client: 2022-09-22 00:16:46.397059 +0800 CST m=+0.004309168
server receive: client: 2022-09-22 00:16:46.397447 +0800 CST m=+0.004697376
server receive: client: 2022-09-22 00:16:46.397453 +0800 CST m=+0.004702835
server receive: client: 2022-09-22 00:16:46.397456 +0800 CST m=+0.004705835
server receive: client: 2022-09-22 00:16:46.397458 +0800 CST m=+0.004708251 // 后续删除大量服务端代码
server send: server: 2022-09-22 00:16:46.397959 +0800 CST m=+1.810484501
server send: server: 2022-09-22 00:16:46.397961 +0800 CST m=+1.810486334
server send: server: 2022-09-22 00:16:46.397964 +0800 CST m=+1.810488959
main.go:61: rpc error: code = Canceled desc = context canceled
main.go:48: rpc error: code = Canceled desc = context canceled

客户端显示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
client send:  client: 2022-09-22 00:16:46.397059 +0800 CST m=+0.004309168
client send: client: 2022-09-22 00:16:46.397447 +0800 CST m=+0.004697376
client send: client: 2022-09-22 00:16:46.397453 +0800 CST m=+0.004702835
client send: client: 2022-09-22 00:16:46.397456 +0800 CST m=+0.004705835
client send: client: 2022-09-22 00:16:46.397458 +0800 CST m=+0.004708251
main.go:51: rpc error: code = Unknown desc = grpc: client streaming protocol violation: get <nil>, want <EOF>
client receive: server: 2022-09-22 00:16:46.397679 +0800 CST m=+1.810204626
main.go:51: rpc error: code = Unknown desc = grpc: client streaming protocol violation: get <nil>, want <EOF>
client receive: server: 2022-09-22 00:16:46.397704 +0800 CST m=+1.810229084
main.go:51: rpc error: code = Unknown desc = grpc: client streaming protocol violation: get <nil>, want <EOF>
client receive: server: 2022-09-22 00:16:46.397709 +0800 CST m=+1.810234459
main.go:51: rpc error: code = Unknown desc = grpc: client streaming protocol violation: get <nil>, want <EOF>
client receive: server: 2022-09-22 00:16:46.397713 +0800 CST m=+1.810238292
main.go:51: rpc error: code = Unknown desc = grpc: client streaming protocol violation: get <nil>, want <EOF>
client receive: server: 2022-09-22 00:16:46.397715 +0800 CST m=+1.810239917

可见,并不是客户端接受到,服务端才发送,而是一端直接发送到对端。

metadata

在gRPC调用过程中,需要传递一些隐式数据(例如token,不需要嵌入业务逻辑的代码,需要用header传输),而且是跨进程通信,此时就可以通过gRPC中的context传递,将metadata传入context中。metadata就可以用于权限验证、链路跟踪等。

metadata是以key-value的形式存储数据的,key是string类型,value[]string,即一个字符串数组类型。http中的header的生命周期是一次http请求,metadata的生命周期就是一次RPC调用。

1
2
// "google.golang.org/grpc/metadata"
type MD map[string][]string

使用方式

1
md := metadata.Pairs("key1", "value1")

RPC客户端发送

1
reply,err := client.SomeRPC(ctx,req)

RPC服务端接受

1
2
3
4
5
6
7
8
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Fatal("something wrong")
}
for k, v := range md {
fmt.Printf("key: %s,value: %v", k, v) // v是一个[]string
}

拦截器

也就是中间件,例如记录接口响应时间,鉴权,做过滤,获取请求放信息等。

拦截器分为:一元拦截器(对于简单模式);和流拦截器(对于数据流模式)

在创建Server中需要注入ServerOption

1
2
3
4
5
6
7
8
func NewServer(opt ...ServerOption) *Server {}

// 可以通过UnaryInterceptor方法生成ServerOption
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {

// 传递UnaryServerInterceptor参数
// 也就是一个UnaryServerInterceptor对象,实现方法即可
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func intercept(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Println("receive a new request.")

resp, err = handler(ctx, req)

fmt.Println("request already done.")
return resp, err
}

func main() {
log.SetFlags(log.Lshortfile)
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}

opt := grpc.UnaryInterceptor(intercept)
s := grpc.NewServer(opt)
minegrpc.RegisterGreeterServer(s, &Service{})
log.Fatal(s.Serve(listener))
}

执行之后,服务端可以看到

1
receive a new request.

拦截器也可以放在客户端

1
2
3
4
5
6
func Dial(target string, opts ...DialOption) (*ClientConn, error) {}

func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {}

// 与服务端同样的逻辑,声明一个函数
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
log.SetFlags(log.Lshortfile)
var opts []grpc.DialOption
opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
dial, err := grpc.Dial("127.0.0.1:8080", opts...)
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)
ctx, _ := context.WithCancel(context.Background())
hello, err := client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka"})
if err != nil {
log.Fatal(err)
}
fmt.Println(hello.Message)
}

func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
fmt.Printf("spend: %s\n", time.Since(start))
return err
}

执行之后,客户端可以看到

1
2
spend: 1.638416ms
hellomitaka

拦截器的开源项目:go-grpc-midware

例如做一个身份验证的拦截器

服务端通过拦截器验证token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func auth(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no token")
}
log.Println(md) // 这个地方还可以嵌入日志记录验证的token

var (
appid string
appkey string
)

if v1, ok := md["appid"]; ok {
appid = v1[0]
}
if v2, ok := md["appkey"]; ok {
appkey = v2[0]
}

if !(appid == "mitaka" && appkey == "pwd") {
return nil, status.Error(codes.Unauthenticated, "auth failed")
}
return handler(ctx, req)
}

客户端通过拦截器加载token

1
2
3
4
5
6
7
8
9
10
func auth(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
md := metadata.Pairs(
"appid", "mitaka",
"appkey", "pwd",
)
ctx = metadata.NewOutgoingContext(ctx, md)

err := invoker(ctx, method, req, reply, cc, opts...)
return err
}

在Golang中,针对认证拦截器,可以通过WithPerRPCCredentials的拦截器

1
2
3
4
5
6
7
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {}

// 实现PerRPCCredentials接口
type PerRPCCredentials interface {
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
RequireTransportSecurity() bool
}

在客户端上实现

1
2
3
4
5
6
7
8
9
10
11
12
13
type perRPCCredentials struct {
}

func (p *perRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appid": "mitaka",
"appkey": "pwd",
}, nil
}

func (p *perRPCCredentials) RequireTransportSecurity() bool {
return false
}

使用

1
2
3
4
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithPerRPCCredentials(&perRPCCredentials{}))
dial, err := grpc.Dial("127.0.0.1:8080", opts...)

验证器

validator,验证传入参数是否满足要求。更多的匹配规则在官方文档中:protoc-gen-validate

安装

1
2
3
4
5
# fetches this repo into $GOPATH
go get -d github.com/envoyproxy/protoc-gen-validate

# installs PGV into $GOPATH/bin
make build

或者

1
go install github.com/envoyproxy/protoc-gen-validate@latest

拷贝validator的proto,目录如下

1
2
3
4
5
6
7
8
9
.
├── base.pb.go
├── helloworld
│   ├── base.proto
│   └── helloworld.proto
├── helloworld.pb.go
├── helloworld_grpc.pb.go
└── validate
└── validate.proto

定义数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
message Person {
uint64 id = 1 [(validate.rules).uint64.gt = 999];

string email = 2 [(validate.rules).string.email = true];

string name = 3 [(validate.rules).string = {
pattern: "^[^[0-9]A-Za-z]+( [^[0-9]A-Za-z]+)*$",
max_bytes: 256,
}];

Location home = 4 [(validate.rules).message.required = true];

message Location {
double lat = 1 [(validate.rules).double = { gte: -90, lte: 90 }];
double lng = 2 [(validate.rules).double = { gte: -180, lte: 180 }];
}
}

生成代码

1
2
3
4
5
6
protoc --proto_path=. \
--proto_path=./helloworld \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
--validate_out="lang=go:../../" \ // 注意生成路径,由于在helloworld.proto指定package,因此路径会按照当前路径生成目录
helloworld.proto

可以看到生成一个helloworld.pb.validate.go

1
2
3
4
5
6
7
8
func main() {
p := minegrpc.Person{
Id: 0,
}
if err := p.Validate(); err != nil {
log.Fatal(err)
}
}

可以看到出现报错:

1
2022/09/23 10:33:45 invalid Person.Id: value must be greater than 999

如果每个参数都这样校验,逻辑会有很多重复的,此时可以使用拦截器。

那么其中有个问题,如何确定req有没有validate方法,这里就可以通过interface和断言实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type validator interface {
Validate() error
}

func intercept(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
if v, ok := req.(validator); ok && v.Validate() != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

return handler(ctx, resp)
}

异常处理

在HTTP中有Code标明这个请求的状态,在gRPC中也有状态码:

官方文档:Status Response Codes

官方文档:GRPC Core

服务端通过制定返回错误

1
2
3
func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error) {
return nil, status.Error(codes.NotFound, "not found")
}

客户端调用之后,可以收到

1
2
3
4
5
6
7
hello, err := client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka"})
if err != nil {
if e, ok := status.FromError(err); ok { // 类似断言,类型判断
fmt.Println(e.Code())
fmt.Println(e.Message())
}
}
1
2
NotFound
not found

超时机制

gRPC在客户端可以设置超时时间,当出现网络抖动、网络拥塞、服务端处理响应慢、服务端依赖数据库等第三方导致很慢,如果一直等待返回,如果后端服务会有连续调用,已经响应过慢的请求本来就没有必要再响应,而且还会占用服务端资源,因此需要使用超时机制。在gRPC中,超时可以向下传递,可以在请求的每个节点上都实现超时控制,在当前节点已经超时,则不需要将请求再传递到下一个节点。在熔断、降级、限流的功能中,都需要用到超时机制。

在客户端请求时,设置超时

1
2
3
4
client := minegrpc.NewGreeterClient(dial)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err = client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka"})

服务端改动,改成sleep 2s

可以看到客户端请求之后

1
2
DeadlineExceeded
context deadline exceeded

gRPC开发中碰到的问题

  1. params:参数默认值,是否为非空

    在接口对接过程中,例如一些更新接口,由于没传和零值可能造成混淆,例如开关,没有更新开关,gRPC会默认收到false,与将开关关闭逻辑一样,因此需要区分零值和空值;还有一种情况是状态请求,响应的请求如果是零值,则不会返回,就变成当开关是off,此时获取开关状态是空的,这个需要注意。

    Golang中,可以通过指针进行判断值是nil还是为零值,将这个思想移植过来,则可以通过oneof或者wrappers的方式。

    oneof

    1
    2
    3
    4
    5
    6
    7
    message HelloRequest {
    string name = 1;
    oneof switch {
    bool on =2;
    bool off=3;
    }
    }

    在使用中,就需要进行判断

    1
    2
    3
    if req.GetOn() {
    stat = true
    }

    可以看到,相比较下,这个方式不易读,更加推崇wrappers的方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    message HelloReply {
    string message = 1;
    google.protobuf.BoolValue switch =2;
    Gender sex =3;
    }

    enum Gender {
    SEX_UNSPECIFIED = 0;
    SEX_MALE = 1;
    SEX_FEMALE = 2;
    }

    使用过程通过与nil进行判断,更加符合Golang中的习惯

    1
    2
    3
    if req.Switch != nil {
    stat = req.Switch.Value
    }

    枚举类型同理,在获取值时,需要先判断是否为未指定(未指定是枚举类型的零值)

    1
    2
    3
    if req.Sex != minegrpc.Gender_SEX_UNSPECIFIED {
    sex = req.Sex
    }
  2. swagger分组

    通过gRPC-GATEWAY可以生成HTTP请求,还可以通过swagger生成swagger,如果所有的rpc请求都放在同一个service中,swagger就没有分组,swagger默认以service进行分组。

  3. Proto 枚举下,使用int而不是string,以及变量输出格式为下划线(蛇形体)而不是驼峰体

    在使用枚举类型时,返回的结果可能会将枚举的值返回出来,而在使用proto进行对接时,枚举类型暴露出来意义不大,而且更容易出错,因此建议使用枚举类型的序号。在RPC交互时,推荐使用序号,在存储到数据库时,推荐使用值。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func NewGRPCGateway() *runtime.ServeMux {
    return runtime.NewServeMux(runtime.WithMarshalerOption(
    runtime.MIMEWildcard, &runtime.JSONPb{
    MarshalOptions: protojson.MarshalOptions{
    // 将枚举类型返回为数字,marshal之后是数字枚举
    UseEnumNumbers: true,
    // 以proto中定义的下划线输出,而不是用驼峰体输出
    UseProtoNames: true,
    },
    UnmarshalOptions: protojson.UnmarshalOptions{},
    },
    ))
    }

    ps:RPC交互时,传枚举类型还是传值,都可以。

  4. gRPC实现文件上传相比而言比较麻烦,

一些坑:

  1. 所有字段都可选,默认不填为零值。这是为了适配新老系统。因此需要在设计阶段就要考虑。例如,增加一个状态字段
  2. 字段序号确定之后,就不能改变,改变了会影响老系统对接
  3. 定义字段的时候,顺序没有关系,只有字段后面的数字会有影响
  4. 使用int32而不是int64,不然数字会变成string
1
2
option go_package = "api/v1;v1";
// 目录名称;包名