gRPC元数据
基本定义
gRPC元数据用于传输特定RPC调用的额外信息(如身份验证详细信息),其表现形式为键值对列表。其中键是字符串,值通常是字符串,但可以是二进制数据。
键通常由字符串、数字、特殊字符(-,
_,
.)组成,且大小写不敏感。但是不能用grpc-开头,框架内部预留。二进制Key使用-bin结尾。允许客户端向服务端发送元数据,同样也允许服务端向客户端发送元数据。
构建元数据
type MD map[string][]string
可以将元数据看成普通的map结构,key 是字符串,value是字符串数组,所以可以给同一个key添加多个value值。创建元数据有两种方式
使用New函数从map结构中创建
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
使用Pairs函数创建,相同的key自动合并为list
md := metadata.Pairs( "key1", "val1", "key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"} "key2", "val2", )
注意:所有的key将自动转变为小写,因此 key1 和 kEy1被认为是同一个Key,它们的value将被合并为list
二进制元数据
元数据中的key一直是字符串,但是value值可以是字符串或二进制数据。gRPC在创建二进制元数据时,要求key必须以 “-bin”结尾, 对value值进行编码
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}), // 使用 base64编码
)
上下文获取元数据
可以从上下文对象context中调用FromIncomingContext方法获取元数据
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
客户端
发送元数据
有两种方式可以将元数据发送到服务器。建议使用AppendToOutgoingContext方法将key-value键值对添加到上下文。该方法可以与上下文现有的元数据一起使用。如果之前没有元数据,则添加元数据;如果存在元数据,则合并key-value键值对。
AppendToOutgoingContext
// 在上下文中创建 元数据
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 在元数据中添加新的数据 (例如:在另一个拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 简单 RPC 调用
response, err := client.SomeRPC(ctx, someRequest)
// 流式 RPC 调用
stream, err := client.SomeStreamingRPC(ctx)
NewOutgoingContext
此外,使用使用NewOutgoingContext方法将元数据添加到context上下文,需要注意: 这将替换上下文已经存在的元数据
// 在上下文中创建 元数据
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 在元数据中添加新的数据 (例如:在另一个拦截器中)
send, _ := metadata.FromOutgoingContext(ctx)
newMD := metadata.Pairs("k3", "v3")
ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD))
// 简单RPC调用
response, err := client.SomeRPC(ctx, someRequest)
// 流式RPC调用
stream, err := client.SomeStreamingRPC(ctx)
获取元数据
客户端通过gRPC中 header、trailer 获取元数据。gRPC 在HTTP2协议的基础上构建数据传送通道,通常而言,都会先入为主的认为元数据会存放在HTTP header中,思考一下为什么还需要从trailer获取元数据?
gRPC使用HTTP trailers 由以下两个目的:
- 内容发送后,需要使用 trailer 发送最终的状态
- RPC调用过程中出现应用程序运行时错误时,状态、状态消息在尾部传送
- Response响应结束流,在最后trailer frame中使用END_STREAM 标记结束
- 支持流式RPC用例,流式RPC通常比正常HTTP请求长,HTTP trailer 尾部需要标记请求/响应的处理结果。例如,流式数据处理过程中出现错误,可以在trailer 中发送错误代码,这种情况HTTP Header 是不可预知的,因此Header无法满足此类需求。
Unary RPC
使用CallOption中的Header、Trailer函数获取 unary RPC中的元数据
var header, trailer metadata.MD // 定义变量存储 header、trailer 中的元数据
r, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // 获取 header 元数据
grpc.Trailer(&trailer), // 获取 trailer 元数据
)
Streaming RPC
可以使用ClientStream接口中的Header、Trailer函数获取响应流中的元数据。流式RPC调用包含:
- 服务端流式RPC
- 客户端流式RPC
- 双向流式RPC调用
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer
trailer := stream.Trailer()
服务端
发送元数据
Unary RPC
在请求-响应RPC调用中,gRPC服务端通过SendHeader、SetTrailer方法发送Header、Trailer元数据,这两个函数都需要上下文context作为参数
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// create and send header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
Streaming RPC
流式RPC调用,使用ServerStream中的SendHeader 、SetTrailer方法发送header、trailer元数据
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
接收元数据
gRPC服务端从context上下文中获取客户端发送的元数据。但是请求-响应RPC、流式RPC接收元数据的方式稍有不同
Unary call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
Streaming call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}
Example 代码
完整代码
代码目录
├── README.md
├── client
│ └── main.go
└── server
└── main.go
运行服务端
go run server/main.go
运行客户端
go run client/main.go
Unary RPC收发元数据
客户端核心代码
func unaryCallWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- unary ---\n") // Create metadata and context. md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) ctx := metadata.NewOutgoingContext(context.Background(), md) //注意此种方式,调用 NewOutgoingContext函数会覆盖 timestamp元数据 //newMD := metadata.Pairs("k3", "v3") //ctx = metadata.NewOutgoingContext(ctx, newMD) // 定义变量 存储服务端header、trailer 元数据 var header, trailer metadata.MD r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer)) if err != nil { log.Fatalf("failed to call UnaryEcho: %v", err) } // 判断response header是否返回timestamp元数据 if t, ok := header["timestamp"]; ok { fmt.Printf("timestamp from header:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in header") } // 判断response header是否返回location元数据 if l, ok := header["location"]; ok { fmt.Printf("location from header:\n") for i, e := range l { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("location expected but doesn't exist in header") } fmt.Printf("------------------------------- response:\n") fmt.Printf(" - %s\n", r.Message) //读取trailer 元数据 if t, ok := trailer["timestamp"]; ok { fmt.Printf("timestamp from trailer:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in trailer") } }
服务端核心代码
func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { fmt.Printf("--- UnaryEcho ---\n") // 利用defer 最后发送trailer元数据 defer func() { //设置trailer 元数据 trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) grpc.SetTrailer(ctx, trailer) }() // 读取客户端的元数据 md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata") } if t, ok := md["timestamp"]; ok { fmt.Printf("timestamp from metadata:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } // 发送header 元数据 header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) grpc.SendHeader(ctx, header) fmt.Printf("request received: %v, sending echo\n", in) return &pb.EchoResponse{Message: in.Message}, nil }
Wireshark抓包截图
Streaming RPC收发元数据
简单的分析下服务端Streaming RPC调用的代码,其他方式(客户端Streaming RPC、双向流模式RPC)大同小异只是调用函数不一致而已
func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
fmt.Printf("--- ServerStreamingEcho ---\n")
// 利用defer 最后发送trailer元数据
defer func() {
trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
stream.SetTrailer(trailer)
}()
// 读取客户端发送的元数据
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata")
}
if t, ok := md["timestamp"]; ok {
fmt.Printf("timestamp from metadata:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
}
// 创建返回的header元数据
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
stream.SendHeader(header)
fmt.Printf("request received: %v\n", in)
// 模拟多次发送
for i := 0; i < streamingCount; i++ {
fmt.Printf("echo message %v\n", in.Message)
err := stream.Send(&pb.EchoResponse{Message: in.Message})
if err != nil {
return err
}
}
return nil
}
测试结果
如上图,代码里里面设置的trailer信息在最后一个stream流中的header 信息中传输给客户端。