Spring Cloud Alibaba - Nacos健康检查

发布于:2025-02-10 ⋅ 阅读:(44) ⋅ 点赞:(0)

健康检查

在Nacos 2.x版本以后,持久实例不变,还是通过服务端主动下探机制,但是临时实例变成通过长连接来判断实例是否健康。

长连接: 一个连接上可以连续发送多数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包,在Nacos 2.x之后,使用Grpc协议代替了http协议,长连接会保持客户端和服务端发送的状态,在源码中ConnectionManager 管理所有客户端的长连接。ConnectionManager每3秒检测所有超过20S内没有发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在指定时间内成功响应,则检测通过,否则执行unregister方法移除Connection。
如果客户端持续和服务端进行通讯,服务端是不需要主动下探的,只有当客户端没有一直和服务端通信的时候,服务端才会主动下探操作。

ConnectionManager类的源码开始分析:

/**
 * 应用启动的时候执行,首次执行延迟1s,运行中周期为3秒执行一次
 * Start Task:Expel the connection which active Time expire.
 */
@PostConstruct
public void start() {
    // 初始化runtimeConnectionEjector为NacosRuntimeConnectionEjector
    initConnectionEjector();
    // 开始执行不健康连接的剔除任务
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
        // 调用com.alibaba.nacos.core.remote.NacosRuntimeConnectionEjector.doEject
        runtimeConnectionEjector.doEject();
    }, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
    public void doEject() {
        // remove out dated connection
        ejectOutdatedConnection();
        // remove overload connection
        ejectOverLimitConnection();
    }
private void ejectOutdatedConnection() {
        try {
            
            Loggers.CONNECTION.info("Connection check task start");
            
            Map<String, Connection> connections = connectionManager.connections;
            int totalCount = connections.size();
            int currentSdkClientCount = connectionManager.currentSdkClientCount();
            
            Loggers.CONNECTION.info("Long connection metrics detail ,Total count ={}, sdkCount={},clusterCount={}",
                    totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount));
            
            Set<String> outDatedConnections = new HashSet<>();
            long now = System.currentTimeMillis();
            //outdated connections collect.
            for (Map.Entry<String, Connection> entry : connections.entrySet()) {
                Connection client = entry.getValue();
                 // client.getMetaInfo().getLastActiveTime(): 客户端最近一次活跃时间
            	// 客户端最近一次活跃时间距离当前时间超过20s的客户端,服务端会发起请求探活,如果失败或者超过指定时间内未响应则剔除服务。
                if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
                    outDatedConnections.add(client.getMetaInfo().getConnectionId());
                } else if (client.getMetaInfo().pushQueueBlockTimesLastOver(300 * 1000)) {
                    outDatedConnections.add(client.getMetaInfo().getConnectionId());
                }
            }
            
            // check out date connection
            Loggers.CONNECTION.info("Out dated connection ,size={}", outDatedConnections.size());
            if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                Set<String> successConnections = new HashSet<>();
                final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                for (String outDateConnectionId : outDatedConnections) {
                    try {
                        Connection connection = connectionManager.getConnection(outDateConnectionId);
                        if (connection != null) {
                            ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                            connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                @Override
                                public Executor getExecutor() {
                                    return null;
                                }
                                
                                @Override
                                public long getTimeout() {
                                    return 5000L;
                                }
                                
                                @Override
                                public void onResponse(Response response) {
                                    latch.countDown();
                                    if (response != null && response.isSuccess()) {
                                    // 探活成功,更新最近活跃时间,然后加入到探活成功的集合中
                                        connection.freshActiveTime();
                                        successConnections.add(outDateConnectionId);
                                    }
                                }
                                
                                @Override
                                public void onException(Throwable e) {
                                    latch.countDown();
                                }
                            });
                            
                            Loggers.CONNECTION.info("[{}]send connection active request ", outDateConnectionId);
                        } else {
                            latch.countDown();
                        }
                        
                    } catch (ConnectionAlreadyClosedException e) {
                        latch.countDown();
                    } catch (Exception e) {
                        Loggers.CONNECTION.error("[{}]Error occurs when check client active detection ,error={}",
                                outDateConnectionId, e);
                        latch.countDown();
                    }
                }
                
                latch.await(5000L, TimeUnit.MILLISECONDS);
                Loggers.CONNECTION.info("Out dated connection check successCount={}", successConnections.size());
                
                for (String outDateConnectionId : outDatedConnections) {
                 // 不在探活成功的集合,说明探活失败,执行注销连接操作
                    if (!successConnections.contains(outDateConnectionId)) {
                        Loggers.CONNECTION.info("[{}]Unregister Out dated connection....", outDateConnectionId);
                        connectionManager.unregister(outDateConnectionId);
                    }
                }
            }
            
            Loggers.CONNECTION.info("Connection check task end");
            
        } catch (Throwable e) {
            Loggers.CONNECTION.error("Error occurs during connection check... ", e);
        }
    }
// 注销过期连接
connectionManager.unregister(outDateConnectionId);
 
public synchronized void unregister(String connectionId) {
    // 根据connectionId从连接集合中移除这个连接
    // Map<String, Connection> connections = new ConcurrentHashMap<>();
    Connection remove = this.connections.remove(connectionId);
    // 移除成功
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
 
        // 通知其它客户端,这个连接断开了
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}
public void notifyClientDisConnected(final Connection connection) {
    
    for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
        try {
            clientConnectionEventListener.clientDisConnected(connection);
        } catch (Throwable throwable) {
            Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                    clientConnectionEventListener.getName(), throwable);
        }
    }
    
}
public void clientDisConnected(Connection connect) {
    clientDisconnected(connect.getMetaInfo().getConnectionId());
}
 
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    boolean isResponsible = isResponsibleClient(client);
    // 发布客户端释放连接事件
    /**
     * 具体处理是在:{@link com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager.onEvent}
     * 主要做了下面几个事情:
     * 1、从订阅者列表中移除所有服务对这个客户端的引用
     * 2、从发布者列表中移除所有服务对这个客户端的引用
     */
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientReleaseEvent(client, isResponsible));
 
    // 发布客户端断开连接事件
    /**
     * 具体处理是在:{@link com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager.onEvent}
     * 主要做了下面几个事情:
     * 1、将服务实例元数据添加到过期集合中
     */
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsible));
    return true;
}

如上代码,比较容易看懂,总体逻辑就是:

1、拿到当前所有的连接;
2、循环判断每个连接,判断下最近一次活跃时间距离当前时间,是不是超过20s,如果超过20s,将连接ID加入到一个过期连接集合中放着;
3、循环过期连接集合中的每个连接,Nacos服务端主动发起一个探活,如果探活成功,将连接ID加入到探活成功的集合中;
4、比较过期连接集合、探活成功集合,两者的差集,就是真正探活失败,需要剔除的那些连接,将会执行注销连接操作;

服务剔除

前面健康检查我们主要分析了ConnectionBasedClientManager这个类,细心的朋友可能会发现ConnectionBasedClientManager的构造方法其实启动了一个定时任务,如下所示:

public ConnectionBasedClientManager() {
    // 启动了一个定时任务,无延迟,每隔5s执行一次
    // 具体就是执行ExpiredClientCleaner.run()方法
    GlobalExecutor
            .scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL,
                    TimeUnit.MILLISECONDS);
}

这个定时任务,每隔5s就会执行一次,具体就是执行ExpiredClientCleaner.run()方法:

private static class ExpiredClientCleaner implements Runnable {
    
    private final ConnectionBasedClientManager clientManager;
    
    public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {
        this.clientManager = clientManager;
    }
    
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            // 判断客户端是否超时
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && client.isExpire(currentTime)) {
                // 超时连接处理
                clientManager.clientDisconnected(each);
            }
        }
    }
}

上面这个clientManager.clientDisconnected(each)超时连接处理,我们在前面已经分析过了,这里不再分析,关键的逻辑就是发布了两个事件:客户端释放连接事件、客户端断开连接事件。


网站公告

今日签到

点亮在社区的每一天
去签到