【深入解析spring cloud gateway】13 Reactive Feign的使用

发布于:2024-04-19 ⋅ 阅读:(32) ⋅ 点赞:(0)

问题引入

在gateway中如果使用feignClient的话,会报如下错误

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.15.jar:3.4.15]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP GET "/get" [ExceptionHandlingWebHandler]


其报错的原因是:网关的reactive线程模型,并不支持像openfeign这样的阻塞IO的操作。
网上给出了一种解决方案

解决方案之一

方案1:自定义一个BlockingLoadBalancerClient.java Bean覆盖原有Bean

step1:创建自定义类CustomBlockingLoadBalancerClient.java
CustomBlockingLoadBalancerClient.java继承BlockingLoadBalancerClient.java,并重写方法BlockingLoadBalancerClient#choose(java.lang.String, org.springframework.cloud.client.loadbalancer.Request)

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer;
import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletableFuture;

/**
 * @Author: ekko
 * @Description: 自定义CustomBlockingLoadBalancerClient.java
 * @Date: 2023/9/20 16:16
 */
public class CustomBlockingLoadBalancerClient extends BlockingLoadBalancerClient {
    private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

    public CustomBlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory, LoadBalancerProperties properties) {
        super(loadBalancerClientFactory, properties);
        this.loadBalancerClientFactory = loadBalancerClientFactory;
    }

    @Override
    public <T> ServiceInstance choose(String serviceId, Request<T> request) {
        ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
        if (loadBalancer == null) {
            return null;
        }
        CompletableFuture<Response<ServiceInstance>> f = CompletableFuture.supplyAsync(() -> {
            Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
            return loadBalancerResponse;
        });
        Response<ServiceInstance> loadBalancerResponse = null;


        try {
            loadBalancerResponse = f.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (loadBalancerResponse == null) {
            return null;
        }
        return loadBalancerResponse.getServer();
    }
}

step2: 创建BlockingLoadBalancerClient类
将自定义CustomBlockingLoadBalancerClient注入到容器中

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: ekko
 * @Description: BlockingLoadBalancerClient配置类,创建自定义的BlockingLoadBalancerClient Bean
 * @Date: 2023/9/20 16:31
 */
@Configuration
public class BlockingLoadBalancerClientConfig {

    @Autowired
    LoadBalancerClientFactory loadBalancerClientFactory;

    @Autowired
    LoadBalancerProperties properties;

    @Bean
    public LoadBalancerClient BlockingLoadBalancerClient() {
        return new CustomBlockingLoadBalancerClient(loadBalancerClientFactory, properties);
    }
}

这种方式可以解决报错的问题。BUT,在gateway网关中强行使用feignClient,同步调用,其实是有风险的一个事情。假设feignClient的下游服务,由于某些原因导致性能变慢。而gateway是同步阻塞式的调用。那么gateway的主线程也会被阻塞。由于gateway底层实际上就是netty的线程池,有两个线程池(主从多线程模型)这种模型使用一个独立的线程池来处理连接请求(Acceptor),而I/O操作则由另一个线程池处理。这种方式可以减少连接请求处理对I/O操作的干扰,提高系统并发性能。
在这里插入图片描述

更优雅的解决方案-feign-reactive

好了,切入正题,优雅的解决方案,当然还是回归到使用Reactive 异步调用的feign了。
看看官网是怎么说的:
https://docs.spring.io/spring-cloud-openfeign/reference/spring-cloud-openfeign.html

As the OpenFeign project does not currently support reactive clients, such as Spring WebClient, neither does Spring Cloud OpenFeign.

SinceSpring Cloud OpenFeign project is now considered feature-complete, we’re not planning on adding support even if it becomes available in the upstream project. We suggest migrating over to Spring Interface Clients instead. Both blocking and reactive stacks are supported there.

Until that is done, we recommend using feign-reactive for Spring WebClient support.

翻译一下就是,open-feign并不支持reactive异步客户端。如果要使用reactive 异步客户端,请移步 feign-reactive
网上关于feign-reactive这方面的介绍实在太少,这里做一下介绍吧。

引入依赖

 <artifactId>gateway</artifactId>
    <properties>
        <reactor.feign.version>3.2.6</reactor.feign.version>
    </properties>
    <dependencies>
        <!--reactivefeign-->
        <dependency>
            <groupId>com.playtika.reactivefeign</groupId>
            <artifactId>feign-reactor-spring-configuration</artifactId>
            <version>${reactor.feign.version}</version>
        </dependency>
        <dependency>
            <groupId>com.playtika.reactivefeign</groupId>
            <artifactId>feign-reactor-webclient</artifactId>
            <version>${reactor.feign.version}</version>
        </dependency>
        <dependency>
            <groupId>com.playtika.reactivefeign</groupId>
            <artifactId>feign-reactor-cloud</artifactId>
            <version>${reactor.feign.version}</version>
        </dependency>
        <!--reactivefeign-->

定义reactivefeign的client接口

@ReactiveFeignClient(name = "hello-service")
public interface TestFeignClient {
    @RequestMapping(value = "/hello1", method = RequestMethod.GET)
    Mono<String> hello(@RequestParam("name") String name);
}

返回结果一定要用Mono包装。

其下游对应着一个hello-service的接口,如下所示

    @RequestMapping(value = "/hello1", method = RequestMethod.GET)
    public String hello(@RequestParam("name") String name) {
        return helloService.hello(name);
    }

启用reactivefeign的client接口

@Configuration(proxyBeanMethods = false)
@EnableReactiveFeignClients(clients = {TestFeignClient.class})
public class FeignClientsConfig {

}

在filter中调用feignClient

@Slf4j
public class FeignTestFilter implements GlobalFilter, Ordered {
    private final TestFeignClient testFeignClient;

    public FeignTestFilter(TestFeignClient testFeignClient) {
        this.testFeignClient = testFeignClient;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Mono<String> mono = testFeignClient.hello("zhangsan");
        return mono.doOnNext(e -> {
            log.info("feignClient请求结果是:" + e);
        }).then(chain.filter(exchange));
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

这里由于是reactive的写法。如果想先处理返回值 ,处理完后,再执行后续的filter,那么写法就如上面代码。
如果不太会这种写法,请参考Reactive 官网相关的文档

调用效果

通过网关访问任意下游的接口,日志输出如下
在这里插入图片描述

总结

  • 从使用方式来看,其实reactivefeign和openFeign很相似,RestFull的接口和我们平常写的注解一样。
  • 注意返回值需要用Mono封装。在处理请求结果时,也要用reactive的写法。
  • 通过这种reactive的写法,当我们下游feign调用的微服务变慢,并不会影响gateway的主线程,并不会拖垮网关