Nacos1.x Raft算法实现

发布于:2025-02-11 ⋅ 阅读:(65) ⋅ 点赞:(0)

Nacos1.x Raft算法实现

Nacos作为一款分布式注册中心,在一致性的构架设计上支持了AP以及CP,其中CP使用了Raft来作为其强一致性的算法实现,本文基于Nacos 1.x版本分析其部分功能的源码实现

Raft算法的论文介绍参考:https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md

RaftCore

初始化方法中主要有几个任务

  1. 从持久化文件中加载元数据到内存
  2. 注册Leader选举任务
  3. 注册Leader心跳任务

这里元数据和非常核心的数据结构,数据以k-v形式存在,key一般由数据类型限定符+服务标识组成,value则为具体的数据,以服务发现场景为例

// 服务实例列表(Instances)
com.alibaba.nacos.naming.iplist.public##MyService
{
    "instanceList": [
        {
            "ip": "192.168.1.100",
            "port": 8080,
            "weight": 1.0,
            "healthy": true
        },
        {
            "ip": "192.168.1.101",
            "port": 8081,
            "weight": 1.0,
            "healthy": true
        }
    ]
}
// 服务元数据(Service)
com.alibaba.nacos.naming.service.meta.public##MyService
{
    "name": "MyService",
    "protectThreshold": 0.5,
    "metadata": {
        "version": "1.0.0",
        "owner": "teamA"
    }
}

初始化#init()

public void init() throws Exception {

	...
	// 从持久化文件中加载元数据到内存
    datums = raftStore.loadDatums(notifier);

    setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
    // 注册Leader选举任务
    GlobalExecutor.registerMasterElection(new MasterElection());
    // 注册Leader心跳任务
    GlobalExecutor.registerHeartbeat(new HeartBeat());
	...
}

Leader选举#MasterElection

在Raft算法中Leader是一个非常重要的角色,因为数据的写操作都只能提到给Leader节点完成,所以确定一个Leader是一件非常重要的事情,在上面init方法中启动了一个周期选举任务,周期间隔是TICK_PERIOD_MS(500ms),从节点一旦在设定的LEADER_TIMEOUT_MS没有接收到心跳,则认为Leader节点可能出现故障,将重新发起Leader的选举。

public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L);
// Leader超时时间
public volatile long leaderDueMs = RandomUtils.nextLong(0, GlobalExecutor.LEADER_TIMEOUT_MS);

这里的超时时间之所以是一个0~15s的随机值,是为了出现,Leader节点一挂,所有从节点出现并发选举的情况

public void run() {
    try {

        if (!peers.isReady()) {
            return;
        }

        RaftPeer local = peers.local();
        local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

        if (local.leaderDueMs > 0) {
            return;
        }

        // reset timeout
        local.resetLeaderDue();
        local.resetHeartbeatDue();
        // 发起投票重新选举Leader
        sendVote();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while master election {}", e);
    }

}
发起投票#sendVote()

发起投票时,发起方将成为候选者,总是投票给直接,并向所有除自身节点外的节点发起投票请求,并根据综合投票结果决定出Leader

public void sendVote() {

    RaftPeer local = peers.get(NetUtils.localServer());
    Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
        JSON.toJSONString(getLeader()), local.term);

    peers.reset();

    local.term.incrementAndGet();
    local.voteFor = local.ip;
    local.state = RaftPeer.State.CANDIDATE;

 	...
    for (final String server : peers.allServersWithoutMySelf()) {
        final String url = buildURL(server, API_VOTE);
        try {
            HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    ...
                    peers.decideLeader(peer);
                    return 0;
                }
            });
        } catch (Exception e) {
            Loggers.RAFT.warn("error while sending vote to server: {}", server);
        }
    }
}
收到投票#receivedVote()

当节点接收到投票以后,秉承着先到先得的原则,如果当前任期内已经投票过了,将直接告知发起者自己的认同的节点ip,否则无条件的支持

public RaftPeer receivedVote(RaftPeer remote) {
	...
    RaftPeer local = peers.get(NetUtils.localServer());
    // 候选者请求的任期小于等于当前任务,返回自己的认同的节点ip
    if (remote.term.get() <= local.term.get()) {
        String msg = "received illegitimate vote" +
            ", voter-term:" + remote.term + ", votee-term:" + local.term;

        Loggers.RAFT.info(msg);
        if (StringUtils.isEmpty(local.voteFor)) {
            local.voteFor = local.ip;
        }

        return local;
    }

    local.resetLeaderDue();
	// 无条件的支持候选者的请求
    local.state = RaftPeer.State.FOLLOWER;
    local.voteFor = remote.ip;
    local.term.set(remote.term.get());

    Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);

    return local;
}
确定Leader#decideLeader()

多数人原则,即超过半数

public int majorityCount() {
    return peers.size() / 2 + 1;
}

最终拥有票数超过总节点一半的节点将成为Leader

public RaftPeer decideLeader(RaftPeer candidate) {
    peers.put(candidate.ip, candidate);

    SortedBag ips = new TreeBag();
    int maxApproveCount = 0;
    String maxApprovePeer = null;
    for (RaftPeer peer : peers.values()) {
        if (StringUtils.isEmpty(peer.voteFor)) {
            continue;
        }

        ips.add(peer.voteFor);
        if (ips.getCount(peer.voteFor) > maxApproveCount) {
            maxApproveCount = ips.getCount(peer.voteFor);
            maxApprovePeer = peer.voteFor;
        }
    }
	// 投票数大于
    if (maxApproveCount >= majorityCount()) {
        RaftPeer peer = peers.get(maxApprovePeer);
        peer.state = RaftPeer.State.LEADER;

        if (!Objects.equals(leader, peer)) {
            leader = peer;
            Loggers.RAFT.info("{} has become the LEADER", leader.ip);
        }
    }

    return leader;
}

心跳同样是在init启动的周期性的定时任务,任务间隔同样是500ms,但只有超过了心跳超时时间才会真正的发送心跳

// 心跳超时时间 0~5s
public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L);
public volatile long heartbeatDueMs = RandomUtils.nextLong(0, GlobalExecutor.HEARTBEAT_INTERVAL_MS);

Leader心跳#HeartBeat

public void run() {
    try {

        if (!peers.isReady()) {
            return;
        }

        RaftPeer local = peers.local();
        local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
        if (local.heartbeatDueMs > 0) {
            return;
        }

        local.resetHeartbeatDue();

        sendBeat();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
    }

}
发送心跳#sendBeat()

首先需要明确,只有领导则能够发送心跳,发送心跳主要主要有以下几个目的

  1. 维护Leader权威:发送心跳会重置选举超时时间,并且告知从节点自己成为了Leader
  2. 同步元数据到从节点:元数据统一由Leader节点写入,但需要把这些数据都同步给从节点学习
  3. 同步从从节点数据:主节点需要知道从节点的基本数据

datums就是是所有元数据的集合,从这里我们可以看到,每份元数据都包含timesatmp,这其实相当于数据的版本信息,作为数据有效性的一依据。

// 原始key
public static final String SERVICE_META_KEY_PREFIX = "com.alibaba.nacos.naming.domains.meta.";
public static final String BRIEF_SERVICE_META_KEY_PREFIX = "meta.";
// 缩略,只保留meta.
public static String briefServiceMetaKey(String key) {
	// com.alibaba.nacos.naming.domains.meta.##MyService=>meta.##MyService
    return BRIEF_SERVICE_META_KEY_PREFIX + key.split(SERVICE_META_KEY_PREFIX)[1];
}

并且我们可以看到key在这里被缩略化,也是体现了开发者对数据传输减少数据大小的优化之处

public void sendBeat() throws IOException, InterruptedException {
    RaftPeer local = peers.local();
    if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
        return;
    }
	// 重置选举超时时间
    local.resetLeaderDue();
    
    // 压缩
    String content = JSON.toJSONString(params);
    GZIPOutputStream gzip = new GZIPOutputStream(out);
    gzip.write(content.getBytes("UTF-8"));
    gzip.close();
	...
    if (!switchDomain.isSendBeatOnly()) {
        for (Datum datum : datums.values()) {
            JSONObject element = new JSONObject();
            if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
            } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
            }
            element.put("timestamp", datum.timestamp);
            array.add(element);
        }
    } else {
        Loggers.RAFT.info("[RAFT] send beat only.");
    }

    packet.put("datums", array);
    ...
    // 同步元数据到从节点
    for (final String server : peers.allServersWithoutMySelf()) {
         HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
         @Override
        public Integer onCompleted(Response response) throws Exception {
			// 同步从从节点数据
            peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
            	return 0;
       		}
        });
    }

}
接收心跳receivedBeat()

从节点接收到心跳如果自身标记为从节点,即可标记,并且重置自身的选举超时时间和心跳超时时间,并且重新确定集群中所有节点的ip和身份。

根据datums的key累计到一定数量后批量获取到元数据的的值,更新内存datums值并做持久化

public RaftPeer receivedBeat(JSONObject beat) throws Exception {
		...
      
    // 接收到信息,如果自身未标记为从节点,即刻标记为从节点
    if (local.state != RaftPeer.State.FOLLOWER) {

        Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
        // mk follower
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
    }

    final JSONArray beatDatums = beat.getJSONArray("datums");
  	// 重置超时时间
    local.resetLeaderDue();
    local.resetHeartbeatDue();
		// 重新确定集群中所有节点的ip和身份
    peers.makeLeader(remote);

    Map<String, Integer> receivedKeysMap = new HashMap<String, Integer>(datums.size());

    for (Map.Entry<String, Datum> entry : datums.entrySet()) {
        receivedKeysMap.put(entry.getKey(), 0);
    }

    // now check datums
    List<String> batch = new ArrayList<String>();
    if (!switchDomain.isSendBeatOnly()) {
        int processedCount = 0;
        for (Object object : beatDatums) {
            processedCount = processedCount + 1;

            JSONObject entry = (JSONObject) object;
            String key = entry.getString("key");
            final String datumKey;

            if (KeyBuilder.matchServiceMetaKey(key)) {
                datumKey = KeyBuilder.detailServiceMetaKey(key);
            } else if (KeyBuilder.matchInstanceListKey(key)) {
                datumKey = KeyBuilder.detailInstanceListkey(key);
            } else {
                // ignore corrupted key:
                continue;
            }

            long timestamp = entry.getLong("timestamp");

            receivedKeysMap.put(datumKey, 1);

            try {
              	// 根据timestamp检查datum的有效期
                if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
                    continue;
                }

                if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                    batch.add(datumKey);
                }
								// 累计50处理
                if (batch.size() < 50 && processedCount < beatDatums.size()) {
                    continue;
                }

                String keys = StringUtils.join(batch, ",");

                if (batch.size() <= 0) {
                    continue;
                }

                // 根据datum key 获取到实际的datum value,同步到自身的datums中,并做持久化
                String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
                HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
              					...
                        for (JSONObject datumJson : datumList) {
                            OPERATE_LOCK.lock();
                            Datum newDatum = null;
                            try {

                                Datum oldDatum = getDatum(datumJson.getString("key"));
																...
                                // 持久化元数据
                                raftStore.write(newDatum);

                                datums.put(newDatum.key, newDatum);
                                notifier.addTask(newDatum.key, ApplyAction.CHANGE);

                                local.resetLeaderDue();

                              	// 尝试更新自身任期
                                if (local.term.get() + 100 > remote.term.get()) {
                                    getLeader().term.set(remote.term.get());
                                    local.term.set(getLeader().term.get());
                                } else {
                                    local.term.addAndGet(100);
                                }

                                raftStore.updateTerm(local.term.get());

                            } catch (Throwable e) {
                                Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
                            } finally {
                                OPERATE_LOCK.unlock();
                            }
                        }
                    }
                });

                batch.clear();

            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
            }

        }
				...
       	// 移除无效的datum
        for (String deadKey : deadKeys) {
            try {
                deleteDatum(deadKey);
            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
            }
        }

    }

    return local;
}

元数据发布#signalPublish()

所有的元数据都由Leader节点写入,并同步给所有从节点学习,这里使用了CountDownLatch,得到超过半数节点响应成功即视为写入成功

public void signalPublish(String key, Record value) throws Exception {

    if (!isLeader()) {
        JSONObject params = new JSONObject();
        params.put("key", key);
        params.put("value", value);
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
				// 转发给Leader
        raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
        return;
    }

    try {
        OPERATE_LOCK.lock();
        long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }

        JSONObject json = new JSONObject();
        json.put("datum", datum);
        json.put("source", peers.local());
				// Leader节点自身写入元数据
        onPublish(datum, peers.local());

        final String content = JSON.toJSONString(json);
				// 等待数同样设置为总节点的半数
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildURL(server, API_ON_PUB);
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                            datum.key, server, response.getStatusCode());
                        return 1;
                    }
                    latch.countDown();
                    return 0;
                }

                @Override
                public STATE onContentWriteCompleted() {
                    return STATE.CONTINUE;
                }
            });

        }
				// 如果没有超过半数响应成功,则抛出异常
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }

        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}

元数据确认#onPublish()

从节点接收到元数据会把元数据同步到内存,但只有在元数据具有持久化标识时才会做持久化。

public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";

public static boolean matchEphemeralInstanceListKey(String key) {
    return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);
}
public static boolean matchPersistentKey(String key) {
	return !matchEphemeralKey(key);
}

从这里可以看到只有iplist是非持久化的,如实例本身元数据始终是需要持久化化。

而后Leader更新任期,从节点同步Leader的任期数据(这点上也标准的Raft也有所不同,标准的Raft只会再选举时更新任期)

public void onPublish(Datum datum, RaftPeer source) throws Exception {
		...
    // 发布节点的任期小于当前节点记录的任期直接拒绝
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
            JSON.toJSONString(source), JSON.toJSONString(local));
        throw new IllegalStateException("out of date publish, pub-term:"
            + source.term.get() + ", cur-term: " + local.term.get());
    }

    local.resetLeaderDue();

    // if data should be persistent, usually this is always true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }

    datums.put(datum.key, datum);

  	// Leader更新任期,从节点同步Leader的任期数据
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());

    notifier.addTask(datum.key, ApplyAction.CHANGE);

    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

总结

Nacos 1.x中对标准Raft算法做了一个简单的实现,并没有严格遵守它。而在Nacos2.x中废弃了这个实现,改用蚂蚁通用的JRaft。


网站公告

今日签到

点亮在社区的每一天
去签到