一、grpc的搭建
一、安装依赖工具
1. 基础工具安装
sudo apt-get install autoconf automake libtool
2. 检查工具版本
# 检查 CMake 版本 cmake -version # 检查 GCC/G++ 版本 gcc -v g++ -v
二、安装 CMake(≥3.15)
1. 卸载旧版本
sudo apt-get autoremove cmake
2. 下载并解压新版
# 下载(以 3.23.0 为例) wget https://cmake.org/files/v3.23/cmake-3.23.0-linux-x86_64.tar.gz # 解压 tar zxf cmake-3.23.0-linux-x86_64.tar.gz # 查看目录结构 tree -L 2 cmake-3.23.0-linux-x86_64
3. 创建软链接(推荐
/opt
路径)sudo mv cmake-3.23.0-linux-x86_64 /opt/cmake-3.23.0 sudo ln -sf /opt/cmake-3.23.0/bin/* /usr/bin/
4. 验证安装
cmake -version # 应显示 3.23.0 或更高
三、安装 GCC/G++(≥7.0)
1. 添加软件源
sudo apt-get install -y software-properties-common sudo add-apt-repository ppa:ubuntu-toolchain-r/test sudo apt update
2. 安装 GCC 7
sudo apt install g++-7 -y
3. 建立软链接并验证
# 配置多版本切换 sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-7 60 \ --slave /usr/bin/g++ g++ /usr/bin/g++-7 # 切换版本(根据提示选择 gcc-7) sudo update-alternatives --config gcc # 验证版本 gcc -v # 应显示 7.x.x g++ -v # 应显示 7.x.x
四、编译 GRPC
方案一:通过源码编译(需科学上网)
# 1. 克隆源码 git clone https://github.com/grpc/grpc cd grpc # 2. 切换版本(如 v1.45.2) git tag # 查看可用版本 git checkout v1.45.2 # 3. 下载第三方依赖 git submodule update --init # 此步骤耗时较长,需耐心等待 # 4. 编译安装 mkdir -p cmake/build cd cmake/build cmake ../.. make sudo make install
方案二:使用预下载压缩包(900+MB,无需翻墙)
# 1. 解压压缩包 tar -jxf grpc-v1.45.2.tar.bz2 cd grpc-v1.45.2 # 2. 直接编译安装(跳过源码下载步骤) mkdir -p cmake/build cd cmake/build cmake ../.. make sudo make install
五、安装 Protobuf(与 GRPC 绑定版本)
# 进入 GRPC 自带的 Protobuf 目录 cd grpc/third_party/protobuf/ # 若使用压缩包,路径为 grpc-v1.45.2/third_party/protobuf/ # 配置编译 ./autogen.sh ./configure --prefix=/usr/local make sudo make install sudo ldconfig # 更新动态库缓存 # 验证版本 protoc --version # 应显示 3.19.4 或与 GRPC 匹配的版本
六、测试环境:编译运行 HelloWorld
1. 编译示例代码
cd grpc/examples/cpp/helloworld/ # 或压缩包对应路径 mkdir build cd build cmake .. make
2. 启动服务端
./greeter_server # 监听 50051 端口 # 输出:Server listening on 0.0.0.0:50051
3. 启动客户端
./greeter_client # 向服务端发送请求 # 输出:Greeter received: Hello world
七、参考资料
- Ubuntu 搭建 GRPC for C++ 开发环境:提供修改第三方库下载地址的方法。
八、辅助工具:SCP 远程文件传输
1. 从服务器下载文件
scp username@serverIP:/remote/path/file.txt /local/path
2. 上传文件到服务器
scp /local/path/file.txt username@serverIP:/remote/path
3. 下载 / 上传目录(需加
-r
参数)# 下载目录 scp -r username@serverIP:/remote/dir /local/dir # 上传目录 scp -r /local/dir username@serverIP:/remote/dir
二、grpc的原理
1.什么是grpc
RPC 即远程过程调用协议(Remote Procedure Call Protocol),可以让我们像调用本地对象一样发起 远程调用。RPC 凭借其强大的治理功能,成为解决分布式系统通信问题的一大利器。 gRPC是一个现代的、高性能、开源的和语言无关的通用 RPC 框架,基于 HTTP2 协议设计,序列化使用 PB(Protocol Buffer),PB 是一种语言无关的高性能序列化框架,基于 HTTP2+PB 保证了的高性能。
2. gRPC 核心特性
2.1 基于服务的架构
- 定义服务接口:明确方法、入参、出参
- 服务器端:实现具体服务逻辑
- 客户端:通过存根(Stub)调用服务,与服务端接口一致
2.2 跨语言通信(Protocol Buffer 驱动)
- 默认 IDL:使用 Protocol Buffer 定义接口
- 数据序列化:基于 Protocol Buffer 实现跨语言数据传输
- 例:Java 服务可被 Go 语言客户端调用
- 语言无关性:支持多语言客户端与服务端混合部署
2.3 同步与异步调用
- 同步调用:客户端阻塞等待服务端响应
- 异步调用:服务端处理完成后主动回调客户端,无需阻塞
2.4 基于 HTTP/2 协议
- 性能优势:利用 HTTP/2 的多路复用、二进制分帧等特性
- 协议兼容性:基于 HTTP/2 标准,支持现代网络优化
2.5 可扩展的负载均衡与服务发现
- 未内置实现:提供命名解析、负载均衡接口供扩展
- 生态集成:可与服务网格(如 Istio)结合实现高级路由
2.6 四类服务方法(基于 HTTP/2 流特性)
- 一元 RPC
- 单次请求 - 响应,类似普通函数调用
- 服务端流式 RPC
- 客户端单次请求,服务端返回流式响应(多条消息)
- 客户端流式 RPC
- 客户端发送流式请求,服务端单次响应
- 双向流式 RPC
- 客户端与服务端双向流式通信,支持任意顺序读写
2.7. 支持的编程语言(GA 状态)
- C++
- Java(含 Android 支持)
- Objective-C(iOS 支持)
- Python
- Ruby
- Go
- C#
- Node.js
- 特性:遵循语义版本控制,跨平台兼容
2.8 使用场景
- 低延迟、高扩展性的分布式系统
- 移动客户端与云服务器通信(如 iOS/Android 后端)
- 设计跨语言、高效的新协议
- 分层扩展场景(认证、负载均衡、监控等)
2.9 典型用户
- 企业级应用:Google 云产品、Square、Netflix、CoreOS、Docker、Cisco 等
- 技术场景:微服务架构、API 网关、实时数据同步
2.10 设计动机与原则(10 大核心)
- 自由开放:开源、跨平台、跨语言,支持全平台使用
- 协议可插拔:支持 JSON/XML/Thrift 等数据格式,可扩展负载均衡、日志等机制
- 阻塞与非阻塞支持:同步 / 异步消息处理,适配不同平台扩展需求
- 取消与超时机制:允许客户端取消 RPC 或设置超时,避免资源浪费
- 优雅拒绝请求:服务器可拒绝新请求并平滑关闭,保障稳定性
- 流处理能力:支持大规模数据集、实时消息(如语音识别、股票行情)
- 流控制:平衡客户端与服务器资源,防御 DoS 攻击
- 元数据交换:支持认证、跟踪等非业务数据传输,解耦通用功能
- 标准化状态码:定义清晰的错误码体系,简化客户端错误处理
- 互通性:基于 HTTP/2WireFormat 协议,兼容互联网基础设施
信息来源
- gRPC 官方 FAQ
- 内容基于 Protocol Buffer、HTTP/2 及微服务架构最佳实践整理
3. 数据封装与传输问题
3.1 网络传输中的内容封装数据体积问题
网络传输中,数据封装的核心矛盾是可读性与传输效率的平衡。早期 RPC 多使用 JSON,现代 RPC(如 gRPC)更倾向二进制序列化(如 Protobuf)。
3.1.1 JSON 与 Protobuf 对比
特性 | JSON | Protobuf(Protocol Buffers) |
---|---|---|
设计目标 | 人类可读,通用文本格式 | 机器高效处理,二进制序列化 |
跨语言支持 | 无需额外约定,天然跨语言 | 需共享.proto 文件,通过中间代码实现跨语言 |
版本兼容性 | 友好(可灵活扩展字段) | 友好(支持字段保留、默认值,兼容旧版本) |
数据表达能力 | 仅支持基础类型(字符串、数字、对象、数组) | 支持复杂类型(结构体、数组、Map、嵌套对象等) |
传输体积 | 文本冗余大(如键名重复、类型标记),压缩率低 | 二进制编码,体积小(通常为 JSON 的 1/3~1/10),压缩率高 |
编解码效率 | 文本解析(需逐行解析),效率较低 | 二进制解析(基于字段标号快速定位),效率高(通常比 JSON 快 2~10 倍) |
3.1.2 Protobuf 的核心机制(以 gRPC 为例)
gRPC 选择 Protobuf 作为默认序列化方案,其关键特性如下:
接口定义(.proto 文件)
通过.proto
文件明确定义服务、方法、入参和出参,示例:// 定义服务 service Test { rpc HowRpcDefine(Request) returns (Response); // 一元RPC方法 } // 定义消息(入参) message Request { int64 user_id = 1; // 字段标号(关键编解码标识) string name = 2; } // 定义消息(出参) message Response { repeated int64 ids = 1; // 数组(repeated) Value info = 2; // 嵌套对象 map<int, Value> values = 3; // Map映射 } // 嵌套消息类型 message Value { bool is_man = 1; int age = 2; }
跨语言支持
- 依赖双方需共享同一份
.proto
文件,通过 Protobuf 编译器(protoc
)生成各语言的中间代码(如 C++ 的class
、Go 的struct
)。 - 中间代码包含对象序列化 / 反序列化逻辑,确保不同语言间数据的正确传输与解析。
- 依赖双方需共享同一份
3.2 网络传输效率问题(HTTP/2 的优化)
gRPC 基于 HTTP/2 协议,通过以下机制解决 HTTP/1.1 的传输效率瓶颈:
3.2.1 HTTP/1.1 的核心问题
- 串行请求阻塞:同一 TCP 连接中,请求需按顺序发送,响应需按顺序接收。若某个请求响应延迟,后续请求会被阻塞(队头阻塞,Head-of-Line Blocking)。
- 连接资源浪费:为避免阻塞,浏览器需建立多个 TCP 连接(如 Chrome 默认 6 个),增加了握手、内存和 CPU 开销。
3.2.2 HTTP/2 的关键改进
HTTP/2 未改变 HTTP 语义(如 GET/POST),但通过以下机制优化传输效率:
二进制分帧(Binary Framing)
- 将请求 / 响应数据拆分为最小单位的帧(Frame)(如 HEADERS 帧、DATA 帧),以二进制格式传输。
- 帧包含流 ID(Stream ID),标识所属的请求 / 响应流。
多路复用(Multiplexing)
- 同一 TCP 连接中可并发传输多个请求 / 响应的帧,通过流 ID 区分不同流。
- 示例:浏览器同时发送 HTML、CSS、JS、图片的请求,所有请求的帧混合传输,客户端按流 ID 重组为完整响应。
头部压缩(HPACK)
- 客户端与服务器共享头部字段缓存表,重复字段通过索引引用,减少冗余传输(头部体积可压缩至原大小的 10%~20%)。
2.2.3 效果总结
- 降低资源消耗:仅需 1 个 TCP 连接即可处理所有请求,减少握手和连接管理开销。
- 消除队头阻塞:请求响应并行传输,避免因单个请求延迟导致整体阻塞。
- 提升传输效率:二进制帧和头部压缩减少了网络带宽占用,加速数据传输。
三、grpc的4中模式
1. 一元 RPC 模式(Unary RPC)
一元 RPC 模式也被称为简单 RPC 模式。在该模式中,当客户端调用服务器端的远程方法时,客户端发 送请求至服务器端并获得一个响应,与响应一起发送的还有状态细节以及 trailer 元数据。
具体使用方式:
1. 服务定义(Protobuf 文件 my_service.proto
)
syntax = "proto3";
package mypackage;
message Request {
string data = 1;
}
message Response {
string result = 1;
}
service MyService {
rpc UnaryMethod(Request) returns (Response); // 一元 RPC 方法
}
2. 服务器端实现(C++)
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using mypackage::Request;
using mypackage::Response;
using mypackage::MyService;
// 实现服务接口
class MyServiceImpl final : public MyService::Service {
Status UnaryMethod(ServerContext* context, const Request* request, Response* response) override {
// 处理请求(示例:返回请求数据的大写形式)
std::string input = request->data();
/*
std::transform(input.begin(), input.end(), input.begin(), ::toupper);
功能:将输入字符串的所有字符转换为大写。
细节:
std::transform 是 C++ STL 中的算法函数,用于对容器(如字符串)的元素进行批量转换。
参数说明:
input.begin() 和 input.end():字符串的起始和结束迭代器(表示要处理的字符范围)。
第三个 input.begin():转换后的结果存储的起始位置(这里直接覆盖原字符串,实现 “原地修改”)。
::toupper:转换函数(C 标准库函数,将单个字符转换为大写)。
示例:若 input 是 "hello",执行后会变为 "HELLO"。
*/
std::transform(input.begin(), input.end(), input.begin(), ::toupper);
response->set_result(input);
return Status::OK;
}
};
// 启动服务器
void RunServer() {
std::string server_address("0.0.0.0:50051");
MyServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
3. 客户端实现(C++)
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using mypackage::Request;
using mypackage::Response;
using mypackage::MyService;
class MyServiceClient {
public:
MyServiceClient(std::shared_ptr<Channel> channel) : stub_(MyService::NewStub(channel)) {}
std::string UnaryCall(const std::string& data) {
Request request;
request.set_data(data);
Response response;
ClientContext context;
// 发送请求并获取响应(阻塞式)
Status status = stub_->UnaryMethod(&context, request, &response);
if (status.ok()) {
return response.result();
} else {
std::cerr << "RPC failed: " << status.error_message() << std::endl;
return "";
}
}
private:
std::unique_ptr<MyService::Stub> stub_;
};
int main() {
MyServiceClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()));
std::string response = client.UnaryCall("hello");
std::cout << "Response: " << response << std::endl; // 输出 "HELLO"
return 0;
}
2. 服务器端流 RPC 模式(Server Streaming RPC)
在一元 RPC 模式中,gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。在服务器 端流 RPC 模式中,服务器端在接收到客户端的请求消息后,会发回一个响应的序列。这种多个响应所组 成的序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端会以 trailer 元数据的形式将 其状态发送给客户端,从而标记流的结束。
具体使用方式:
1. 服务定义(Protobuf)
syntax = "proto3";
package mypackage;
// 请求消息(客户端发送的单个请求)
message StreamRequest {
int32 count = 1; // 客户端要求服务器发送的数据流长度(示例字段)
}
// 响应消息(服务器发送的流数据单元)
message StreamResponse {
string data = 1; // 服务器返回的单个数据块(示例:递增的字符串)
}
// 服务定义(包含服务器端流方法)
service MyService {
// 服务器端流方法:客户端发送单个 StreamRequest,服务器返回 stream StreamResponse
rpc ServerStream(StreamRequest) returns (stream StreamResponse);
}
2. 服务器端实现(C++)
// server_stream_server.cc
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h" // 由 protoc 生成的头文件
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerWriter; // 服务器端流写工具
using mypackage::StreamRequest;
using mypackage::StreamResponse;
using mypackage::MyService;
// 服务实现类
class MyServiceImpl final : public MyService::Service {
public:
// 重写服务器端流方法(必须使用 override)
Status ServerStream(ServerContext* context,
const StreamRequest* request,
ServerWriter<StreamResponse>* writer) override {
// 1. 从客户端请求中获取要发送的数据流长度(count)
int32_t total = request->count();
std::cout << "Received request to send " << total << " chunks" << std::endl;
// 2. 循环发送指定数量的 StreamResponse(模拟流数据)
for (int i = 1; i <= total; ++i) {
StreamResponse response;
response.set_data("Chunk-" + std::to_string(i)); // 设置数据块内容(如 "Chunk-1")
// 发送流数据(阻塞直到发送完成或出错)
if (!writer->Write(response)) {
std::cerr << "Failed to write chunk " << i << std::endl;
return Status(grpc::StatusCode::INTERNAL, "Stream write failed");
}
// 模拟耗时操作(间隔 1 秒)
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return Status::OK; // 流发送完毕,返回成功状态
}
};
// 启动服务器
void RunServer() {
const std::string server_address("0.0.0.0:50051");
MyServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
3. 客户端实现(C++)
// server_stream_client.cc
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h" // 由 protoc 生成的头文件
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using grpc::ClientReader; // 客户端流读工具
using mypackage::StreamRequest;
using mypackage::StreamResponse;
using mypackage::MyService;
// 客户端类
class MyServiceClient {
public:
MyServiceClient(std::shared_ptr<Channel> channel)
: stub_(MyService::NewStub(channel)) {}
// 调用服务器端流方法(返回是否成功)
bool CallServerStream(int32_t count) {
StreamRequest request;
request.set_count(count); // 设置要请求的数据流长度
ClientContext context;
StreamResponse response;
// 1. 创建客户端流读取器(关联请求和上下文)
std::unique_ptr<ClientReader<StreamResponse>> reader(
stub_->ServerStream(&context, request));
// 2. 循环读取服务器发送的流数据
std::cout << "Receiving server stream..." << std::endl;
while (reader->Read(&response)) {
std::cout << "Received: " << response.data() << std::endl; // 输出每个数据块
}
// 3. 检查流结束状态
Status status = reader->Finish();
if (status.ok()) {
std::cout << "Server stream completed successfully" << std::endl;
return true;
} else {
std::cerr << "Server stream failed: " << status.error_message() << std::endl;
return false;
}
}
private:
std::unique_ptr<MyService::Stub> stub_;
};
int main() {
MyServiceClient client(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// 请求服务器发送 5 个数据块(模拟实时数据推送)
client.CallServerStream(5);
return 0;
}
3. 客户端流 RPC(Client Streaming RPC)
在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一 个响应给客户端。但是,服务器端不一定要等到从客户端接收到所有消息后才发送响应。基于这样的逻辑我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以在读取完流中的所有消息之 后再发送响应。
1. .proto 文件(my_service.proto
)
syntax = "proto3";
package mypackage;
// 请求消息(客户端发送的流数据单元)
message UploadRequest {
string log_entry = 1; // 示例字段:客户端上传的单条日志
}
// 响应消息(服务器返回的汇总结果)
message UploadResponse {
int32 total_logs = 1; // 示例字段:服务器统计的日志总数
string summary = 2; // 示例字段:日志内容汇总
}
// 服务定义(包含客户端流方法)
service MyService {
// 客户端流方法:客户端发送 stream UploadRequest,服务器返回单个 UploadResponse
rpc ClientStream(stream UploadRequest) returns (UploadResponse);
}
2. 服务端实现(C++)
// client_stream_server.cc
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerReader; // 客户端流读工具
using mypackage::UploadRequest;
using mypackage::UploadResponse;
using mypackage::MyService;
// 服务实现类
class MyServiceImpl final : public MyService::Service {
public:
// 重写客户端流方法
Status ClientStream(ServerContext* context,
ServerReader<UploadRequest>* reader,
UploadResponse* response) override {
// 1. 初始化变量,累积客户端流数据
int32_t total = 0;
std::string all_logs;
UploadRequest request;
// 2. 循环读取客户端发送的流数据(阻塞直到流结束)
while (reader->Read(&request)) {
all_logs += request.log_entry() + "\n"; // 拼接日志内容
total++; // 统计日志数量
}
// 3. 生成响应(汇总日志信息)
response->set_total_logs(total);
response->set_summary("Logs received:\n" + all_logs);
return Status::OK;
}
};
// 启动服务器
void RunServer() {
const std::string server_address("0.0.0.0:50051");
MyServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
3. 客户端实现(C++)
// client_stream_client.cc
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using grpc::ClientWriter; // 客户端流写工具
using mypackage::UploadRequest;
using mypackage::UploadResponse;
using mypackage::MyService;
// 客户端类
class MyServiceClient {
public:
MyServiceClient(std::shared_ptr<Channel> channel)
: stub_(MyService::NewStub(channel)) {}
// 调用客户端流方法(返回服务器的汇总结果)
std::string CallClientStream(const std::vector<std::string>& logs) {
ClientContext context;
UploadResponse response;
// 1. 创建客户端流写入器(关联上下文和响应)
//当客户端调用 stub_->ClientStream 时,服务器端的 ClientStream 方法会被 gRPC 框架立即触发,进入等待读取客户端流数据的状态(通过 ServerReader<UploadRequest>* reader)。
std::unique_ptr<ClientWriter<UploadRequest>> writer(
stub_->ClientStream(&context, &response));
// 2. 发送流数据(循环发送每条日志)
for (const auto& log : logs) {
UploadRequest request;
request.set_log_entry(log); // 设置单条日志内容
if (!writer->Write(request)) { // 发送数据(失败时退出)
std::cerr << "Failed to write log: " << log << std::endl;
writer->WritesDone();
return "";
}
// 模拟分批发送(间隔 0.5 秒)
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// 3. 结束流并等待服务器响应
writer->WritesDone(); // 通知服务器:客户端流发送完毕
Status status = writer->Finish(); // 等待服务器处理
if (status.ok()) {
// 拼接响应结果(日志总数 + 汇总内容)
return "Total logs: " + std::to_string(response.total_logs())
+ "\nSummary:\n" + response.summary();
} else {
std::cerr << "Client stream failed: " << status.error_message() << std::endl;
return "";
}
}
private:
std::unique_ptr<MyService::Stub> stub_;
};
int main() {
MyServiceClient client(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// 模拟客户端要上传的日志列表
std::vector<std::string> logs = {
"[2023-10-01] Server started",
"[2023-10-01] User login: alice",
"[2023-10-01] Error: disk full",
"[2023-10-01] Backup completed",
"[2023-10-01] Server stopped"
};
// 调用客户端流方法并打印结果
std::string result = client.CallClientStream(logs);
if (!result.empty()) {
std::cout << "Server response:\n" << result << std::endl;
}
return 0;
}
4. 双向流 RPC(Bidirectional Streaming RPC)
在双向流 RPC 模式中,客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响 应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。
1. .proto 文件(my_service.proto
)
syntax = "proto3";
package mypackage;
// 消息(客户端和服务器双向传递的数据单元)
message ChatMessage {
string sender = 1; // 发送者(如 "Client" 或 "Server")
string content = 2; // 消息内容
}
// 服务定义(包含双向流方法)
service MyService {
// 双向流方法:客户端发送 stream ChatMessage,服务器返回 stream ChatMessage
rpc BidirectionalStream(stream ChatMessage) returns (stream ChatMessage);
}
2. 服务端实现(C++)
// bidirectional_stream_server.cc
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerReaderWriter; // 双向流读写工具
using mypackage::ChatMessage;
using mypackage::MyService;
// 服务实现类
class MyServiceImpl final : public MyService::Service {
public:
// 重写双向流方法
Status BidirectionalStream(ServerContext* context,
ServerReaderWriter<ChatMessage, ChatMessage>* stream) override {
ChatMessage request;
ChatMessage response;
// 循环读取客户端消息并回复(非阻塞,双方可独立发送)
while (stream->Read(&request)) {
std::cout << "Received from " << request.sender() << ": " << request.content() << std::endl;
// 构造回复消息(服务器作为发送者)
response.set_sender("Server");
response.set_content("ACK: " + request.content()); // 回复内容为原消息的确认
// 发送回复(非阻塞,可与客户端发送同时进行)
if (!stream->Write(response)) {
std::cerr << "Failed to write response" << std::endl;
return Status(grpc::StatusCode::INTERNAL, "Stream write failed");
}
}
return Status::OK;
}
};
// 启动服务器
void RunServer() {
const std::string server_address("0.0.0.0:50051");
MyServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
3. 客户端实现(C++)
// bidirectional_stream_client.cc
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "my_service.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using grpc::ClientReaderWriter; // 双向流读写工具
using mypackage::ChatMessage;
using mypackage::MyService;
// 客户端类
class MyServiceClient {
public:
MyServiceClient(std::shared_ptr<Channel> channel)
: stub_(MyService::NewStub(channel)) {}
// 启动双向流通信(模拟实时聊天)
void StartBidirectionalStream() {
ClientContext context;
auto stream = stub_->BidirectionalStream(&context); // 双向流读写器
// 1. 启动发送线程(向服务器发送消息)
std::thread sender([stream]() {
ChatMessage request;
request.set_sender("Client");
// 模拟发送 3 条消息(间隔 1 秒)
for (int i = 1; i <= 3; ++i) {
request.set_content("Hello-" + std::to_string(i));
if (!stream->Write(request)) {
std::cerr << "Failed to send message " << i << std::endl;
return;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
stream->WritesDone(); // 发送完毕,通知服务器
});
// 2. 启动接收线程(读取服务器回复)
std::thread receiver([stream]() {
ChatMessage response;
while (stream->Read(&response)) {
std::cout << "Received from " << response.sender() << ": "
<< response.content() << std::endl;
}
});
// 等待线程结束
sender.join();
receiver.join();
// 检查流结束状态
Status status = stream->Finish();
if (!status.ok()) {
std::cerr << "Bidirectional stream failed: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<MyService::Stub> stub_;
};
int main() {
MyServiceClient client(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
std::cout << "Starting bidirectional chat..." << std::endl;
client.StartBidirectionalStream();
return 0;
}
四、grpc的同步与异步
一、前置准备:定义
.proto
文件gRPC 基于
.proto
文件定义服务接口和消息格式。我们使用官方helloworld.proto
示例:// helloworld.proto syntax = "proto3"; package helloworld; // 服务定义(包含一个一元 RPC 方法) service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} } // 请求消息:包含用户姓名 message HelloRequest { string name = 1; } // 响应消息:包含问候语 message HelloReply { string message = 1; }
二、生成 gRPC 代码
通过
protoc
编译器和gRPC C++ 插件
生成服务接口和消息类的 C++ 代码。2.1 安装依赖
- 安装
protoc
(Protocol Buffers 编译器):下载地址。- 安装
gRPC C++ 库
:参考 gRPC C++ 安装指南。2.2 生成代码命令
在终端执行以下命令(确保
protoc
和grpc_cpp_plugin
在 PATH 中):protoc --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=$(which grpc_cpp_plugin) helloworld.proto
生成以下文件:
helloworld.pb.h
:消息类头文件(如HelloRequest
、HelloReply
)。helloworld.pb.cc
:消息类实现。helloworld.grpc.pb.h
:服务接口头文件(如Greeter::Service
、Greeter::Stub
)。helloworld.grpc.pb.cc
:服务接口实现。三、同步调用实现
3.1 同步客户端(Client)
同步客户端直接调用 RPC 方法并阻塞等待响应。
代码(
greeter_sync_client.cc
):#include <iostream> #include <memory> #include <string> #include <grpcpp/grpcpp.h> #include "helloworld.pb.h" #include "helloworld.grpc.pb.h" using grpc::Channel; using grpc::ClientContext; using grpc::Status; using helloworld::HelloRequest; using helloworld::HelloReply; using helloworld::Greeter; // 同步客户端类 class GreeterClient { public: explicit GreeterClient(std::shared_ptr<Channel> channel) : stub_(Greeter::NewStub(channel)) {} // 调用 SayHello RPC(同步阻塞) std::string SayHello(const std::string& user) { HelloRequest request; request.set_name(user); // 设置请求参数 HelloReply reply; ClientContext context; // RPC 上下文(可设置超时、元数据等) // 同步调用:阻塞直到服务端返回响应 Status status = stub_->SayHello(&context, request, &reply); if (status.ok()) { return reply.message(); // 返回成功响应 } else { std::cerr << "RPC failed: " << status.error_message() << std::endl; return "Error"; } } private: std::unique_ptr<Greeter::Stub> stub_; // gRPC 服务存根 }; int main(int argc, char** argv) { // 连接服务端(地址:端口) std::string target = "localhost:50051"; auto channel = grpc::CreateChannel( target, grpc::InsecureChannelCredentials()); // 非安全连接(测试用) GreeterClient client(channel); std::string user = "World"; std::string response = client.SayHello(user); std::cout << "Sync Client Received: " << response << std::endl; return 0; }
3.2 同步服务器(Server)
同步服务器为每个请求分配一个线程处理,代码简单易维护。
代码(
greeter_sync_server.cc
):#include <iostream> #include <memory> #include <string> #include <grpcpp/grpcpp.h> #include "helloworld.pb.h" #include "helloworld.grpc.pb.h" using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; using helloworld::HelloRequest; using helloworld::HelloReply; using helloworld::Greeter; // 服务实现类(继承 Greeter::Service) class GreeterServiceImpl final : public Greeter::Service { // 重写 SayHello 方法(处理请求并返回响应) Status SayHello(ServerContext* context, const HelloRequest* request, HelloReply* reply) override { std::string greeting = "Hello " + request->name(); reply->set_message(greeting); // 设置响应内容 return Status::OK; // 返回成功状态 } }; void RunServer() { std::string server_address = "0.0.0.0:50051"; // 监听所有网卡的 50051 端口 GreeterServiceImpl service; ServerBuilder builder; // 配置非安全连接(测试用,生产环境需使用 TLS) builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); // 注册服务实现 std::unique_ptr<Server> server(builder.BuildAndStart()); std::cout << "Sync Server listening on: " << server_address << std::endl; server->Wait(); // 阻塞等待,直到服务器终止 } int main(int argc, char** argv) { RunServer(); return 0; }
四、异步调用实现
异步调用通过
CompletionQueue
处理事件,避免线程阻塞,适合高并发场景。4.1 异步客户端(Client)
异步客户端发送请求后不阻塞,通过
CompletionQueue
异步接收响应。代码(
greeter_async_client.cc
):#include <iostream> #include <memory> #include <string> #include <grpcpp/grpcpp.h> #include "helloworld.pb.h" #include "helloworld.grpc.pb.h" using grpc::Channel; using grpc::ClientAsyncResponseReader; using grpc::ClientContext; using grpc::CompletionQueue; using grpc::Status; using helloworld::HelloRequest; using helloworld::HelloReply; using helloworld::Greeter; // 异步客户端类 class GreeterAsyncClient { public: explicit GreeterAsyncClient(std::shared_ptr<Channel> channel) : stub_(Greeter::NewStub(channel)) {} // 异步发送请求并注册回调 void AsyncSayHello(const std::string& user) { // 动态分配资源(避免被提前释放) HelloRequest* request = new HelloRequest(); request->set_name(user); HelloReply* reply = new HelloReply(); ClientContext* context = new ClientContext(); CompletionQueue* cq = &cq_; // 初始化异步 RPC 对象 auto rpc = stub_->PrepareAsyncSayHello(context, *request, cq); // 启动请求(异步操作) rpc->StartCall(); // 注册完成事件(响应到达时触发) void* tag = static_cast<void*>(rpc.get()); // 用 RPC 对象地址作为唯一标识 rpc->Finish(reply, &status_, tag); // 保存关联数据(用于后续清理) pending_rpcs_[tag] = {request, reply, context, rpc.get()}; } // 处理完成队列中的事件 void HandleResponses() { void* tag; bool ok = false; // 循环等待事件(阻塞直到有事件到达) while (cq_.Next(&tag, &ok)) { auto it = pending_rpcs_.find(tag); if (it == pending_rpcs_.end()) { std::cerr << "Unknown tag: " << tag << std::endl; continue; } // 提取关联数据 auto& data = it->second; HelloRequest* request = data.request; HelloReply* reply = data.reply; ClientContext* context = data.context; auto* rpc = data.rpc; if (ok && status_.ok()) { std::cout << "Async Client Received: " << reply->message() << std::endl; } else { std::cerr << "Async RPC failed: " << status_.error_message() << std::endl; } // 清理资源 delete request; delete reply; delete context; pending_rpcs_.erase(it); } } private: struct PendingRpc { HelloRequest* request; HelloReply* reply; ClientContext* context; void* rpc; }; std::unique_ptr<Greeter::Stub> stub_; CompletionQueue cq_; Status status_; std::map<void*, PendingRpc> pending_rpcs_; // 保存未完成的 RPC 关联数据 }; int main() { std::string target = "localhost:50051"; auto channel = grpc::CreateChannel( target, grpc::InsecureChannelCredentials()); GreeterAsyncClient client(channel); client.AsyncSayHello("World"); // 发送异步请求 client.HandleResponses(); // 处理响应(需在循环或单独线程中调用) return 0; }
4.2 异步服务器(Server)
异步服务器通过
CallData
封装请求状态,利用CompletionQueue
处理事件。代码(
greeter_async_server.cc
):#include <iostream> #include <memory> #include <string> #include <grpcpp/grpcpp.h> #include "helloworld.pb.h" #include "helloworld.grpc.pb.h" using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::ServerAsyncResponseWriter; using grpc::CompletionQueue; using grpc::Status; using helloworld::HelloRequest; using helloworld::HelloReply; using helloworld::Greeter; // 封装单个 RPC 请求的状态和数据 class CallData { public: enum State { CREATE, PROCESS, FINISH }; // 状态机(控制请求处理流程) CallData(Greeter::AsyncService* service, CompletionQueue* cq) : service_(service), cq_(cq), responder_(&ctx_), state_(CREATE) { Proceed(); // 初始状态为 CREATE,触发第一次处理 } void Proceed() { switch (state_) { case CREATE: { // 注册请求监听:当新请求到达时,通过 cq_ 触发事件 state_ = PROCESS; service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); break; } case PROCESS: { // 创建新的 CallData 对象,持续监听后续请求(关键!) new CallData(service_, cq_); // 处理当前请求(构造响应) std::string greeting = "Hello " + request_.name(); reply_.set_message(greeting); // 发送响应并切换状态到 FINISH state_ = FINISH; responder_.Finish(reply_, Status::OK, this); break; } case FINISH: { // 清理当前请求资源 delete this; break; } } } private: Greeter::AsyncService* service_; // 异步服务接口 CompletionQueue* cq_; // 事件队列 ServerContext ctx_; // 请求上下文 HelloRequest request_; // 请求消息 HelloReply reply_; // 响应消息 ServerAsyncResponseWriter<HelloReply> responder_; // 响应写入器 State state_; // 当前状态 }; // 异步服务器类 class AsyncServer { public: ~AsyncServer() { server_->Shutdown(); cq_->Shutdown(); } void Run() { std::string server_address = "0.0.0.0:50051"; service_ = std::make_unique<Greeter::AsyncService>(); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(service_.get()); // 注册异步服务 cq_ = builder.AddCompletionQueue(); // 创建事件队列 server_ = builder.BuildAndStart(); std::cout << "Async Server listening on: " << server_address << std::endl; // 初始化第一个 CallData,触发请求监听 new CallData(service_.get(), cq_.get()); // 循环处理事件(阻塞) void* tag; bool ok; while (cq_->Next(&tag, &ok)) { if (!ok) { std::cerr << "CompletionQueue event failed" << std::endl; continue; } static_cast<CallData*>(tag)->Proceed(); // 通过 Tag 驱动状态机 } } private: std::unique_ptr<Greeter::AsyncService> service_; // 异步服务实例 std::unique_ptr<Server> server_; // 服务器实例 std::unique_ptr<CompletionQueue> cq_; // 事件队列 }; int main() { AsyncServer server; server.Run(); return 0; }
gRPC 异步调用运行流程
1、核心组件与术语
- CompletionQueue(CQ):事件队列,用于存放异步操作的完成事件(如请求到达、响应发送)。
- CallData:服务器端封装单个请求的状态机对象(
CREATE
→PROCESS
→FINISH
)。- Tag:事件的唯一标识(通常是
CallData
或 RPC 对象的地址),用于关联事件和处理逻辑。2、异步服务器运行流程(字符图)
+---------------------+ +---------------------+ +---------------------+ | Server启动阶段 | | 等待请求阶段 | | 处理请求阶段 | +---------------------+ +---------------------+ +---------------------+ 1. 服务器初始化: ┌───────────────────────────────────────────────────────────────────────────────────┐ │ AsyncServer::Run() │ │ - 创建 Greeter::AsyncService 实例 │ │ - ServerBuilder 配置监听地址(0.0.0.0:50051)和 CQ │ │ - 启动服务器(BuildAndStart()) │ │ - 创建第一个 CallData 对象(new CallData(service, cq)) │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 2. CallData 初始状态(CREATE): ┌───────────────────────────────────────────────────────────────────────────────────┐ │ CallData::Proceed() │ │ state_ = CREATE → 调用 service_->RequestSayHello() │ │ - 注册请求监听:当新请求到达时,将事件(Tag=CallData地址)加入 CQ │ │ - 状态切换为 PROCESS │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 3. 等待客户端请求(CQ 阻塞): ┌───────────────────────────────────────────────────────────────────────────────────┐ │ AsyncServer::Run() 循环: │ │ while (cq_->Next(&tag, &ok)) { │ │ static_cast<CallData*>(tag)->Proceed(); // 取出 Tag(CallData地址) │ │ } │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 4. 客户端发送请求(假设客户端已启动): ┌───────────────────────────────────────────────────────────────────────────────────┐ │ Client::AsyncSayHello() │ │ - 调用 rpc->StartCall() 启动请求 │ │ - rpc->Finish() 将完成事件(Tag=RPC地址)加入客户端 CQ │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 5. 服务器 CQ 收到请求事件(Tag=CallData地址): ┌───────────────────────────────────────────────────────────────────────────────────┐ │ CallData::Proceed()(state_=PROCESS) │ │ - 创建新 CallData 对象(new CallData(...))→ 持续监听后续请求(关键!) │ │ - 处理当前请求:reply.set_message("Hello " + request.name()) │ │ - 调用 responder_.Finish() 发送响应,将完成事件(Tag=CallData地址)加入 CQ │ │ - 状态切换为 FINISH │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 6. 服务器 CQ 收到响应完成事件(Tag=CallData地址): ┌───────────────────────────────────────────────────────────────────────────────────┐ │ CallData::Proceed()(state_=FINISH) │ │ - delete this → 清理当前 CallData 对象 │ └───────────────────────────────────────────────────────────────────────────────────┘
3、异步客户端运行流程(字符图)
+---------------------+ +---------------------+ +---------------------+ | 客户端初始化 | | 发送请求阶段 | | 接收响应阶段 | +---------------------+ +---------------------+ +---------------------+ 1. 客户端初始化: ┌───────────────────────────────────────────────────────────────────────────────────┐ │ GreeterAsyncClient 构造函数 │ │ - 通过 Channel 创建 Greeter::Stub 实例 │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 2. 发送异步请求: ┌───────────────────────────────────────────────────────────────────────────────────┐ │ GreeterAsyncClient::AsyncSayHello("World") │ │ - 动态分配 request、reply、context │ │ - 调用 stub_->PrepareAsyncSayHello() 初始化 RPC 对象 │ │ - rpc->StartCall() 启动请求(异步操作) │ │ - rpc->Finish() 注册完成事件(Tag=RPC地址)到客户端 CQ │ │ - 保存关联数据到 pending_rpcs_(避免资源提前释放) │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 3. 等待响应(客户端 CQ 阻塞): ┌───────────────────────────────────────────────────────────────────────────────────┐ │ GreeterAsyncClient::HandleResponses() │ │ while (cq_.Next(&tag, &ok)) { │ │ 从 pending_rpcs_ 中查找 Tag 对应的关联数据 │ │ 处理响应(打印 reply.message()) │ │ 清理 request、reply、context 资源 │ │ } │ └───────────────────────────────────────────────────────────────────────────────────┘ ▼ 4. 响应完成,流程结束: ┌───────────────────────────────────────────────────────────────────────────────────┐ │ pending_rpcs_ 中删除当前 Tag → 所有资源释放 │ └───────────────────────────────────────────────────────────────────────────────────┘
3、关键步骤总结(服务器与客户端交互)
步骤 角色 操作 状态 / 事件 1 服务器 启动并创建第一个 CallData
(CREATE
状态)CallData
注册请求监听2 客户端 调用 AsyncSayHello
发送请求RPC 请求启动( StartCall
)3 服务器 CQ 收到请求事件( Tag=CallData地址
)CallData
进入PROCESS
4 服务器 处理请求,创建新 CallData
(持续监听),发送响应(Finish
)CallData
进入FINISH
5 服务器 CQ 收到响应完成事件( Tag=CallData地址
)CallData
被销毁6 客户端 CQ 收到响应完成事件( Tag=RPC地址
)处理响应并清理资源 五、编译与运行
5.1 编译命令(Linux/macOS)
假设所有代码和生成的
.pb.cc
、.grpc.pb.cc
文件在同一目录,使用g++
编译:# 编译同步客户端 g++ greeter_sync_client.cc helloworld.pb.cc helloworld.grpc.pb.cc -o sync_client \ -std=c++17 -lgrpc++ -lprotobuf -lpthread # 编译同步服务器 g++ greeter_sync_server.cc helloworld.pb.cc helloworld.grpc.pb.cc -o sync_server \ -std=c++17 -lgrpc++ -lprotobuf -lpthread # 编译异步客户端 g++ greeter_async_client.cc helloworld.pb.cc helloworld.grpc.pb.cc -o async_client \ -std=c++17 -lgrpc++ -lprotobuf -lpthread # 编译异步服务器 g++ greeter_async_server.cc helloworld.pb.cc helloworld.grpc.pb.cc -o async_server \ -std=c++17 -lgrpc++ -lprotobuf -lpthread
5.2 运行测试
启动服务器(先启动服务器,再启动客户端):
# 启动同步服务器 ./sync_server # 或启动异步服务器(单独终端) ./async_server
启动客户端(另一个终端):
# 测试同步客户端 ./sync_client # 测试异步客户端 ./async_client
预期输出(客户端):
Sync Client Received: Hello World # 或 Async Client Received: Hello World