gRPC-go 元数据

发布于:2022-11-13 ⋅ 阅读:(1065) ⋅ 点赞:(0)

gRPC元数据

基本定义

gRPC元数据用于传输特定RPC调用的额外信息(如身份验证详细信息),其表现形式为键值对列表。其中键是字符串,值通常是字符串,但可以是二进制数据。

键通常由字符串、数字、特殊字符(-, _, .)组成,且大小写不敏感。但是不能用grpc-开头,框架内部预留。二进制Key使用-bin结尾。允许客户端向服务端发送元数据,同样也允许服务端向客户端发送元数据。

构建元数据

type MD map[string][]string

可以将元数据看成普通的map结构,key 是字符串,value是字符串数组,所以可以给同一个key添加多个value值。创建元数据有两种方式

  • 使用New函数从map结构中创建

    md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
    
  • 使用Pairs函数创建,相同的key自动合并为list

    md := metadata.Pairs(
        "key1", "val1",
        "key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}
        "key2", "val2",
    )
    

注意:所有的key将自动转变为小写,因此 key1 和 kEy1被认为是同一个Key,它们的value将被合并为list

二进制元数据

元数据中的key一直是字符串,但是value值可以是字符串或二进制数据。gRPC在创建二进制元数据时,要求key必须以 “-bin”结尾, 对value值进行编码

md := metadata.Pairs(
    "key", "string value",
    "key-bin", string([]byte{96, 102}), // 使用 base64编码
)

上下文获取元数据

可以从上下文对象context中调用FromIncomingContext方法获取元数据

func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

客户端

发送元数据

有两种方式可以将元数据发送到服务器。建议使用AppendToOutgoingContext方法将key-value键值对添加到上下文。该方法可以与上下文现有的元数据一起使用。如果之前没有元数据,则添加元数据;如果存在元数据,则合并key-value键值对。

AppendToOutgoingContext

// 在上下文中创建 元数据
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")

// 在元数据中添加新的数据 (例如:在另一个拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")

// 简单 RPC 调用
response, err := client.SomeRPC(ctx, someRequest)

// 流式 RPC 调用
stream, err := client.SomeStreamingRPC(ctx)

NewOutgoingContext

此外,使用使用NewOutgoingContext方法将元数据添加到context上下文,需要注意: 这将替换上下文已经存在的元数据

// 在上下文中创建 元数据
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 在元数据中添加新的数据 (例如:在另一个拦截器中)
send, _ := metadata.FromOutgoingContext(ctx)
newMD := metadata.Pairs("k3", "v3")
ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD))

// 简单RPC调用
response, err := client.SomeRPC(ctx, someRequest)

// 流式RPC调用
stream, err := client.SomeStreamingRPC(ctx)

获取元数据

客户端通过gRPC中 header、trailer 获取元数据。gRPC 在HTTP2协议的基础上构建数据传送通道,通常而言,都会先入为主的认为元数据会存放在HTTP header中,思考一下为什么还需要从trailer获取元数据?

gRPC使用HTTP trailers 由以下两个目的:

  • 内容发送后,需要使用 trailer 发送最终的状态
    • RPC调用过程中出现应用程序运行时错误时,状态、状态消息在尾部传送
    • Response响应结束流,在最后trailer frame中使用END_STREAM 标记结束
  • 支持流式RPC用例,流式RPC通常比正常HTTP请求长,HTTP trailer 尾部需要标记请求/响应的处理结果。例如,流式数据处理过程中出现错误,可以在trailer 中发送错误代码,这种情况HTTP Header 是不可预知的,因此Header无法满足此类需求。

Unary RPC

使用CallOption中的Header、Trailer函数获取 unary RPC中的元数据

var header, trailer metadata.MD // 定义变量存储 header、trailer 中的元数据
r, err := client.SomeRPC(
    ctx,
    someRequest,
    grpc.Header(&header),    // 获取 header 元数据
    grpc.Trailer(&trailer),  // 获取 trailer 元数据
)

Streaming RPC

可以使用ClientStream接口中的Header、Trailer函数获取响应流中的元数据。流式RPC调用包含:

  • 服务端流式RPC
  • 客户端流式RPC
  • 双向流式RPC调用
stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

// retrieve trailer
trailer := stream.Trailer()

服务端

发送元数据

Unary RPC

在请求-响应RPC调用中,gRPC服务端通过SendHeader、SetTrailer方法发送Header、Trailer元数据,这两个函数都需要上下文context作为参数

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    // create and send header
    header := metadata.Pairs("header-key", "val")
    grpc.SendHeader(ctx, header)
    // create and set trailer
    trailer := metadata.Pairs("trailer-key", "val")
    grpc.SetTrailer(ctx, trailer)
}

Streaming RPC

流式RPC调用,使用ServerStream中的SendHeader 、SetTrailer方法发送header、trailer元数据

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
    // create and send header
    header := metadata.Pairs("header-key", "val")
    stream.SendHeader(header)
    // create and set trailer
    trailer := metadata.Pairs("trailer-key", "val")
    stream.SetTrailer(trailer)
}

接收元数据

gRPC服务端从context上下文中获取客户端发送的元数据。但是请求-响应RPC、流式RPC接收元数据的方式稍有不同

Unary call

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

Streaming call

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
    md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
    // do something with metadata
}

Example 代码

完整代码

├── README.md
├── client
│   └── main.go
└── server
    └── main.go
  • 运行服务端

    go run server/main.go
    
  • 运行客户端

    go run client/main.go
    

Unary RPC收发元数据

  • 客户端核心代码

    func unaryCallWithMetadata(c pb.EchoClient, message string) {
    	fmt.Printf("--- unary ---\n")
    	// Create metadata and context.
    	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    	ctx := metadata.NewOutgoingContext(context.Background(), md)
    
    	//注意此种方式,调用 NewOutgoingContext函数会覆盖 timestamp元数据
    	//newMD := metadata.Pairs("k3", "v3")
    	//ctx = metadata.NewOutgoingContext(ctx, newMD)
    
    	// 定义变量 存储服务端header、trailer 元数据
    	var header, trailer metadata.MD
    	r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))
    	if err != nil {
    		log.Fatalf("failed to call UnaryEcho: %v", err)
    	}
    
    	// 判断response header是否返回timestamp元数据
    	if t, ok := header["timestamp"]; ok {
    		fmt.Printf("timestamp from header:\n")
    		for i, e := range t {
    			fmt.Printf(" %d. %s\n", i, e)
    		}
    	} else {
    		log.Fatal("timestamp expected but doesn't exist in header")
    	}
    	// 判断response header是否返回location元数据
    	if l, ok := header["location"]; ok {
    		fmt.Printf("location from header:\n")
    		for i, e := range l {
    			fmt.Printf(" %d. %s\n", i, e)
    		}
    	} else {
    		log.Fatal("location expected but doesn't exist in header")
    	}
    	fmt.Printf("------------------------------- response:\n")
    	fmt.Printf(" - %s\n", r.Message)
    
    	//读取trailer 元数据
    	if t, ok := trailer["timestamp"]; ok {
    		fmt.Printf("timestamp from trailer:\n")
    		for i, e := range t {
    			fmt.Printf(" %d. %s\n", i, e)
    		}
    	} else {
    		log.Fatal("timestamp expected but doesn't exist in trailer")
    	}
    }
    
  • 服务端核心代码

    func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
    	fmt.Printf("--- UnaryEcho ---\n")
    	// 利用defer 最后发送trailer元数据
    	defer func() {
    		//设置trailer 元数据
    		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    		grpc.SetTrailer(ctx, trailer)
    	}()
    
    	// 读取客户端的元数据
    	md, ok := metadata.FromIncomingContext(ctx)
    	if !ok {
    		return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
    	}
    	if t, ok := md["timestamp"]; ok {
    		fmt.Printf("timestamp from metadata:\n")
    		for i, e := range t {
    			fmt.Printf(" %d. %s\n", i, e)
    		}
    	}
    
    	// 发送header 元数据
    	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
    	grpc.SendHeader(ctx, header)
    
    	fmt.Printf("request received: %v, sending echo\n", in)
    
    	return &pb.EchoResponse{Message: in.Message}, nil
    }
    
  • Wireshark抓包截图

    在这里插入图片描述

    img

Streaming RPC收发元数据

简单的分析下服务端Streaming RPC调用的代码,其他方式(客户端Streaming RPC、双向流模式RPC)大同小异只是调用函数不一致而已

func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
	fmt.Printf("--- ServerStreamingEcho ---\n")
	// 利用defer 最后发送trailer元数据
	defer func() {
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
		stream.SetTrailer(trailer)
	}()

	// 读取客户端发送的元数据
	md, ok := metadata.FromIncomingContext(stream.Context())
	if !ok {
		return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata")
	}
	if t, ok := md["timestamp"]; ok {
		fmt.Printf("timestamp from metadata:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	}

	// 创建返回的header元数据
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
	stream.SendHeader(header)

	fmt.Printf("request received: %v\n", in)

	// 模拟多次发送
	for i := 0; i < streamingCount; i++ {
		fmt.Printf("echo message %v\n", in.Message)
		err := stream.Send(&pb.EchoResponse{Message: in.Message})
		if err != nil {
			return err
		}
	}
	return nil
}
  • 测试结果

    在这里插入图片描述

    如上图,代码里里面设置的trailer信息在最后一个stream流中的header 信息中传输给客户端。


网站公告

今日签到

点亮在社区的每一天
去签到