篇章十 消息持久化(二)

发布于:2025-05-22 ⋅ 阅读:(21) ⋅ 点赞:(0)

目录

1.消息持久化-创建MessageFileManger类

1.1 创建一个类

1.2 创建关于路径的方法

1.3 定义内部类

1.4 实现消息统计文件读写

1.5 实现创建消息目录和文件

1.6 实现删除消息目录和文件

1.7 实现消息序列化

1. 消息序列化的一些概念:

2. 方案选择:

3.验证版本

4.序列化代码

 1.8 把消息写入文件

1.写入消息长度

2.线程安全问题

3.加锁代码

1.9 删除文件中的消息

1.随机访问

2.删除代码

1.10 加载文件中的所有消息


1.消息持久化-创建MessageFileManger类

1.1 创建一个类

1.2 创建关于路径的方法

获取消息文件所在的目录的方法

// 预定消息文件所在的目录和文件名
    // 这个方法, 用在获取到 指定队列 对应的 消息文件所在路径
    private String getQueueDir(String queueName) {
        return "./data/" + queueName;
    }

    // 这个方法用来获取 该队列的 消息数据文件路径
    // 注意:二进制文件,使用 txt 作为后缀, 不太合适. txt一般表示文本。
    // .bin .dat
    private String getQueueDataPath(String queueName) {
        return getQueueDir(queueName) + "/queue_data.txt";
    }

    // 这个方法用来获取 该队列的 消息统计文件路径
    private String getQueueStatPath(String queueName) {
        return getQueueDir(queueName) + "/queue_stat.txt";
    }

1.3 定义内部类

定义一个内部类, 来表示该队列的统计信息

// 定义一个内部类, 来表示该队列的统计信息
// 优先考虑使用 static, 静态内部类.
static public class Stat {
// 此处直接定义成 public, 就不再搞 get set方法
// 对于这样的简单类,直接使用成员,类似于 C的结构体
public int totalCount; // 总消息数量
public int validCount; // 有效消息数量
}

1.此处采用静态内部类:

静态内部类 不会依赖上面外部类的 this ,这样解耦更彻底

如果确实需要访问外部类的属性,这时不使用static。这种想在内部类使用外部类非静态属性的,只有外部类有 this 的情况下才行。

此处很显然不会使用外部类的非静态属性,所以使用static,限制更少,更方便。

3.使用public:

虽然此类只在这里使用,但是为了测试方便,所以使用public,更方便访问。

1.4 实现消息统计文件读写

1.由于 消息统计文件是文本格式,所以可以直接使用Scanner来读取文件

2.需要注意的是写文件时,使用的时FileOutputStream,它的第二个参数可以控制:

(默认)false:直接覆盖原文

true:追加文本

private Stat readStat(String queueName) {
        // 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName, Stat stat) {
        // 使用 PrintWriter 来写文件
        // OutputStream 打开文件, 默认情况下会直接把原文件清空.此时相当于新的数据覆盖了旧的.
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

1.5 实现创建消息目录和文件

此方法 创建对应的文件和目录

包括:

        队列对应的消息目录 testQueue1

        消息数据文件 queue_data.txt

        消息统计文件 queue_stat.txt

并且为消息统计文件设置初始值 0\t0

// 创建队列对应的文件和目录
    private void createQueueFiles(String queueName) throws IOException {
        // 创建队列对应的消息目录
        File baseDir = new File(getQueueDir(queueName));
        if(!baseDir.exists()) {
            // 不存在就创建这个目录
            boolean ok = baseDir.mkdirs();
            if (!ok) {
                throw new IOException("创建目录失败! baseDir = " + baseDir.getAbsolutePath());
            }
        }

        // 创建消息数据文件
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
            boolean ok = queueDataFile.createNewFile();
            if (!ok) {
                throw new IOException("创建文件失败! queueDataFile = " + queueDataFile.getAbsolutePath());
            }
        }

        // 创建消息统计文件
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
            boolean ok = queueStatFile.createNewFile();
            if (!ok) {
                throw new IOException("创建文件失败! queueStatFile = " + queueStatFile.getAbsolutePath());
            }
        }

        // 给消息统计文件 设定初始值 0\t0
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName, stat);
    }

1.6 实现删除消息目录和文件

此方法用于删除 1.5 方法创建的消息目录和文件

注意:

1. 先删除文件,在删除目录

2. 一个删除失败就整体失败

3.并且给出一个检查目录文件是否存在的方法:

        后续有生产者给 Broker Server 生产消息,这个消息可能需要记录到文件上,要保证对应的目录文件是存在的。

// 删除队列的目录和文件
    // 队列也是可以被删除的, 当队列删除目录之后, 对应的消息文件自然而然也要删除
    public void destroyQueueFiles(String queueName) throws IOException {
        // 先删除文件 再删除目录
        File queueStatFile = new File(getQueueStatPath(queueName));
        boolean ok1 = queueStatFile.delete();
        File queueDataFile = new File(getQueueDataPath(queueName));
        boolean ok2 = queueDataFile.delete();
        File baseDir = new File(getQueueDir(queueName));
        boolean ok3 = baseDir.delete();
        if (!ok1 || !ok2 || !ok3) {
            // 有任意一个删除失败 算整体删除失败
            throw new IOException("删除队列目录和文件失败!baseDir = " + baseDir.getAbsolutePath());
        }
    }

    // 检查队列的目录和文件是否存在
    // 比如后续有生产者给 Broker Server 生产消息了 这个消息可能需要记录到文件上(取决于消息是否需要持久化)
    public boolean checkFilesExist(String queueName) {
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
            return false;
        }

        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
            return false;
        }

        return true;
    }

1.7 实现消息序列化

此处抽出写一篇文章,主要介绍 Java中的序列化和反序列化是什么?

1. 消息序列化的一些概念:

消息序列化:

        是什么: 简单来说就是 -> 把一个对象(结构化数据)转成一个字符串/字节数组(可以和 降维打击联想在一起)

        为什么:序列化之后,方便存储和传输。

                        存储:一般就是存储在文件中(就像此处消息存到文件里),因为文件只能存字符串/二进制数据,不能直接存对象

                        传输:通过网络传输。

2. 方案选择:

方案一:

        使用 JSON 来完成序列化,反序列化:jackson,ObjectMapper(就像把arguments字段存到数据库中一样,把(集合)结构化数据转成字符串)

    不可行:因为 Message,里面存储的 body 部分,是二进制数据,不太方便使用 JSON 进行序列化。JSON序列化得到的结果是文本数据,无法存储二进制。

有关JSON序列化的一些小记:

1.为什么JSON无法存储二进制

        JSON 格式中有很多特殊符号, : " {} 这些符号会影响 JSON 格式的解析。如果存文本,你的键值对中不会包含上述特殊符号。如果存二进制,某个二进制的字节正好就和上述特殊符号ASCII对应,此时可能会引起JSON解析格式错误。

 2.如果就想使用 JSON序列化,而且以二进制存储怎么办?

        可以针对二进制数据进行base64编码,base64作用就是用 4 个字节,表示三个字节的信息,会保证4个字节 都是使用 文本字符。

        (像HTML 中如果嵌入一个图片,图片是二进制数据,就可以把图片的二进制 base64 编码然后就可以 直接以文本的形式 嵌入 HTML中)

        但是这种方法效率底,有额外的转码开销,同时,还会使空间变大。

关于base64的使用,我会写一篇博文

针对二进制序列化有很多种解决方案

方案二 Java标准库就提供了序列化的方案 ObjectInputStream 和 ObjectOutputStream

方案三 Hessian也是一个解决方案

方案四 protobuffer

方案五 thrift

此处使用第二个方案,好处:不必引入额外的依赖。

3.验证版本

此处 private static final long serialVersionUID = 1L;

是用来验证版本的。实际中开发代码是不断修改更新的。

1. 将Message信息序列化存储到文件中

2.更新了Message类结构

3.针对以前的旧数据进行反序列化,大概率失败

4.所以,通过此验证版本,一旦发现版本不一致,直接报错,不允许反序列化,提醒程序员数据有问题。

4.序列化代码

package com.xj.mq.common;

import java.io.*;

/**
 * Created with IntelliJ IDEA
 * Description 序列化工具类
 *              并不仅仅是 Message 其他的Java对象 也是可以通过这样的逻辑 进行序列化和反序列化
 *              如果要让这个对象能序列化和反序列化 需要让它的类实现 Serializable 接口
 * User: 王杰
 * Date: 2025-05-20
 * Time: 9:56
 */
public class BinaryTool {
    // 把 一个对象 序列化 成一个字节数组
    public static byte[] toBytes(Object object) throws IOException {
        // 这个流对象相当于一个变长的字节数组
        // 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中 再统一转成 byte[]
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
                // 此处的 writeObject 就会把该对象进行序列化 生成的二进制字节数据 就会写入到
                // ObjectOutputStream 中
                // 由于 ObjectOutputStream 又关联到了 ByteArrayOutputStream 最终结果就写入到 ByteArrayOutputStream 里了
                objectOutputStream.writeObject(object);
            }
            // 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来 转成 byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }

    // 把 一个字节数组 反序列化 成一个对象
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream()){
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
                // 此处的 readObject 就是从 data 这个 byte[] 中读取数据并且反序列化
                object = objectInputStream.readObject();
            }
            return object;
        }
    }
}

 1.8 把消息写入文件

实现把消息写入文件

1.写入消息长度

我们需要往这个文件中,先写入消息的长度(四个字节),是需要把 长度这个 int 的四个字节,一次写入到文件中:

如果选择 outputStream.write(messageBinary.length)会出现一些问题

虽然这个 write的参数类型是 int类型,但是实际上只能写一个字节

在流对象中,经常会涉及到,使用 int 表示 byte 的情况,具体该怎么做呢?

可以把 int 的四个字节分别取出来,一个字节一个字节的写:

很显然不用我们亲自写这些,Java标准库已经提供了现成的类,帮我们封装好了上述操作(让我们感受Java的魅力吧!)

DataInputStream/DataOutputStream

// 写入消息到数据文件 注意 是追加写入到数据文件末尾
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
     try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
           // 接下来要先写出来 当前消息的长度 占据 4个字节的
           dataOutputStream.writeInt(messageBinary.length);
           // 写入消息本体
           dataOutputStream.write(messageBinary);
      }
}

2.线程安全问题

问题如下图所示:

怎么办?加锁!

确定锁对象

写到 synchronized() 里的对象,当前就以 queue 队列对象 进行加锁即可。

如果两个线程,是往同一个队列中写消息,此时就阻塞等待

如果两个线程,往不同的队列里写消息,此时不需要阻塞等待

3.加锁代码

// 把一个新的消息 放到队列对应的文件中
    // queue 表示要把消息写入的队列, massage 表示要写的消息
    public void sendMessage(MessageQueue queue, Message message) throws MqException, IOException {
        // 检查一下 当前要写入的队列 对应的文件 是否存在
        if (!checkFilesExist(queue.getName())) {
            throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName = " + queue.getName());
        }

        // 把Message对象 进行序列化 转成二进制的字节数组
        byte[] messageBinary = BinaryTool.toBytes(message);

        synchronized (queue) {
            // 先获取到当前的队列数据文件的长度 用这个来计算 该Message 对象的 offsetBeg 和 offsetEnd
            // 把新的 Message 数据 写入到队列数据文件的末尾 此时Message对象的 offsetBeg 就是文件长度 + 4
            // offsetEnd 就是当前文件长度 + 4 + message自身长度
            File queueDataFile = new File(getQueueDataPath(queue.getName()));
            // 通过 queueDataFile.length() 就能获取到文件的长度 单位字节
            message.setOffsetBeg(queueDataFile.length() + 4);
            message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
            // 写入消息到数据文件 注意 是追加写入到数据文件末尾
            try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)){
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    // 接下来要先写出来 当前消息的长度 占据 4个字节的
                    dataOutputStream.writeInt(messageBinary.length);
                    // 写入消息本体
                    dataOutputStream.write(messageBinary);
                }
            }

            // 更新 消息统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount += 1;
            stat.validCount += 1;
            writeStat(queue.getName(), stat);
        }
    }

1.9 删除文件中的消息

1.随机访问

之前用的 FileInputStream 和 FileOutputStream 都是从文件头读写的。而此处我们需要的是,在文件中的指定位置进行读写 ,随机访问。此处用到的类是 RandomAccessFile

三个方法:

read:读取时会移动光标

write:写入时会移动光标

seek:调整当前的文件光标(当前要读写的位置)

2.删除代码

// 删除消息的方法
    // 这里的删除时 逻辑删除 也就是把硬盘上存储这个数据里边的哪个 isValid 属性 设置成 0
    // 先把文件中的这一段数据 读出来 还原回 Message 对象
    // 把 isValid 改成 0
    // 把上述数据重新写回到文件
    // 此处这个参数中的 message 对象 必须得包含有效的 offsetBeg 和 offsetEnd
    public void deleteMessage(MessageQueue queue, Message message) throws IOException, ClassNotFoundException {
        synchronized (queue) {
            try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")){
                // 先从文件中读取对应的 Message 数据
                byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBeg())];
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.read(bufferSrc);
                // 把当前读出来的二进制数据 转换回 Message 对象
                Message diskMessage  = (Message) BinaryTool.fromBytes(bufferSrc);
                // 把 isValid 设置为无效
                message.setIsValid((byte) 0x0);
                // 此处不需要 给参数 message的 isValid 设为0 因为这个参数代表的是内存中管理的 Message 对象
                // 而这个对象 马上也要被从内存中销毁
                // 重新写入文件
                byte[] bufferDest = BinaryTool.toBytes(diskMessage);
                // 虽然上面已经 seek过了 但是上面 seek完了 进行了读操作 这导致 文件光标往后移动
                // 移动到下一个消息的位置 因此想让接下来的写入 能够刚好写回到之前的位置 就需要重新调整光标位置
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.write(bufferDest);
                // 通过上述代码 只是改变了一个重要的有效标记位 (一个字节)
            }

            // 不要忘了 更新统计文件 把一个消息设为无效 此时有效消息个数要 -1
            Stat stat = readStat(queue.getName());
            if (stat.validCount > 0) {
                stat.validCount -= 1;
            }
            writeStat(queue.getName(), stat);
        }
    }

1.10 加载文件中的所有消息

读取文件中所有的消息内容:

        此处注意这个方法是在程序启动时调用 此时服务器还不能处理请求 不涉及多线程操作文件

// 使用这个方法 从文件中 读取出所有的消息内容 加载到内存中 (具体来说就是放在一个链表里)
    // 使用这个方法 准备在程序启动的时候 进行调用
    // 这里使用了一个 LinkedList 主要目的是为了后续进行头删操作
    // 这个方法参数 只是一个queueName 而不是 MessageQueue对象 因为这个方法不需要加锁 只使用 queueName 就够了
    // 由于该方法是在程序启动时调用 此时服务器还不能处理请求 不涉及多线程操作文件
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
            try (DataInputStream dataInputStream = new DataInputStream(inputStream)){
                // 这个变量记录当前文件光标
                long currentOffset = 0;
                // 一个文件 包含了很多信息 此处势必 循环读取
                    while (true) {
                    // 读取当前消息的长度
                    // readInt 方法读到文件末尾 会抛出 EOFException 异常 这一点和以前的流对象不同
                    int messageSize = dataInputStream.readInt();
                    // 按照这个长度 读取消息内容
                    byte[] buffer = new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if (messageSize != actualSize) {
                        // 如果不匹配 说明文件有问题 格式错乱了
                        throw new MqException("[MessageFileManager] 文件格式错误!" + queueName);
                    }
                    // 当读到这个二进制数据 反序列化回 Message 对象
                    Message message = (Message)BinaryTool.fromBytes(buffer);
                    // 判断一下看看这个消息对象 是不是无效对象
                    if (message.getIsValid() != 0x1) {
                        // 无效数据 直接跳过
                        // 虽然消息是无效数据 但是 offset 不要忘记更新
                        currentOffset += (4 + messageSize);
                        continue;
                    }
                    // 有效数据 则需要把这个 Message对象加入到链表中 加入之前还需要填写 offsetBeg 和 offsetEnd
                    // 进行计算 offset的时候 需要知道当前文件光标的位置 由于当前使用的是 dataInputStream 并不方便直接获取文件光标
                    // 因此就需要手动计算下文件光标
                    message.setOffsetBeg(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            }catch (EOFException e) {
                // 这个 catch 并非真是处理 “异常” 而是处理 “正常” 的业务逻辑 文件读到末尾 会被 readInt 抛出该异常
                // 这个 catch 语句中也不需要做什么
                System.out.println("[MessageFileManager] 恢复 Message 数据完成");
            }
        }
        return messages;
    }


网站公告

今日签到

点亮在社区的每一天
去签到