GRPC(3):四种通信模式‍

发布于:2023-02-07 ⋅ 阅读:(633) ⋅ 点赞:(0)

上篇文章我们介绍了gRPC 的基本概念,今天实现一个完整的 gRPC 服务,包括 proto 文件的定义,客户端和服务端代码的生成以及业务逻辑代码的补充。

GRPC 四种通信模式‍

普通模式(unary RPC)

假如我们需要构建一个订单管理系统,这个系统为用户提供了订单查询的接口,每次用户输入订单号,便会返回对应的订单信息。每个请求独立,且响应和请求一一对应,这就是简单的 RPC 模式,对于大多数业务场景均可以适用。

图片

service MysqlService {
  ...
  rpc SelectRecord(MysqlReq) returns (MysqlRes) {};
  ...
}

服务端流模式(server-streaming RPC)

假如我们要创建一个订单的信息的缓存池,实现订单的高效查询,缓存服务启动的时候就需要请求服务端,同步服务端的订单信息。这种情况下,我们只需要请求一次,服务端便会持续的为我们提供同步订单信息,这就是服务端流 RPC 的场景。

和简单的 RPC 模式不同,服务端流 RPC 模式,客户端发起一个 RPC 请求,服务端会返回一系列的响应结果,发送完所有的响应后,服务端在流结尾标记服务状态详情作为结束的元数据。

图片

service MysqlService {
...
 rpc DeleteRecord(MysqlReq) returns (stream MysqlRes) {}
...
}

客户端流模式(client-streaming RPC)

假设以下场景,有一个服务每天需要定时和订单管理系统更新订单的最终状态,服务端只需要在最后告诉该服务,最终的处理结果(哪些更新成功,哪些失败)。不需要频繁和服务端建立和断开连接。从而可以降低服务端的并发节省连接资源。

与服务端流模式类似,客户端流会发送连续的更新请求给服务端,服务端在收到请求后不会立马给到客户端响应结果,直到请求结束了,才会返回一个单独的响应。

图片

service MysqlService {
...
rpc InsertRecord(stream MysqlReq) returns (MysqlRes) {};
...
}

双向流模式(bidirectional-streaming RPC)

同样以上面的订单更新的服务为例子,每天定时向管理系统推送一批订单进行更新,假如这个批订单有上千万的量,而且订单更新服务之后还需要和其他系统进行数据同步,这个时候使用客户端流模式显然不是那么合适,我们可以采用双向流模式,不断地推送订单请求给管理系统,服务端在收到请求,处理完立马返回处理结果给客户端,直到所有的请求处理结束。这种场景就是双向流模式。

图片

service MysqlService {
...
 rpc UpdateRecord(stream MysqlReq) returns (stream MysqlRes) {};
...
}

一个简单的 GRPC 服务实现

这个服务为客户端提供 MYSQL 增删改查的 gRPC 接口。

图片

项目目录结构:

  _01_grpc_demo:    
  -> conf          # 数据库连接配置    
  -> cmd           # 测试样例    
  -> pb            # proto && pb.go 文件    
  -> client.go     # grpc 客户端代码    
  -> server.go     # grpc 服务端代码    
  -> Makefile      # 命令脚本

代码实现

mysql.proto

syntax = "proto3";

package _01_grpc_demo;

option go_package = "./;pb";

message MysqlReq {
  string sql = 1;
}

message MysqlRes {
  uint32 code = 1;
  string msg = 2;
  string data = 3;
}

service MysqlService {
  rpc SelectRecord(MysqlReq) returns (MysqlRes) {};
  rpc InsertRecord(stream MysqlReq) returns (MysqlRes) {};
  rpc DeleteRecord(MysqlReq) returns (stream MysqlRes) {}
  rpc UpdateRecord(stream MysqlReq) returns (stream MysqlRes) {};
}

server.go

package _01_grpc_demo

import (
  "context"
  "encoding/json"
  "errors"
  "fmt"
  "io"
  "log"

  "_01_grpc_demo/conf"
  "_01_grpc_demo/pb"
)

type MysqlServer struct {}

func NewMysqlServer() *MysqlServer {
  return &MysqlServer{}
}

func (*MysqlServer) SelectRecord(ctx context.Context, req *pb.MysqlReq) (*pb.MysqlRes, error) {

  sql := req.GetSql()
  log.Printf("receives a select record request with sql : %s", sql)

  // test sleep
  // time.Sleep(time.Second * 5)

  if err := ctx.Err(); err != nil {
    return nil, err
  }

  users, res := make([]*conf.User, 0), &pb.MysqlRes{}
  if err := conf.DB.Raw(sql).Scan(&users).Error; err != nil {
    return nil, err
  }

  rows, err := json.Marshal(users)
  if err != nil {
    return nil, err
  }

  res.Code = 200
  res.Msg = "select success"
  res.Data = string(rows)

  return res, nil
}

func (*MysqlServer) InsertRecord(stream pb.MysqlService_InsertRecordServer) error {

  var sql string
  for {
    req, err := stream.Recv()
    if err == io.EOF || req == nil{
      log.Printf("read ends")
      break
    }

    if len(sql) == 0 {
      sql = req.Sql
    } else {
      sql = fmt.Sprintf("%s;%s", sql, req.Sql)
    }
  }

  log.Printf("receives a insert record request with sql : %s", sql)

  // test sleep
  // time.Sleep(time.Second * 5)

  res := &pb.MysqlRes{
    Code: 200,
    Msg:  fmt.Sprintf("insert records sql [%s] succ", sql),
    Data: _empty,
  }

  if err := conf.DB.Exec(sql).Error; err != nil {
    res.Code = 500
    res.Msg = fmt.Sprintf("exec %s err [%s]", sql, err)
    return err
  }

  // send res
  if err := stream.SendAndClose(res); err != nil {
    errs := fmt.Sprintf("send res [%s] err [%s]", res.Msg, err)
    return errors.New(errs)
  }

  return nil
}

func (*MysqlServer) DeleteRecord(
  req *pb.MysqlReq,
  stream pb.MysqlService_DeleteRecordServer,
  ) error {

  sql := req.GetSql()
  log.Printf("receives a delete record request with sql : %s", sql)

  // test sleep
  // time.Sleep(time.Second * 5)

  res := &pb.MysqlRes{
    Code: 200,
    Msg:  fmt.Sprintf("delete records sql [%s] succ", sql),
    Data: _empty,
  }

  if err := conf.DB.Exec(sql).Error; err != nil {
    res.Code = 500
    res.Msg = fmt.Sprintf("exec %s err [%s]", sql, err)
  }

  // send res
  if err := stream.Send(res); err != nil {
    resBytes, _ := json.Marshal(res)
    return fmt.Errorf("send res [%s] to stream err [%s]", string(resBytes), err)
  }

  return nil
}

func (*MysqlServer) UpdateRecord(stream pb.MysqlService_UpdateRecordServer) error {
  var sql string
  for {
    req, err := stream.Recv()
    if err == io.EOF || req == nil{
      log.Printf("read ends\n")
      break
    }

    if len(sql) == 0 {
      sql = req.Sql
    } else {
      sql = fmt.Sprintf("%s;%s", sql, req.Sql)
    }
  }

  log.Printf("receives a update record request with sql : %s\n", sql)

  res := &pb.MysqlRes{
    Code: 200,
    Msg:  fmt.Sprintf("update records sql [%s] succ", sql),
    Data: _empty,
  }

  if err := conf.DB.Exec(sql).Error; err != nil {
    res.Code = 500
    res.Msg = fmt.Sprintf("exec %s err [%s]", sql, err)
  }

  if err := stream.Send(res); err != nil {
    resBytes, _ := json.Marshal(res)
    return fmt.Errorf("send res [%s] to stream err [%s]", string(resBytes), err)
  }

  return nil
}

client.go

package _01_grpc_demo

import (
  "context"
  "encoding/json"
  "fmt"
  "io"
  "log"
  "time"

  "google.golang.org/grpc"

  "_01_grpc_demo/pb"
)

const (
  _empty = ""
)

type MysqlClient struct {
  service pb.MysqlServiceClient
}

func NewMysqlClient(cc *grpc.ClientConn) *MysqlClient {
  service := pb.NewMysqlServiceClient(cc)

  return &MysqlClient{service}
}

func (mysqlClient *MysqlClient) SelectRecord(sql string) string {
  req := &pb.MysqlReq{
    Sql: sql,
  }

  // set timeout
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  res, err := mysqlClient.service.SelectRecord(ctx, req)
  if err != nil {
    log.Fatal("select record err ", err)
    return _empty
  }

  return fmt.Sprintf("select record success %s", res.GetData())
}

func (mysqlClient *MysqlClient) InsertRecord(sql string) string {

  req := &pb.MysqlReq{
    Sql: sql,
  }

  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  stream, err := mysqlClient.service.InsertRecord(ctx)
  if err != nil {
    log.Fatal("insert record err ", err)
    return _empty
  }

  if err = stream.Send(req); err != nil {
    log.Fatal("send req err ", err)
    return _empty
  }

  res, err := stream.CloseAndRecv()
  if err != nil {
    log.Fatal("receive res err ", err)
    return _empty
  }

  resBytes, _ := json.Marshal(res)
  return string(resBytes)
}

func (mysqlClient *MysqlClient) DeleteRecord(sql string) string {

  req := &pb.MysqlReq{
    Sql: sql,
  }

  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  stream, err := mysqlClient.service.DeleteRecord(ctx, req)
  if err != nil {
    log.Fatal("delete record err ", err)
    return _empty
  }

  resBytes := make([]byte, 0)
  for {
    res, err := stream.Recv()
    if err == io.EOF {
      break
    }
    if err != nil {
      log.Fatal("cannot receive response: ", err)
      return _empty
    }

    resBytes, _ = json.Marshal(res)
  }

  return string(resBytes)
}

func (mysqlClient *MysqlClient) UpdateRecord(sql string) string {

  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  stream, err := mysqlClient.service.UpdateRecord(ctx)
  if err != nil {
    log.Fatal("update record err ", err)
    return _empty
  }

  waitResponse := make(chan error)
  // go routine to receive responses
  go func() {
    for {
      res, err := stream.Recv()
      if err == io.EOF {
        log.Print("no more responses")
        waitResponse <- nil
        return
      }
      if err != nil {
        waitResponse <- fmt.Errorf("cannot receive stream response: %v", err)
        return
      }

      log.Print("received response: ", res)
    }
  }()

  req := &pb.MysqlReq{
    Sql: sql,
  }

  err = stream.Send(req)
  if err != nil {
    return fmt.Sprintf("send stream err: %s", err)
  }
  err = stream.CloseSend()
  if err != nil {
    return fmt.Sprintf("cannot close send: %s", err)
  }

  err = <-waitResponse
  return fmt.Sprintf("%s0000", err)
}

makeFile

clean:
  rm pb/*.go

gen:
  protoc --plugin=protoc-gen-go=/d/workspace/golang/bin/protoc-gen-go.exe --go_out=plugins=grpc:pb --proto_path=pb pb/*.proto

server:
  go run cmd/server/main.go -port 8080

client:
  go run cmd/client/main.go -address 0.0.0.0:8080

参考资料

  • gRPC Up and Running

  • https://blog.csdn.net/weixin_39678525/article/details/110570486

  • private_projectshttps://developers.google.com/protocol-buffers/docs/gotutorial

  • https://github.com/unendlichkeiten/private_projects

  • https://www.bilibili.com/video/BV1Xv411t7h5?spm_id_from=333.999.0.0

关注公众号一起学习——无涯的计算机笔记

图片


网站公告

今日签到

点亮在社区的每一天
去签到