关键步骤:
Spring Boot 启动时创建
LettuceConnectionFactory
根据配置类型(集群/哨兵/单机)初始化客户端
对于集群模式:
创建
RedisClusterClient
调用
setOptions(getClusterClientOptions(configuration))
应用配置
2. 节点状态检查机制
拓扑刷新选项加载:
java
private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() { ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions(); return clusterClientOptions != null ? clusterClientOptions.getTopologyRefreshOptions() : FALLBACK_OPTIONS; }
状态检查触发条件:
java
private boolean isEnabled(RefreshTrigger refreshTrigger) { return getClusterTopologyRefreshOptions() .getAdaptiveRefreshTriggers() .contains(refreshTrigger); }
3. 失败重连原理
重连事件处理:
java
@Override public void onReconnectAttempt(int attempt) { if (isEnabled(RefreshTrigger.PERSISTENT_RECONNECTS) && attempt >= getRefreshTriggersReconnectAttempts()) { if (indicateTopologyRefreshSignal()) { emitAdaptiveRefreshScheduledEvent(); } } }
工作流程:
当发生网络断开时,Lettuce 自动尝试重连
每次重连尝试都会调用
onReconnectAttempt
方法检查是否启用了
PERSISTENT_RECONNECTS
触发器检查重连次数是否达到阈值(
refreshTriggersReconnectAttempts
)如果满足条件,触发拓扑刷新事件
4. 拓扑刷新执行过程
刷新激活机制:
java
private void activateTopologyRefreshIfNeeded() { if (getOptions() instanceof ClusterClientOptions) { ClusterClientOptions options = (ClusterClientOptions) getOptions(); ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions(); if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) { return; } if (clusterTopologyRefreshActivated.compareAndSet(false, true)) { // 创建定时刷新任务 ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate( clusterTopologyRefreshScheduler, options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS ); clusterTopologyRefreshFuture.set(scheduledFuture); } } }
刷新过程:
检查周期性刷新是否启用
确保只有一个刷新任务被激活(原子操作)
创建定时任务,按配置的时间间隔执行刷新
刷新时重新获取集群拓扑信息
更新客户端内部节点路由表
5. 其他触发机制
除了重连触发外,还有多种触发条件:
java
public enum RefreshTrigger { MOVED_REDIRECT, // MOVED 重定向 ASK_REDIRECT, // ASK 重定向 PERSISTENT_RECONNECTS, // 持续重连 UNCOVERED_SLOT, // 未覆盖的槽位 UNKNOWN_NODE // 未知节点 }
对应的事件处理方法:
java
@Override public void onMovedRedirection() { if (isEnabled(RefreshTrigger.MOVED_REDIRECT)) { if (indicateTopologyRefreshSignal()) { emitAdaptiveRefreshScheduledEvent(); } } }
6. 配置生效关键点
在 Spring Boot 中确保配置生效的关键:
集群模式必须启用:仅当使用集群配置时相关逻辑才会执行
正确设置 ClientOptions:通过
LettuceClientConfigurationBuilderCustomizer
注入配置拓扑刷新选项:必须包含
PERSISTENT_RECONNECTS
触发器设置合理的重连阈值:
adaptiveRefreshTriggersReconnectAttempts
7. 完整工作流程
图表
代码
最佳实践建议:
多触发器组合配置:同时启用多种触发器提高集群感知能力
java
.enableAdaptiveRefreshTrigger( RefreshTrigger.PERSISTENT_RECONNECTS, RefreshTrigger.MOVED_REDIRECT, RefreshTrigger.UNKNOWN_NODE )
合理设置重连阈值:根据网络环境调整
java
.adaptiveRefreshTriggersReconnectAttempts(3) // 生产环境建议 3-5
配置超时保护:防止拓扑刷新阻塞
java
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
启用周期性刷新:作为故障恢复的补充
java
.enablePeriodicRefresh(Duration.ofMinutes(10))
通过以上机制,Lettuce 能够在集群节点故障或网络问题时自动更新拓扑信息,保持客户端与集群的同步,实现高可用连接。
##配置重连
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Configuration;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
@Configuration
public class RedisClusterConfig {
@Bean
public LettuceClientConfiguration lettuceClientConfiguration() {
// 👇 配置集群拓扑刷新选项
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(true) // 开启周期刷新
.refreshPeriod(Duration.ofSeconds(30)) // 每30秒刷新一次
.enableAllAdaptiveRefreshTriggers()
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.ASK_REDIRECT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNKNOWN_NODE)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
.refreshTriggersReconnectAttempts(5)
.build();
return LettuceClientConfiguration.builder()
.clientOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.build())
.build();
}
@Bean
public LettuceConnectionFactory redisConnectionFactory(RedisProperties redisProperties, LettuceClientConfiguration lettuceClientConfiguration) {
RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(clusterProperties.getNodes());
if (clusterProperties.getMaxRedirects() != null) {
config.setMaxRedirects(clusterProperties.getMaxRedirects());
}
if (redisProperties.getPassword() != null) {
config.setPassword(RedisPassword.of(redisProperties.getPassword()));
}
return new LettuceConnectionFactory(config, lettuceClientConfiguration);
}
// 可选:配置 RedisTemplate
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
##源码
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
LettuceConnectionFactory redisConnectionFactory(
ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
ClientResources clientResources) throws UnknownHostException {
LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
getProperties().getLettuce().getPool());
return createLettuceConnectionFactory(clientConfig);
}
private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
if (getSentinelConfig() != null) {
return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);
}
if (getClusterConfiguration() != null) {
return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);
}
return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);
}
public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,
LettuceClientConfiguration clientConfig) {
this(clientConfig);
Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null!");
this.configuration = clusterConfiguration;
}
//createClient创建client
public void afterPropertiesSet() {
this.client = createClient();
this.connectionProvider = createConnectionProvider(client, CODEC);
this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);
if (isClusterAware()) {
this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
}
if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
}
}
//clusterClient设置configuration配置 clusterClient.setOptions(getClusterClientOptions(configuration));
protected AbstractRedisClient createClient() {
if (isStaticMasterReplicaAware()) {
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(RedisClient::create) //
.orElseGet(RedisClient::create);
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
return redisClient;
}
if (isRedisSentinelAware()) {
RedisURI redisURI = getSentinelRedisURI();
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClient.create(clientResources, redisURI)) //
.orElseGet(() -> RedisClient.create(redisURI));
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
return redisClient;
}
if (isClusterAware()) {
List<RedisURI> initialUris = new ArrayList<>();
ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;
for (RedisNode node : configuration.getClusterNodes()) {
initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort()));
}
RedisClusterClient clusterClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) //
.orElseGet(() -> RedisClusterClient.create(initialUris));
clusterClient.setOptions(getClusterClientOptions(configuration));
return clusterClient;
}
RedisURI uri = isDomainSocketAware()
? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket())
: createRedisURIAndApplySettings(getHostName(), getPort());
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClient.create(clientResources, uri)) //
.orElseGet(() -> RedisClient.create(uri));
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
return redisClient;
}
private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {
ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();
if (clusterClientOptions != null) {
return clusterClientOptions.getTopologyRefreshOptions();
}
return FALLBACK_OPTIONS;
}
public ClusterTopologyRefreshOptions getTopologyRefreshOptions() {
return topologyRefreshOptions;
}
private boolean isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {
return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger);
}
@Override
public void onReconnectAttempt(int attempt) {
if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
&& attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) {
if (indicateTopologyRefreshSignal()) {
emitAdaptiveRefreshScheduledEvent();
}
}
}
@Override
public void onMovedRedirection() {
if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)) {
if (indicateTopologyRefreshSignal()) {
emitAdaptiveRefreshScheduledEvent();
}
}
}
private void activateTopologyRefreshIfNeeded() {
if (getOptions() instanceof ClusterClientOptions) {
ClusterClientOptions options = (ClusterClientOptions) getOptions();
ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();
if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {
return;
}
if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {
ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,
options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);
clusterTopologyRefreshFuture.set(scheduledFuture);
}
}
}