目录
为MSGQueue类新增加二个属性和方法,用于管理订阅队列的消费者集合:
十.整合数据库和文件数据
上面的代码中,使用数据库存储了Exchange,Queue,Binding,使用文件存储了Message,
下面对数据库和文件中的数据进行整合.进行统一管理.
创建DiskDataManager类
/**
* 对数据库中的Exchange,Queue,Binding和文件中的Message数据进行整合
* 统一管理,后续上层代码直接调用该类中的方法即可,无需再向下层数据结构调用
*/
public class DiskDataManager {
private DataBaseManager dataBaseManager = new DataBaseManager();
private MessageFileManager messageFileManager = new MessageFileManager();
public void init() throws JsonProcessingException {
dataBaseManager.init();
messageFileManager.init();
}
//交换机:
//添加交换机
public void insertExchange(Exchange exchange){
dataBaseManager.insertExchange(exchange);
}
//删除交换机
public void deleteExchange(String exchangeName){
dataBaseManager.deleteExchange(exchangeName);
}
//查找交换机
public List<Exchange> selectAllExchanges(){
return dataBaseManager.selectAllExchanges();
}
//队列
//添加队列
public void insertQueue(MSGQueue queue) throws IOException, MqException {
dataBaseManager.insertQueue(queue);
//创建队列后,不仅要将队列写入到数据库中,还要创建出对应的目录和文件
messageFileManager.createQueueFile(queue.getName());
}
//删除队列
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
//删除队列后,还要讲对应的目录和文件删除
messageFileManager.destoryQueueFile(queueName);
}
//查找队列
public List<MSGQueue> selectAllQueues(){
return dataBaseManager.selectAllQueues();
}
//绑定关系
//添加绑定关系
public void insertBinding(Binding binding){
dataBaseManager.insertBinding(binding);
}
//删除绑定关系
public void deleteBinding(String bingingKey){
dataBaseManager.deleteBindings(bingingKey);
}
//查找绑定
public List<Binding> selectAllBindings(){
return dataBaseManager.selectAllBindings();
}
//消息
//发送消息
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue,message);
}
//删除消息
public void deleteMessageFromQueue(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessageFromFile(queue,message);
//删除消息后,查看是否需要进行GC
if(messageFileManager.checkGC(queue.getName())){
messageFileManager.GC(queue);
}
}
//加载所有的消息到内存中
public List<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessage(queueName);
}
}
十一.内存结构设计
将数据存储到数据库和文件,是为了实现其持久性,但数据还是要存储在内存上的,这样才能更快的访问到数据.
创建MeneryDataCenter类:
这里通过设计不同的数据集合来存储数据在内存中.
/**
* 将数据存储在内存中,创建不同的数据集合来管理
* 要管理的数据有:
* 交换机
* 队列
* 绑定关系
* 消息
* 队列中的消息集合
* 待确认消息队列中的消息集合
*/
public class MemoryDataCenter {
//key:exchangeName
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
//key:queueName
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
// key1:exchangeName key2:queueName
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
//key: messageId
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
// key:queueName List:message
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
//存储在手动确认模式下,管理待确认的消息和队列,在未收到确认消息时,要先将数据存储到这个数据集合中,
// key1:queueName key2:messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> WaitAckQueueMessageMap = new ConcurrentHashMap<>();
}
实现集合操作:
//交换机:
//插入
public void insertExchange(Exchange exchange){
exchangeMap.put(exchange.getName(),exchange);
System.out.println("[MemoryDataCenter] 新增交换机成功 exchangeName:"+exchange.getName());
}
//删除
public void deleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 删除交换机成功 exchangeName:"+exchangeName);
}
//查找
public Exchange getExchange(String exchangeName){
Exchange exchange = exchangeMap.get(exchangeName);
return exchange;
}
//队列
//插入
public void insertQueue(MSGQueue queue){
queueMap.put(queue.getName(),queue);
System.out.println("[MemoryDataCenter] 新增队列成功! queueName: "+queue.getName());
}
//删除
public void deleteQueue(String queueName){
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功! queueName: "+queueName);
}
//查找
public MSGQueue getQueue(String queueName){
return queueMap.get(queueName);
}
//绑定关系:
//新增
public void insertBinding(Binding binding) throws MqException {
// //绑定关系不存在时,创建一个,存在时,进行覆盖
// ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getQueueName());
// if(bindingMap==null){
// bindingMap = new ConcurrentHashMap<>();
// }
// bindingMap.put(binding.getQueueName(),binding);
// bindingsMap.put(binding.getExchangeName(),bindingMap);
//这个方法是ConcurrentMap方法用来判断对应的哈希表是否存在,不存在就执行第二个参数,存在就直接赋值,和上面的逻辑是一样的
//且该方法是原子的,不存在线程安全问题
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
f -> new ConcurrentHashMap<>());
//此处可能会存在线程安全问题,以绑定关系为基准进行上锁
synchronized (binding){
Binding binding1 = bindingMap.get(binding.getQueueName());
//当绑定关系已经存在时,抛出异常,只有新的绑定插入时,才会成功
if(binding1!=null){
throw new MqException("[MemoryDataCenter] 绑定已存在 exchangeName: "+binding.getExchangeName()
+" ,queueName: "+binding.getQueueName());
}
bindingMap.put(binding.getQueueName(),binding);
}
System.out.println("[MemoryDataCenter] 新的绑定创建成功! +exchangeName:"+binding.getExchangeName()
+" ,queueName: "+binding.getQueueName()+" ,bindingKey: "+binding.getBindingKey());
}
//删除
public void deleteBinding(Binding binding) throws MqException {
//先判断交换机是否存在绑定,不存在时,无法删除.抛出异常
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if(bindingMap==null){
throw new MqException("[MemoryDataCenter] 无绑定关系,删除失败 exchangeName: "+binding.getExchangeName()
+" ,queueName: "+binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功 exchangeName: "+binding.getExchangeName()
+" ,queueName: "+binding.getQueueName()+" ,bindingKey:"+binding.getBindingKey());
}
//查找
public Binding getBinding(String exchangeName,String queueName) throws MqException {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
if(bindingMap==null){
return null;
}
Binding binding = bindingMap.get(queueName);
return binding;
}
//消息
//插入
public void insertMessage(Message message){
messageMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 新增消息成功 messageID:"+message.getMessageId());
}
//删除
public void deleteMessage(String messageId){
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息删除陈功 messageId: "+messageId);
}
//查找
public Message getMessage(String messageId){
return messageMap.get(messageId);
}
//队列消息集合
//发送消息到指定队列
public void sendMessage(MSGQueue queue,Message message){
//1.先查找队列对应的集合是否存在,不存在时创建消息集合
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), f -> new LinkedList<>());
//这里当多个线程同时执行插入操作时,可能会覆盖消息,要以集合为维度进行上锁
synchronized(messages){
messages.add(message);
}
//将消息也存入到消息集合中
messageMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 发送消息到队列成功 queueName:"+queue.getName()+
" ,messageId:"+message.getMessageId());
}
//从队列中取消息
public Message pollMessage(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages==null || messages.isEmpty()){
return null;
}
Message message = messages.remove(0);
System.out.println("[MemoryDataCenter] 从队列中取消息成功 queueName:"+queueName+
" ,messageId:"+message.getMessageId());
return message;
}
//获取队列中的消息个数
public int getMessageCountFromQueue(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages==null) return 0;
//此处获取集合中元素个数可能存在线程安全问题,对集合进行上锁
synchronized(messages){
return messages.size();
}
}
//待确认消息集合
//发送消息到待确认消息集合
public void sendWaitMessage(String queueName,Message message){
ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.computeIfAbsent(queueName, f -> new ConcurrentHashMap<>());
//此处向待确认消息集合中插入数据时,也可能存在线程安全问题,以集合为维度加锁
synchronized (waitMessagesMap){
waitMessagesMap.put(message.getMessageId(),message);
}
System.out.println("[MemoryDataCenter] 发送待确认消息到队列成功 queueName:"+queueName+
" ,messageId:"+message.getMessageId());
}
//从队列中取待确认消息
public Message pollWaitMessage(String queueName,String messageId){
ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);
if(waitMessagesMap==null){
return null;
}
Message message = waitMessagesMap.get(messageId);
if(message==null){
return null;
}
System.out.println("[MemoryDataCenter] 从队列中取代确认消息成功 messageId:"+messageId+
" ,queueName:"+queueName);
return message;
}
//从队列中删除待确认消息
public void deleteWaitMessage(String queueName,String messageId){
ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);
if(waitMessagesMap==null){
System.out.println("[MemoryDataCenter] 待确认消息队列不存在,消息删除失败 messageId:"+messageId+
" ,queueName:"+queueName);
}
waitMessagesMap.remove(messageId);
System.out.println("[MemoryDataCenter] 待确认消息删除成功 messageId:"+messageId+
" ,queueName:"+queueName);
}
//恢复所有硬盘中的数据
//当服务器重启后,内存中的数据都不存在了,要从磁盘中获取数据
public void recovery(DiskDataManager diskDataManager) throws IOException, MqException, ClassNotFoundException {
//先将内存中的集合都清空,防止存在残留数据
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
queueMessageWaitAckMap.clear();
//恢复交换机数据
List<Exchange> exchanges = diskDataManager.selectAllExchanges();
for(Exchange e:exchanges){
String exchangeName = e.getName();
exchangeMap.put(exchangeName,e);
}
//恢复队列数据
List<MSGQueue> queues = diskDataManager.selectAllQueues();
for(MSGQueue q:queues){
String queueName = q.getName();
queueMap.put(queueName,q);
}
//恢复绑定关系
List<Binding> bindings = diskDataManager.selectAllBindings();
for(Binding b:bindings){
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(b.getExchangeName(), f -> new ConcurrentHashMap<>());
bindingMap.put(b.getQueueName(),b);
}
//恢复消息
for(MSGQueue q:queueMap.values()){
List<Message> messages = diskDataManager.loadAllMessageFromQueue(q.getName());
for(Message m:messages){
messageMap.put(m.getMessageId(),m);
}
}
//对于未确认消息,当服务器重启后,服务器中所有的消息都要重新发送,未被确认的消息就都成了未被取走的消息了,
//对于未确认的消息, 就不需要回复这些数据了
}
对MemoryDataCenter类功能测试:
@SpringBootTest
public class MemoryDataCenter {
private MemoryDataCenter memoryDataCenter;
@BeforeEach
public void setUp(){
memoryDataCenter = new MemoryDataCenter();
System.out.println("前置工作已经准备后!");
}
@AfterEach
public void tearDown(){
memoryDataCenter = null;
System.out.println("收尾工作以完成!");
}
}
测试功能:
//测试交换机相关操作
private Exchange createExchange(String exchangeName){
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
return exchange;
}
@Test
void testExchange(){
Exchange exchange = createExchange("exchangeTest");
memoryDataCenter.insertExchange(exchange);
Exchange act = memoryDataCenter.getExchange(exchange.getName());
Assertions.assertEquals(exchange.getName(),act.getName());
Assertions.assertEquals(exchange.getType(),act.getType());
Assertions.assertEquals(exchange.isDurable(),act.isDurable());
Assertions.assertEquals(exchange.isAutoDelete(),act.isAutoDelete());
memoryDataCenter.deleteExchange(exchange.getName());
act = memoryDataCenter.getExchange(exchange.getName());
Assertions.assertNull(act);
}
//测试队列相关操作
private MSGQueue createQueue(String queueName){
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setAutoDelete(false);
return queue;
}
@Test
void testQueue(){
MSGQueue queue = createQueue("queueTest");
memoryDataCenter.insertQueue(queue);
MSGQueue act = memoryDataCenter.getQueue(queue.getName());
Assertions.assertEquals(queue.getName(),act.getName());
Assertions.assertEquals(queue.isDurable(),act.isDurable());
Assertions.assertEquals(queue.isAutoDelete(),act.isAutoDelete());
memoryDataCenter.deleteQueue(queue.getName());
act = memoryDataCenter.getQueue(queue.getName());
Assertions.assertNull(act);
}
//测试绑定关系相关操作
private Binding createBinding(String exchangeName,String queueName,String bindingKey){
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
return binding;
}
@Test
void testBinding() throws MqException {
//要先创建队列和交换机
Exchange exchange = createExchange("exchangeTest");
MSGQueue queue = createQueue("queueTest");
Binding binding = createBinding(exchange.getName(), queue.getName(), "bindingKeyTest");
memoryDataCenter.insertBinding(binding);
Binding act = memoryDataCenter.getBinding(exchange.getName(), queue.getName());
Assertions.assertEquals(binding.getExchangeName(),act.getExchangeName());
Assertions.assertEquals(binding.getQueueName(),act.getQueueName());
Assertions.assertEquals(binding.getBindingKey(),act.getBindingKey());
memoryDataCenter.deleteBinding(binding);
act = memoryDataCenter.getBinding(act.getExchangeName(),act.getQueueName());
Assertions.assertNull(act);
}
//测试消息操作
private Message createMessage(String body){
Message message = new Message();
return message.createMessageById("routingKeyTest",null,body.getBytes());
}
@Test
public void testMessage(){
Message expectedMessage = createMessage("testMessage");
memoryDataCenter.insertMessage(expectedMessage);
Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
Assertions.assertEquals(expectedMessage,actualMessage);
//删除消息
memoryDataCenter.deleteMessage(expectedMessage.getMessageId());
actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
Assertions.assertNull(actualMessage);
}
//测试队列中的消息集合
@Test
void testQueueMessage(){
MSGQueue queue = createQueue("queueTest");
Message message = createMessage("hello");
memoryDataCenter.sendMessage(queue,message);
int n = memoryDataCenter.getMessageCountFromQueue(queue.getName());
Assertions.assertEquals(1,n);
Message act = memoryDataCenter.pollMessage(queue.getName());
n = memoryDataCenter.getMessageCountFromQueue(queue.getName());
Assertions.assertEquals(0,n);
Assertions.assertEquals(message.getMessageId(),act.getMessageId());
Assertions.assertArrayEquals(message.getBody(),act.getBody());
Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());
Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());
}
//测试待确认队列集合
@Test
void testWaitMessageQueue(){
MSGQueue queue = createQueue("queueTest");
Message message = createMessage("hello");
memoryDataCenter.sendWaitMessage(queue.getName(), message);
Message act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());
Assertions.assertEquals(message.getMessageId(),act.getMessageId());
Assertions.assertArrayEquals(message.getBody(),act.getBody());
Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());
Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());
memoryDataCenter.deleteWaitMessage(queue.getName(), message.getMessageId());
act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());
Assertions.assertNull(act);
}
//测试加载磁盘所有数据到内存
@Test
void testRecovery() throws IOException, MqException, ClassNotFoundException {
//这里需要使用到mybatis,需要进行了类加载,先启动SpringApplication
Mq02Application.context = SpringApplication.run(Mq02Application.class);
//在磁盘上构造好数据:
DiskDataManager diskDataCenter = new DiskDataManager();
diskDataCenter.init("");
//创建交换机:
Exchange exchange = createExchange("testExchange");
diskDataCenter.insertExchange(exchange);
//创建队列:
MSGQueue queue = createQueue("testQueue");
diskDataCenter.insertQueue(queue);
//创建绑定
Binding binding = new Binding();
binding.setExchangeName(exchange.getName());
binding.setQueueName(queue.getName());
binding.setBindingKey("bindingKey");
diskDataCenter.insertBinding(binding);
//创建消息
Message message = createMessage("testContext");
diskDataCenter.sendMessage(queue,message);
//执行恢复:
memoryDataCenter.recovery(diskDataCenter);
//结果比对:
//交换机:
Exchange actualExchange = memoryDataCenter.getExchange(exchange.getName());
Assertions.assertEquals(exchange.getName(),actualExchange.getName());
Assertions.assertEquals(exchange.getType(),actualExchange.getType());
Assertions.assertEquals(exchange.isDurable(),actualExchange.isDurable());
Assertions.assertEquals(exchange.isAutoDelete(),actualExchange.isAutoDelete());
//队列:
MSGQueue actualQueue = memoryDataCenter.getQueue(queue.getName());
Assertions.assertEquals(queue.getName(),actualQueue.getName());
Assertions.assertEquals(queue.isDurable(),actualQueue.isDurable());
Assertions.assertEquals(queue.isAutoDelete(),actualQueue.isAutoDelete());
//绑定:
Binding actulaBinding = memoryDataCenter.getBinding(exchange.getName(), queue.getName());
Assertions.assertEquals(binding.getExchangeName(),actulaBinding.getExchangeName());
Assertions.assertEquals(binding.getQueueName(),actulaBinding.getQueueName());
Assertions.assertEquals(binding.getBindingKey(),actulaBinding.getBindingKey());
//消息:
Message actualMessage = memoryDataCenter.getMessage(message.getMessageId());
Assertions.assertEquals(message.getMessageId(),actualMessage.getMessageId());
Assertions.assertEquals(message.getDeliveryMode(),actualMessage.getDeliveryMode());
Assertions.assertEquals(message.getRoutingKey(),actualMessage.getRoutingKey());
Assertions.assertArrayEquals(message.getBody(),actualMessage.getBody());
// 清除文件
//清理之前要先关闭文件
Mq02Application.context.close();
File file = new File("./data");
FileUtils.deleteDirectory(file);
}
十二.整合内存和磁盘数据
将内存和磁盘上的数据进行整合,用"虚拟机"这个概念将其整合起来. 不同虚拟机中的交换机 队列,绑定关系,消息都是不互通的. 此处为了简化,仅实现单台虚拟主机,但在数据结构上设置不同虚拟主句名
为区分不同的虚拟主机上的设备,通过配置设备名区别:(以虚拟机名为前缀)
规定:
* exchangeName = virtualHostName+exchangeName;
* queueName = virtualHostName+queueName;
并且将调用的方法抛出的异常都在这个类中进行处理,不再向上抛出
创建VirtualHost类:
@Data
public class VirtualHost {
private String virtualHostName;
private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
private DiskDataManager diskDataManager = new DiskDataManager();
public VirtualHost(String virtualHostName){
this.virtualHostName = virtualHostName;
//初始化磁盘数据:
diskDataManager.init();
//初始化内存数据
try {
memoryDataCenter.recovery(diskDataManager);
} catch (IOException | MqException | ClassNotFoundException e) {
System.out.println("[VirtualHost] 内存数据恢复失败");
e.printStackTrace();
}
}
}
Exchange的声明和删除:
//在对交换机在内存和磁盘上插入和删除数据时,可能存在线程安全问题,要以交换机为维度对其上锁
//交换机锁对象:
private final Object exchangeLocker = new Object();
//交换机操作:
//创建交换机,
//创建后,将其保存到内存和磁盘上
public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable,
boolean autoDelete, Map<String,Object> args){
// 先根据约定 设置交换机名
exchangeName = virtualHostName + exchangeName;
synchronized(exchangeLocker){
//先在内存上查找,若已存在,则直接返回
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange!=null){
System.out.println("[VirtualHost] 交换机已经存在,不再创建 exchangeName:"+exchangeName);
return true;
}
exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(type);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
//这里对args参数的设置.要在Exchange类中再为args关于Map参数添加set和get方法
exchange.setArgs(args);
//先存入数据库,再存入内存中,
//这个顺序是:插入数据库操作比较容易出现异常,存内存出现异常的可能小较小
// 若插入数据库失败,则不再存入内存中;
// 若是转换顺序,当存数据库出现异常时,还要将内存中的数据再删了,比较麻烦
if(durable){
//当交换机设置为持久化时,将其存入内存:
diskDataManager.insertExchange(exchange);
}
//存入内存
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交换机创建成功 exchangeName:"+exchangeName);
return true;
}
}
//删除交换机
//在内存和磁盘上将数据删除
public boolean exchangeDelete(String exchangeName){
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange==null){
throw new MqException("[VirtualHost] 要删除的交换机不存在 exchangeName:"+exchangeName);
}
//删除内存数据:
memoryDataCenter.deleteExchange(exchangeName);
//删除磁盘数据:
boolean durable = exchange.isDurable();
if(durable){
diskDataManager.deleteExchange(exchangeName);
}
System.out.println("[VirtualHost] 交换机删除成功 exchangeName:"+exchangeName);
return true;
}
} catch (MqException e) {
System.out.println("[VirtualHost] 交换机删除失败 exchangeName:"+exchangeName);
e.printStackTrace();
}
return false;
}
在Exchange类中关于args属性上,再增加关于Map参数类型的set方法:
public void setArgs(Map<String,Object> args){
this.args = args;
}
MSGQueue的声明和删除:
//在对队列在内存和磁盘上插入和删除数据时,可能存在线程安全问题,要以队列为维度对其上锁
//创建 队列锁对象:
private final Object queueLocker = new Object();
/**队列
* 创建队列:创建队列并将其存入到磁盘和内存中
*/
public boolean queueDeclare(String queueName,boolean isDurable,boolean autoDelete, Map<String,Object> args){
queueName = virtualHostName+queueName;
try {
synchronized(queueLocker){
MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
if(existsQueue!=null){
System.out.println("[VirtualHost] 队列已经存在 queueName:"+queueName);
return true;
}
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(isDurable);
queue.setAutoDelete(autoDelete);
//此处在MSGQueue类中,针对args属性,要实现关于Map类型的set方法
queue.setArgs(args);
//存入磁盘
if(isDurable) {
diskDataManager.insertQueue(queue);
}
//存入内存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 创建队列成功 !");
}
return true;
} catch (IOException | MqException e) {
System.out.println("[VirtualHost] 创建队列失败 queueName:"+queueName);
e.printStackTrace();
return false;
}
}
//删除队列:从磁盘和内存中 删除队列
public boolean queueDelete(String queueName){
queueName = virtualHostName+queueName;
try{
synchronized(queueLocker){
MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
if(existsQueue==null){
throw new MqException("[VirtualHost] 队列不存在,删除队列失败 queueName:"+queueName);
}
if(existsQueue.isDurable()){
diskDataManager.deleteQueue(queueName);
}
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 删除队列成功 queueName:"+queueName);
}
return true;
}catch (Exception e) {
System.out.println("[VirtualHost] 队列删除失败! queueName:" +queueName);
e.printStackTrace();
return false;
}
}
同样,在MSGQueue类中关于args属性上,再增加关于Map参数类型的set方法:
public void setArgs(Map<String,Object> args){
this.args = args;
}
Binding的创建和删除:
// 该类实现和绑定相关的操作
private Router router = new Router();
//绑定的插入和删除
//插入绑定
public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey){
exchangeName = virtualHostName + exchangeName;
queueName = virtualHostName + queueName;
try{
//1.验证绑定是否存在,不存在再创建
Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
if(binding!=null){
throw new MqException("[VirtualHost] 绑定已存在 exchangeName:"+exchangeName+
" ,queueName:"+queueName);
}
//这里再创建一个类router,实现关于绑定相关的操作
//2.判断bindingKey格式是否正确
boolean ok = router.checkBindingKey(bindingKey);
if(!ok){
throw new MqException("[VirtualHost] 绑定格式有误 bindingKey:"+bindingKey);
}
//3.创建绑
binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
//4.验证绑定的队列和交换机是否存在
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue==null){
throw new MqException("[VirtualHost] 要绑定的队列不存在 queueName:"+queueName+
" ,bindingKey:"+bindingKey);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange==null){
throw new MqException("[VirtualHost] 要绑定的交换机不存在 exchangeName:"+exchangeName+
" ,bindingKey:"+bindingKey);
}
//5,存入磁盘
//当队列和交换机同时设置持久化时,将该绑定关系存入磁盘
if(queue.isDurable() && exchange.isDurable()){
diskDataManager.insertBinding(binding);
}
//6.存入内存
memoryDataCenter.insertBinding(binding);
System.out.println("[VirtualHost] 创建绑定成功 bindingKey: "+bindingKey);
return true;
}catch (MqException e) {
System.out.println("[VirtualHost] 创建绑定失败 bindingKey:"+bindingKey);
e.printStackTrace();
}
return false;
}
//删除绑定
public boolean bindingDelete(String exchangeName,String queueName){
exchangeName = virtualHostName + exchangeName;
queueName = virtualHostName + queueName;
try {
Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
if(binding==null){
throw new MqException("[VirtualHost] 绑定不存在 queueName:"+queueName+
" ,exchangeName:"+exchangeName);
}
//从内存删除
memoryDataCenter.deleteBinding(binding);
//从磁盘删除
//此处可能绑定没有保存在磁盘上,删除失败,但没有关系,没有影响
diskDataManager.deleteBinding(binding.getBindingKey());
System.out.println("[VirtualHost] 删除绑定成功 exchangeName:"+exchangeName+
" ,queueName:"+queueName +" , bindingKey:"+binding.getBindingKey());
return true;
}catch(Exception e){
System.out.println("[VirtualHost] 删除绑定失败 exchangeName:"+exchangeName+
" ,queueName:"+queueName);
e.printStackTrace();
return false;
}
}
创建Router类:
实现匹配判断功能:
/**
* 该类实现和绑定相关的操作
//路由规定:
//routingKey: 只能由 数字 字母(大小写) 下划线 构成,使用.作为分割
//bindingKey:只能包含 数字 字母 下划线 * #,以 . 作为分割,* #只能作为独立的分段
*/
public class Router {
//判断消息携带的绑定格式是否正确
public boolean checkRoutingKey(String routingKey){
char[] ch = routingKey.toCharArray();
for(char i:ch){
if(i>='a' && i<='z') continue;
if(i>='A' && i<='Z') continue;
if(i>='0' && i<='9') continue;
if(i=='.' || i=='_') continue;
else return false;
}
return true;
}
//判断绑定格式是否正确
public boolean checkBindingKey(String bindingKey){
char[] ch = bindingKey.toCharArray();
for(char c:ch){
if(c>='A' && c<='Z') continue;
if(c>='a' && c<='z') continue;
if(c>='0' && c<='9') continue;
if(c=='_' || c=='*' || c=='#' || c=='.') continue;
else return false;
}
//规定不能让* #相连,即出现以下情况规定不成立:
// *.# #.* #.#
//以 . 对字符串进行分隔,判断
String[] s = bindingKey.split("\\.");
for(int i=0;i<s.length-1;i++){
if (s[i].equals("*") && s[i+1].equals("#") ||
s[i].equals("#") && s[i+1].equals("*") ||
s[i].equals("#") && s[i+1].equals("#")) {
return false;
}
}
return true;
}
//判断bindingKey与routingKey是否匹配成功
public boolean isRouting(ExchangeType type,String routingKey,String bindingKey) throws MqException {
//判断当前交换机类型:fanout/topic
if(type==ExchangeType.FANOUT){
//匹配到绑定交换机的所有队列
//直接返回即可
return true;
}else if(type==ExchangeType.TOPIC){
//进行routingKey和BindingKey的匹配判断
return routingTopic(routingKey,bindingKey);
}else{
throw new MqException("[Router] 交换机类型有误 type:"+type);
}
}
/**
* 规定:rotingKey匹配bindingKey
* *:匹配任意单个字符串
* #:匹配任意个任意字符串
* @param routingKey 消息携带的匹配字符串
* @param bindingKey 交换机和队列的绑定关系
* @return
*/
private boolean routingTopic(String routingKey, String bindingKey) {
String[] b = bindingKey.split("\\.");
String[] r = routingKey.split("\\.");
int n1 = b.length;
int n2 = r.length;
int i = 0;
int j = 0;
while(i<n1 && j<n2){
if(b[i].equals("*")){
//可以匹配routingKey的任意单个字符
//直接向后走就行:
i++;
j++;
}else if(b[i].equals("#")){
//匹配routingKey的任意个任意字符
if(i==n1-1){
//当bindingKey的最后一个字符为#时,可以匹配routingKey后面的所有字符串,直接返回true即可:
return true;
}else{
//当b的#不是最后一个字符时,就找r之后的字符串中是否有b的下一个字符串的下标,当找不到时,就返回-1:
i++;
j = checkNext(b[i],r,j);
//当在r中找不到b的下一个字符串时,一定匹配失败,直接返回
if(j==-1) return false;
else{
i++;
j++;
}
}
}else{
//b为普通字符串时
if(!b[i].equals(r[j])) return false;
else {
i++;
j++;
}
}
}
//b / r有一个已经匹配到结尾了,只有两个都完全匹配完,才算匹配成功
if(i!=n1 || j!=n2){
return false;
}
return true;
}
private int checkNext(String next, String[] r, int j) {
for(int k=j;k<r.length;k++){
if(r[k].equals(next)) return k;
}
return -1;
}
}
对Router类的TOPIC匹配进行测试:
@SpringBootTest
public class RouterTopicTest {
private Router router = new Router();
// [测试用例]
// binding key routing key result
// aaa aaa true
// aaa.bbb aaa.bbb true
// aaa.bbb aaa.bbb.ccc false
// aaa.bbb aaa.ccc false
// aaa.bbb.ccc aaa.bbb.ccc true
// aaa.* aaa.bbb true
// aaa.*.bbb aaa.bbb.ccc false
// *.aaa.bbb aaa.bbb false
// # aaa.bbb.ccc true
// aaa.# aaa.bbb true
// aaa.# aaa.bbb.ccc true
// aaa.#.ccc aaa.ccc true
// aaa.#.ccc aaa.bbb.ccc true
// aaa.#.ccc aaa.aaa.bbb.ccc true
// #.ccc ccc true
// #.ccc aaa.bbb.ccc true
@Test
void test01() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC, "aaa", "aaa");
Assertions.assertTrue(ok);
}
@Test
void test02() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bbb", "aaa.bbb");
Assertions.assertTrue(ok);
}
@Test
void test03() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bbb.ccc", "aaa.bbb");
Assertions.assertFalse(ok);
}
@Test
void test04() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.ccc", "aaa.bbb");
Assertions.assertFalse(ok);
}
@Test
void test05() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb.cc", "aaa.bb.cc");
Assertions.assertTrue(ok);
}
@Test
void test06() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb", "aaa.*");
Assertions.assertTrue(ok);
}
@Test
void test07() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb.cc", "aaa.*.bb");
Assertions.assertFalse(ok);
}
@Test
void test08() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb", "*.aaa.bb");
Assertions.assertFalse(ok);
}
@Test
void test09() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb.cc", "#");
Assertions.assertTrue(ok);
}
@Test
void test10() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb", "aaa.#");
Assertions.assertTrue(ok);
}
@Test
void test11() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb.cc", "aaa.#");
Assertions.assertTrue(ok);
}
@Test
void test12() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.cc", "aaa.#.cc");
Assertions.assertTrue(ok);
}
@Test
void test13() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb.cc", "aaa.#.cc");
Assertions.assertTrue(ok);
}
@Test
void test14() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.aaa.bb.cc", "aaa.#.cc");
Assertions.assertTrue(ok);
}
@Test
void test15() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"cc", "#.cc");
Assertions.assertTrue(ok);
} @Test
void test16() throws MqException {
boolean ok = router.isRouting(ExchangeType.TOPIC,
"aaa.bb.cc", "#.cc");
Assertions.assertTrue(ok);
}
发送消息:
//发送消息到队列
public boolean basicPublish(String exchangeName,String routingKey,
BasicProperties basicProperties,byte[] body){
exchangeName = virtualHostName + exchangeName;
try {
//1.判断交换机是否存在
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange==null){
throw new MqException("[VirtualHost] 交换机不存在 exchangeName:"+exchangeName);
}
//2.判断routingKey格式是否正确
boolean ok = router.checkRoutingKey(routingKey);
if(!ok) {
throw new MqException("[VirtualHost] routingKey格式有误 routingKey:"+routingKey);
}
//3.根据交换机的类型进行路由匹配,分发消息
if(exchange.getType()==ExchangeType.DIRECT){
//直接交换机,routingKey就是队列名,bindingKey无用,将消息路由到指定的队列上
//获取到指定队列
String queueName = virtualHostName + routingKey;
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue==null){
throw new MqException("[VirtualHost] 队列不存在 queueName:"+queueName);
}
//构造消息对象
Message message = new Message();
message = message.createMessageById(null,basicProperties,body);
//发送消息到队列,再构造一个方法实现
sendMessage(queue,message);
}else{
//当交换机类型为fanout/topic时:
//遍历交换机所有的绑定
ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
for(Binding b:bindings.values()){
MSGQueue queue = memoryDataCenter.getQueue(b.getQueueName());
//判断交换机绑定的队列是否存在:
if(queue==null){
System.out.println("[VirtualHost] 队列不存在 queueName:"+b.getQueueName());
continue;
}
//构造消息对象
Message message = new Message().createMessageById(routingKey, basicProperties, body);
//判断routingKey与binding是否成功
if(!router.isRouting(exchange.getType(),message.getRoutingKey(),b.getBindingKey())){
//匹配失败:
System.out.println("[VirtualHost] routingKey和BindingKey不匹配 routingKey:"+routingKey+
" , bindingKey:"+b.getBindingKey());
continue;
}
//匹配成功时,就将消息转发
sendMessage(queue,message);
System.out.println("[VirtualHost] 消息发送成功 queueName:"+queue.getName()+
" ,messageId:"+message.getMessageId());
}
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost]消息发送失败 ");
e.printStackTrace();
return false;
}
}
//消费者管理对象:
private ConsumerManager consumerManager = new ConsumerManager(this);
private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
//存入磁盘
//是否持久化
//1:持久化 0:非持久化
if(message.getDeliveryMode()==1){
diskDataManager.sendMessage(queue,message);
}
//存入内存:
memoryDataCenter.sendMessage(queue,message);
//消息已经到达队列,通知订阅队列的消费者消费消息
consumerManager.notifyConsumer(queue.getName());
System.out.println("[VirtualHost] 发送消息成功");
}
创建ConsumerManager类:
对消费者进行管理:
/**
* 消费者管理类
*/
public class ConsumerManager {
//持有上层的VirtualHost对象的引用,用来操作数据
private VirtualHost virtualHost;
// 使⽤⼀个线程池⽤来执⾏消息回调
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//存放令牌(队列名)的队列:那个队列当前有消息了,就将队列名加入到阻塞队列中
//然后扫描线程通过该队列中存放的队列名找到对应的消息和订阅者,将信息打包放到线程池中进行消费
private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
//扫描线程
private Thread scannerThread = null;
//通知消费者消费消息:
//调用时机:发送方发送消息成功后,
//当队列中有消息了,就将其放到阻塞队列中,然后就要通知消费者消费消息了
public void notifyConsumer(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
}
订阅消息:
/**
//订阅消息
//添加一个订阅者:
* @param consumerTag 消费者身份标识
* @param queueName 队列名
* @param autoAck 是否自动确认消息
* @param consumer 回调函数
* @return
*/
public boolean basicConsume(String consumerTag, String queueName,
boolean autoAck, Consumer consumer){
queueName = virtualHostName + queueName;
try {
//通过消费者管理类实现添加消费者功能
consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
System.out.println("[VirtualHost] basicConsumer 成功 queueName:"+queueName);
return true;
} catch (MqException e) {
System.out.println("[VirtualHost] basicConsumer 失败 queueName:"+queueName);
e.printStackTrace();
return false;
}
}
创建ConsumerEnv类:
消费者完整环境类:
/**
* 表示消费者(完整的执行环境)
*/
@Data
public class ConsumerEnv {
//消费者唯一标识
private String consumerTag;
//订阅队列的队列名字
private String queueName;
//是否自动确认消息
private boolean autoAck;
//要执行的具体功能,通过一个接口,由调用者自己实现其方法体
private Consumer consumer;
}
创建Consumer接口:
实现消费者的回调函数接口:通过lambda表达式,让消费者自己实现对消息的处理
/**
* 函数式接口,回调函数,当消费者收到消息后,要处理消息,调用者通过这个接口实现具体的功能
*/
@FunctionalInterface
public interface Consumer {
//deliver:投递的意思,这个方法在每次服务器收到发送来的消息后,调用
//通过这个方法把消息推送给对应的消费者
void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}
为MSGQueue类新增加二个属性和方法,用于管理订阅队列的消费者集合:
//此处再添加一个属性:订阅该队列的消费者集合
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//当订阅队列的消费者不止一个时 , 规定以轮训的方式消费消息
//再添加一个属性,记录当前轮到哪个消费者消费消息了
//这里使用AtomicInteger类来实现,目的是不让手动修改,且要实现自增的功能
private AtomicInteger atomicInteger = new AtomicInteger(0);
//添加一个新的订阅者(消费者)
public void addConsumerEnv(ConsumerEnv consumerEnv){
consumerEnvList.add(consumerEnv);
}
//挑选一个订阅者,消费当前消息,按照轮训的方式
public ConsumerEnv chooseConsumerEnv(){
if(consumerEnvList.isEmpty()){
//当前该队列还没有消费者订阅
System.out.println("[MSGQueue] 当前该队列没有订阅者");
return null;
}
//按照轮训的方式获取一个要消费消息的订阅者下标
int index = atomicInteger.get()%consumerEnvList.size();
//让轮训值 自增
atomicInteger.getAndIncrement();
return consumerEnvList.get(index);
}
在ConsumerManager类中实现添加消费者方法:
//添加新的消费者,并消费队列中当前存在的消息
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//1.找到对应的队列
MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
if(queue==null){
throw new MqException("[ConsumerManager] 队列不存在 queueName:"+queueName);
}
//2.创建一个消费者
ConsumerEnv consumerEnv = new ConsumerEnv();
consumerEnv.setConsumerTag(consumerTag);
consumerEnv.setQueueName(queueName);
consumerEnv.setAutoAck(autoAck);
consumerEnv.setConsumer(consumer);
//3.将订阅者加入到队列的订阅者队列中
queue.addConsumerEnv(consumerEnv);
//4.当队列中已经有一些消息时,要将其消费掉
synchronized (queue){
int n = virtualHost.getMemoryDataCenter().getMessageCountFromQueue(queueName);
for(int i=0;i<n;i++){
//这个方法调用一次就消费一条消息
consumerMessage(queue);
}
}
}
// 消费消息:调用消息的回调函数,并将消息从队列中删除
//从队列中获取一个消息,并让消费者消费,
// 当消费者不止一个时,按照轮训的方式让消费者依次消费消息
private void consumerMessage(MSGQueue queue) throws MqException {
//1.从队列的订阅者中挑选一个订阅者
ConsumerEnv consumerEnv = queue.chooseConsumerEnv();
if(consumerEnv==null){
//当前队列号没有订阅者,无法消费消息
System.out.println("[ConsumerManager] 当前队列中还没有订阅者");
return;
}
//2.消费消息
Message message = virtualHost.getMemoryDataCenter().pollMessage(queue.getName());
if(message==null){
//当前队列中还没有消息,不需要消费
System.out.println("当前队列中还没有消息");
return;
}
//将消息带到消费者的回调方法中,给线程池执行
workerPool.submit(()->{
try{
//1.在执行回调之前,先将消息放到待确认队列集合中,一旦消息被消费失败了.就重新发送消息
virtualHost.getMemoryDataCenter().sendWaitMessage(queue.getName(),message);
//2.执行订阅者的回调方法
consumerEnv.getConsumer().handlerDeliver(consumerEnv.getConsumerTag(),message.getBasicProperties(),message.getBody());
//3.根据消费者的确认消息方式及消费者消费消息的情况,执行删除消息操作
// 这里完成为自动确认模式下的操作,手动模式下,在basicAck方法中实现
if(consumerEnv.isAutoAck()){
//4.删除磁盘中的数据
// 是否持久化
// 1:非持久化 0:持久化
if(message.getDeliveryMode()==0) {
virtualHost.getDiskDataManager().deleteMessageFromQueue(queue, message);
}
//5.删除未确认消息队列中的消息
virtualHost.getMemoryDataCenter().deleteWaitMessage(queue.getName(), message.getMessageId());
//6.删除消息集合中的消息
virtualHost.getMemoryDataCenter().deleteMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费 ");
}
}catch (Exception e){
System.out.println("[ConsumerManager] 消费消息失败");
e.printStackTrace();
}
});
}
在ConsumerManager类中,添加扫描线程,不停扫描阻塞令牌队列,查看是否有新的消息到来,需要消费者及时消费:
//先获取到令牌,根据令牌找到指定的队列,从队列中获取消息进行消费
public ConsumerManager(VirtualHost parent){
virtualHost = parent;
//为推的模式.不断的扫描令牌队列,一但有消息进入队列,就将其推送给消费者
Thread t = new Thread(()->
{
while(true){
try {
//1.获取令牌
String queueName = tokenQueue.take();
//2.根据令牌,找到指定的队列
MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
if(queue==null){
throw new MqException("[ConsumerManager] 获取令牌时,发现队列不存在");
}
synchronized (queue){
//3.从队列中获取一个消息并进行消费
consumerMessage(queue);
}
} catch (InterruptedException | MqException e) {
throw new RuntimeException(e);
}
}
});
//将线程设为后台线程
//当前台线程执行结束了,后台线程也就结束了,
//若设为前台线程,那么只有当前台线程执行完了,整个进程才会结束,
// 这里的循环是while(true)会一直卡着执行结束不了,因此要设成后台线程
t.setDaemon(true);
//启动线程
t.start();
}
十三.网络通信协议设计
生产者和消费者都是客户端,需要通过网络和消息队列服务器进行通信.
此处我们使⽤TCP协议,来作为通信的底层协议.同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服 务器这边功能的远程调⽤.
设计应用层协议:
使⽤⼆进制的⽅式设定协议.
请求数据格式:
响应数据格式:
其中 type 表⽰请求响应不同的功能. 取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
对于请求来说,payload是各种请求方法的参数信息
对响应来说,payload是方法的返回数据信息.
创建request类:
/**
* 表示一个网络通信中的请求对象
*/
@Data
public class Request {
/** type 表⽰请求响应不同的功能. 取值如下
* 0x1 创建 channel
* • 0x2 关闭 channel
* • 0x3 创建 exchange
* • 0x4 销毁 exchange
* • 0x5 创建 queue
* • 0x6 销毁 queue
* • 0x7 创建 binding
* • 0x8 销毁 binding
* • 0x9 发送 message
* • 0xa 订阅 message
* • 0xb 返回 ack
* • 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
*/
//请求类型,设定占4字节
private int type;
//请求的数据长度,占4字节
private int length;
//请求体 payload 表⽰这次⽅法调⽤的各种参数信息
private byte[] payload;
}
创建response类:
/**
* 表示一个响应对象
*/
@Data
public class Response {
//按照自己的定义,响应类型,4字节
private int type;
//响应的数据长度,4字节
private int length;
//响应体
private byte[] payload;
}
创建参数父类:
//定义参数⽗类
//构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
//不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再
//通过继承的⽅式体现
@Data
public class BasicArgs implements Serializable {
//表示一次请求的身份标识,用来和该请求 对应的返回的响应相对照
protected String rid;
//每一次请求需要建立连接,通过TCP建立连接,一个连接可以发送多次消息,每条消息通过信道传送
//一条信道可以发送多条消息
//这次通信的信道channel的身份标识
protected String channelId;
}
创建响应父类:
/**
* 定义payload的返回数据
*/
@Data
public class BasicReturns implements Serializable {
//一次请求或相应的身份标识
protected String rid;
//标识一个channel
protected String channelId;
//表示方法的执行结果 payload 表⽰这次⽅法调⽤的返回值.
protected boolean ok;
}
创建设备功能的参数类:
exchangeDeclareArgs:
/**
* 这个类表示调用声明交换机方法的参数
*/
@Data
public class ExchangeDeclareArgs extends BasicArgs implements Serializable {
private String exchangeName;
private ExchangeType type;
private boolean isDurable;
private boolean autoDelete;
private Map<String,Object> args;
}
exchangeDeleteArgs:
@Data
public class ExchangeDeleteArgs extends BasicArgs implements Serializable {
private String exchangeName;
}
queueDeclareArgs:
@Data
public class QueueDeclareArgs extends BasicArgs implements Serializable {
private String queueName;
private boolean isDurable;
private boolean autoDelete;
private Map<String,Object> args;
}
queueDeleteArgs:
@Data
public class QueueDeleteArgs extends BasicArgs implements Serializable {
private String queueName;
}
bindingDeclareArgs:
@Data
public class BindingDeclareArgs extends BasicArgs implements Serializable {
private String ExchangeName;
private String queueName;
private String bindingKey;
}
bindingDeleteArgs:
@Data
public class BindingDeleteArgs extends BasicArgs implements Serializable {
private String exchangeName;
private String queueName;
}
basicPublishArgs:
@Data
public class BasicPublishArgs extends BasicArgs implements Serializable {
private String exchangeName;
private String routingKey;
private BasicProperties basicProperties;
private byte[] body;
}
basicConsumerArgs:
@Data
public class BasicConsumerArgs extends BasicArgs implements Serializable {
private String consumerTag;
private String queueName;
private boolean autoAck;
//这个类对应的BasicConsumer方法还有一个参数 consumer,是一个回到参数
//消费者客户端收到服务器发送的消息后,针对自己的业务,实现这个回调接口就行了,
//无需再将回调参数传给服务器,因此解救不需要在这里写这个参数了
//并且,这个 回调参数也无法通过网络传输给服务器
}
basicAckArgs:
/**
* 手动响应数据
*/
@Data
public class BasicAckArgs extends BasicArgs implements Serializable {
private String queueName;
private String messageId;
}
subscribeReturns:
/**
* 这里类表示返回数据的具体参数
* 是服务器给消费者提供的订阅消息
* consumerTag其实是channelId.
* basicProperties和body共同构成了Message.
*/
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {
private String consumerTag;
private BasicProperties basicProperties;
private byte[] body;
}
十四.实现BrokerServer
public class BrokerServer {
//调用相关数据
private VirtualHost virtualHost = new VirtualHost("default");
//服务器⾃⾝的 socket
private ServerSocket serverSocket = null;
//引入线程池,处理多个客户端的请求
private ExecutorService executorService = null;
//引入一个哈希表,存储所有的会话对象
//key: channelId, val:socket对象
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
//引入一个布尔变量,表示当前服务器是否要停止,
//要对所有线程是立即可见的,用volatile修饰
private volatile boolean runnable = true;
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(9090);
}
//启动服务
public void start() throws IOException {
System.out.println("[BrokerServer] 启动服务");
executorService = Executors.newCachedThreadPool();
try {
while (runnable) {
//accept:不断接收客户端发来的请求:
Socket clientSocket = serverSocket.accept();
executorService.submit(() -> {
processConnection(clientSocket);
});
}
} catch (SocketException e) {
//正常结束
System.out.println("[BrokerServer] 服务器停止运行!");
}
}
//停止服务器
public void stop() throws IOException {
runnable = false;
executorService.shutdown();
serverSocket.close();
}
//处理一个客户端的连接
//一个个连接可能有多次的请求和相应
//要读取数据,处理数据,然后将结果返回给客户端
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()) {
// 这里需要按照特定格式进行读取和解析数据
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
while (true) {
//1.读取请求
Request request = readRequest(dataInputStream);
//2.根据请求计算相应
Response response = process(request, clientSocket);
//3.将结果返回给客户端
writeResponse(dataOutputStream,response);
}
} catch (EOFException | SocketException e) {
//当出现这两种异常时,是正常的异常,是请求读取结束了,读到了空字符串抛出的异常,
// 正常结束循环就可以了
System.out.println("[BrokerServer] connection 连接关闭 ,客户端地址: " + clientSocket.getInetAddress().toString()
+ " : " + clientSocket.getPort());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} catch (MqException e) {
throw new RuntimeException(e);
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
//关闭资源
try {
//当前连接处理完之后,需要关闭Socket
clientSocket.close();
//把当前socket对应的所有channel也删除了
clearCloseSessions(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
//删除sessions中客户端和服务器建立的连接
}
}
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if (n != request.getLength()) {
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
// 这个刷新缓冲区也是重要的操作!!
dataOutputStream.flush();
}
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1. 把 request 中的 payload 做一个初步的解析.
BasicArgs BasicArgs = (BasicArgs) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid=" + BasicArgs.getRid() + ", channelId=" + BasicArgs.getChannelId()
+ ", type=" + request.getType() + ", length=" + request.getLength());
// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
boolean ok = true;
if (request.getType() == 0x1) {
// 创建 channel
sessions.put(BasicArgs.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + BasicArgs.getChannelId());
} else if (request.getType() == 0x2) {
// 销毁 channel
sessions.remove(BasicArgs.getChannelId());
System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + BasicArgs.getChannelId());
} else if (request.getType() == 0x3) {
// 创建交换机. 此时 payload 就是 ExchangeDeclareArgs 对象了.
ExchangeDeclareArgs Args = (ExchangeDeclareArgs) BasicArgs;
ok = virtualHost.exchangeDeclare(Args.getExchangeName(), Args.getType(),
Args.isDurable(), Args.isAutoDelete(), Args.getArgs());
} else if (request.getType() == 0x4) {
ExchangeDeleteArgs Args = (ExchangeDeleteArgs) BasicArgs;
ok = virtualHost.exchangeDelete(Args.getExchangeName());
} else if (request.getType() == 0x5) {
QueueDeclareArgs Args = (QueueDeclareArgs) BasicArgs;
ok = virtualHost.queueDeclare(Args.getQueueName(), Args.isDurable(), Args.isAutoDelete(), Args.getArgs());
} else if (request.getType() == 0x6) {
QueueDeleteArgs Args = (QueueDeleteArgs) BasicArgs;
ok = virtualHost.queueDelete((Args.getQueueName()));
} else if (request.getType() == 0x7) {
BindingDeclareArgs Args = (BindingDeclareArgs) BasicArgs;
ok = virtualHost.bindingDeclare(Args.getQueueName(), Args.getExchangeName(), Args.getBindingKey());
} else if (request.getType() == 0x8) {
BindingDeleteArgs Args = (BindingDeleteArgs) BasicArgs;
ok = virtualHost.bindingDelete(Args.getQueueName(), Args.getExchangeName());
} else if (request.getType() == 0x9) {
BasicPublishArgs Args = (BasicPublishArgs) BasicArgs;
ok = virtualHost.basicPublish(Args.getExchangeName(), Args.getRoutingKey(),
Args.getBasicProperties(), Args.getBody());
} else if (request.getType() == 0xa) {
BasicConsumerArgs Args = (BasicConsumerArgs) BasicArgs;
ok = virtualHost.basicConsume(Args.getConsumerTag(), Args.getQueueName(), Args.isAutoAck(),
new Consumer() {
//这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端
//此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询,
// 就可以得到对应的socket 对象了, 从而可以往里面发送数据了
@Override
public void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 先知道当前这个收到的消息, 要发给哪个客户端.
// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
// socket 对象了, 从而可以往里面发送数据了
// 1. 根据 channelId 找到 socket 对象
Socket clientSocket = sessions.get(consumerTag);
if (clientSocket == null || clientSocket.isClosed()) {
throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
}
// 2. 构造响应数据
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
subScribeReturns.setOk(true);
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toByte(subScribeReturns);
Response response = new Response();
// 0xc 表示服务器给消费者客户端推送的消息数据.
response.setType(0xc);
// response 的 payload 就是一个 SubScribeReturns
response.setLength(payload.length);
response.setPayload(payload);
// 3. 把数据写回给客户端.
// 注意! 此处的 dataOutputStream 这个对象不能 close !!!
// 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
// 此时就无法继续往 socket 中写入后续数据了.
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream, response);
}
});
} else if (request.getType() == 0xb) {
// 调用 basicAck 确认消息.
BasicAckArgs Args = (BasicAckArgs) BasicArgs;
ok = virtualHost.basicAck(Args.getQueueName(), Args.getMessageId());
} else {
// 当前的 type 是非法的.
throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
}
// 3. 构造响应
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(BasicArgs.getChannelId());
basicReturns.setRid(BasicArgs.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toByte(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
+ ", type=" + response.getType() + ", length=" + response.getLength());
return response;
}
private void clearCloseSessions(Socket clientSocket) {
// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
List<String> toDeleteChannelId = new ArrayList<>();
for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
if (entry.getValue() == clientSocket) {
// 不能在这里直接删除!!!
// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!
// sessions.remove(entry.getKey());
toDeleteChannelId.add(entry.getKey());
}
}
for (String channelId : toDeleteChannelId) {
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
}
}
十五.实现客户端
创建ConnectionFactory.
表示用来创建连接的工厂类:
/**
*连接工厂
*/
@Data
public class ConnectionFactory {
// broker server 的 ip 地址
private String host;
// broker server 的端口号
private int port;
public Connection newConnection() throws IOException {
Connection connection = new Connection(host, port);
return connection;
}
}
创建Connection类:
一个Connection对应一个TCP,一个连接可以包含多个channel.
public class Connection {
private Socket socket = null;
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
//创建线程池,用来处理客户端这边执行用户回调的线程池
private ExecutorService callbackPool = null;
// 创建一个hash.来管理多个channel
ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();
//这个方法在客户端构造好请求后,调用,用来发送请求到服务器:
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayload());
dataOutputStream.flush();
System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());
}
// 和服务器建立连接,接收服务器返回的响应,并处理响应
public Connection(String host,int port) throws IOException {
socket = new Socket(host,port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
dataInputStream = new DataInputStream(inputStream);
dataOutputStream = new DataOutputStream(outputStream);
callbackPool = Executors.newFixedThreadPool(4);
// 创建一个扫描线程,不断的从socket中读取响应,交给对应的channel进行处理
Thread t = new Thread(()->{
try{
while (!socket.isClosed()){
Response response = readResponse();
//处理响应
dispatchResponse(response);
}
} catch (SocketException e){
//连接正常断开
System.out.println("[Connection] 连接正常断开");
}catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[Connection] 连接异常断开");
e.printStackTrace();
}
});
t.start();
}
public void close(){
try{
//关闭Connection ,释放资源
callbackPool.shutdownNow();
channelMap.clear();
outputStream.close();
inputStream.close();
socket.close();;
}catch (IOException e){
e.printStackTrace();
}
}
// 读取服务器返回的响应
public Response readResponse() throws IOException {
Response response = new Response();
response.setType(dataInputStream.readInt());
response.setLength(dataInputStream.readInt());
byte[] payload = new byte[response.getLength()];
int n = dataInputStream.read(payload);
if (n != response.getLength()) {
throw new IOException("读取的响应数据不完整!");
}
response.setPayload(payload);
System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
return response;
}
// 使用这个方法来分别处理响应, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
if (response.getType() == 0xc) {
// 服务器推送给消费者客户端的消息数据
SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
// 根据 channelId 找到对应的 channel 对象
Channel channel = channelMap.get(subScribeReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
}
// 执行该 channel 对象内部的回调.
callbackPool.submit(() -> {
try {
channel.getConsumer().handlerDeliver(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
subScribeReturns.getBody());
} catch (MqException | IOException e) {
e.printStackTrace();
}
});
} else {
// 当前响应是针对刚才的控制请求的响应
BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
// 把这个结果放到对应的 channel 的 hash 表中.
Channel channel = channelMap.get(basicReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
}
//获取到响应后,将其放到响应的集合中,让客户端从集合中取走对应的响应.
channel.putReturns(basicReturns);
}
}
// 通过这个方法, 在 Connection 中能够创建出一个 Channel
public Channel createChannel() throws IOException {
String channelId = "C-" + UUID.randomUUID().toString();
Channel channel = new Channel(channelId, this);
// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.
channelMap.put(channelId, channel);
// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.
boolean ok = channel.createChannel();
if (!ok) {
// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!
// 把刚才已经加入 hash 表的键值对, 再删了.
channelMap.remove(channelId);
return null;
}
return channel;
}
}
创建Channel类:
用于客户端发送请求调用的相关的API:
@Data
public class Channel {
private String channelId;
// 当前这个 channel 属于哪个连接.
private Connection connection;
// 用来存储后续客户端收到的服务器的响应.
private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.
// 此处约定一个 Channel 中只能有一个回调.
private Consumer consumer = null;
public Channel(String channelId, Connection connection) {
this.channelId = channelId;
this.connection = connection;
}
/** type 表⽰请求响应不同的功能. 取值如下
* 0x1 创建 channel
* • 0x2 关闭 channel
* • 0x3 创建 exchange
* • 0x4 销毁 exchange
* • 0x5 创建 queue
* • 0x6 销毁 queue
* • 0x7 创建 binding
* • 0x8 销毁 binding
* • 0x9 发送 message
* • 0xa 订阅 message
* • 0xb 返回 ack
* • 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
*/
// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.
public boolean createChannel() throws IOException {
// 对于创建 Channel 操作来说, payload 就是一个 basicArgs 对象
BasicArgs basicArgs = new BasicArgs();
basicArgs.setChannelId(channelId);
basicArgs.setRid(generateRid());
byte[] payload = BinaryTool.toByte(basicArgs);
Request request = new Request();
request.setType(0x1);
request.setLength(payload.length);
request.setPayload(payload);
// 构造出完整请求之后, 就可以发送这个请求了.
connection.writeRequest(request);
// 等待服务器的响应
//服务器对根据请求处理并返回响应,对请求的处理时间不确定,
// 该步骤可能会发生阻塞
BasicReturns basicReturns = waitResult(basicArgs.getRid());
return basicReturns.isOk();
}
// 通过UUID,生成唯一rid
private String generateRid() {
return "R-" + UUID.randomUUID().toString();
}
private BasicReturns waitResult(String rid) {
BasicReturns basicReturns = null;
while ((basicReturns = basicReturnsMap.get(rid)) == null) {
// 如果查询结果为 null, 说明包裹还没回来.
// 此时就需要阻塞等待.
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 读取成功之后, 还需要把这个消息从哈希表中删除掉.
basicReturnsMap.remove(rid);
return basicReturns;
}
public void putReturns(BasicReturns basicReturns) {
basicReturnsMap.put(basicReturns.getRid(), basicReturns);
synchronized (this) {
// 当前也不知道有多少个线程在等待上述的这个响应.
// 把所有的等待的线程都唤醒.
notifyAll();
}
}
// 关闭 channel, 给服务器发送一个 type = 0x2 的请求
public boolean close() throws IOException {
BasicArgs basicArgs = new BasicArgs();
basicArgs.setRid(generateRid());
basicArgs.setChannelId(channelId);
byte[] payload = BinaryTool.toByte(basicArgs);
Request request = new Request();
request.setType(0x2);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicArgs.getRid());
return basicReturns.isOk();
}
// 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType,
boolean durable, boolean autoDelete, Map<String, Object> Args) throws IOException {
ExchangeDeclareArgs exchangeDeclareArgs = new ExchangeDeclareArgs();
exchangeDeclareArgs.setRid(generateRid());
exchangeDeclareArgs.setChannelId(channelId);
exchangeDeclareArgs.setExchangeName(exchangeName);
exchangeDeclareArgs.setType(exchangeType);
exchangeDeclareArgs.setDurable(durable);
exchangeDeclareArgs.setAutoDelete(autoDelete);
exchangeDeclareArgs.setArgs(Args);
byte[] payload = BinaryTool.toByte(exchangeDeclareArgs);
Request request = new Request();
request.setType(0x3);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(exchangeDeclareArgs.getRid());
return basicReturns.isOk();
}
// 删除交换机
public boolean exchangeDelete(String exchangeName) throws IOException {
ExchangeDeleteArgs Args = new ExchangeDeleteArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0x4);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
// 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean autoDelete,
Map<String, Object> Args) throws IOException {
QueueDeclareArgs queueDeclareArgs = new QueueDeclareArgs();
queueDeclareArgs.setRid(generateRid());
queueDeclareArgs.setChannelId(channelId);
queueDeclareArgs.setQueueName(queueName);
queueDeclareArgs.setDurable(durable);
queueDeclareArgs.setAutoDelete(autoDelete);
queueDeclareArgs.setArgs(Args);
byte[] payload = BinaryTool.toByte(queueDeclareArgs);
Request request = new Request();
request.setType(0x5);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeclareArgs.getRid());
return basicReturns.isOk();
}
// 删除队列
public boolean queueDelete(String queueName) throws IOException {
QueueDeleteArgs Args = new QueueDeleteArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setQueueName(queueName);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0x6);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
// 创建绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
BindingDeclareArgs Args = new BindingDeclareArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setQueueName(queueName);
Args.setExchangeName(exchangeName);
Args.setBindingKey(bindingKey);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0x7);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
// 解除绑定
public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
BindingDeleteArgs Args = new BindingDeleteArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setQueueName(queueName);
Args.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0x8);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
// 发送消息
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
BasicPublishArgs Args = new BasicPublishArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setExchangeName(exchangeName);
Args.setRoutingKey(routingKey);
Args.setBasicProperties(basicProperties);
Args.setBody(body);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0x9);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
// 订阅消息
public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
// 先设置回调.
if (this.consumer != null) {
throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
}
this.consumer = consumer;
BasicConsumerArgs Args = new BasicConsumerArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来表示了.
Args.setQueueName(queueName);
Args.setAutoAck(autoAck);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0xa);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
// 确认消息
public boolean basicAck(String queueName, String messageId) throws IOException {
BasicAckArgs Args = new BasicAckArgs();
Args.setRid(generateRid());
Args.setChannelId(channelId);
Args.setQueueName(queueName);
Args.setMessageId(messageId);
byte[] payload = BinaryTool.toByte(Args);
Request request = new Request();
request.setType(0xb);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(Args.getRid());
return basicReturns.isOk();
}
}
客户端代码测试:
@SpringBootTest
public class MqClientTest {
private BrokerServer brokerServer = null;
private ConnectionFactory factory = null;
private Thread t = null;
@BeforeEach
public void setUp() throws IOException {
// 1. 先启动服务器
Mq02Application.context = SpringApplication.run(Mq02Application.class);
brokerServer = new BrokerServer(9090);
t = new Thread(() -> {
// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!
try {
brokerServer.start();
} catch (IOException e) {
e.printStackTrace();
}
});
t.start();
// 2. 配置 ConnectionFactory
factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
}
@AfterEach
public void tearDown() throws IOException {
// 停止服务器
brokerServer.stop();
// t.join();
Mq02Application.context.close();
// 删除必要的文件
File file = new File("./data");
FileUtils.deleteDirectory(file);
factory = null;
}
@Test
public void testConnection() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
}
@Test
public void testChannel() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
}
@Test
public void testExchange() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
Assertions.assertTrue(ok);
ok = channel.exchangeDelete("testExchange");
Assertions.assertTrue(ok);
// 此处稳妥起见, 把改关闭的要进行关闭.
channel.close();
connection.close();
}
@Test
public void testQueue() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.queueDeclare("testQueue", true, false, null);
Assertions.assertTrue(ok);
ok = channel.queueDelete("testQueue");
Assertions.assertTrue(ok);
channel.close();
connection.close();
}
@Test
public void testBinding() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
Assertions.assertTrue(ok);
ok = channel.queueDeclare("testQueue", true, false, null);
Assertions.assertTrue(ok);
ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");
Assertions.assertTrue(ok);
ok = channel.queueUnbind("testQueue", "testExchange");
Assertions.assertTrue(ok);
channel.close();
connection.close();
}
@Test
public void testMessage() throws IOException, MqException, InterruptedException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
Assertions.assertTrue(ok);
ok = channel.queueDeclare("testQueue", true, false, null);
Assertions.assertTrue(ok);
byte[] requestBody = "hello".getBytes();
ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);
Assertions.assertTrue(ok);
ok = channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
Assertions.assertArrayEquals(requestBody, body);
System.out.println("[消费数据] 结束!");
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
完成
成果测试:
启动消息队列服务器:
//启动服务器:
BrokerServer brokerServer = new BrokerServer(9090);
brokerServer.start();
创建生产者 发送消息:
/**
* 模拟生产者
*/
public class producer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
System.out.println("启动生产者");
factory.setHost("127.0.0.1");
factory.setPort(9090);
//创建连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机 队列 绑定
channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("queue",true,false,null);
//发送消息
boolean ok = channel.basicPublish("exchange", "queue",null,"hello".getBytes());
System.out.println("消息发送成功: ok:"+ok);
Thread.sleep(1000);
//关闭资源
channel.close();
connection.createChannel();
}
}
创建消费者消费消息:
/**
* 模拟消费者
*/
public class consumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
System.out.println("消费者启动");
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("queue",true,false,null);
//接收消息
boolean ok = channel.basicConsume("queue", true, new org.rabbitmq.mq02.common.Consumer() {
@Override
public void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("处理消息开始");
System.out.println("consumerTag:"+consumerTag);
System.out.println("basicProperties:"+basicProperties);
System.out.println("body:"+body.toString());
System.out.println("处理消息结束");
}
});
System.out.println("消费一条消息成功 ok:"+ok);
// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
while (true) {
Thread.sleep(500);
}
}
}
完结.