image

Golang gRPC学习笔记

  • WORDS 30273

Goalng gRPC学习笔记

简介

RPC协议中,客户端应用程序可以像本地对象一样直接调用不同机器上的服务器应用程序。gRPC是一个开源、高性能、跨语言的 RPC框架,使用 Protobuf格式数据进行通信,面向移动和 HTTP/2设计。

概念

和大部分 RPC系统一样,gRPC基于定义服务的思想,指定可以远程调用的方法及其参数和返回类型。默认情况下,gRPC使用 Protocol buffers做为接口定义语言,通过在 .proto文件中定义 messageservice服务

// 定义message 类似于类和结构体 
// message中字段的参数和编号必须是唯一的
message Student {
	string name = 1;
	uin32 age = 2; 
}
// 定义一个服务 其中包含一个rpc接口
service HelloService {
	// 声明rpc接口以及请求消息体和响应消息体
	rpc Hello(HelloRequest) returns (HelloReply) {}
}

// Hello RPC接口的请求参数
message HelloRequest {
	string name = 1;
}

// Hello RPC接口的请求响应参数
message HelloReply {
	string message = 1;
	int64 timestamp = 2;
}

以上就简单定义了一个名为 HelloServiceRPC服务,其包含一个 Hello接口,传入 name参数,返回 messagetimestamp参数。

点击这里,阅读 Protobuf的官方文档

服务定义

gRPG允许定义四种服务方法

  • 单个请求单个响应

    rpc Hello(HelloRequest) returns (HelloReply);
    
  • 客户端流式 RPC:客户端写入一系列消息流,完成写入后会等待服务器读取消息并返回响应

    rpc Hello(stream HelloRequest) returns (HelloReply);
    
  • 服务器流式 RPC:客户端向服务端发送请求并获取返回的响应流,从流中读取响应消息,直到流被关闭

    rpc Hello(HelloRequest) returns (stream HelloReply);
    
  • 双向流式 RPC:客户端和服务端都是用读写流发送信息,两个流分别独立运行,因此客户端和服务端可以按照任意顺序读取和写入

    rpc Hello(stream HelloRequest) returns (stream HelloReply);
    

单向流式RPC中,gRPC会保证单个 RPC调用中的消息排序

使用API

proto文件中的服务定义完成之后,gRPC提供了用于生成客户端和服务端代码的编译器插件。

  • 在服务端,需要实现服务声明的方法并运行 gRPC服务器来处理客户端调用,gRPC框架会解码请求、执行对应的服务方法并对响应消息进行编码。
  • 在客户端,本地会有一个与服务端方法相同的本地对象,客户端可以在本地对象上调用这些方法向服务端发起请求,gRPC会编码请求参数、将请求发送到服务器、解码服务器的响应参数并返回

同步与异步

同步 RPC调用会一直阻塞,知道服务器发出响应为止。但是在许多场景中,异步的 RPC调用可能更有用。

gRPC在大部分语言中都有同步和异步两种风格的编程API

RPC生命周期

普通RPC(Unary RPC)

  1. 客户端调用服务方法,服务器会收到通知,该 RPC已被调用,其中包含此次调用的客户端元数据、方法名称和指定的超时时间
  2. 服务器收到通知后,可以立即发回自己的初始元数据(必须在响应消息之前发送),或者等待客户端的请求消息
  3. 服务器收到客户端的请求消息后,就会开始执行响应消息返回前的所有操。然后,响应消息会连同状态详细信息以及可选的尾随元数据一起返回给客户端
  4. 客户端收到响应后,如果状态为 ok,那么调用完成

服务器流式RPC(Server streaming RPC)

服务器流式 RPC和普通 RPC类似,不同之处在于服务器会返回消息流以响应客户端的请求。所有消息发送完成后,会将状态详细信息和可选的尾随元数据发送到客户端。

客户端流式RPC(Client streaming RPC)

客户端流式 RPC也和普通 RPC类似,不同之处在于客户端向服务端发送的是消息流而不是单个消息。服务通常在收到所有客户端的消息后(也有可能收到部分消息就响应),以单个消息进行响应。

双向流式RPC(Bidirectional streaming RPC)

在双向流式 RPC中,服务器接收客户端元数据、方法名称和超时时间。服务器可以选择发回其初始元数据或等待客户端开始流式消息传输。

由于两个流是独立的,客户端和服务器可以按照任意顺序读写消息。

截至时间/超时(Deadlines/Timeouts)

gRPC允许客户端指定在 RPCDEADLINE_EXCEEDED错误终止之间等待 RPC完成的时间。在服务端可以查询特定的 RPC是否超时和距离超时的剩余时间。

指定截止时间或超时是特定于语言的

RPC termination

gRPC中,客户端和服务端都会判断调用是否成功,并且两者的结果可能会不用。例如,RPC可能在服务端成功响应,但在客户端失败(响应超时到达)。

RPC取消(Cancelling an RPC)

客户端和服务端都可以随时取消 RPC调用。取消操作会立即终止 RPC请求。

元数据(Metadata)

元数据是有关特定 RPC调用的信息,采用键值对列表的形式,其中键为字符串,值通常也为字符串,也可以是二进制数据。

通道(Channels)

gRPC channel提供与指定主机和端口上的 gRPC服务器连接。客户端可以指定 channel参数来修改 gRPC的默认行为,例如打开和关闭消息压缩等。channel的状态包括 connectedidle

gRPC如何处理关闭 channel取决于语言

基础使用

golang中使用 gRPC需要安装 Protocol buffer编译器和相关联的 go插件

# proto go编译器
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# proto rpc go编译器
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

在安装完插件后,需要确保 GOPATH是可用的,以确保 protoc编译器可以找到插件

使用 go mod初始化一个空项目

编写 proto文件

// 指定protobuf版本
syntax = "proto3";

package hello;

// 编译为go代码时必须 指定go的package
option go_package = "/hello";

// 定义服务
service DemoService {
  // 定义rpc接口
  rpc Hello(HelloRequest) returns (HelloReply) {}
}

// 请求消息
message HelloRequest {
  string name = 1;
}

// 响应消息
message HelloReply {
  string message = 1;
  int64 timestamp = 2;
}

定义完成后使用 protoc插件生成 goalng结构体代码和 rpc接口代码

# --go_out 生成的go代码文件存放目录 hello.proto 指定proto文件
# --go_out 只会生成meessage对应的go结构体代码和protobuf序列化代码
protoc --go_out=. .\hello\hello.proto

# 添加 --go-grpc_out 则会同时生成rpc接口代码
protoc --go_out=. --go-grpc_out=. .\hello\hello.proto

执行成功后会生成 hello.pb.gohello_grpc.pb.go两个文件

// hello.pb.go
// 自动生成的message结构体代码和protubuf序列化以及反序列化方法
// ...
type HelloRequest struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

func (x *HelloRequest) Reset() {
	*x = HelloRequest{}
	if protoimpl.UnsafeEnabled {
		mi := &file_hello_proto_msgTypes[0]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *HelloRequest) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*HelloRequest) ProtoMessage() {}
// ...
// hello_grpc.pb.go
// ...
// DemoService 客户端接口
type DemoServiceClient interface {
	Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

// 客户端接口实现
type demoServiceClient struct {
	cc grpc.ClientConnInterface
}

func NewDemoServiceClient(cc grpc.ClientConnInterface) DemoServiceClient {
	return &demoServiceClient{cc}
}

// rpc调用方法
func (c *demoServiceClient) Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
	cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
	out := new(HelloReply)
	err := c.cc.Invoke(ctx, DemoService_Hello_FullMethodName, in, out, cOpts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// DemoService 服务端接口 需要编写实现类
type DemoServiceServer interface {
	Hello(context.Context, *HelloRequest) (*HelloReply, error)
	mustEmbedUnimplementedDemoServiceServer()
}

定义RPC接口实现

自动生成的 DemoServiceServer接口包含了非公开的 mustEmbedUnimplementedDemoServiceServer方法,这个方法无法被 hello包之外的类实现,所以在实现 RPC方法时,需要通过组合的形式包含 Unimplementedxxxxx

type UnimplementedDemoServiceServer struct{}

func (UnimplementedDemoServiceServer) Hello(context.Context, *HelloRequest) (*HelloReply, error) {
	return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}
func (UnimplementedDemoServiceServer) mustEmbedUnimplementedDemoServiceServer() {}
func (UnimplementedDemoServiceServer) testEmbeddedByValue()                     {}

编写 RPC方法实现

type DemoService struct {
	hello.UnimplementedDemoServiceServer
}

func (self *DemoService) Hello(ctx context.Context, request *hello.HelloRequest) (*hello.HelloReply, error) {
    // 获取截止时间参数
	deadline, ok := ctx.Deadline()
	fmt.Println(deadline, ok)
	// 直接返回消息
	return &hello.HelloReply{
		Message:   fmt.Sprintf("ok %s", request.Name),
		Timestamp: time.Now().UnixMilli(),
	}, nil
}

服务端

func main() {
	server := grpc.NewServer()
    // 注册DemoService
	hello.RegisterDemoServiceServer(server, &DemoService{})
    // 监听tcp端口
	listener, err := net.Listen("tcp", ":4096")
	if err != nil {
		log.Fatalln(err)
	}
    // 启动grpc服务器
	log.Fatalln(server.Serve(listener))
}
# 启动服务端
go run server\main.go

客户端

func main() {
    // 第一个参数为服务端的接口地址 后续参数为配置参数 此处是默认配置
	conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalln(err)
	}
	defer func() {
		if closeErr := conn.Close(); closeErr != nil {
			log.Println(closeErr)
		}
	}()
    // 新建client
	client := hello.NewDemoServiceClient(conn)
    // 调用Hello rpc方法
	response, err := client.Hello(context.Background(), &hello.HelloRequest{
		Name: "demo",
	})
	if err != nil {
		log.Fatalln(err)
	}
    // 输出响应体
	log.Printf("message: %s, timestamp: %d", response.Message, response.Timestamp)
}
# 运行客户端
go run client\main.go
# 结果输出
2024/08/29 22:49:35 message: ok demo, timestamp: 1724942975373

以上代码就完成了一个简单的 RPC服务,在不使用流的情况下,上述代码适用于大部分的 RPC处理流程

完整代码示例

了解如何使用服务器、客户端和双向流式 RPC

定义5个 RPC服务,分别包含了每一种 RPC服务类别

syntax = "proto3";

package notice;

option go_package = "/notice";

service NoticeService {
  // 保存通知 返回通知Id
  rpc SaveNotice(SaveNoticeRequest) returns (SaveNoticeReply) {}
  // 通过id查询通知
  rpc QueryNotice(QueryNoticeRequest) returns (Notice) {}
  // 查询通知列表
  rpc ListNotice(ListNoticeRequest) returns (stream Notice) {}
  // 批量保存通知
  rpc BatchSaveNotice(stream SaveNoticeRequest) returns (SaveNoticeReply) {}
  // 动态保存通知 每一次保存都会即时返回
  rpc DynamicSaveNotice(stream SaveNoticeRequest) returns (stream Notice) {}
}

message SaveNoticeRequest {
  string content = 1; // 通知内容
}

message SaveNoticeReply {
  uint64 notice_id = 1; // 通知id
  uint64 count = 2; // 保存的通知数量
}

message QueryNoticeRequest {
  uint64 notice_id = 1; // 通知id
}

// 通知数据对象
message Notice {
  uint64 id = 1; // 通知id
  string content = 2; // 通知内容
  int64 timestamp = 3; // 保存时间
}

// 通知查询参数
message ListNoticeRequest {
  uint64 lo = 1; // 通知id最小值
  uint64 hi = 2; // 通知id最大值
}
// server/main.go

type NoticeService struct {
	notice.UnimplementedNoticeServiceServer
	noticeCache map[uint64]string
	mutex       *sync.RWMutex
}

func NewNoticeService() notice.NoticeServiceServer {
	return &NoticeService{
		noticeCache: make(map[uint64]string),
		mutex:       &sync.RWMutex{},
	}
}

// SaveNotice 保存通知 返回通知Id
func (self *NoticeService) SaveNotice(ctx context.Context, request *notice.SaveNoticeRequest) (*notice.SaveNoticeReply, error) {
	noticeId := self.addNotice(request.Content)
	return &notice.SaveNoticeReply{
		NoticeId: noticeId,
		Count:    1,
	}, nil
}

// QueryNotice 通过通知id查询通知详情
func (self *NoticeService) QueryNotice(ctx context.Context, request *notice.QueryNoticeRequest) (*notice.Notice, error) {
	self.mutex.RLock()
	noticeString := self.noticeCache[request.NoticeId]
	if noticeString == "" {
		return nil, nil
	}
	value := &notice.Notice{}
	err := sonic.UnmarshalString(noticeString, value)
	self.mutex.RUnlock()
	return value, err
}

// ListNotice 查询通知Id区间通知列表
func (self *NoticeService) ListNotice(request *notice.ListNoticeRequest, stream grpc.ServerStreamingServer[notice.Notice]) error {
	self.mutex.RLock()
	defer self.mutex.RUnlock()
	for key, value := range self.noticeCache {
		if key > request.Hi || key < request.Lo {
			continue
		}
		noticeValue := &notice.Notice{}
		if err := sonic.UnmarshalString(value, noticeValue); err != nil {
			log.Println(err)
			continue
		}
        // 使用流发送单个通知
		if err := stream.Send(noticeValue); err != nil {
			return err
		}
	}
	return nil
}

// BatchSaveNotice 批量保存通知 返回保存的通知数量
func (self *NoticeService) BatchSaveNotice(stream grpc.ClientStreamingServer[notice.SaveNoticeRequest, notice.SaveNoticeReply]) error {
	var noticeCount uint64
	startTime := time.Now()
	for {
        // 读取流通知 如果err为EOF 那么流读取完成
		request, err := stream.Recv()
		if err == io.EOF {
			endTime := time.Now()
			log.Printf("startTime: %v, endTime: %v", startTime, endTime)
			return stream.SendAndClose(&notice.SaveNoticeReply{
				Count: noticeCount,
			})
		}
		if err != nil {
			return err
		}
		self.addNotice(request.Content)
		noticeCount++
	}
}

// DynamicSaveNotice 动态保存通知 没保存一条通知都会发回完整的通知参数
func (self *NoticeService) DynamicSaveNotice(stream grpc.BidiStreamingServer[notice.SaveNoticeRequest, notice.Notice]) error {
	for {
		request, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		noticeId := self.addNotice(request.Content)
		stringValue := self.noticeCache[noticeId]
		value := &notice.Notice{}
		if err = sonic.UnmarshalString(stringValue, value); err != nil {
			return err
		}
        // 由于是两条独立的流 所有可以同时读取和发送
		if err = stream.Send(value); err != nil {
			return err
		}
	}
}

func (self *NoticeService) addNotice(content string) uint64 {
	self.mutex.Lock()
	defer self.mutex.Unlock()
	noticeId := uint64(len(self.noticeCache) + 1)
	value := &notice.Notice{
		Id:        noticeId,
		Content:   content,
		Timestamp: time.Now().UnixMilli(),
	}
	stringValue, _ := sonic.MarshalString(value)
	self.noticeCache[noticeId] = stringValue
	return noticeId
}

func main() {
    // 启动服务
	server := grpc.NewServer()
	notice.RegisterNoticeServiceServer(server, NewNoticeService())
	listener, err := net.Listen("tcp", ":4096")
	if err != nil {
		log.Fatalln(err)
	}
	log.Fatalln(server.Serve(listener))
}
// client/main.go

func main() {
	conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalln(err)
	}
	defer func() {
		if closeErr := conn.Close(); closeErr != nil {
			log.Println(closeErr)
		}
	}()
	client := notice.NewNoticeServiceClient(conn)
	saveReply, err := client.SaveNotice(context.Background(), &notice.SaveNoticeRequest{
		Content: "demo notice content",
	})
	if err != nil {
		log.Fatalf("saveNotice error message: %s", err)
	}
	log.Printf("saveNotice return noticeId: %d", saveReply.NoticeId)

	stream, err := client.BatchSaveNotice(context.Background())
	if err != nil {
		log.Fatalf("batchSaveNotice error message: %s", err)
	}
    // 通过流进行通知的批量保存
	for i := 0; i < 10; i++ {
		if err = stream.Send(&notice.SaveNoticeRequest{Content: strconv.Itoa(i)}); err != nil {
			log.Fatalf("send notice error message: %s", err)
		}
	}
	batchReply, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("recv batchSaveNotice error, message: %s", err)
	}
	log.Printf("batchSaveNotice return noticeCount: %d", batchReply.Count)

	query := &notice.ListNoticeRequest{
		Lo: 5,
		Hi: 10,
	}
	listStream, err := client.ListNotice(context.Background(), query)
	if err != nil {
		log.Fatalf("listNotice error message: %s", err)
	}
    // 读取返回的流通知
	for {
		value, err := listStream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("listStream reav error message: %s", err)
		}
		log.Printf("listNotice result, noticeId: %d, content: %s, timestamp: %d", value.Id, value.Content, value.Timestamp)
	}

	dynamicStream, err := client.DynamicSaveNotice(context.Background())
	wait := make(chan struct{})
    // 启动协程读取返回的流消息
	go func() {
		for {
			value, recvErr := dynamicStream.Recv()
			if recvErr == io.EOF {
                // 如果流读取完成 那么关闭通道
				close(wait)
				return
			}
			if recvErr != nil {
				log.Printf("dynamicStream recv error message: %s", recvErr)
			}
			log.Printf("dynamicStream recv, noticeId: %d, content: %s, timestamp: %d", value.Id, value.Content, value.Timestamp)
		}
	}()
	for i := 0; i < 10; i++ {
		if err = dynamicStream.Send(&notice.SaveNoticeRequest{Content: strconv.Itoa(i + 10)}); err != nil {
			log.Fatalf("send notice error message: %s", err)
		}
	}
	dynamicStream.CloseSend()
    // 阻塞等待通道被关闭
	<-wait
}

传输加密

TLS

TLS 是一种常用的加密协议,用于提供端到端通信安全性。TLS连接需要提前准备好公钥/私钥,通常可以使用 openSSL创建。

需要使用到的通常有以下三个文件:

  • ca.crt / ca_cert.pem:包含可以验证服务器证书的证书
  • server.crt / server_cert.pem:服务器证书(公钥)
  • server.key / server_key.pem:服务器私钥

修改服务端启动代码

// 使用tls证书创建加密连接
creds, err := credentials.NewServerTLSFromFile("../certs/server.crt", "../certs/server.key")
if err != nil {
    log.Fatalln(err)
}
server := grpc.NewServer(grpc.Creds(creds))

修改客户端代码

// 指定验证证书和证书指向的域名
creds, err := credentials.NewClientTLSFromFile("../certs/ca.crt", "www.example.com")
if err != nil {
    log.Fatalln(err)
}
conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(creds))

ALTS

ALTS是由 Google开发的身份验证和加密传输系统,用于保护 Google架构中的 RPC通信,ALTS类似于双向 TLS,但经过 Google的设计和优化以满足生产环境的要求。ALTS

ALTS目前仅在 Google Cloud Platform中受支持

gRPC中的 ALTS具有以下功能:

  • 创建以 ALTS做为传输安全协议的 gRPC服务端和客户端
  • ALTS连接受到端到端的隐私和完整性保护
  • 客户端授权和服务端授权支持

gRPC中启用 ALTS非常简单

使用ALTS的gRPC客户端

clientCreds := alts.NewClientCreds(alts.DefaultClientOptions())
conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(clientCreds))

使用ALTS的gRPC服务器

serverCreds := alts.NewServerCreds(alts.DefaultServerOptions())
server := grpc.NewServer(grpc.Creds(serverCreds))

双向TLS(mTLS)

在普通 TLS 中,服务器只关心向客户端提供服务器证书以进行验证。在双向 TLS 中,服务器还会加载受信任的 CA 文件列表,以验证客户端提供的证书。

双向 TLS相较于普通 TLS需要多依赖三个文件:

  • client.crt / client_cert.pem:客户端证书(公钥)
  • client.key / clien_key.pem:客户端私钥
  • client_ca.crt / client_ca_cert.pem:包含可以验证客户端证书的证书

认证

OAuth

gRPC中,身份认证和授权被抽象为 credentials.PerRPCCredentials,可以基于每个连接或调用进行设置

在客户端,需要先获取有效的 oauth令牌,然后可选的将其配置为在某个连接中使用或在每次调用中使用。

OAuth要求传输必须是安全的

gRPC中,提供的令牌以令牌类型和空格为前缀,通过键 authorization附加到请求元数据

creds, err := credentials.NewClientTLSFromFile("../certs/ca.crt", "www.example.com")
if err != nil {
    log.Fatalln(err)
}
// 生成oauth令牌
token := &oauth2.Token{
    AccessToken:  "<access-token>",
    TokenType:    "Bearer",
    RefreshToken: "<refresh-token>",
    Expiry:       time.Now().Add(time.Hour),
}
source := oauth.TokenSource{
    TokenSource: oauth2.StaticTokenSource(token),
}
// 使用WithPerRPCCredentials配置oauth
tokenCreds := grpc.WithPerRPCCredentials(source)
// 在当前连接中的所有RPC调用中都使用此Token
conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(creds), tokenCreds)
client := notice.NewNoticeServiceClient(conn)
// 使用PerRPCCredentials配置gRPC调用,此时Token只在这单次调用中可用
saveReply, err := client.SaveNotice(context.Background(), &notice.SaveNoticeRequest{
    Content: "demo notice content",
}, grpc.PerRPCCredentials(source))

在服务端,Token通常会在拦截器中进行验证,可以通过 metadata.FromIncomingContext + key的形式获取单次调用 metadata内的数据。假如传入的令牌无效,通常会返回错误代码。

// 此为拦截器提供的ctx 后面拦截器章节会解释
// 从context中解析客户端调用的metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
    return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
// 获取oauth认证字段的值
authHeader := md["authorization"]
// 因为token的格式为 "Bearer " 其最小长度都有7位 判断token是否存在
if len(authHeader) <= 7) {
    return nil, status.Error(codes.Unauthenticated, "missing token")
}
token := authHeader[7:]
// TODO 执行自定义的token验证逻辑

// handler 也是拦截器提供的 用于执行拦截器链中的下一个拦截器
return handler(ctx, req)

自定义认证

可以通过实现 PerRPCCredentials接口以自定义身份验证机制,例如可以实现一个简单的基于账号密码进行身份验证。

// 自定义身份认证
type CustomAuth struct {
	Username string // 用户名
	Password string // 密码
	Role     uint8  // 角色
}

func (self *CustomAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"username": self.Username,
		"password": self.Password,
		"role":     fmt.Sprintf("%d", self.Role),
	}, nil
}

// 是否需要TLS传输
func (self *CustomAuth) RequireTransportSecurity() bool {
	return false
}

// 使用自定义身份认证
basicAuthCreds := grpc.WithPerRPCCredentials(&CustomAuth{
    Username: "admin",
    Password: "admin",
    Role: 1,
})
conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(creds), basicAuthCreds)

在服务器端验证和 OAuth认证的流程是一样的,也是在拦截器中进行操作,只是需要将验证 Token的操作替换为验证账号密码。

取消RPC

gRPC客户端不再需要 RPC调用返回的结果时,可能会向服务器发出取消信号。同时,截止日期到期或 IO错误也会触发取消。当 RPC被取消时,服务器应停止计算并关闭流。

client := notice.NewNoticeServiceClient(conn)
// 取消RPC依赖于go中context的取消方法
// 创建一个超时取消的context 到请求时间超过5s或手动调用cancel()方法时,都会触发RPC的取消信号
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
saveReply, err := client.SaveNotice(ctx, &notice.SaveNoticeRequest{
    Content: "demo notice content",
}, grpc.PerRPCCredentials(source))
cancel()

压缩

消息压缩可以减少通信时使用的网络带宽,gRPC可以根据 call或消息级别启用或禁用压缩,并且允许非对称压缩通信,响应的压缩方式可能和请求的不同,也可能不会被压缩。

  • 如果客户端消息由服务器不支持的算法压缩,则该消息将导致服务器上出现 UNIMPLEMENTED 状态错误。服务器会在响应中包含一个 grpc-accept-encoding标头,指定服务器接受的压缩算法。
  • 客户端如果使用 grpc-accept-encoding 标头中的一种算法压缩消息,服务器仍返回 UNIMPLEMENTED 错误状态,那么错误原因于消息压缩无关
  • 如果服务器收到未公开但受支持压缩算法的消息,会在响应的 grpc-accept-encoding 标头中包含此编码格式
  • 如果服务器使用它知道某个客户端不支持的压缩算法,那么会返回未压缩的消息
  • 如果用户请求禁用压缩,那么下一条消息会以不压缩的格式发送

客户端设置压缩和解压缩算法

  • func WithCompressor(grpc.Compressor)设置消息压缩器
  • func WithDecompressor(grpc.Decompressor)设置消息解压缩器。如果 WithDecompressor的解压缩器和消息的编码匹配,那么会使用此解压缩器解压消息。如果不匹配,则会从所有已注册的 Decompressor中进行匹配,假如一个也匹配不了,则会关闭流并返回 Unimplemented状态错误
  • func UseCompressor(name string):使用指定的 named压缩算法(最新版本推荐使用的方法)。如果 named的压缩器未注册,则会在 RPC调用之前将错误返回给客户端。如果使用 identity的压缩器,那么消息不会被压缩,但会在标头中将 identity发送到服务端
// 为当前客户端的所有RPC调用都是用gzip压缩
conn, err := grpc.NewClient("localhost:4096", grpc.WithTransportCredentials(creds), tokenCreds, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))

// grpc.UseCompressor("gzip") 也支持针对单次RPC调用使用

服务端设置压缩和解压缩算法

  • func RPCCompressor(grpc.Compressor) 设置响应消息压缩器
  • func RPCDecompressor(grpc.Decompressor)设置请求消息的解压缩器

gRPC的新版本中,服务端的压缩算法无需显示指定,直接匿名导入 gRPC对应的包即可

// 导入gzip压缩支持
import _ "google.golang.org/grpc/encoding/gzip"


func main() {
    // 当前server会根据客户端发送的压缩标头自动解压缩请求。
    server := grpc.NewServer()
}

自定义名称解析

自定义名称解析和 DNS的原理相同,都是通过名称拿到对应的 IP列表。使用自定义名称解析的好处在于可以动态扩展或缩减服务列表,并且根据各个服务的负载和健康指数进行负载均衡和熔断操作。

const (
    // 指定scheme
	exampleScheme  = "example"
    // 指定服务名称
	demoServerName = "demoService"

    // 服务的实际地址
	demoServiceAddr = "localhost:4096"
)

// 自定义名称服务器builder
type exampleResolverBuilder struct {
	scheme string
}
// build函数 返回自定义名称服务器实例
func (self *exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
	value := &exampleResolver{
		target: target,
		cc:     cc,
		addrStore: map[string][]string{
			demoServerName: {demoServiceAddr},
		},
	}
	value.Start()
	return value, nil
}

// 返回此自定义名称服务器的scheme 用于gRPC调用时进行匹配
func (self *exampleResolverBuilder) Scheme() string {
	return self.scheme
}

// 自定义名称服务器
type exampleResolver struct {
	target    resolver.Target
	cc        resolver.ClientConn
    // 自定义名称对应的addr列表
	addrStore map[string][]string
}

// 启动服务
func (self *exampleResolver) Start() {
	addrList := self.addrStore[self.target.Endpoint()]
	addrs := make([]resolver.Address, len(addrList))
	for i, value := range addrList {
		addrs[i] = resolver.Address{Addr: value}
	}
	self.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(resolver.ResolveNowOptions) {

}
func (*exampleResolver) Close() {

}

func init() {
	resolver.Register(&exampleResolverBuilder{scheme: exampleScheme})
}


func main() {
    // 使用 scheme:///serviceName 的形式访问RPC服务
    conn, err := grpc.NewClient(fmt.Sprintf("%s:///%s", exampleScheme, demoServerName), grpc.WithTransportCredentials(insecure.NewCredentials()))
}

负载均衡策略

gRPC的负载均衡策略由名称服务器(resolver)提供服务器 IP地址列表。该策略负责维护与服务器的连接,并选择在发送 RPC时要使用的连接。

在默认情况下会使用 pick_first策略,此策略不会进行负载均衡,而是使用从 resolver返回的地址列表中第一个可以连接的地址。此外,还可以使用 round_robin策略,对所有可用的连接地址进行轮询。如果内置负载均衡策略不能满足需求,还可以自定义策略。

截止时间

截止时间(Deadlines)用于指定客户端等待服务端响应的最大时间。默认情况下,gRPC不设置截至时间,客户端会永远等待服务器响应,在高并发的情况下会极大的拉低系统资源利用率和增大网络延迟。

在指定了截止时间的情况下,如果服务器在处理请求时超过了截止时间,那么客户端会放弃此次 RPC调用,将请求处于 DEADLINE_EXCEEDED状态。

服务器接收到的 RPC调用其截止时间可能非常短,根本来不及处理,这会导致服务器的资源被浪费。gRPC服务器应该在截止时间过之后自动取消调用(CANCELLED状态)。

截止时间和**取消 RPC**类似,也是使用context包中的 context.WithTimeout()函数,其超时时间就是本次请求的截止时间

错误处理

gRPC标准的错误模型,所有客户端和服务端库都支持,并且独立于 gRPC数据格式。但是,如果使用 buffer做为数据格式,则可能需要使用 Google开发和使用的更为丰富的错误模型。常见的错误状态代码如下:

  • GRPC_STATUS_CANCELLED:客户端取消请求
  • GRPC_STATUS_DEADLINE_EXCEEDED:服务器响应之前截止时间过期
  • GRPC_STATUS_UNIMPLEMENTED:服务器上找不到方法
  • GRPC_STATUS_UNAVAILABLE:服务器关闭
  • GRPC_STATUS_UNKNOWN:服务器异常
  • GRPC_STATUS_DEADLINE_EXCEEDED:截止时间到期前没有传输任何数据
  • GRPC_STATUS_UNAVAILABLE:数据传输连接中断
  • GRPC_STATUS_INTERNAL:无法解压缩,但支持压缩算法、解析 protobuf错误
  • GRPC_STATUS_UNIMPLEMENTED:客户端使用的压缩算法服务端不支持
  • GRPC_STATUS_RESOURCE_EXHAUSTED:请求被限流
  • GRPC_STATUS_INTERNAL:限流协议冲突
  • GRPC_STATUS_UNAUTHENTICATED:没有凭据或权限无效
// SaveNotice 保存通知 返回通知Id
func (self *NoticeService) SaveNotice(ctx context.Context, request *notice.SaveNoticeRequest) (*notice.SaveNoticeReply, error) {
    // 服务端校验参数 如果参数为空则返回错误
	if request.Content == "" {
		return nil, status.Error(codes.InvalidArgument, "request missing required field: Content")
	}
	noticeId := self.addNotice(request.Content)
	return &notice.SaveNoticeReply{
		NoticeId: noticeId,
		Count:    1,
	}, nil
}

健康检查

gRPChealth提供了服务器运行状况的健康检查,启用健康检查后客户端可以自动和服务端的状态服务通信,避免客户端请求状态错误的服务器。

gRPC的运行状况检查支持两种操作模式:

  • Check:适用于健康监控或负载均衡服务器状态检测,无法支持 gRPC客户端队列状态检测
  • Watch:流式传输状态更新

服务状态有以下两种:

  • SERVING:服务正常,可以接受请求
  • NOT_SERVING:无法接受请求

服务端启动状态检查服务

import healthgrpc "google.golang.org/grpc/health/grpc_health_v1"

func main() {
    // 健康监测服务
	healthServer := health.NewServer()
	server := grpc.NewServer()
	healthgrpc.RegisterHealthServer(server, healthServer)
	notice.RegisterNoticeServiceServer(server, NewNoticeService())
    // 使用协程检查
	go func() {
		next := healthgrpc.HealthCheckResponse_SERVING
		for {
            // 设置状态 服务名称可以为空
			healthServer.SetServingStatus("", next)
            // 仅作演示用 服务状态为 SERVING 时下一次健康检查将服务状态设置为 NOT_SERVING
			if next == healthgrpc.HealthCheckResponse_SERVING {
				next = healthgrpc.HealthCheckResponse_NOT_SERVING
			} else {
				next = healthgrpc.HealthCheckResponse_SERVING
			}
			time.Sleep(5 * time.Second)
		}
	}()
	listener, err := net.Listen("tcp", ":4096")
	if err != nil {
		log.Fatalln(err)
	}
	log.Fatalln(server.Serve(listener))
}

客户端使用

// 创建一个名称服务器
r := manual.NewBuilderWithScheme("example")
r.InitialState(resolver.State{
    Addresses: []resolver.Address{
        {Addr: "localhost:4096"},
        {Addr: "localhost:4097"},
    },
})
options := []grpc.DialOption{
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithResolvers(r),
    // 设置默认配置 1.使用轮询  2.为所有服务都启用状态检测(如果只想检测指定服务,在serviceName处指定即可)
    grpc.WithDefaultServiceConfig(`{
        "loadBalancingPolicy": "round_robin",
        "healthCheckConfig": {
            "serviceName": ""
        }
    }`),
}
conn, err := grpc.NewClient(fmt.Sprintf("%s:///demoService", r.Scheme()), options...)

客户端启用状况检查会更改调用服务器的行为

  • 客户端会在状态检查时调用 Watch
  • 状况检查服务调用会阻塞当前请求,调用完成才会发送i请求
  • 如果服务不可用,那么客户端不会再发送该服务的请求
  • 当服务变为可用时,客户端可以正常请求
  • 如果状态检查对于负载均衡策略来说没有作用,某些负载均衡策略可能会禁用此服务(例如 pick_first

拦截器

gRPC中的拦截器也可以被称为过滤器中间件,适用的场景包括但不限于:

  • Metadata处理
  • 请求缓存
  • 防止重复提交
  • 身份验证
  • 请求鉴权

gRPC中,客户端和服务器的拦截器 API各不相同,因此分为客户端拦截器和服务端拦截器

在使用多个拦截器时,其拦截器的执行顺序是拦截器的注册顺序

客户端拦截器

  • Unary拦截器:Unary拦截器的实现包含三部分,通常是预处理、调用 RPC和后处理。其声明格式是一个函数类型 UnaryClientInterceptor

    func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
    

    在拦截器中可以通过检查传入的参数对请求进行预处理,预处理完成后调用 invoker进行 RPC调用。调用结果返回后,可以根据调用结果进行后处理。

    // 认证拦截器
    func AuthInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    	var isAuth bool
        // 判断当前请求中是否存在认证信息 如果没有那么就添加认证信息
    	for _, cred := range opts {
    		if _, ok := cred.(grpc.PerRPCCredsCallOption); ok {
    			isAuth = true
    			break
    		}
    	}
    	if !isAuth {
    		opts = append(opts, grpc.PerRPCCredentials(oauth.TokenSource{
    			TokenSource: oauth2.StaticTokenSource(&oauth2.Token{AccessToken: "<token>"}),
    		}))
    	}
    	return invoker(ctx, method, req, reply, cc, opts...)
    }
    
    // 单个缓存对象
    type CacheEntity struct {
    	value      []byte
    	expireTime time.Time
    }
    
    // 缓存拦截器 使用闭包的形式
    // 简易时间 仅作为演示使用
    func NewCacheAuthInterceptor(expireTime time.Duration) grpc.UnaryClientInterceptor {
    	cacheMap := make(map[string]*CacheEntity)
    	return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
            // 判断当前调用的方法结果是否被缓存
            // 如果被缓存并且还没过期 那么直接将reply设置为缓存的结果同时返回nil
    		if entity := cacheMap[method]; entity != nil {
    			if time.Now().After(entity.expireTime) {
    				delete(cacheMap, method)
    			} else if err := sonic.Unmarshal(entity.value, reply); err == nil {
    				return nil
    			}
    		}
            // 如果缓存过期或者未被缓存 那么会运行RPC调用 确保返回错误为nil后 会将reply返回结果序列化并放到缓存中
    		err := invoker(ctx, method, req, reply, cc, opts...)
    		if err == nil {
    			if value, jsonErr := sonic.Marshal(reply); jsonErr == nil {
    				cacheMap[method] = &CacheEntity{
    					value:      value,
    					expireTime: time.Now().Add(expireTime),
    				}
    			}
    		}
    		return err
    	}
    }
    
    func main() {
        r := manual.NewBuilderWithScheme("example")
    	r.InitialState(resolver.State{
    		Addresses: []resolver.Address{
    			{Addr: "localhost:4096"},
    		},
    	})
    	options := []grpc.DialOption{
    		grpc.WithTransportCredentials(insecure.NewCredentials()),
    		grpc.WithResolvers(r),
            // 通过 WithUnaryInterceptor 使用 Unary 拦截器
    		grpc.WithUnaryInterceptor(AuthInterceptor),
            grpc.WithUnaryInterceptor(NewCacheAuthInterceptor(10 * time.Second))
    	}
    	conn, err := grpc.NewClient(fmt.Sprintf("%s:///demoService", r.Scheme()), options...)
    }
    
  • 流拦截器:流拦截器包括预处理和流操作拦截,它的类型是 StreamClientInterceptorUnary拦截器不同

    func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
    

    其预处理逻辑和 Unary拦截器相同,但其流拦截操作是拦截用户对流的操作。流拦截器的原理是重写 ClientStreamRecvMsgSendMsg方法,相当于在原始的 ClientStream上做一层代理。

    // 代理客户端流 重写 RecvMsg 和 SendMsg 方法实现自定义的逻辑
    type ProxyClientStream struct {
    	grpc.ClientStream
    }
    
    func (self *ProxyClientStream) RecvMsg(message any) error {
    	log.Printf("Recv message at %v", time.Now().Format(time.DateTime))
    	return self.ClientStream.RecvMsg(message)
    }
    
    func (self *ProxyClientStream) SendMsg(message any) error {
    	log.Printf("Send message at %v", time.Now().Format(time.DateTime))
    	return self.ClientStream.SendMsg(message)
    }
    
    func NewProxyClientStream(stream grpc.ClientStream) grpc.ClientStream {
    	return &ProxyClientStream{stream}
    }
    
    // 客户端流拦截器
    func StreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    	stream, err := streamer(ctx, desc, cc, method, opts...)
    	if err != nil {
    		return nil, err
    	}
    	return NewProxyClientStream(stream), nil
    }
    
    func main() {
        r := manual.NewBuilderWithScheme("example")
    	r.InitialState(resolver.State{
    		Addresses: []resolver.Address{
    			{Addr: "localhost:4096"},
    		},
    	})
    	options := []grpc.DialOption{
    		grpc.WithTransportCredentials(insecure.NewCredentials()),
    		grpc.WithResolvers(r),
            // 通过 WithStreamInterceptor 使用流拦截器
    		grpc.WithStreamInterceptor(StreamInterceptor),
    	}
    	conn, err := grpc.NewClient(fmt.Sprintf("%s:///demoService", r.Scheme()), options...)
    
        //...
    }
    

服务端拦截器

服务端拦截器的实现和客户端拦截器大致一样,只是拦截器的函数参数有区别

服务端 Unary拦截器的类型是 UnaryServerInterceptor,其函数签名为

func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error)

其预处理和后处理逻辑和客户端 Unary拦截相同

// 一个简单的请求时间统计拦截器
func TimeStatsInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
    // 此时可以通过 FromIncomingContext 拿到请求的metadata
    md, ok := metadata.FromIncomingContext(ctx)
	startTime := time.Now().UnixMilli()
	resp, err := handler(ctx, req)
	endTime := time.Now().UnixMilli()
	log.Printf("Method: [%s] Request Handle Time: [%d]ms", info.FullMethod, endTime-startTime)
	return resp, err
}

func main() {
    // 通过 UnaryInterceptor 使用 Unary 拦截器
	server := grpc.NewServer(grpc.UnaryInterceptor(TimeStatsInterceptor))
	notice.RegisterNoticeServiceServer(server, NewNoticeService())
	listener, err := net.Listen("tcp", ":4096")
	if err != nil {
		log.Fatalln(err)
	}
	log.Fatalln(server.Serve(listener))
}

服务端流拦截器的类型是 StreamServerInterceptor,其函数签名为

func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

使用方式和客户端流拦截器一致,在拦截器中也可以通过 FromIncomingContext拿到请求的 metadata信息

Metadata

Metadata(元数据)允许客户端和服务端相互提供 RPC调用关联的信息,会以键值对的形式随请求和响应一起发送。键必须为 ASCLL字符串且不区分大小写,但是不能用 grpc-为前缀;值可以是 ASCLL字符串或二进制数据。其底层是基于 HTTP/2的请求标头实现的。

性能最佳实践

  • 尽可能重用 stubschannel
  • 使用 HTTP/2 Keepalive发送 PING帧使 HTTP/2连接保持活动状态,以便快速初始化 RPC
  • 客户端和服务器之前存在长逻辑数据流时,尽量使用流式 RPC

关联文章

0 条评论