源码篇--Nacos服务--中章(7):Nacos客户端重连的4中场景

发布于:2024-04-30 ⋅ 阅读:(40) ⋅ 点赞:(0)


前言

本文对 nacos 客户端的重新连接nacos 服务端的4中场景进行介绍,环境:客户端版本2.2.1,客户端将springboot 3.0.13。


一、客户度重连:

1.1 StreamObserver 感知通道的断开:

双向流StreamObserver 在 onError 感知到断开并进行处理:

StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

onError 细节 发现客户端还在运行,此时向重连队列中放入一个服务端地址为空的数据

@Override
public void onError(Throwable throwable) {
    boolean isRunning = isRunning();
    boolean isAbandon = grpcConn.isAbandon();
    if (isRunning && !isAbandon) {
        LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
                grpcConn.getConnectionId(), throwable);
        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
			// 向重连队列中放数据            
			switchServerAsync();
        }

    } else {
        LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
                grpcConn.getConnectionId(), isRunning, isAbandon);
    }

}

switchServerAsync 对放入的数据进行包装:

public void switchServerAsync() {
    switchServerAsync(null, false);
}

protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
}

start 的方法 获取reconnectionSignal 中的元素数据进行重连处理:如果没有获取到数据说明不需要进行重连,获取到数据进行重连

clientEventExecutor.submit(() -> {
    while (true) {
        try {
            if (isShutdown()) {
                break;
            }
            ReconnectContext reconnectContext = reconnectionSignal
                    .poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
            if (reconnectContext == null) {
                // check alive time.
                // 不需要断线重连,只需要检查 客户端与服务端的通道更新时间 默认5s
                if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
                    // 大于5s,进行心跳监测
                    boolean isHealthy = healthCheck();
                    if (!isHealthy) {
                        if (currentConnection == null) {
                            continue;
                        }
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                "[{}] Server healthy check fail, currentConnection = {}",
                                rpcClientConfig.name(), currentConnection.getConnectionId());
                        
                        RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                        if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                            break;
                        }
                        // 如果不健康,此处cas 修改客户端的健康值
                        boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
                                .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                        if (statusFLowSuccess) {
                        	// reconnectContext  对象创建 null:服务端地址为空;false :是否访问接口标识
                            reconnectContext = new ReconnectContext(null, false);
                        } else {
                            continue;
                        }
                        
                    } else {
                        lastActiveTimeStamp = System.currentTimeMillis();
                        continue;
                    }
                } else {
                    continue;
                }
                
            }
            
            if (reconnectContext.serverInfo != null) {
                // clear recommend server if server is not in server list.
                // 这里是连接重置逻辑,在本文后面的章节进行介绍
                boolean serverExist = false;
                for (String server : getServerListFactory().getServerList()) {
                    ServerInfo serverInfo = resolveServerInfo(server);
                    if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                        serverExist = true;
                        reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                        break;
                    }
                }
                if (!serverExist) {
                    LoggerUtils.printIfInfoEnabled(LOGGER,
                            "[{}] Recommend server is not in server list, ignore recommend server {}",
                            rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
                    
                    reconnectContext.serverInfo = null;
                    
                }
            }
            // 这里进入到连接重连的方法
            reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
        } catch (Throwable throwable) {
            // Do nothing
        }
    }
});

reconnect 顾名思义 客户端和服务端进行重新连接:通过connectToServer 进行重新连接,连接成功获取到新的connectId,然后对旧的的连接进行释放(发送连接断开事件),并保存新的连接(发送连接建立事件)

/**
 * switch server .
 */
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    
    try {
        
        AtomicReference<ServerInfo> recommendServer = new AtomicReference<>(recommendServerInfo);
        if (onRequestFail && healthCheck()) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success, currentServer is {} ",
                    rpcClientConfig.name(), currentConnection.serverInfo.getAddress());
            rpcClientStatus.set(RpcClientStatus.RUNNING);
            return;
        }
        
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to reconnect to a new server, server is {}",
                rpcClientConfig.name(), recommendServerInfo == null ? " not appointed, will choose a random server."
                        : (recommendServerInfo.getAddress() + ", will try it once."));
        
        // loop until start client success.
        boolean switchSuccess = false;
        
        int reConnectTimes = 0;
        int retryTurns = 0;
        Exception lastException;
        while (!switchSuccess && !isShutdown()) {
            
            // 1.get a new server  如果 recommendServer.get() 是null 则 通过nextRpcServer 去随机获取下个index 的服务端地址
            // 如果不是null ,则是连接重置(改连接的ip 和端口 是nacos 服务端通过通道主动发送给nacos的客户端的,本文稍后章节进行介绍)
            ServerInfo serverInfo = null;
            try {
                serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
                // 2.create a new channel to new server
                Connection connectionNew = connectToServer(serverInfo);
                if (connectionNew != null) {
                    LoggerUtils
                            .printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
                                    rpcClientConfig.name(), serverInfo.getAddress(),
                                    connectionNew.getConnectionId());
                    // successfully create a new connect.
                    // 成功建立了新的连接,将旧的连接进行资源释放
                    if (currentConnection != null) {
                        LoggerUtils.printIfInfoEnabled(LOGGER,
                                "[{}] Abandon prev connection, server is {}, connectionId is {}",
                                rpcClientConfig.name(), currentConnection.serverInfo.getAddress(),
                                currentConnection.getConnectionId());
                        // set current connection to enable connection event.
                        // 将旧的连接进行资源释放
                        currentConnection.setAbandon(true);
                        closeConnection(currentConnection);
                    }
                    currentConnection = connectionNew;
                    rpcClientStatus.set(RpcClientStatus.RUNNING);
                    switchSuccess = true;
                    // 发送连接建立事件
                    eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
                    return;
                }
                
                // close connection if client is already shutdown.
                if (isShutdown()) {
                    closeConnection(currentConnection);
                }
                
                lastException = null;
                
            } catch (Exception e) {
                lastException = e;
            } finally {
                recommendServer.set(null);
            }
            
            if (CollectionUtils.isEmpty(RpcClient.this.serverListFactory.getServerList())) {
                throw new Exception("server list is empty");
            }
            // 延时重试,最大延时时间 5s
            if (reConnectTimes > 0
                    && reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
                LoggerUtils.printIfInfoEnabled(LOGGER,
                        "[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}",
                        rpcClientConfig.name(), reConnectTimes, serverInfo,
                        lastException == null ? "unknown" : lastException);
                if (Integer.MAX_VALUE == retryTurns) {
                    retryTurns = 50;
                } else {
                    retryTurns++;
                }
            }
            
            reConnectTimes++;
            
            try {
                // sleep x milliseconds to switch next server.
                if (!isRunning()) {
                    // first round, try servers at a delay 100ms;second round, 200ms; max delays 5s. to be reconsidered.
                    Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
                }
            } catch (InterruptedException e) {
                // Do nothing.
                // set the interrupted flag
                Thread.currentThread().interrupt();
            }
        }
        
        if (isShutdown()) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown, stop reconnect to server",
                    rpcClientConfig.name());
        }
        
    } catch (Exception e) {
        LoggerUtils
                .printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", rpcClientConfig.name(),
                        e);
    }
}

1.2 客户端发送request 请求失败触发重连:

客户端在发送request 的简单请求时,发送失败,调用switchServerAsyncOnRequestFail 方法,放入一个服务端地址为空的元素到reconnectionSignal 队列,后续线程在消费reconnectionSignal 队列元素时 ,进入重连的逻辑

public void switchServerAsyncOnRequestFail() {
    switchServerAsync(null, true);
}

public void switchServerAsync() {
    switchServerAsync(null, false);
}

protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
}

1.3 心跳监测失败 触发重连:

boolean isHealthy = healthCheck(); 方法返后回false ,将服务端的状态设置为不健康,放入一个服务端地址为空的元素到reconnectionSignal 队列,后续线程在消费reconnectionSignal 队列元素时 ,进入重连的逻辑

boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
        .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
    reconnectContext = new ReconnectContext(null, false);
} else {
    continue;
}
// 重连
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);

1.4 连接重置:

服务端发现当前节点连接的客户端过多,则发送给客户端连接重置请求, 并给予要连接的服务端的ip 和端口:

nacos客户端接收服务端连接重置事件:

class ConnectResetRequestHandler implements ServerRequestHandler {
    
    @Override
    public Response requestReply(Request request, Connection connection) {
        
        if (request instanceof ConnectResetRequest) {
            
            try {
                synchronized (RpcClient.this) {
                    if (isRunning()) {
                        ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
                        if (StringUtils.isNotBlank(connectResetRequest.getServerIp())) {
                            ServerInfo serverInfo = resolveServerInfo(
                                    connectResetRequest.getServerIp() + Constants.COLON + connectResetRequest
                                            .getServerPort());
                            // 获取到服务端建议连接的 地址和端口,然后将服务地址信息放入到reconnectionSignal  队列 
                            switchServerAsync(serverInfo, false);
                        } else {
                            switchServerAsync();
                        }
                        afterReset(connectResetRequest);
                    }
                }
            } catch (Exception e) {
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Switch server error, {}", rpcClientConfig.name(), e);
            }
            return new ConnectResetResponse();
        }
        return null;
    }
}

二、服务端感知客户端的断线:

服务端AddressTransportFilter 下的 transportTerminated(Attributes transportAttrs) 感知到连接的端口并进行处理;

@Override
public void transportTerminated(Attributes transportAttrs) {
    String connectionId = null;
    try {
		// 获取连接的id
        connectionId = transportAttrs.get(ATTR_TRANS_KEY_CONN_ID);
    } catch (Exception e) {
        // Ignore
    }
    if (StringUtils.isNotBlank(connectionId)) {
        Loggers.REMOTE_DIGEST
                .info("Connection transportTerminated,connectionId = {} ", connectionId);
		// 服务端删除客户端改连接的信息
        connectionManager.unregister(connectionId);
    }
}

/**
 * unregister a connection .
 *
 * @param connectionId connectionId.
 */
public synchronized void unregister(String connectionId) {
	//  Map<String, Connection> connections = new ConcurrentHashMap<>();
	//根据客户端id 移除连接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
		// 获取客户端对应的ip
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
			// 同一台客户端电脑注册到服务端的应用数量--
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
		// 连接关闭
        remove.close();
        LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

总结

本文对Nacos客户端重连的4中场景和服务端的处理进行介绍。


网站公告

今日签到

点亮在社区的每一天
去签到