文章目录
前言
本文对 nacos 客户端的重新连接nacos 服务端的4中场景进行介绍,环境:客户端版本2.2.1,客户端将springboot 3.0.13。
一、客户度重连:
1.1 StreamObserver 感知通道的断开:
双向流StreamObserver 在 onError 感知到断开并进行处理:
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
onError 细节 发现客户端还在运行,此时向重连队列中放入一个服务端地址为空的数据
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
grpcConn.getConnectionId(), throwable);
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 向重连队列中放数据
switchServerAsync();
}
} else {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
switchServerAsync 对放入的数据进行包装:
public void switchServerAsync() {
switchServerAsync(null, false);
}
protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
}
start 的方法 获取reconnectionSignal 中的元素数据进行重连处理:如果没有获取到数据说明不需要进行重连,获取到数据进行重连
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
// 不需要断线重连,只需要检查 客户端与服务端的通道更新时间 默认5s
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
// 大于5s,进行心跳监测
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Server healthy check fail, currentConnection = {}",
rpcClientConfig.name(), currentConnection.getConnectionId());
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// 如果不健康,此处cas 修改客户端的健康值
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
// reconnectContext 对象创建 null:服务端地址为空;false :是否访问接口标识
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
// 这里是连接重置逻辑,在本文后面的章节进行介绍
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list, ignore recommend server {}",
rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
reconnectContext.serverInfo = null;
}
}
// 这里进入到连接重连的方法
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
reconnect 顾名思义 客户端和服务端进行重新连接:通过connectToServer 进行重新连接,连接成功获取到新的connectId,然后对旧的的连接进行释放(发送连接断开事件),并保存新的连接(发送连接建立事件)
/**
* switch server .
*/
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
try {
AtomicReference<ServerInfo> recommendServer = new AtomicReference<>(recommendServerInfo);
if (onRequestFail && healthCheck()) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success, currentServer is {} ",
rpcClientConfig.name(), currentConnection.serverInfo.getAddress());
rpcClientStatus.set(RpcClientStatus.RUNNING);
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to reconnect to a new server, server is {}",
rpcClientConfig.name(), recommendServerInfo == null ? " not appointed, will choose a random server."
: (recommendServerInfo.getAddress() + ", will try it once."));
// loop until start client success.
boolean switchSuccess = false;
int reConnectTimes = 0;
int retryTurns = 0;
Exception lastException;
while (!switchSuccess && !isShutdown()) {
// 1.get a new server 如果 recommendServer.get() 是null 则 通过nextRpcServer 去随机获取下个index 的服务端地址
// 如果不是null ,则是连接重置(改连接的ip 和端口 是nacos 服务端通过通道主动发送给nacos的客户端的,本文稍后章节进行介绍)
ServerInfo serverInfo = null;
try {
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
// 2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
rpcClientConfig.name(), serverInfo.getAddress(),
connectionNew.getConnectionId());
// successfully create a new connect.
// 成功建立了新的连接,将旧的连接进行资源释放
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Abandon prev connection, server is {}, connectionId is {}",
rpcClientConfig.name(), currentConnection.serverInfo.getAddress(),
currentConnection.getConnectionId());
// set current connection to enable connection event.
// 将旧的连接进行资源释放
currentConnection.setAbandon(true);
closeConnection(currentConnection);
}
currentConnection = connectionNew;
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
// 发送连接建立事件
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
return;
}
// close connection if client is already shutdown.
if (isShutdown()) {
closeConnection(currentConnection);
}
lastException = null;
} catch (Exception e) {
lastException = e;
} finally {
recommendServer.set(null);
}
if (CollectionUtils.isEmpty(RpcClient.this.serverListFactory.getServerList())) {
throw new Exception("server list is empty");
}
// 延时重试,最大延时时间 5s
if (reConnectTimes > 0
&& reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}",
rpcClientConfig.name(), reConnectTimes, serverInfo,
lastException == null ? "unknown" : lastException);
if (Integer.MAX_VALUE == retryTurns) {
retryTurns = 50;
} else {
retryTurns++;
}
}
reConnectTimes++;
try {
// sleep x milliseconds to switch next server.
if (!isRunning()) {
// first round, try servers at a delay 100ms;second round, 200ms; max delays 5s. to be reconsidered.
Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
}
} catch (InterruptedException e) {
// Do nothing.
// set the interrupted flag
Thread.currentThread().interrupt();
}
}
if (isShutdown()) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown, stop reconnect to server",
rpcClientConfig.name());
}
} catch (Exception e) {
LoggerUtils
.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", rpcClientConfig.name(),
e);
}
}
1.2 客户端发送request 请求失败触发重连:
客户端在发送request 的简单请求时,发送失败,调用switchServerAsyncOnRequestFail 方法,放入一个服务端地址为空的元素到reconnectionSignal 队列,后续线程在消费reconnectionSignal 队列元素时 ,进入重连的逻辑
public void switchServerAsyncOnRequestFail() {
switchServerAsync(null, true);
}
public void switchServerAsync() {
switchServerAsync(null, false);
}
protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
}
1.3 心跳监测失败 触发重连:
boolean isHealthy = healthCheck(); 方法返后回false ,将服务端的状态设置为不健康,放入一个服务端地址为空的元素到reconnectionSignal 队列,后续线程在消费reconnectionSignal 队列元素时 ,进入重连的逻辑
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
// 重连
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
1.4 连接重置:
服务端发现当前节点连接的客户端过多,则发送给客户端连接重置请求, 并给予要连接的服务端的ip 和端口:
nacos客户端接收服务端连接重置事件:
class ConnectResetRequestHandler implements ServerRequestHandler {
@Override
public Response requestReply(Request request, Connection connection) {
if (request instanceof ConnectResetRequest) {
try {
synchronized (RpcClient.this) {
if (isRunning()) {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp())) {
ServerInfo serverInfo = resolveServerInfo(
connectResetRequest.getServerIp() + Constants.COLON + connectResetRequest
.getServerPort());
// 获取到服务端建议连接的 地址和端口,然后将服务地址信息放入到reconnectionSignal 队列
switchServerAsync(serverInfo, false);
} else {
switchServerAsync();
}
afterReset(connectResetRequest);
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Switch server error, {}", rpcClientConfig.name(), e);
}
return new ConnectResetResponse();
}
return null;
}
}
二、服务端感知客户端的断线:
服务端AddressTransportFilter 下的 transportTerminated(Attributes transportAttrs) 感知到连接的端口并进行处理;
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
// 获取连接的id
connectionId = transportAttrs.get(ATTR_TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = {} ", connectionId);
// 服务端删除客户端改连接的信息
connectionManager.unregister(connectionId);
}
}
/**
* unregister a connection .
*
* @param connectionId connectionId.
*/
public synchronized void unregister(String connectionId) {
// Map<String, Connection> connections = new ConcurrentHashMap<>();
//根据客户端id 移除连接
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
// 获取客户端对应的ip
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
// 同一台客户端电脑注册到服务端的应用数量--
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
// 连接关闭
remove.close();
LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
总结
本文对Nacos客户端重连的4中场景和服务端的处理进行介绍。