目录
一,项目介绍
首先介绍一下阻塞队列(Blocking Queue),在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)。
而阻塞队列是应用于同一主机的不同线程之上。
所谓的消息队列,就是把阻塞队列这样的数据结构,单独提取成一个程序,进行独立部署。
分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求。
因此, 我们通常会把阻塞队列封装成一个独立的服务器程序, 并且赋予其更丰富的功能。 这样的服务程序我们就称为 消息队列 (Message Queue, MQ)。也是跨主机实现生产者消费者模型。
生产者消费者模型的作用:
解耦合:使数据的生产与数据的处理分离开来,使程序更加模块化。
支持并发:根据实际的生产速率与消费速率,控制创建不同 数量生产者的线程或消费者的线程。
支持忙闲不均/削峰填谷:当生产的数据量剧增时,消息队列可以作为缓冲区,将数据缓存起来,消费者根据自己的处理能力来获取数据。
市面上成熟的消息队列非常多:
RabbitMQ
Kafka
RocketMQ
ActiveMQ
......
其中 RabbitMQ 是一个非常知名、功能强大且广泛使用的消息队列。这个项目就是仿照RabbitMQ,实现一个简单的消息队列。
二,开发环境
操作系统:Linux(Ubuntu 20.04)
编辑器:Vscode/vim
编译器/调试工具 :g++/gdb
项目自动化构建工具:Makefile
三,技术选型
开发语言:C++
序列化框架:protobuf序列化框架
网络通信:自定义应用层协议+muduo库
数据持久化:sqlite3数据库
单元测试框架:Gtest
四,第三方库介绍
1,Protobuf
ProtoBuf(全称 Protocol Buffer)是数据结构序列化和反序列化框架,它具有以下特点:
语言无关、平台无关:即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持多个平台。
高效:即比 XML 更小、更快、更为简单。
扩展性、兼容性好:你可以更新数据结构,而不影响和破坏原有的旧程序。
protobuf使用流程介绍:
编写 .proto 文件,目的是为了定义结构对象(message)及属性内容。
使用 protoc 编译器编译 .proto 文件,生成一系列接口代码,存放在新生成头文件和源文件中。
依赖生成的接口,将编译生成的头文件包含进我们的代码中,实现对 .proto 文件中定义的字段进行设置和获取,和对 message 对象进行序列化和反序列化。
示例:
1,编写contacts.proto文件
//通过protobuf生成代码
//声明语法版本
syntax="proto3";
//声明代码的命名空间
package contacts;
//结构化对象的描述
message contact
{
//各个字段的描述:子段类型 子段名=字段编号
uint64 sn=1;
string name=2;
float score=3;
};
2,编译contacts.proto文件
//编译命令格式
protoc --cpp_out=./ contacts.proto
3,生成的文件
contacts.pb.cc 和contacts.pb.h
4,序列化和反序列的使用
#include <iostream>
#include <string>
#include "contacts.pb.h"
int main()
{
// 序列化
contacts::contact conn;
conn.set_sn(1001);
conn.set_name("小明");
conn.set_score(90);
std::string str = conn.SerializeAsString();
// std::cout<<str<<std::endl;
// 反序列化
contacts::contact stu;
bool ret = stu.ParseFromString(str);
if (ret == false)
{
std::cout << "反序列化失败" << std::endl;
}
std::cout << stu.sn() << std::endl;
std::cout << stu.score() << std::endl;
std::cout << stu.name() << std::endl;
return 0;
}
2,muduo库
2.1,muduo库的介绍
muduo是由陈硕大佬开发,是一个基于非阻塞IO事件驱动的C++高并发TCP网络编程库。它是一个基于主从Reactor模型的网络库,其用的线程模型是one thread one loop,所谓的one thread one loop指的是:
一个线程只能有一个事件循环(EventLoop),用来响应IO事件。
一个文件描述符,只能由一个线程进行读写,也就是一个TCP连接必须归属于某个EventLoop管理。
在搭建服务器和客户端时,会使用到muduo库中的相关接口。
3,SQLite
SQLite是一个较轻量级的数据库,它是一个零配置的数据库,这意味着与其他数据库不一样,我们不需要在系统中配置。
像其他数据库,SQLite 引擎不是一个独立的进程,可以按应用程序需求进行静态或动态连接,SQLite 直接访问其存储文件。
SQLite3 C/C++ API介绍
SQLite3 官方文档:https://www.sqlite.org/c3ref/funclist.html
//sqlite3有 三种安全等级
//1,非线程安全模式:不加任何的锁,此时的效率肯定是最高的
//2,线程安全模式(不同的连接在不同的线程/进程下使用是安全的,即同一个操作句柄不能用于多线程间)
//3,串行化模式(可以在不同的线程/进程间使用同一个句柄):对所有的操作都 加锁保护起来
//下面的所有函数,返回值为SQLITE_OK表示函数执行成功,否则失败。
//1,创建/打开数据库文件,并返回操作句柄(以输出型参数的形式),该函数执行成功返回SQLITE_OK
int sqlite3_open(const char *filename, sqlite3 **ppDb)
//2,若在编译阶段启动了线程安全,则在程序运行阶段可以通过参数选择线程安全等级
int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
falg参数可以设置为:SQLITE_OPEN_READWRITE-- 以可读可写方式打开数据库文件
SQLITE_OPEN_CREATE-- 不存在数据库文件则创建
SQLITE_OPEN_NOMUTEX--多线程模式,只要不同的线程使用不同的连接即可保证线程安全
SQLITE_OPEN_FULLMUTEX--串行化模式
//3,执行语句
//对于执行语句sql,他可能是一个查询语句,那么对于结果如何处理?
//此时就需要我们传入一个回调函数来对结果进行处理
//如果这个sql语句是一个插入语句或者删除语句等等,那么回调函数设置为nullptr即可
//在该函数的执行过程中,会将arg传给回调函数的第一个参数
//最后一个参数表示,如果该执行出错,错误信息会设置到err中,通过这个我们可以查看错误原因
int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),
void* arg, char **err)
callback回调函数的参数:
void* : 原函数中的arg参数
int:一行中数据的列数
char**:存储一行数据的字符指针数组
char**:每一列的字段名称
注意:这个回调函数有一个int类型的返回值,这个回调函数如果成功执行必须返回0,返回非0就会触发abort退出程序
//4,销毁语句
int sqlite3_close(sqlite3* db); 成功返回 SQLITE_OK
int sqlite3_close_v2(sqlite3*); 推荐使用--无论如何都会返回SQLITE_OK
//5,获取错误信息
const char *sqlite3_errmsg(sqlite3* db);
4,GTest
GTest 是一个跨平台的 C++单元测试框架,由 google 公司发布。它提供了丰富的断言、致命和非致命判断、参数化等等。
TEST(test_case_name, test_name)
TEST_F(test_fixture,test_name)
TEST:主要用来创建一个简单测试, 它定义了一个测试函数, 在这个函数中可以使用任何 C++代码并且使用框架提供的断言进行检查。
TEST_F:主要用来进行多样测试,适用于多个测试场景如果需要相同的数据配置的情况, 即相同的数据测不同的行为。
GTest 中的断言的宏可以分为两类:
ASSERT_系列:如果当前点检测失败则退出当前函数。
EXPECT_系列:如果当前点检测失败则继续往下执行。
//断言宏的使用
ASSERT_GT(age,18) //age是否大于18
ASSERT_LT(age,18) //age是否小于18
//同理也存在EXPECT_GT EXPECT_LT
ASSERT_ 断言失败停止运行
EXPECT_ 断言失败继续运行
示例:
#include <iostream>
#include <gtest/gtest.h>
//第一个参数表示单元测试的总名称,第二个表示具体的名称
TEST(test,great_than)
{
int age=20;
//GT大于
ASSERT_GT(age,18);
std::cout<<"OK!\n";
}
TEST(test,less_than1)
{
int age=20;
//LT小于
EXPECT_GT(age,18);
std::cout<<"OK!\n";
}
int main(int argc,char* argv[])
{
testing::InitGoogleTest(&argc,argv);
//启动所有的单元测试
int ret=RUN_ALL_TESTS();
return 0;
}
五,项目需求分析
1,核心概念
在前面提到过,消息队列,就是把阻塞队列这样的数据结构,单独提取成一个程序,进行独立部署。
会有很多个生产者客户端向服务器放入数据,服务器在将消息推送给对应的客户端进行处理。
Broker Server服务器内部涉及到一些关键的概念: 1,队列(Queue):真正用来存储消息的实体,后续消费者也是从队列中获取数据。
2,交换机(Exchange):生产者将消息投递到Broker Server上,是先将消息船体给某个交换机,再由交换机决定,应该将消息传递给哪些绑定的队列。
3,绑定(Binding):交换机与队列的绑定关系,一个交换机可以绑定一个或者多个队列。
4,虚拟机(Virtual Host):虚拟机可以理解为数据库中的"database"。一个Broker Server上可以组织不同类别的数据,此时就可以使用虚拟机进行区分。但是在本项目的实现中,虚拟机只实现了一个。
5,消息(Message):可以认为是A给B发的请求,通过MQ转发,就是一个消息。同样B给A发出响应,经过MQ转发,也是一个消息。
当生产者发布一条消息到服务器后,先交给交换机,再由交换机决定交给哪个队列,这个过程是由交换机类型决定的。交换机有三种类型:Direct(直接交换),Fanout(广播交换),Topic(主题交换)。
生产者发送消息的时候,会携带一个routing_key,就是一个字符串,而交换机与队列的绑定关系中也包含一个字符串binding_key
Direct(直接交换):生产者发送消息的时候,交换机先拿到消息,交换机再根据绑定的所有队列的binding_key进行比较,只有routing_key=binding_key,才将消息发送给该 队列。
Fanout(广播交换):同样生产者 发送消息先到交换机,此时交换机会将这条消息发送给所有绑定的队列。
Topic(主题交换):可以理解为如果此时routing_key与binding_key对上"暗号"了,此时就可以把消息发送给指定的队列。这里的"暗号"实际 上就是指一种匹配规则,当routing_key与binding_key匹配成功时,就可以把消息 发送给指定的队列中。
以上的这些内容,都是AMQP协议——高级消息队列协议中规定出来的。
持久化操作:对于上述的数据:交换机,队列,绑定关系,消息等等,需要在内存中存储,也需要在硬盘中存储。
在内存中存储:使用/访问更加方便开快速。
在硬盘存储:保证服务器重启后数据不丢失。
至此 ,本项目需要实现的内容包括:Broker Server服务器,客户端(包括发布客户端和订阅客户端)。
2,服务器需要提供的核心API
创建交换机(exchangeDeclare)
移除交换机(exchangeDelete)
创建队列(queueDeclare)
移除队列(queueDelete)
创建绑定关系(queueBind)
解除绑定关系(queueUnBind)
发布消息(basicPublish)
订阅消息(basicConsume)
确认消息(basicAck):消费客户端拿到消息,处理完毕后,向服务器发送确认响应,然后就可以从服务器上删除该消息了。
取消订阅(basicCancel)
3,网络通信
生产者/消费者都是通过网络,和Broker Server进行交互的。
此处设定,使用Tcp+自定义应用层协议实现生产者/消费者和Broker Server的交互。
在客户端这边,也需要提供上述服务器的方法,只不过服务器上的方法是真正做事的,而客户端这边实现的方法,实际上只是发送请求/接受响应。
由于在Tcp中,建立/断开一个连接(Connection)的成本其实挺高的,因此,很多时候,不希望频繁的建立/断开Tcp连接。
为此,RabbitMQ引入一个新的概念——信道(channel),一个连接(Connection)可以创建多个信道,信道(Channel)只是逻辑上的一个概念,它比连接(Connection)轻量很多。
每个信道上的数据传输都是互不相干的,这是它的核心价值。它允许在同一个Tcp连接上,并发地执行多个独立的操作(如发布消息,消费消息,声明队列等等),这些操作都是在不同信道上进行的,互不干扰。
Tcp连接和信道的关系就比如高速公路和车道的关系,Tcp是铺路,信道是划车道。
如上图所示,在发布客户端中,不同的线程使用不同的信道来发送请求,这几个信道是互不影响的,一个信道上操作失败通常是不会影响其他信道。在服务器这边,也有信道,来接受客户端发送来的请求,并进行处理(这么多的请求,到时候肯定会选择使用线程池来处理)。
在这里我有一个问题,就是为什么客户端也要使用信道这个概念,发送请求时,为什么不直接使用多线程,而是要使用信道。也就是下面的这种方式:
如果使用 这种方案:会存在以下问题。
因为对于客户端发送的一些请求来说,是有严格顺序的,比如客户端发送了声明队列和订阅队列消息这两个请求,而服务器可能先接受到的请求是订阅队列消息,但是此时队列根本还没有创建,就无法订阅,程序混乱。
当服务端处理完客户端发送的请求后,会给客户端发送一个响应,如果使用这种方式,我们无法知道这个响应是哪个线程对应的请求。
4,消息应答模式
被消费的消息,需要进行应答,有两种应答方式:
自动应答:消息只要消费了,就算应答完毕了,Broker Server直接删除掉这条消息。
手动应答:消费者消费完消息后,需要收到调用接口,像服务器发送响应,然后Broker Server再删除掉这条消息。
六,项目模块化划分
有了上面对消息队列的简单认识,接下来就要细化一下具体要做哪些工作。
需要实现的内容包括:
服务器(Broker Server)
发布客户端(生产者)
订阅客户端(消费者)
1,服务端模块设计
交换机模块
队列模块
绑定关系模块
消息模块
虚拟机模块:该模块是对以上四个模块的整合,向外提供操作接口。
路由匹配模块:根据交换机的类型,判断一条消息应该路由给绑定的哪些队列。
消费者管理模块:消费者就是订阅者,当消费者调用API订阅了某条消息,其实本质是消费者订阅了某个队列。然后当这个队列有消息时,就会查找这个队列有哪些消费者订阅了,将消息推送给其中一个消费者进行处理,所以在服务器这边,我们需要将消费者的消息保存起来,以便于推送消息时可以找到消费者。
信道管理模块:一个TCP连接可能会有多个信道,当某个客户端想要关闭通信,关闭的不是连接,而是自己的信道,关闭信道的同时,我们需要将该客户端的订阅信息给删除。
连接管理模块:我们的服务器可能会收到多个客户端的连接,我们需要将这些连接保存起来。
Broker Server模块(搭建服务器):对以上模块的整合,向客户端提供服务。
2,服务端模块汇总图
首先介绍服务器模块,当 用户创建交换机和队列,以及绑定交换机和队列时,这些信息应该被持久化存储,这里使用的是sqlite数据库进行存储,以便于主机重启/程序重启后,这些信息可以恢复。同样,消息也需要持久化存储,因为一条消息被放入到消息队列后,可能还没有被消费,服务器就可能因为某些原因停止运行了,为了使服务器重启后,消息还存在,这里使用文件进行存储。
当将一条消息发布给消息队列时,这个消息首先是交给交换机,再根据交换机的类型以及路由匹配,将消息推送给绑定的队列,然后这条消息就会被保存下来,内存中保存一份,文件中也会保存。当这个消息推送给某个消费者客户端,并且收到确认响应后,才会从内存和文件中将这条消息删除掉。
3,客户端模块设计
消费者模块:一个客户端如果订阅了消息,那么他就是消费客户端,在向服务器发送订阅消息请求的时候,要包含本身的消费者信息。
信道管理模块:同样我们在发布消息/获取消息的时候,是通过信道来完成的。
连接管理模块:该模块提供各种API接口,比如创建交换机,创建队列等等所有的接口。
最后基于以上三个模块:实现发布客户端和订阅客户端。
4,客户端模块汇总图
七,工具类的实现
在本项目中,会使用到一些工具类,比如线程池,对数据库的操作,随机ID的生成等等。
所以,在这里先实现这些工具类,之后在项目中直接使用。
1,线程池的实现
1,前置知识
使用C++11中的异步操作,实现一个线程池。
std::future是C++11标准库中的一个模板类,它表示一个异步操作的执行结果。当我们在多线程编程中,使用异步操作来执行任务时,std::future可以帮助我们获取任务的执行结果。std::future会阻塞当前线程,直到异步任务执行完。
std::future提供了一种安全的方式来获取异步任务的执行结果,我们可以通过std::future::get()
函数来获取任务的结果,此函数会阻塞当前线程,直到异步任务执行完。
std::future通常要与其他组件(如
async/promise/packaged_task
)搭配使用。1,std::async是一个函数模板
作用:异步启动任务,该函数会返回一个future对象,用来获取执行结果。
异步任务的执行方式:
std::launch::async
:强制在新线程执行。
std::launch::deferred
:延迟执行(调用get()
时执行)。2,std::promise是一个类模板
调用这个类的成员函数get_future(),返回一个std::future对象。
3,std::packaged_task是一个类模板
同样也可以调用成员函数get_future(),返回一个std::future对象。
可以对可调用对象进行二次封装,将其结果绑定到std::future中
std::async与std::future搭配使用示例:
//使用future来获取异步线程的执行结果
#include <iostream>
#include <future>
#include <chrono>
int Add(int num1,int num2)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout<<"开始执行!"<<std::endl;
int resul=num1+num2;
return resul;
}
int main()
{
//std::launch::deferred 当调用get获取执行结果时,异步线程才会执行任务
//std::launch::async 异步线程会立即执行任务
// std::future<int> ret=std::async(std::launch::deferred,Add,11,22);
std::future<int> ret=std::async(std::launch::async,Add,11,22);
std::this_thread::sleep_for(std::chrono::seconds(1));
int sum=ret.get();//获取执行结果
std::cout<<sum<<std::endl;
return 0;
}
std::promise与std::future搭配使用示例:
//promise模板类的使用
#include <iostream>
#include <future>
#include <thread>
void Add(int num1,int num2,std::promise<int>& pro)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
int result=num1+num2;
pro.set_value(result);
}
//通过promise对象使两个线程之间的结果得到同步
int main()
{
std::promise<int> pro;
std::future<int> fut=pro.get_future();
std::thread thr(Add,11,22,std::ref(pro));
int ret=fut.get();//会在这里阻塞住,直到pro中结果被设置了
std::cout<<"sum:"<<ret<<std::endl;
thr.join();
return 0;
}
std::packaged_task与std::future搭配使用案例:
// packaged_task的使用
#include <iostream>
#include <future>
#include <memory>
int Add(int num1, int num2)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "开始执行!" << std::endl;
int resul = num1 + num2;
return resul;
}
int main()
{
// 可以通过packaged_task对象交给智能指针来管理,也就是封装成一个指针
// 然后将该指针传入线程中,再解引用执行
// 但是此时的packaged_task对象就需要从堆上new一个,而不是在栈上开辟
// 因为线程的执行顺序不确定,可能packaged_task对象销毁了,才会执行线程函数
// std::shared_ptr<std::packaged_task<int(int, int)>> ptask(new std::packaged_task<int(int, int)>(Add));
auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);
std::thread thr([ptask]()
{
std::this_thread::sleep_for(std::chrono::seconds(3));
(*ptask)(11,22); });
std::future<int> fut = ptask->get_future();
int ret = fut.get(); // 同样会在这里阻塞
std::cout << ret << std::endl;
thr.join();
// ————————————————————————————————————————————————————————————————————————————————————
// 定义一个packaged_task对象,它可以用来管理一个可调用对象
// std::packaged_task<int(int,int)> task(Add);
// std::future<int> fut=task.get_future();
// //可以把该对象当作使一个可调用对象来执行
// task(11,22);
// int ret=fut.get();
// std::cout<<ret<<std::endl;
// ——————————————————————————————————————————————————————————————————————————————————
// 不能将task对象传入async,让异步线程去执行
// std::packaged_task<int(int,int)> task(Add);
// std::future<int> fut=task.get_future();
// std::async(std::launch::async,task,11,22);
// std::cout<<fut.get()<<std::endl;
// ——————————————————————————————————————————————————————————————————————————————————
// 也不能将task对象作为线程的入口函数
// std::packaged_task<int(int,int)> task(Add);
// std::future<int> fut=task.get_future();
// std::thread thr(task,11,22);
// std::cout<<fut.get()<<std::endl;
// thr.join();
return 0;
}
2,线程池的实现
由于使用std::promise,需要将std::promise对象设置到任务的执行参数中,还需要将运行结果设置到该对象中,也就是我们要改变用户传入的函数 ,实现难度较高,不采用这种方式。而std::async,使用的是自己内部的工作线程,那我们创建的线程用来干什么呢?
所以,最后决定使用std::packaged_task
与std::future
来实现线程池。
线程池的实现思想:
用户传入要执行的任务(函数),以及要处理的数据(函数的参数),由线程池中的工作线程来完成异步执行。
要提供的操作:
push操作:用户向线程池中放入一个任务。
stop操作:停止线程池的运行。
要管理的成员:
任务池:保存用户传入的任务
工作线程池:保存一定数量的线程
互斥锁&条件变量:实现同步互斥
结束运行标志:以便于控制线程池的结束
// 线程池
#ifndef __M_THRPOOL_H__
#define __M_THRPOOL_H__
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <functional>
#include <future>
#include <memory>
namespace xgmq
{
class ThreadPool
{
public:
using Funct = std::function<void()>;
using ptr=std::shared_ptr<ThreadPool>;
ThreadPool(const int thr_count = 3)
: _stop(false)
{
for (int i = 0; i < thr_count; i++)
{
_thr_pool.emplace_back(&entry, this);
}
}
// 向线程池中加一个任务
template <typename F, typename... Args>
auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
{
// 获取该函数的返回类型
auto return_type = decltype(func(args...));
// 将该函数的参数进行绑定
auto func_t = std::bind(std::forward(func), std::forward(args)...);
auto task = std::make_shared < std::packaged_task<return_type()>(func_t);
std::future<return_type> fut = task->get_future();
{
std::unique_lock<std::mutex> lock(_mutex);
_task_pool.emplace_back([task]()
{ (*task)(); });
_cond.notify_one();
}
return fut;
}
// 停止线程池的运行
void stop()
{
if (_stop == true)
{
return;
}
_stop = true;
_cond.notify_all(); // 唤醒所有进程
for (int i = 0; i < _thr_pool.size(); i++)
{
_thr_pool[i].join();
}
}
~ThreadPool()
{
stop();
}
private:
// 线程入口函数
void entry()
{
while (!_stop)
{
std::vector<Funct> _tmp_pool;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [this]()
{ return !_stop || _task_pool.size() > 0; });
_tmp_pool.swap(_task_pool);
}
for (auto &task : _tmp_pool)
{
task();
}
}
}
private:
std::atomic<bool> _stop; // 线程池是否停止运行
std::mutex _mutex;
std::condition_variable _cond;
std::vector<Funct> _task_pool; // 任务池
std::vector<std::thread> _thr_pool; // 线程池
};
}
#endif
2,日志打印
在项目中 ,会涉及到一些地方的日志信息打印,来观察程序的运行状态。
在这里,要求进行日志打印的时候,要带上行号和文件名。同时日志打印也有等级的划分,有调试等级的日志打印,有警告等级的日志打印,也有错误等级的日志打印。
用到的关键技术:C语言库中的strftime函数。
size_t strftime(char *str, size_t maxsize, const char *format, const struct tm *timeptr)
这个函数可以将传入的时间,按照指定格式,组织成字符串。
str标识最终形成的字符串,maxsize表示str的最大字符个数,format表示生成的字符串的格式,最后一个参数是一个tm类型的结构体,这个结构体内部就包含了时间,比如年月日,时分秒,微秒......
使用示例:
//获取当前时间
time_t t=time(nullptr);
//使用当前时间构造一个struct tm类型的结构体
struct tm* ptm=localtime(&t);
char str[32];
strftime(str,31,"%H:%M:%S",ptm);//时间格式为 时:分:秒
// 日志打印工具
#ifndef __M_LOGGER_H__
#define __M_LOGGER_H__
#include <iostream>
#include <cstdio>
#include <ctime>
#define DEBUG_LEVEL 0
#define INFO_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DEUBG_LEVEL
#define LOG(leve_str, level, format, ...) \
{ \
if (level >= DEFAULT_LEVEL) \
{ \
time_t t = time(nullptr); \
struct tm *ptm = localtime(&t); \
char time_str[32]; \
strftime(time_str, 31, "%H:%M:%S", ptm); \
printf("[%s][%s][%s:%d]\t" format "\n", leve_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
}
#define DLOG(format,...) LOG("DEBUG",DEBUG_LEVEL,format,##__VA_ARGS__)
#define ILOG(format,...) LOG("INFO",INFO_LEVEL,format,##__VA_ARGS__)
#define ELOG(format,...) LOG("ERR",ERR_LEVEL,format,##__VA_ARGS__)
#endif
3,sqlite数据库操作的封装
class SqliteHelper
{
public:
typedef int (*callback)(void *, int, char **, char **);
SqliteHelper(const std::string &dbfile)
: _dbfile(dbfile), pb(nullptr)
{
}
// 打开数据库文件
// 传入打开时的安全等级,默认是串行化
void open(int safe = SQLITE_OPEN_FULLMUTEX)
{
// 以可读可写方式,不存在则创建的方式打开
int ret = sqlite3_open_v2(_dbfile.c_str(), &pb,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe, nullptr);
if (ret != SQLITE_OK)
{
DLOG("打开数据文件失败:%s", _dbfile.c_str());
return;
}
}
// 执行sql语句
bool exec(const std::string &sql, callback cb, void *arg)
{
int ret = sqlite3_exec(pb, sql.c_str(), cb, arg, nullptr);
if (ret != SQLITE_OK)
{
DLOG("执行语句失败:%s", sql.c_str());
return false;
}
return true;
}
// 关闭数据库文件
void close()
{
if (pb != nullptr)
{
sqlite3_close_v2(pb);
}
}
private:
std::string _dbfile;
sqlite3 *pb;
};
4,字符串分割方法的实现
前面提到过,消息是先交给交换机,再根据交换机的类型和路由匹配,决定将这条消息该交给哪个队列。
如果交换机的类型是主题交换模式(Topic),那么就要进行路由匹配(如何匹配在后面模块处详解),只有当binding_key与routing_key匹配成功时,才将该消息交给这个队列。
在这部分会涉及到将一个字符串按照指定字符进行分割的操作,所以在这里先实现。
class StrSplit
{
public:
static size_t split(const std::string& str,const std::string& sep,std::vector<std::string>& result)
{
size_t pos=0,idx=0;
while(idx<str.size())
{
pos=str.find(sep);
if(pos==std::string::npos)
{
result.push_back(str.substr(idx));
return result.size();
}
if(idx==pos)
{
idx=pos+sep.size();
continue;
}
std::string tmp=str.substr(idx,pos-idx);
result.push_back(tmp);
idx=pos+sep.size();
}
return result.size();
}
};
5,uuid生成器
UUID,也叫通用唯一识别码,通常由32位16进制数字字符组成。
在进行消息的发布和消息的确认到的时候,每个消息是有一个唯一ID的,来标识消息的唯一性。
在这里不使用一个自增的整数来表示,是因为如果消息数量太多的话,这个数字是有可能会越界的。
所以,我们决定使用一个字符串来表示这个ID,那么现在我们就需要保证整个字符串不能出现重复。
uuid的标准形式为:8-4-4-4-12的32个字符,如550e8400-e29b-41d4-a716-446655440000.
实现思路:首先生成8个随机数字,范围在[0,255]之间,因为1个字节可以表示的数据范围就是[0,255],这样就可以生成8字节的数字,再加上8字节的序号,构成16字节的数字,然后将这16字节的数字转化成32位16进制字符。
1,
std::random_device
这是c++11为我们提供生成随机数的方式,通过这个类定义对象生成的随机数是一种机器随机数 ,也就是通过硬件来生成的,效率较低。2,标准库中还提供了一种生成随机数的算法:
std::mt19937_64
,通过这个类生成的随机数范围是在[0,2^19937]之间,不过他生成的随机数是一个伪随机数。3,所以,我们可以将这两者结合,先使用
std::random_device
生成一个随机数,将这个随机数设置为std::mt19937_64
的随机数种子。之后我们使用std::mt19937_64
生成随机数即可。4,此时生成的随机数很大,而我们要的是[0,255]之间的。
我们可以使用标准库中的std::uniform_int_distribution来限定生成随机数的数字区间。
class UUID
{
public:
static std::string uuid()
{
// 先生成随机数种子
std::random_device rd;
std::mt19937_64 genorator(rd()); // 生成伪随机数
// 生成0-255之间的数字
std::uniform_int_distribution<int> distribution(0, 255);
std::stringstream ss;
for (int i = 0; i < 8; i++)
{
ss << std::setw(2) << std::setfill('0') << std::hex << distribution(genorator);
if (i == 3 || i == 5 || i == 7)
{
ss << "-";
}
}
// 生成序号
static std::atomic<int> seq(1);
size_t num = seq.fetch_add(1);
// 将序号添加到字符串后
// 将序号的每一字节转化为16进制字符
// size_t 在64位下是8字节
for (int i = 7; i >= 0; i--)
{
ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> i * 8) & 0xff);
if (i == 6)
{
ss << "-";
}
}
return ss.str();
}
};
6,文件操作类的实现
前面服务器模块设计中提到过,我们使用文件来保存消息,在这里肯定就会涉及到很多文件相关的操作。
需要封装的操作:
判断文件是否存在
获取文件大小
文件读/写
文件的创建/删除
目录的创建/删除
使用C语言库中提供的函数int stat(const char* path,struct stat* buf),可以获取一个文件的信息,比如文件的大小,文件的创建时间,文件的inode编号等等。返回值等于0就表示函数执行成功。
文件的信息就存放于struct stat结构体中。
class FileHelper
{
public:
FileHelper(const std::string& filename)
:_filename(filename)
{}
//判断问价是否存在
bool exists()
{
struct stat st;
return (stat(_filename.c_str(),&st)==0);
}
//获取文件大小
size_t size()
{
struct stat st;
int n=stat(_filename.c_str(),&st);
if(n!=0)
{
DLOG("获取文件信息失败:%s",_filename.c_str());
return 0;
}
return st.st_size;
}
//文件的读取
//从指定位置开始,读取指定长度的数据
bool read(char* buff,size_t offset,size_t len)
{
std::ifstream ifs(_filename.c_str(),std::ios::binary|std::ios::in);
if(ifs.is_open()==false)
{
ELOG("打开文件失败:%s",_filename.c_str());
return false;
}
//移动到指定位置
ifs.seekg(offset,std::ios::beg);
//开始读取
ifs.read(buff,len);
if(ifs.good()==false)
{
ELOG("读取文件失败:%s",_filename.c_str());
return false;
}
return true;
}
//文件的读取,读取全部数据
bool read(std::string& buff)
{
buff.resize(size());
return read(&buff[0],0,size());
}
//向文件指定位置写入指定长度的数据
bool write(const char* data,size_t offset,size_t len)
{
std::fstream fs(_filename,std::ios::binary|std::ios::in|std::ios::out);
if(fs.is_open()==false)
{
ELOG("打开文件失败:%s",_filename.c_str());
return false;
}
//移动到指定位置
fs.seekg(offset,std::ios::beg);
fs.write(data,len);
if(fs.good()==false)
{
ELOG("写入文件失败:%s",_filename.c_str());
return false;
}
return true;
}
//向文件开始处写入数据
bool write(const std::string& data)
{
return write(data.c_str(),0,size());
}
//创建文件
static bool createFile(const std::string& filename)
{
std::ifstream ifs(filename,std::ios::binary);
if(ifs.is_open()==false)
{
ELOG("创建文件失败:%s",filename.c_str());
return false;
}
ifs.close();
return true;
}
//删除文件
static void removeFile(const std::string& filename)
{
::remove(filename.c_str());
}
//获取文件的父路径
static std::string parentDirectory(const std::string& path)
{
//aaa/bbb/ccc/a.txt
size_t pos=path.find_last_of("/");
if(pos==std::string::npos)
{
return "./";
}
std::string parent=path.substr(0,pos);
return parent;
}
//创建路径
static bool createDirectory(const std::string& path)
{
//aaa/bbb/ccc 可能存在多级目录
size_t pos=0,idx=0;
while(idx<path.size())
{
pos=path.find("/");
if(pos==std::string::npos)
{
return mkdir(path.c_str(),0775)==0;
}
std::string parent=path.substr(0,pos);
int ret=mkdir(parent.c_str(),0775);
//目录如果已经存在,在创建的时候返回置为非0,并且错误码被设置
if(ret!=0&&errno!=EEXIST)
{
ELOG("创建目录失败:%s",parent.c_str());
return false;
}
idx=pos+1;
}
return true;
}
//删除路径
static bool removeDirectory(const std::string& path)
{
std::string cmd="rm -rf "+path;
return ::system(cmd.c_str());
}
//修改文件名称
bool rename(const std::string& nname)
{
return (::rename(_filename.c_str(),nname.c_str())==0);
}
private:
std::string _filename;
};
八,项目实现
项目实现部分不展示代码,代码仓库附在最后。
1,服务器模块
1,交换机模块
首先是交换机数据结构的定义:
对于一个交换机,它一个具有以下属性:
交换机的名称(交换机唯一标识)
交换机的类型(主题交换OR直接交换OR广播交换)
交换机的存储方式(是否需要将交换机的信息进行持久化存储)
接着是交换机信息持久化类的实现:使用sqlite数据库来保存
最后是对交换机管理类的实现:向外提供声明交换机,移除交换机,获取指定交换机的方法。
2,队列模块
该模块与交换机模块类似:
队列结构的定义:
队列的名称(唯一标识)
存储方式(是否持久化)
是否独占标志(只能有一个客户端来订阅该队列)
是否自动删除标志 (当创建该队列的客户端退出后,是否自动 删除该队列)
还有队列信息持久化类的实现:使用sqlite数据库来存储。
最后是队列管理类的实现:向外提供声明队列,移除队列,获取指定队列的方法。
3,绑定关系
描述交换机与队列之间的关系,这个模块的设计与前面两个相似。
首先是绑定关系结构的定义:
应该包含的成员:交换机名称,队列名称,还有binding_key(交换机进行消息路由的时候会使用到)
还有绑定关系的持久化类,交换机和哪些队列绑定了也要持久化存储,这样在服务器重启,仍然可以恢复。
最后是一个管理类,向外提供建立一组绑定关系,解除一组绑定关系,解除指定交换机的所有绑定关系,解除指定队列的所有绑定关系,获取一组绑定关系,获取指定交换机的所有绑定关系等操作。
4,消息模块
消息在进行持久化存储的时候,使用的是文件存储,这就会涉及到序列化和反序列工作,所以这里使用protobuf来描述消息的结构,生成消息的结构定义以及序列化和反序列化功能的实现。
编写的message.proto文件内容(在这里顺便也定义了交换机的类型):
一个完整的消息包含有效载荷和其他参数
消息的有效载荷包括:这个消息的属性,消息的正文,以及这个消息是否有效。
消息的属性包括消息的id,消息的routing_key,以及消息的投递模式(是否持久化存储)。
其他参数:这个消息在文件中存储的位置(偏移量)和这个消息的长度。
syntax="proto3";
package xgmq;
//交换机类型
enum ExchangeType
{
UNKNOWN_TYPE=0;
DIRECT=1;
TOPIC=2;
FANOUT=3;
};
enum DeliveryMode
{
UNKNOWN=0;
UNDURABLE=1;
DURABLE=2;
};
message basicProperties
{
string id=1;
DeliveryMode delivery_mode=2;
string routing_key=3;
};
message Message
{
message Payload
{
basicProperties properties=1;
string body=2;
string valid=3;
};
Payload payload=1;
uint32 offset=2;
uint32 length=3;
};
消息的持久化实现:
在这里,使用文件来存储消息,但是这里如果使用一个文件来存储所有的消息,那么在访问的时候,就需要加锁,锁竞争会非常频繁,所以不适用这种方式。消息的载体是队列,消息最终都是要交给队列的,所以在消息进行持久化时,以队列为单位,每个队列一个文件,来存储该队列上的消息,这样当访问不同队列的消息时,就不存锁竞争问题时,访问同一个队列的消息时,还是需要加锁的,这样就可以大大降低锁竞争的概率。
在实现消息的持久化存储,就必然会涉及到消息的读和写,那么此时就会涉及到怎么写,怎么读的问题,也就是协议。这里设定,在写文件的时候 ,按照格式:【8个字节的消息长度 消息内容】这样的方式进行写入。那么在读取的时候,就先读前8个字节,知道消息的长度之后,就可以读取后面消息的内容 。这样做就可以解决类似Tcp粘包问题。
新增一个消息,我们直接在文件后面追加写这个消息即可,但如果时删除消息,此时我们的做法是,找到这条消息在文件中的位置(偏移量offset),将这条消息的是否有效标志为设为无效,然后写入文件,覆盖掉之前的数据,这样就算一次删除消息的操作。
但是删除完一条消息之后,文件的大小没变,如果此时文件中无效消息数量超过总消息数量的一半,那么此时就需要进行垃圾回收,此时我们创建一个临时文件,然后读取 源文件,将有效消息写入新文件,最后在讲源文件删除掉,临时文件重命名为源文件名,这样一个删除操作才算完成。
5,虚拟机模块
虚拟机模块是对以上四个模块的整合,使用上述四个模块的管理句柄,向外提供服务:
声明/销毁交换机,声明/删除队列,绑定/解绑,发布一条消息,订阅一条消息,确认一条消息,还有获取所有队列,获取指定交换机的所有绑定 信息等操作。
6,路由匹配模块
当一条消息被发布到交换机上之后,需要判断交换机的类型,如果是主题交换(Topic),需要进行路由匹配,决定这条消息应该交给哪些队列。
这个模块只用来提供功能:
判断routing_key是否合法 。
判断binding_key是否合法。
判断binding_key与routing_key是否匹配成功(路由匹配功能)。
routing_key和binding_key都是以'.'作为分割的字符串。
对于routing_key来说:合法字符包含(a~z,A~Z,0~9, '.' , '_')等字符。
对于binding_key来说:合法字符包含(a~z,A~Z,0~9, '.' , '_'),除了这些外,还包含通配符'*'和'#'。但是这两个通配符不能连续出现,并且单独出现的,不能和其他字符一起出现。'*'可以匹配任意的一个字符,'#'可以匹配0个或多个任意字符。
路由匹配规则:
先将routing_key和binding_key按照'.'进行分割,使用数组保存分割后的字符串(单词)。也就是说现在有两个数组:rout_arr保存routing_key分割 后得到的每个单词,bind_arr保存binding_key分割后的每个单词。
然后让rout_arr的每个单词与bind_arr的每个单词进行比对。(使用动态规划算法)
定义状态表示:dp[i][j]表示rout_arr的第i个单词与bind_arr的第j个单词是否匹配成功。0表示匹配失败,1表示匹配成功 。
结果:dp表最后一个位置的值表示最终是否匹配成功。
状态转移方程的推导:
示例1:binding_key="bbb.ddd",routing_key="aaa.ddd",dp表结果如下:
aaa |
ddd |
|
bbb |
0 |
0 |
ddd |
0 |
0 |
当binding_key与routing_key的第一个单词进行比较的时候,bbb与aaa匹配失败。当binding_key与routing_key的第二个单词进行比较的时候,本来是匹配成功的,但由于前一个单词匹配失败,所以这里也就失败了。所以可以得出一个状态转移:如果dp[i][j]是匹配成功的,那么还需要看上一个单词的匹配结果,dp[i][j]=dp[i-1][j-1]
示例2:binding_key="#",routing_key="aaa.bbb",下面的dp表多开一行一列,可以避免越界,但是dp[0][0]要置为 1,否则就会影响后面结果判断的正确性。
aaa |
bbb |
||
1 |
0 |
0 |
|
# |
0 |
1 |
0->1 |
#与aaa匹配成功,dp[1][1]=1,同时继承与dp[i-1][j-1]=1,所以dp[1][1]=1。
#与bbb匹配成功,dp[2][2]=1,但由于dp[i-1][j-1]=0,所以dp[2][2]=0。
但是按理来说,#是匹配任意多个字符,是可以匹配成功 aaa.bbb的,所以这里还需要从当前位置的左边继承结果。可以得出结论,如果是通配符"#",dp[i][j]不仅是要从dp[i-1][j-1]继承结果,还要从dp[i][j-1]位置继承结果。
所以,如果出现"#",dp[i][j]=dp[i-1][j-1] | dp[i][j-1]。
示例3:binding_key="aaa.#",routing_key="aaa"
aaa |
||
1 |
0 |
|
aaa |
0 |
1 |
# |
0 |
0->1 |
同理,上面的示例可以看出,当出现"#"时,还可以从上方继承结果。
所以dp[i][j]=dp[i-1][j-1] | dp[i][j-1] | dp[i-1][j]。
"#"还有 一个特殊之处,就是如果binding_key是以"#"开头,那么dp表第一列需置为1,否则会影响后续填表的正确性。
总结:
当两个单词匹配成功时,从左上方继承结果:dp[i-1][j-1]。
当 遇到通配符"#"时:dp[i][j]=dp[i-1][j-1] | dp[i][j-1] | dp[i-1][j]。
当binding_key以 "#"开始时:需要将对应行的第一列位置初始化为1.
当遇到通配符"*"时,"*"是用来匹配任意一个单词的,所以可以当作第一种情况。
7,消费者管理模块
当有消费者订阅了服务器上的某个队列时,就需要将该消费者的信息保存起来,当这个队列上有了消息之后 ,就会把消息推送给这个队列,因此需要将消费者的信息记录下来。
注意:一个队列可能会被多个消费者订阅,当队列中有消息时,会采用RR轮转的方式,选择一个消费者,然后将消息推送给消费者,从而实现负载均衡。
在前面提到过,客户端与服务器在及逆行交互的时候,使用的是信道,信道给我们提供服务的。当客户端想要订阅一个队列的消息时,是通过信道发送对应的请求,然后在服务器这边,就会创建一个消费者对象,然后将这个消费者队列"绑定"在一起,当队列中有消息的时候,就找到该消费者,然后找到对应的信道,将响应发送给客户端。
一个 消费者属于某个信道的,当信道关闭或者连接关闭时,都要删除这个消费者。
定义消费者结构:
消费者名称:消费者唯一标识
队列名称:该消费者所订阅的队列
是否自动应答标志:该消费者获取到消息后,是否需要收到对这个消息进行确认。
一个回调函数:当该队列有消息后,先找对应的消费者,再调用消费者内部的回调函数,将这条消息发送给客户端。
8,信道管理模块
信道是针对连接更细粒度的一个通信通道,多个信道可以使用同一个连接来进行通信,但是同一个连接的信道之间是相互独立的。
信道建立在TCP连接之上的,是抽象出来的概念,用户通过信道向服务器发送请求,同时服务器也通过信道向客户端发送响应。
信道应该提供以下的服务:
创建/删除交换机
创建/删除队列
绑定/解绑
发布消息
订阅消息
对消息进行确认
取消订阅
信道结构的定义:
信道ID:信道的唯一标识
信道关联的消费者:信道提供了订阅消息的服务 ,当某个用户订阅了队列的消息,此时就会产生一个消费者。当队列中有消息的时候,就会把消息推送给消费者。注意:如果信道关闭了,就需要将这条信道上的消费者也删除掉,避免造成内存泄漏的问题。
信道关联的连接:毕竟真正进行通信的时候,还是要使用到底层TCP连接的。
虚拟机句柄:完成上述各种操作,创建/删除交换机......
线程池句柄:当一个消息被推送到某个队列上后,需要将这条消息推送给订阅的消费者,这个各种交给线程池来完成。
消费者管理句柄:用于删除消费者信息。信道关闭/取消订阅的时候 ,要删除消费者信息 。(防止内存泄漏)
protobuf协议处理句柄:网络通信中的协议处理。
接下来还需要对信道进行管理:
用哈希表来管理信道——信道ID和信道对象的映射。
同时提供打开信道,关闭信道,获取信道的功能。
9,应用层协议设计
应用层协议格式 如下:
len:4个字节,表示整个报文的长度。
namelen:表示typeName的长度。
typeName:表示请求/响应报文的类型名称。(比如创建交换机请求的类型,创建队列请求的类型)
protobufData:表示请求/响应数据通过protobuf序列化之后的二进制数据。
checkSum:校验和。
上述的应用层协议,其实就是muduo库中,陈硕大佬定义的一种应用层协议。我们使用muduo库搭建服务器,所以也就使用这个应用层协议来实现网络通信。
接下来需要定义请求/响应参数(格式):
syntax="proto3";
package xgmq;
//需要用到消息的属性,所以引入了消息模块
import "message.proto";
//信道的打开与关闭请求
message openChannelRequest
{
string rid=1;//请求id
string cid=2;//信道id
};
message closeChannelRequest
{
string rid=1;
string cid=2;
};
//交换机的创建与销毁请求
message declareExchangeRequest
{
string rid=1;
string cid=2;
string exchange_name=3;//交换机名称
ExchangeType exchange_type=4;//交换机类型
bool durable=5;//是否持久化
};
message deleteExchangeRequest
{
string rid=1;
string cid=2;
string exchange_name=3;
};
//队列的创建与销毁请求
message declareQueueRequest
{
string rid=1;
string cid=2;
string queue_name=3;
bool exclusive=4;
bool durable=5;
bool auto_delete=6;
};
message deleteQueueRequest
{
string rid=1;
string cid=2;
string queue_name=3;
};
//队列的绑定与解绑请求
message queueBindRequest
{
string rid=1;
string cid=2;
string exchange_name=3;
string queue_name=4;
string binding_key=5;
};
message queueUnBindRequest
{
string rid=1;
string cid=2;
string exchange_name=3;
string queue_name=4;
};
//消息的发布
message basicPublishRequest
{
string rid=1;
string cid=2;
string exchange_name=3;
string body=4;
basicProperties perties=5;
};
//消息的确认请求
message basicAckRequest
{
string rid=1;
string cid=2;
string msg_id=3;
string queue_name=4;
};
//队列订阅请求
message basicConsumeRequest
{
string rid=1;
string cid=2;
string consume_tag=3;//消费者名称
bool auto_ack=4;//是否自动应答
string queue_name=5;//队列名
};
//队列的取消订阅
message basicCancelRequest
{
string rid=1;
string cid=2;
string queue_name=3;
string consum_tag=4;
};
//消息的推送响应
message basicConsumeResponse
{
string cid=1;
string consum_tag=2;//消费者名称
string body=3;//消息内容
basicProperties properties=4;//消息的属性
};
//通用响应
message basicCommonResponse
{
string rid=1;
string cid=2;
bool ok=3;
};
10,连接管理模块
服务器可能会收到很多客户端的连接,所以先将这些连接管理起来,每个TCP连接都对应有一个信道管理句柄,这样就实现了一个TCP连接,可以创建多个信道。
而muduo中的连接connection对象,是没有这个功能的,使用这个connection对象只是来完成网络通信的。
所以,这里我们需要再将muduo库中的connection连接封装一次,提供创建信道/关闭信道的功能。
提供的操作:
创建/关闭信道。
成员信息:
我们通过连接创建信道,一个连接的成员就是创建信道时所需的参数,比如connection连接,虚拟机句柄等等。
信道管理句柄(实现信道的增删查):用来打开/关闭/获取信道。当某个连接断开时,相关的信道信息也应该被释放掉,避免内存泄漏。(信道释放了,那么这条信道上的消费者也应该被释放掉,避免内存泄漏,不过这个工作已经在信道模块处理过了,当一个信道对象析构时,其内部会通过消费者管理句柄删除该消费者对象)
连接关联的实际用于通信的 muduo::net::Connection 连接
protobuf 协议处理的句柄
消费者管理句柄
虚拟机句柄
异步工作线程池句柄
连接管理的实现:
使用哈希表来管理连接,使用muduo::net::TcpConnectionPtr作为key,而我们封装的连接对象作为value。
当一条Tcp连接建立成功之后,我们需要为这个Tcp连接建立一个我们封装的连接对象,通过这个对象,进行打开信道,关闭信道,获取信道的操作,这样通过一个Tcp连接,就可以建立多个信道了。
11,brokerserver模块
该模块使用muduo库进行服务器的搭建,只需设置请求的处理回调函数即可。
服务器一旦启动,需要完成对消费者管理结构的初始化,因为当由客户端订阅队列消息时,需要将消费者信息添加到消费者管理结构中,如果不初始化,则会出现对空对象访问的错误。
2,客户端模块
1,订阅者模块(消费者模块)
这个模块并不直接向用户展示,这个模块只起到一个角色描述的作用,表示这是一个消费者。如果是生产者发布消息,那它就不需要这个。
2,信道模块
在客户端这边同时也是有信道的,用户通过信道向服务器发送请求,所以,信道同样也要提供服务器模块的相关服务:创建/删除交换机,创建/删除队列......,不过这些服务的内部,只是构建出一个请求,然后发送给服务器,服务器收到后,进行解析处理,所以真正做事的是服务器。
信道的管理:向用户提供创建信道,关闭信道,获取信道的操作。当用户调用创建信道的操作时,同时其内部会向服务器发送一个创建信道的请求,至此服务器 和客户端都存在信道了,而客户端这边的信道是为用户提供服务的,而服务器这边的信道是为客户端提供服务的。
注意:当向服务器发送一个请求后,需要等待这个请求被服务器处理后,并接收到服务器发送来的响应,才能进行下一步操作。由于muduo库中的操作都是异步执行的,当要执行如下操作 时:创建交换机1,创建队列1,绑定交换机1和队列1,结果可能是:创建队列1和绑定请求先发送到了服务器,交换机1还没有创建,此时就会出现绑定失败的问题。所以 ,我们需要等待创建交换机1的请求接收到对应的响应之后,再发送下一个请求。
这里使用哈希表来存储响应id和响应的映射。
3,异步工作线程模块
这个模块是实现客户端异步工作的,防止下面的操作阻塞住当前的工作线程。
使用EventLoopThread来监控底层IO事件是否就绪。
当接收到服务器推送过来的消息,使用线程池来处理。
4,连接模块
这是进行网络通信的模块,也是使用muduo来实现底层的网络通信,一个连接可以创建出多个信道来进行网络通信。向用户提供打开信道/关闭信道的功能。
最后通过客户端模块,就可以分别搭建出发布客户端和订阅客户端。
九,项目总结
1,在订阅客户端启动时,程序异常退出了,问题在于订阅客户端在创建信道时,调用连接对象conn接口,而conn中的信道管理对象没有初始化,造成了空指针的解引用错误。
未初始化ChannelManagger对象的智能指针。
2,没有关闭信道,造成资源泄漏。
3,在服务器将消息推送给客户端时,由于判断条件写反->!bp,导致消费者的属性没有发送,这时客户端通过消息的id进行消息确认的时候,就无法完成确认,也就无法从服务器的队列中删除这条消息,从而一直占有内存资源,会造成内存泄漏问题。
十,仓库链接