#include<gflags/gflags.h>#include<json2pb/pb_to_json.h>#include<brpc/server.h>#include"butil/endpoint.h"#include"echo.pb.h"// flags,用于配置serverDEFINE_bool(echo_attachment,true,"Echo attachment as well");DEFINE_int32(port,8000,"TCP Port of this server");DEFINE_string(listen_addr,"","Server listen address, may be IPV4/IPV6/UDS."" If this is set, the flag port will be ignored");DEFINE_int32(idle_timeout_s,-1,"Connection will be closed if there is no ""read/write operations during the last `idle_timeout_s'");classEchoServiceImpl:public example::EchoService{public:EchoServiceImpl()=default;virtual~EchoServiceImpl()=default;// response完成后执行的回调staticvoidCallAfterRpc(brpc::Controller* controller,const google::protobuf::Message* req,const google::protobuf::Message* res){
std::string req_str, res_str;// 此时cntl/req/res均没有被析构
json2pb::ProtoMessageToJson(*req,&req_str,NULL);
json2pb::ProtoMessageToJson(*res,&res_str,NULL);LOG(INFO)<<"Got "<<"req:"<< req_str
<<"and res:"<< res_str;}voidEcho(google::protobuf::RpcController* controller,const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done)override{
brpc::Controller* cntl =static_cast<brpc::Controller*>(controller);// 强转
brpc::ClosureGuard done_guard(done);// RAII//日志LOG(INFO)<<"Received request[log_id="<< cntl->log_id()<<"] from "<< cntl->remote_side()<<" to "<< cntl->local_side()<<": "<< request->message()<<" (attached="<< cntl->request_attachment()<<")";// 生成响应
response->set_message("Echo: "+ request->message());// 如果有捎带数据,也发送回去if(FLAGS_echo_attachment){
cntl->response_attachment().append(cntl->request_attachment());}// 设置response完成后的回调函数
cntl->set_after_rpc_resp_fn(std::bind(EchoServiceImpl::CallAfterRpc,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}};intmain(int argc,char* argv[]){// 初始化gflagsGFLAGS_NS::ParseCommandLineFlags(&argc,&argv,true);
brpc::Server server;
EchoServiceImpl service_impl;// 添加服务,,brpc::SERVER_OWNS_SERVICE 表示server是否拥有 service_impl,通常为false if(server.AddService(&service_impl, brpc::SERVER_OWNS_SERVICE)!=0){LOG(ERROR)<<"Fail to add service";return-1;}// 监听节点, 默认监听所有地址
butil::EndPoint point;if(FLAGS_listen_addr.empty()){
point = butil::EndPoint(butil::IP_ANY, FLAGS_port);}else{// 解析监听地址if(butil::str2endpoint(FLAGS_listen_addr.c_str(),&point)!=0){LOG(ERROR)<<"Invalid listen address:"<< FLAGS_listen_addr;return-1;}}// 设置配置
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;// 开启server(异步的)if(server.Start(point,&options)!=0){LOG(ERROR)<<"Fail to start EchoServer";return-1;}// 等待直到退出
server.RunUntilAskedToQuit();return0;}
client 端
#include<cstdio>#include<gflags/gflags.h>#include<butil/logging.h>#include<butil/time.h>#include<brpc/channel.h>#include"echo.pb.h"DEFINE_string(attachment,"attach","Carry this along with requests");DEFINE_string(protocol,"baidu_std","Protocol type. Defined in src/brpc/options.proto");DEFINE_string(connection_type,"","Connection type. Available values: single, pooled, short");DEFINE_string(server,"0.0.0.0:8000","IP Address of server");DEFINE_string(load_balancer,"","The algorithm for load balancing");DEFINE_int32(timeout_ms,100,"RPC timeout in milliseconds");DEFINE_int32(max_retry,3,"Max retries(not including the first RPC)");DEFINE_int32(interval_ms,1000,"Milliseconds between consecutive requests");intmain(int argc,char* argv[]){GFLAGS_NS::ParseCommandLineFlags(&argc,&argv,true);// 配置
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;// 初始化channel
brpc::Channel channel;if((channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),&options))!=0){LOG(ERROR)<<"Fail to initialize channel";return-1;}// channel的封装类,线程间共享
example::EchoService_Stub stub(&channel);// 准备请求响应
example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;char buf[128];printf("请输入:");scanf("%s",buf);
request.set_message(buf);// 捎带数据
cntl.request_attachment().append(FLAGS_attachment);// Cluster 设置为空,表示同步执行,函数会阻塞,直到结果返回,或者超时
stub.Echo(&cntl,&request,&response,NULL);if(cntl.Failed()){LOG(WARNING)<< cntl.ErrorText();// 通常这只是WARNING,为了演示才直接返回return-1;}// 正确输出LOG(INFO)<<"Received response from "<< cntl.remote_side()<<" to "<< cntl.local_side()<<": "<< response.message()<<" (attached="<< cntl.response_attachment()<<")"<<" latency="<< cntl.latency_us()<<"us";}