【Java微服务组件】分布式协调P1-数据共享中心简单设计与实现

发布于:2025-05-17 ⋅ 阅读:(17) ⋅ 点赞:(0)

欢迎来到啾啾的博客🐱。
记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。
欢迎评论交流,感谢您的阅读😄。

引言

分布式微服务架构中,有一些常见的分布式协调问题,如配置管理、命名服务(共享服务实例的地址信息)、分布式锁、集群管理、Master选举等。
这些协调问题可以简单归纳为数据的共享与状态监控,我们需要解决这些问题来保障架构的可用、性能,同时降低耦合、开放拓展等。

为此,我们的框架需要一个可靠的、一致的、可观察的共享数据存储。
ZooKeeper就是这样的一个存在。

不过在深入了解ZooKeeper之前,我们先就这些特性来简单实现一下共享数据存储。
在上篇注册中心中,我们设计了主从读写分离、数据安全结构与读写锁,实现了简单的注册中心。其数据共享主要是集群内数据共享。
本篇共享数据存储是面对整个架构的所有服务的。

设计一个共享数据中心

首先,我们需要考虑数据以什么样的形式、什么样的结构存在。
是仅存在于内存还是需要持久化,持久化是否需要进行事务特性实现以确保一致性……等等。

需要考虑的问题很多,首先,让我们来进行数据结构的选型、决定数据以什么样的形式存在。

选择数据模型

数据模型可以简单分为以下几种:

数据模型类型 说明
键值对(Key-Value) 这是最简单和最常见的数据模型。
比如Redis。
key一般为String类型、value的值灵活多变。

这样的结构易于实现,但查询能力优先,只能按键查找。
文档型(Document-Oriented) 比如MongoDB。
数据以类似JSON或BSON的文档形式存储。

适合存储半结构化数据(不完全符合严格的表结构,但包含一些内部组织标记或元数据的数据),结构灵活、具备层次结构、可解析、字描述。

这样的结构易于解析和传输大量数据,但查询更难。
列式(Columnar) 比如HBase或Cassandra。

适合大规模数据分析。

关系型(Relational) 比如MySQL。

适合数据间有复杂关系,有事务需求时的选择。

实现一个完整的关系型数据库非常困难。
图(Graph) 比如 Neo4j。

适合数据之前关系非常重要复杂的情况。

显然实现起来更复杂。

对于一个共享数据存储中心,它需要能存储各式各样的值,且用户服务可以很快速地获取共享的值。
那么选择“键值对”作为数据存储的结构是个很合理的选择。

键值对设计

键值对应该如何设计呢?
了解基本数据结构的我们很容易想到使用Hash计算的方式映射,用数组进行存储。
在Java中简单来说就是使用HashMap。共享数据存储中心需要考虑多个用户服务同时调用的情况,因此,我们的结构应当是线程安全的,即使用ConcurrentHashMap,或者对普通的HashMap使用锁,如读写锁。

数据可靠性设计

数据是仅内存(In-Memory Only)还是需要持久化(Persistence)呢?

  • 仅内存
    仅内存性能高,但共享数据存储中心一旦重启或崩溃,数据容易丢失。

  • 持久化
    持久化虽然更可靠,但是复杂度显著提升。
    需要以什么形式持久化?
    持久化是否需要事务来确保一致性(内存和持久化存储的一致性、并发访问的一致性)?
    以什么样的方式持久化?
    等,有很多需要考虑的地方,越可靠越复杂。
    因此必须在复杂度和可靠中做取舍,这往往也取决于对数据丢失的容忍度和数据的重要性。

持久化

持久化设计需要考虑持久化方式(Durability)与崩溃恢复(Crash Recovery)。从内存到磁盘的持久化有3种方式。

快照 (Snapshotting/RDB-like)

定期将内存中的整个数据结构完整地序列化到磁盘(分布式的“状态转移”)。
快照时,系统会创建一个当前内存数据结构的副本,对副本进行序列化操作。快照文件通常是二进制格式,考虑存储效率和加载速度。例如Redis的RDB文件。
快照操作后台线程执行,对正在运行的操作影响小。
但如果两次快照之间发生故障,快照之间的部分数据会丢失。且快照过程比较耗时且消耗I/O,尤其数据量大时。

快照可以设置的触发策略如下:

  • 基于时间的策略 (Time-based)
  • 基于操作数量\数据变化的策略(Change-based)
  • 基于日志文件大小(Log-size-based - 通常与WAL/AOF结合
  • 手动触发 (Manual Trigger)
  • 系统关闭 (On Shutdown)
操作日志(Write-Ahead Log -WAL /Append-Only File AOF-like)

数据操作日志 是一种按时间顺序记录所有对数据产生修改的“操作”的日志文件。 它记录的是如何达到当前数据状态的过程,而不是数据状态本身(分布式的“操作转移”)。
数据需要先记录到日志,然后再更新到内存中。

  • 追加写入(Append-Only)
    新的操作日志条目总是被添加到日志文件的末尾。顺序写入的方式通常比随机写入磁盘效率高。
  • 预写(Write-Ahead)
    在数据真正被修改到内存中的持久化结构(在数据被刷到数据文件)之前,描述该修改的日志条目必须首先被安全地写入到持久化的操作日志中并刷盘 (fsync)。

日志条目内容一般包含:操作类型、操作参数、事务信息、时间戳或其他序列号(LSN)。

操作日志恢复数据时,会从日志头开始读取文件并按顺序重新执行,从而在内存中重建数据状态。

这种方式数据持久性更好,崩溃时只丢失最后未刷盘的少量操作。缺点是恢复时需要重放所有日志,可能较慢,日志文件会不断增长,需要定期进行压缩或与快照结合。

快照与日志融合使用(Snapshot + WAL)

快照只能恢复数据的最终状态,且两次快照之间数据会丢失,虽然效率高,但是数据量大时I/O消耗大。
而日志模式虽然不会丢失太多数据,但是重放日志效率更低,日志数量多时恢复效率会显著下降。
因此,生产级方案往往是快照与日志融合。

  • 融合方案
    定期做快照,同时记录快照之后的WAL。恢复时先加载最近的快照,再重放后续的WAL。

比如Redis和MySQL(InnoDB)就是用的融合方案。

  • MySQL
    MySQL的InnoDB设计有重做日志Redo Log,在数据被刷入磁盘之前,数据修改的记录(Redo Log)必须先被写入到Redo Log Buffer,并从Buffer刷到磁盘的Redo Log文件中。
    在InnoDB中,Redo Log有一个概念为检查点(Checkpoint),它记录一个LSN(Log Sequence Number)。
    数据恢复时不需要重放所有的Redo Log,只需要从最近的Checkpoint开始重放。
    Checkpoint 确保了其记录点之前的所有脏页都已刷盘,因此Checkpoint之前的Redo Log文件都可以被覆盖。。这个机制解决了日志文件不断增长的问题。

  • Redis
    Redis有两种持久化方案:RDB与AOF,且可以同时开启。
    RDB对应快照,AOF对应操作日志。
    Redis应对操作日志不断增大的机制是 AOF重写(AOF Rewrite)。AOF重写在不中断服务的情况下,创建一个新的更小的AOF文件,新文件包含达到当前数据集状态所需的最小命令集。即去掉数据状态变更过程,只保留最新数据状态。
    Redis从4.0开始RDB-AOF混合持久化。AOF重写时,新的AOF文件可以配置为以RDB格式开头,后跟增量的AOF命令(aof-use-rdb-preamble yes)。
    新的 AOF 文件首先包含一个 RDB 快照部分(记录重写开始时的数据状态),然后是重写期间发生的增量写命令。
    这使得恢复时可以先加载 RDB 部分,然后只重放少量的 AOF 命令,大大加快了恢复速度,同时保留了 AOF 的高持久性。

比如我们计划使用ConcurrentHashMap,那么快照时就需要将这个ConcurrentHashMap序列化。
常见的方式是快照方式是非阻塞式的后台复制——写时复制(Copy-on-Write COW)。快照策略选择按数据量进行触发。
快照与日志融合方案使用MySQL的更为简单。

一致性

在通过持久化的方式来保证数据的可靠后,我们的共享数据存储中心有了一定的可用性保障。这时我们需要开始考虑数据的一致性。
但一个完整的ACID事务系统是极其困难的,设计到并发控制、恢复管理、日志管理等多个复杂的子系统。
因此,简单实现可以暂不追求完整的ACID事务。
仅先考虑基本的一致性,如简单原子性,批量操作提交视为一个整体。集群数据同步的一致性。

监控数据状态

状态通知机制

当数据发生变更时,我们需要一个机制能可靠地通知所有相关节点,并保证它们获取到的最新的、一致性的数据。
我们很容易想到观察者模式。当数据被修改时,告诉共享数据的订阅者数据已更改。
因此通知机制的前提是需要有一张注册表。
在上篇的注册中心我们已经知道,注册表可以通过心跳来维持其有效性。
但还有另一种做法,就是ZooKeeper的的Watcher机制。
每次修改数据通知完订阅者后,删除其在注册表中的信息,每次getData()时再重新注册。即一次性触发 (One-time Trigger)。
这样可以精简设计,省去心跳机制。

集群间数据同步

简单追求集群数据同步的强一致性,共享数据做读写分离处理提升性能。

实现一个共享数据中心

简单实现两个model类,用于存储在内存的共享数据对象ShareData

  
import lombok.Data;

/**
 * 内存中的共享数据
 *
 * @author crayon
 * @version 1.0
 * @date 2025/5/14
 */
@Data
public class ShareData {


    private String id;

    /**
     * 数据更新时间戳
     */
    private Long lsn;


    /**
     * 数据
     */
    private Object data;

    /**
     * 数据版本
     */
    private int version;

    public ShareData(String id1, String initialValueForKey1, int version) {
        this.id = id1;
        this.data = initialValueForKey1;
        this.version = version;
        this.lsn = System.currentTimeMillis();
    }


    public void incrementVersion() {
        this.version++;
    }
}

与用于序列化的PersistenceData

package com.crayon.datashare.model;

import lombok.Data;

import java.io.Serializable;

/**
 * 内存共享数据的序列化对象
 *
 * @author crayon
 * @version 1.0
 * @date 2025/5/15
 */
@Data
public class PersistenceData implements Serializable {

    /**
     * 数据序列化时间
     */
    private Long serialDateTime;

    /**
     * 操作类型
     */
    private String operaType;

    /**
     * 数据key
     */
    private String key;

    /**
     * 共享数据
     */
    private ShareData shareData;

    public PersistenceData(Builder builder) {
        this.key = builder.key;
        this.shareData = builder.shareData;
        this.operaType = builder.operaType;
        this.serialDateTime = System.currentTimeMillis();
    }

    @Override
    public String toString() {
        return "PersistenceData{" +
                "serialDateTime=" + serialDateTime +
                ", operaType='" + operaType + '\'' +
                ", key='" + key + '\'' +
                ", shareData=" + shareData +
                '}';
    }

    // 建造者模式
    public static class Builder {
        private String key;
        private ShareData shareData;

        private String operaType;

        public Builder key(String key) {
            this.key = key;
            return this;
        }

        public Builder shareData(ShareData shareData) {
            this.shareData = shareData;
            return this;
        }

        public Builder operaType(String operaType) {
            this.operaType = operaType;
            return this;
        }

        public PersistenceData build() {
            return new PersistenceData(this);
        }
    }

}

实现最重要的数据共享中心服务端


import com.crayon.datashare.model.ShareData;

import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 简单的共享数据存储中心
 *
 * <p>
 * 功能如下:
 * <p>
 * API-获取数据
 * API-存储数据
 * API-注册信息
 * <p>
 * 数据量到了应规模时进行序列化快照,存储数据时日志追加
 *
 * @author crayon
 * @version 1.0
 * @date 2025/5/14
 */
public class ShareDataServer {

    /**
     * 使用ConcurrentHashMap存储共享数据
     * <p>
     * keyName -> ShareData
     * </p>
     * <p>
     * 集群做读写分离设计,Leader-Follower 模型 。
     * master写同步到slave,slave节点读,负载均衡采用随机策略。
     * </p>
     * <p>
     * 数据容量暂不设置上限与对应清理机制。
     * </p>
     * <p>
     * 没有选举机制,也没有逻辑时钟
     * </p>
     */
    private static ConcurrentHashMap<String, ShareData> shareDataMaster = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, ShareData> shareDataSlave1 = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, ShareData> shareDataSlave2 = new ConcurrentHashMap<>();

    /**
     * 用于随机获取从节点
     */
    private static Random random = new Random();

    /**
     * <p>
     * keyName -> ReentrantReadWriteLock
     * </p>
     * 线程安全方案一:
     * <p>
     * 使用读写锁控制共享数据安全。
     * ConcurrentHashMap 操作数据是安全的,但是共享数据内容是可变的(Mutable)。
     * 当需要组合多个ConcurrentHashMap操作时,其是不安全的。
     * 其他线程可能在ConcurrentHashMap多个操作之间,对可变对象进行更改。
     * <p>
     * 因此需要读写锁来保证写入时候数据安全。
     * 在共享数据中,因为原子操作为:写数据+日志追加,所以更需要使用锁来控制。
     * <p>
     * 在分布式系统中,共享数据中心本身常被作为分布式锁使用。
     * <p>
     * 如果不是需要WAL,其实可以通过不可变对象(Immutable Objects)来消除数据共享来简化并发问题
     * </p>
     */
    private static ConcurrentHashMap<String, ReentrantReadWriteLock> readWriteLocks = new ConcurrentHashMap<>();


    /**
     * 订阅者集合
     * <p>
     * 采取一次性触发机制(One-time Trigger),省去心跳检测的麻烦
     * 每次通知订阅者时,会从集合中移除订阅者,订阅者每次需要重新注册
     * 比如在调用get时重新注册
     * <p>
     * 订阅者也可以封装成一个对象,这里简单一点=ip:port
     * <p>
     * keyName -> Set<ip:port>
     * 使用线程安全的Set,如 ConcurrentHashMap.newKeySet()
     * </p>
     */
    private static ConcurrentHashMap<String, Set<String>> subscribers = new ConcurrentHashMap<>();


    /**
     * Watcher
     */
    private static Notifier notifier = new Notifier();


    /**
     * 序列化服务
     * 日志操作,数据恢复(暂无)等
     */
    private static SerializableService serializableService = new SerializableService();

    /**
     * 获取共享数据
     * 采取一次性触发机制(One-time Trigger)由Server完成
     *
     * @param key
     * @param ipPort (可选) 客户端标识,用于重新注册Watcher
     * @param watch  (可选) 是否要设置Watcher
     * @return
     */
    public ShareData get(String key, String ipPort, boolean watch) {
        ReentrantReadWriteLock readWriteLock = readWriteLocks.computeIfAbsent(key, k -> new ReentrantReadWriteLock());
        readWriteLock.readLock().lock();
        try {
            ConcurrentHashMap<String, ShareData> readNode = getReadNode();
            ShareData shareData = readNode.get(key);
            if (watch && null != ipPort && !"".equals(ipPort) && null != shareData) {
                register(key, ipPort);
            }
            return shareData;
        } finally {
            readWriteLock.readLock().unlock();
        }
    }

    /**
     * 注册订阅者
     *
     * @param key
     * @param ipPort
     */
    public void register(String key, String ipPort) {
        // 使用ConcurrentHashMap.newKeySet() 创建一个线程安全的Set
        subscribers.computeIfAbsent(key, k -> ConcurrentHashMap.newKeySet()).add(ipPort);
    }

    /**
     * 添加共享数据
     * 组合 日志追加 + 添加 + 集群同步
     * <p>
     * 原子操作设计:
     * 一般这种带集群同步的标准方案是共识算法(Consensus Algorithm)。太复杂了,搞不来。
     *
     * </p>
     *
     * @param key
     * @param value
     */
    public boolean set(String key, ShareData value) {
        ReentrantReadWriteLock readWriteLock = readWriteLocks.computeIfAbsent(key, k -> new ReentrantReadWriteLock());
        readWriteLock.writeLock().lock();
        try {
            // 1、写入日志 WAL
            boolean logSuccess = serializableService.appendLog(OperaTypeEnum.SET.getType(), key, value);
            if (!logSuccess) {
                return false;
            }
            // 2、写入内存Master
            shareDataMaster.put(key, value);
            /**
             * 3、集群同步
             * 简单模拟,没有处理网络失败、异步、其他复杂ack机制等
             */
            syncToSlave(key, value);
            // 4、通知订阅者,从注册表移除
            // 获取并移除,实现一次性触发
            Set<String> currentSubscribers = subscribers.remove(key);
            if (currentSubscribers != null && !currentSubscribers.isEmpty()) {
                for (String subscriberIpPort : currentSubscribers) {
                    // 实际应用中,这里会通过网络连接向客户端发送通知
                    notifier.notify(subscriberIpPort, key, OperaTypeEnum.CHANGE.getType());
                }
            }
        } catch (Exception e) {
            // 实际生产需要回滚等事务操作、日志记录等
            return false;
        } finally {
            readWriteLock.writeLock().unlock();
        }
        return true;
    }

    /**
     * 集群同步
     * <p>
     * 只在set操作中调用
     *
     * @param key
     * @param value
     */
    private void syncToSlave(String key, ShareData value) {
        shareDataSlave1.put(key, value); // 模拟同步到slave1
        shareDataSlave2.put(key, value); // 模拟同步到slave2
    }


    /**
     * 50%概率随机取节点
     *
     * @return
     */
    private ConcurrentHashMap<String, ShareData> getReadNode() {
        return random.nextBoolean() ? shareDataSlave1 : shareDataSlave2;
    }

}

封装操作类型枚举


/**
 * 操作类型枚举
 */
public enum OperaTypeEnum {

    GET("GET"),
    SET("SET"),

    CHANGE("CHANGE");


    private String type;

    OperaTypeEnum(String type) {
        this.type = type;
    }

    public String getType() {
        return type;
    }

}

实现序列化方法


import com.crayon.datashare.model.PersistenceData;
import com.crayon.datashare.model.ShareData;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;

/**
 * 序列化服务
 *
 * <p>
 * 序列化:采用快照+日志方式
 * 按数据量策略进行快照
 * <p>
 * 文件内容:类Redis融合方案
 * 快照内容+日志内容。文件前面是RDB格式,后面是AOF格式
 * <p>
 * RDB格式:
 * AOF格式
 * <p>
 * 文件大小处理:采用保留数据最终状态的压缩方案
 *
 * <p>
 * 恢复机制:
 *
 * @author crayon
 * @version 1.0
 * @date 2025/5/15
 */
public class SerializableService {

    /**
     * 假设的日志文件名
     */
    private static final String MASTER_LOG_FILE = System.getProperty("user.dir") + "/wal/master_wal.log";

    /**
     * 日志追加
     * <p>
     * 简化的日志格式,实际应该至少有操作类型、时间戳、序列号、状态码,
     * 数据库的话会有数据库的一些信息,如数据库名字、server id等
     * </p>
     * 生产日志会有压缩、刷盘等操作,这里简化了
     */
    public boolean appendLog(String operaType, String key, ShareData value) {
        PersistenceData persistenceData = new PersistenceData.Builder()
                .operaType(operaType)
                .key(key)
                .shareData(value)
                .build();
        try (PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(MASTER_LOG_FILE, true)))) {
            // out.flush(); // 可以考虑更频繁的flush或根据策略fsync
            out.println(persistenceData.toString());
            return true;
        } catch (IOException e) {
            System.err.println("Error writing to WAL: " + e.getMessage());
            return false;
        }
    }
}

封装一个用于通知的监视者Watcher


/**
 * @author crayon
 * @version 1.0
 * @date 2025/5/16
 */
public class Notifier {

    /**
     * 通知订阅者
     *
     * @param subscriberIpPort
     * @param key
     * @param operaType
     */
    public void notify(String subscriberIpPort, String key, String operaType) {
        // 调用订阅者的接口,让订阅者进行相应的处理
    }
}

封装一个客户端

/**
 * 简单模拟客户端
 * <p>
 *     订阅和获取、更新数据等操作
 * </p>
 * 
 * @author crayon
 * @version 1.0
 * @date 2025/5/16
 */
public class SubscriberClient {


    private String ipPort;
    private ShareDataServer shareDataServer = new ShareDataServer();

    public SubscriberClient(String ipPort) {
        this.ipPort = ipPort;
    }

    public void subscribe(String key) {
        shareDataServer.register(key, ipPort);
    }

    public ShareData get(String key, String ipPort) {
        return shareDataServer.get(key, ipPort, true);
    }

}

test


import com.crayon.datashare.client.SubscriberClient;
import com.crayon.datashare.model.ShareData;
import com.crayon.datashare.server.ShareDataServer;

/**
 * 简单数据共享中心演示
 *
 * @author crayon
 * @version 1.0
 * @date 2025/5/14
 */
public class Demo {

    public static void main(String[] args) {
        ShareDataServer shareDataServer = new ShareDataServer();

        // 模拟客户端1注册对 key1 的订阅
        SubscriberClient subscriberClient8080 = new SubscriberClient("127.0.0.1:8080");
        subscriberClient8080.subscribe("key1");
        SubscriberClient subscriberClient8081 = new SubscriberClient("127.0.0.1:8081");
        subscriberClient8081.subscribe("key1");

        // 模拟客户端2注册对 key2 的订阅
        subscriberClient8081.subscribe("key2");


        System.out.println("\n Setting data for key1...");
        shareDataServer.set("key1", new ShareData("id1", "Initial Value for key1", 1));
        System.out.println("\n Getting data for key1 by client1 (will re-register watcher)...");
        ShareData data1 = shareDataServer.get("key1", "client1_ip:port", true);
        System.out.println("Client1 got: " + data1);

        System.out.println("\n Setting data for key1 again (client1 should be notified)...");
        shareDataServer.set("key1", new ShareData("id1", "Updated Value for key1", 2));

        System.out.println("\n Setting data for key2...");
        shareDataServer.set("key2", new ShareData("id2", "Value for key2", 1));


        System.out.println("\n Client2 getting data for key1 (not subscribed initially, but sets a watch now)...");
        ShareData data1_by_client2 = shareDataServer.get("key1", "client2_ip:port", true);
        System.out.println("Client2 got (for key1): " + data1_by_client2);


        System.out.println("\n Simulating a read from a random slave for key1:");
        ShareData slaveData = shareDataServer.get("key1", null, false); // No re-register
        System.out.println("Read from slave for key1: " + slaveData);
    }

}

结果展示如下
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到