目录
1.消息持久化-创建MessageFileManger类
1.1 创建一个类
1.2 创建关于路径的方法
获取消息文件所在的目录的方法
// 预定消息文件所在的目录和文件名
// 这个方法, 用在获取到 指定队列 对应的 消息文件所在路径
private String getQueueDir(String queueName) {
return "./data/" + queueName;
}
// 这个方法用来获取 该队列的 消息数据文件路径
// 注意:二进制文件,使用 txt 作为后缀, 不太合适. txt一般表示文本。
// .bin .dat
private String getQueueDataPath(String queueName) {
return getQueueDir(queueName) + "/queue_data.txt";
}
// 这个方法用来获取 该队列的 消息统计文件路径
private String getQueueStatPath(String queueName) {
return getQueueDir(queueName) + "/queue_stat.txt";
}
1.3 定义内部类
定义一个内部类, 来表示该队列的统计信息
// 定义一个内部类, 来表示该队列的统计信息
// 优先考虑使用 static, 静态内部类.
static public class Stat {
// 此处直接定义成 public, 就不再搞 get set方法
// 对于这样的简单类,直接使用成员,类似于 C的结构体
public int totalCount; // 总消息数量
public int validCount; // 有效消息数量
}
1.此处采用静态内部类:
静态内部类 不会依赖上面外部类的 this ,这样解耦更彻底
如果确实需要访问外部类的属性,这时不使用static。这种想在内部类使用外部类非静态属性的,只有外部类有 this 的情况下才行。
此处很显然不会使用外部类的非静态属性,所以使用static,限制更少,更方便。
3.使用public:
虽然此类只在这里使用,但是为了测试方便,所以使用public,更方便访问。
1.4 实现消息统计文件读写
1.由于 消息统计文件是文本格式,所以可以直接使用Scanner来读取文件
2.需要注意的是写文件时,使用的时FileOutputStream,它的第二个参数可以控制:
(默认)false:直接覆盖原文
true:追加文本
private Stat readStat(String queueName) {
// 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private void writeStat(String queueName, Stat stat) {
// 使用 PrintWriter 来写文件
// OutputStream 打开文件, 默认情况下会直接把原文件清空.此时相当于新的数据覆盖了旧的.
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
1.5 实现创建消息目录和文件
此方法 创建对应的文件和目录
包括:
队列对应的消息目录 testQueue1
消息数据文件 queue_data.txt
消息统计文件 queue_stat.txt
并且为消息统计文件设置初始值 0\t0
// 创建队列对应的文件和目录
private void createQueueFiles(String queueName) throws IOException {
// 创建队列对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if(!baseDir.exists()) {
// 不存在就创建这个目录
boolean ok = baseDir.mkdirs();
if (!ok) {
throw new IOException("创建目录失败! baseDir = " + baseDir.getAbsolutePath());
}
}
// 创建消息数据文件
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()) {
boolean ok = queueDataFile.createNewFile();
if (!ok) {
throw new IOException("创建文件失败! queueDataFile = " + queueDataFile.getAbsolutePath());
}
}
// 创建消息统计文件
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()) {
boolean ok = queueStatFile.createNewFile();
if (!ok) {
throw new IOException("创建文件失败! queueStatFile = " + queueStatFile.getAbsolutePath());
}
}
// 给消息统计文件 设定初始值 0\t0
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName, stat);
}
1.6 实现删除消息目录和文件
此方法用于删除 1.5 方法创建的消息目录和文件
注意:
1. 先删除文件,在删除目录
2. 一个删除失败就整体失败
3.并且给出一个检查目录文件是否存在的方法:
后续有生产者给 Broker Server 生产消息,这个消息可能需要记录到文件上,要保证对应的目录文件是存在的。
// 删除队列的目录和文件
// 队列也是可以被删除的, 当队列删除目录之后, 对应的消息文件自然而然也要删除
public void destroyQueueFiles(String queueName) throws IOException {
// 先删除文件 再删除目录
File queueStatFile = new File(getQueueStatPath(queueName));
boolean ok1 = queueStatFile.delete();
File queueDataFile = new File(getQueueDataPath(queueName));
boolean ok2 = queueDataFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean ok3 = baseDir.delete();
if (!ok1 || !ok2 || !ok3) {
// 有任意一个删除失败 算整体删除失败
throw new IOException("删除队列目录和文件失败!baseDir = " + baseDir.getAbsolutePath());
}
}
// 检查队列的目录和文件是否存在
// 比如后续有生产者给 Broker Server 生产消息了 这个消息可能需要记录到文件上(取决于消息是否需要持久化)
public boolean checkFilesExist(String queueName) {
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()) {
return false;
}
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()) {
return false;
}
return true;
}
1.7 实现消息序列化
此处抽出写一篇文章,主要介绍 Java中的序列化和反序列化是什么?
1. 消息序列化的一些概念:
消息序列化:
是什么: 简单来说就是 -> 把一个对象(结构化数据)转成一个字符串/字节数组(可以和 降维打击联想在一起)
为什么:序列化之后,方便存储和传输。
存储:一般就是存储在文件中(就像此处消息存到文件里),因为文件只能存字符串/二进制数据,不能直接存对象
传输:通过网络传输。
2. 方案选择:
方案一:
使用 JSON 来完成序列化,反序列化:jackson,ObjectMapper(就像把arguments字段存到数据库中一样,把(集合)结构化数据转成字符串)
不可行:因为 Message,里面存储的 body 部分,是二进制数据,不太方便使用 JSON 进行序列化。JSON序列化得到的结果是文本数据,无法存储二进制。
有关JSON序列化的一些小记:
1.为什么JSON无法存储二进制
JSON 格式中有很多特殊符号, : " {} 这些符号会影响 JSON 格式的解析。如果存文本,你的键值对中不会包含上述特殊符号。如果存二进制,某个二进制的字节正好就和上述特殊符号ASCII对应,此时可能会引起JSON解析格式错误。
2.如果就想使用 JSON序列化,而且以二进制存储怎么办?
可以针对二进制数据进行base64编码,base64作用就是用 4 个字节,表示三个字节的信息,会保证4个字节 都是使用 文本字符。
(像HTML 中如果嵌入一个图片,图片是二进制数据,就可以把图片的二进制 base64 编码然后就可以 直接以文本的形式 嵌入 HTML中)
但是这种方法效率底,有额外的转码开销,同时,还会使空间变大。
关于base64的使用,我会写一篇博文
针对二进制序列化有很多种解决方案
方案二 Java标准库就提供了序列化的方案 ObjectInputStream 和 ObjectOutputStream
方案三 Hessian也是一个解决方案
方案四 protobuffer
方案五 thrift
此处使用第二个方案,好处:不必引入额外的依赖。
3.验证版本
此处 private static final long serialVersionUID = 1L;
是用来验证版本的。实际中开发代码是不断修改更新的。
1. 将Message信息序列化存储到文件中
2.更新了Message类结构
3.针对以前的旧数据进行反序列化,大概率失败
4.所以,通过此验证版本,一旦发现版本不一致,直接报错,不允许反序列化,提醒程序员数据有问题。
4.序列化代码
package com.xj.mq.common;
import java.io.*;
/**
* Created with IntelliJ IDEA
* Description 序列化工具类
* 并不仅仅是 Message 其他的Java对象 也是可以通过这样的逻辑 进行序列化和反序列化
* 如果要让这个对象能序列化和反序列化 需要让它的类实现 Serializable 接口
* User: 王杰
* Date: 2025-05-20
* Time: 9:56
*/
public class BinaryTool {
// 把 一个对象 序列化 成一个字节数组
public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个变长的字节数组
// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中 再统一转成 byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
// 此处的 writeObject 就会把该对象进行序列化 生成的二进制字节数据 就会写入到
// ObjectOutputStream 中
// 由于 ObjectOutputStream 又关联到了 ByteArrayOutputStream 最终结果就写入到 ByteArrayOutputStream 里了
objectOutputStream.writeObject(object);
}
// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来 转成 byte[]
return byteArrayOutputStream.toByteArray();
}
}
// 把 一个字节数组 反序列化 成一个对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream()){
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
// 此处的 readObject 就是从 data 这个 byte[] 中读取数据并且反序列化
object = objectInputStream.readObject();
}
return object;
}
}
}
1.8 把消息写入文件
实现把消息写入文件
1.写入消息长度
我们需要往这个文件中,先写入消息的长度(四个字节),是需要把 长度这个 int 的四个字节,一次写入到文件中:
如果选择 outputStream.write(messageBinary.length)会出现一些问题
虽然这个 write的参数类型是 int类型,但是实际上只能写一个字节
在流对象中,经常会涉及到,使用 int 表示 byte 的情况,具体该怎么做呢?
可以把 int 的四个字节分别取出来,一个字节一个字节的写:
很显然不用我们亲自写这些,Java标准库已经提供了现成的类,帮我们封装好了上述操作(让我们感受Java的魅力吧!)
DataInputStream/DataOutputStream
// 写入消息到数据文件 注意 是追加写入到数据文件末尾
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
// 接下来要先写出来 当前消息的长度 占据 4个字节的
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
2.线程安全问题
问题如下图所示:
怎么办?加锁!
确定锁对象
写到 synchronized() 里的对象,当前就以 queue 队列对象 进行加锁即可。
如果两个线程,是往同一个队列中写消息,此时就阻塞等待
如果两个线程,往不同的队列里写消息,此时不需要阻塞等待
3.加锁代码
// 把一个新的消息 放到队列对应的文件中
// queue 表示要把消息写入的队列, massage 表示要写的消息
public void sendMessage(MessageQueue queue, Message message) throws MqException, IOException {
// 检查一下 当前要写入的队列 对应的文件 是否存在
if (!checkFilesExist(queue.getName())) {
throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName = " + queue.getName());
}
// 把Message对象 进行序列化 转成二进制的字节数组
byte[] messageBinary = BinaryTool.toBytes(message);
synchronized (queue) {
// 先获取到当前的队列数据文件的长度 用这个来计算 该Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据 写入到队列数据文件的末尾 此时Message对象的 offsetBeg 就是文件长度 + 4
// offsetEnd 就是当前文件长度 + 4 + message自身长度
File queueDataFile = new File(getQueueDataPath(queue.getName()));
// 通过 queueDataFile.length() 就能获取到文件的长度 单位字节
message.setOffsetBeg(queueDataFile.length() + 4);
message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
// 写入消息到数据文件 注意 是追加写入到数据文件末尾
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
// 接下来要先写出来 当前消息的长度 占据 4个字节的
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 更新 消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(), stat);
}
}
1.9 删除文件中的消息
1.随机访问
之前用的 FileInputStream 和 FileOutputStream 都是从文件头读写的。而此处我们需要的是,在文件中的指定位置进行读写 ,随机访问。此处用到的类是 RandomAccessFile
三个方法:
read:读取时会移动光标
write:写入时会移动光标
seek:调整当前的文件光标(当前要读写的位置)
2.删除代码
// 删除消息的方法
// 这里的删除时 逻辑删除 也就是把硬盘上存储这个数据里边的哪个 isValid 属性 设置成 0
// 先把文件中的这一段数据 读出来 还原回 Message 对象
// 把 isValid 改成 0
// 把上述数据重新写回到文件
// 此处这个参数中的 message 对象 必须得包含有效的 offsetBeg 和 offsetEnd
public void deleteMessage(MessageQueue queue, Message message) throws IOException, ClassNotFoundException {
synchronized (queue) {
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")){
// 先从文件中读取对应的 Message 数据
byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
// 把当前读出来的二进制数据 转换回 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
// 把 isValid 设置为无效
message.setIsValid((byte) 0x0);
// 此处不需要 给参数 message的 isValid 设为0 因为这个参数代表的是内存中管理的 Message 对象
// 而这个对象 马上也要被从内存中销毁
// 重新写入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
// 虽然上面已经 seek过了 但是上面 seek完了 进行了读操作 这导致 文件光标往后移动
// 移动到下一个消息的位置 因此想让接下来的写入 能够刚好写回到之前的位置 就需要重新调整光标位置
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
// 通过上述代码 只是改变了一个重要的有效标记位 (一个字节)
}
// 不要忘了 更新统计文件 把一个消息设为无效 此时有效消息个数要 -1
Stat stat = readStat(queue.getName());
if (stat.validCount > 0) {
stat.validCount -= 1;
}
writeStat(queue.getName(), stat);
}
}
1.10 加载文件中的所有消息
读取文件中所有的消息内容:
此处注意这个方法是在程序启动时调用 此时服务器还不能处理请求 不涉及多线程操作文件
// 使用这个方法 从文件中 读取出所有的消息内容 加载到内存中 (具体来说就是放在一个链表里)
// 使用这个方法 准备在程序启动的时候 进行调用
// 这里使用了一个 LinkedList 主要目的是为了后续进行头删操作
// 这个方法参数 只是一个queueName 而不是 MessageQueue对象 因为这个方法不需要加锁 只使用 queueName 就够了
// 由于该方法是在程序启动时调用 此时服务器还不能处理请求 不涉及多线程操作文件
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
LinkedList<Message> messages = new LinkedList<>();
try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
try (DataInputStream dataInputStream = new DataInputStream(inputStream)){
// 这个变量记录当前文件光标
long currentOffset = 0;
// 一个文件 包含了很多信息 此处势必 循环读取
while (true) {
// 读取当前消息的长度
// readInt 方法读到文件末尾 会抛出 EOFException 异常 这一点和以前的流对象不同
int messageSize = dataInputStream.readInt();
// 按照这个长度 读取消息内容
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
if (messageSize != actualSize) {
// 如果不匹配 说明文件有问题 格式错乱了
throw new MqException("[MessageFileManager] 文件格式错误!" + queueName);
}
// 当读到这个二进制数据 反序列化回 Message 对象
Message message = (Message)BinaryTool.fromBytes(buffer);
// 判断一下看看这个消息对象 是不是无效对象
if (message.getIsValid() != 0x1) {
// 无效数据 直接跳过
// 虽然消息是无效数据 但是 offset 不要忘记更新
currentOffset += (4 + messageSize);
continue;
}
// 有效数据 则需要把这个 Message对象加入到链表中 加入之前还需要填写 offsetBeg 和 offsetEnd
// 进行计算 offset的时候 需要知道当前文件光标的位置 由于当前使用的是 dataInputStream 并不方便直接获取文件光标
// 因此就需要手动计算下文件光标
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
}catch (EOFException e) {
// 这个 catch 并非真是处理 “异常” 而是处理 “正常” 的业务逻辑 文件读到末尾 会被 readInt 抛出该异常
// 这个 catch 语句中也不需要做什么
System.out.println("[MessageFileManager] 恢复 Message 数据完成");
}
}
return messages;
}