获取服务列表
ServiceInstanceListSupplier
ServiceInstanceListSupplier
接口是一个提供 ServiceInstance
列表的供应者,返回一个响应式流 Flux<List<ServiceInstance>>
,用于服务发现。
public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {
String getServiceId();
default Flux<List<ServiceInstance>> get(Request request) {
return get();
}
static ServiceInstanceListSupplierBuilder builder() {
return new ServiceInstanceListSupplierBuilder();
}
}
DelegatingServiceInstanceListSupplier
DelegatingServiceInstanceListSupplier
是一个抽象类,继承自 ServiceInstanceListSupplier
,它通过委托给另一个 ServiceInstanceListSupplier
实例来实现其功能,同时支持选定服务实例的回调、初始化和销毁操作。
public abstract class DelegatingServiceInstanceListSupplier implements ServiceInstanceListSupplier, SelectedInstanceCallback, InitializingBean, DisposableBean {
protected final ServiceInstanceListSupplier delegate;
public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {
Assert.notNull(delegate, "delegate may not be null");
this.delegate = delegate;
}
public ServiceInstanceListSupplier getDelegate() {
return delegate;
}
@Override
public String getServiceId() {
return delegate.getServiceId();
}
@Override
public void selectedServiceInstance(ServiceInstance serviceInstance) {
if (delegate instanceof SelectedInstanceCallback selectedInstanceCallbackDelegate) {
selectedInstanceCallbackDelegate.selectedServiceInstance(serviceInstance);
}
}
@Override
public void afterPropertiesSet() throws Exception {
if (delegate instanceof InitializingBean) {
((InitializingBean) delegate).afterPropertiesSet();
}
}
@Override
public void destroy() throws Exception {
if (delegate instanceof DisposableBean) {
((DisposableBean) delegate).destroy();
}
}
}
负载均衡实现
ReactorLoadBalancer
ReactorLoadBalancer
是基于 Reactor 实现的响应式负载均衡器,通过 Mono<Response<T>>
异步选择服务实例。
public interface ReactorLoadBalancer<T> extends ReactiveLoadBalancer<T> {
@SuppressWarnings("rawtypes")
Mono<Response<T>> choose(Request request);
default Mono<Response<T>> choose() {
return choose(REQUEST);
}
}
ReactorServiceInstanceLoadBalancer
ReactorServiceInstanceLoadBalancer
是一个标记接口,继承自 ReactorLoadBalancer
,专门用于选择 ServiceInstance
对象的负载均衡器。
// RandomLoadBalancer, RoundRobinLoadBalancer
public interface ReactorServiceInstanceLoadBalancer extends ReactorLoadBalancer<ServiceInstance> {}
核心代码逻辑
推荐阅读:[spring-cloud: @LoadBalanced & @LoadBalancerClient]-源码分析
1. BlockingLoadBalancerInterceptor
// LoadBalancerInterceptor, RetryLoadBalancerInterceptor
public interface BlockingLoadBalancerInterceptor extends ClientHttpRequestInterceptor {}
LoadBalancerInterceptor
public class LoadBalancerInterceptor implements BlockingLoadBalancerInterceptor {
private final LoadBalancerClient loadBalancer;
private final LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
// 重点!
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
}
2. BlockingLoadBalancerClient
ServiceInstanceChooser
ServiceInstanceChooser
接口用于通过负载均衡器选择与指定服务ID对应的服务实例,支持带请求上下文的选择。
public interface ServiceInstanceChooser {
ServiceInstance choose(String serviceId);
<T> ServiceInstance choose(String serviceId, Request<T> request);
}
LoadBalancerClient
LoadBalancerClient
接口用于客户端负载均衡,选择服务实例并执行请求,同时提供将逻辑服务名重构为实际服务实例的 URI 的功能。
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
// BlockingLoadBalancerClientAutoConfiguration
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {
// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration
// LoadBalancerClientFactory
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
// 重点!
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
String hint = getHint(serviceId);
LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request, buildRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 通过 choose 方法来选择一个合适的 ServiceInstance
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
return execute(serviceId, serviceInstance, lbRequest);
}
private <T> TimedRequestContext buildRequestContext(LoadBalancerRequest<T> delegate, String hint) {
if (delegate instanceof HttpRequestLoadBalancerRequest) {
HttpRequest request = ((HttpRequestLoadBalancerRequest) delegate).getHttpRequest();
if (request != null) {
RequestData requestData = new RequestData(request);
return new RequestDataContext(requestData, hint);
}
}
return new DefaultRequestContext(delegate, hint);
}
// 通过生命周期钩子函数来管理负载均衡请求的开始与结束,并处理可能的异常,确保负载均衡的执行过程有序
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
if (serviceInstance == null) {
throw new IllegalArgumentException("Service Instance cannot be null, serviceId: " + serviceId);
}
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
try {
T response = request.apply(serviceInstance);
Object clientResponse = getClientResponse(response);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));
return response;
}
catch (IOException iOException) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
throw iOException;
}
catch (Exception exception) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
ReflectionUtils.rethrowRuntimeException(exception);
}
return null;
}
private <T> Object getClientResponse(T response) {
ClientHttpResponse clientHttpResponse = null;
if (response instanceof ClientHttpResponse) {
clientHttpResponse = (ClientHttpResponse) response;
}
if (clientHttpResponse != null) {
try {
return new ResponseData(clientHttpResponse, null);
}
catch (IOException ignored) {
}
}
return response;
}
private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {
return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
DefaultRequestContext.class, Object.class, ServiceInstance.class);
}
@Override
public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, REQUEST);
}
// 重点!通过负载均衡器同步选择一个服务实例并返回
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
private String getHint(String serviceId) {
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
String defaultHint = properties.getHint().getOrDefault("default", "default");
String hintPropertyValue = properties.getHint().get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}
3. LoadBalancerRequestFactory
LoadBalancerRequestFactory
类用于创建封装负载均衡请求的 LoadBalancerRequest
实例,支持请求转换器和负载均衡客户端的配置。
public class LoadBalancerRequestFactory {
private final LoadBalancerClient loadBalancer;
private final List<LoadBalancerRequestTransformer> transformers;
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
List<LoadBalancerRequestTransformer> transformers) {
this.loadBalancer = loadBalancer;
this.transformers = transformers;
}
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
transformers = new ArrayList<>();
}
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return new BlockingLoadBalancerRequest(loadBalancer, transformers,
new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));
}
}
LoadBalancerRequestTransformer
LoadBalancerRequestTransformer
接口允许在负载均衡过程中根据不同的服务实例自定义转换 HttpRequest
,如修改请求头、URL 等,同时通过 @Order
注解控制其执行顺序。
@Order(LoadBalancerRequestTransformer.DEFAULT_ORDER)
public interface LoadBalancerRequestTransformer {
int DEFAULT_ORDER = 0;
HttpRequest transformRequest(HttpRequest request, ServiceInstance instance);
}
4. BlockingLoadBalancerRequest
BlockingLoadBalancerRequest
类实现了负载均衡请求接口,负责将原始 HTTP 请求封装为负载均衡请求,并支持应用请求转换器和执行负载均衡操作。
class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {
private final LoadBalancerClient loadBalancer;
private final List<LoadBalancerRequestTransformer> transformers;
private final ClientHttpRequestData clientHttpRequestData;
BlockingLoadBalancerRequest(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers,
ClientHttpRequestData clientHttpRequestData) {
this.loadBalancer = loadBalancer;
this.transformers = transformers;
this.clientHttpRequestData = clientHttpRequestData;
}
@Override
public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);
}
@Override
public HttpRequest getHttpRequest() {
return clientHttpRequestData.request;
}
static class ClientHttpRequestData {
private final HttpRequest request;
private final byte[] body;
private final ClientHttpRequestExecution execution;
ClientHttpRequestData(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {
this.request = request;
this.body = body;
this.execution = execution;
}
}
}