【分布式】基于Dubbo实现对远程调用接口的封装

发布于:2025-06-21 ⋅ 阅读:(19) ⋅ 点赞:(0)

服务调用者

调用

在调用服务中 给被调服务添加@DubboReference(version = "1.0.0")注解

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("auth")
public class AuthController {

    @DubboReference(version = "1.0.0")
    private UserFacadeService userFacadeService;

    @GetMapping("/get")
    public String get(){
        UserQueryResponse response = userFacadeService.query(new UserQueryRequest());
        return response.getResponseMessage();
    }
}

统一包装工具 RemoteCallWrapper

public class RemoteCallWrapper {

    private static Logger logger = LoggerFactory.getLogger(RemoteCallWrapper.class);

    private static ImmutableSet<String> SUCCESS_CHECK_METHOD = ImmutableSet.of("isSuccess", "isSucceeded",
            "getSuccess");

    private static ImmutableSet<String> SUCCESS_CODE_METHOD = ImmutableSet.of("getResponseCode");

    private static ImmutableSet<String> SUCCESS_CODE = ImmutableSet.of("SUCCESS", "DUPLICATE",
            "DUPLICATED_REQUEST");

    public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse) {
        return call(function, request, request.getClass().getSimpleName(), checkResponse, false);
    }

    public static <T, R> R call(Function<T, R> function, T request) {
        return call(function, request, request.getClass().getSimpleName(), true, false);
    }

    public static <T, R> R call(Function<T, R> function, T request, String requestName) {
        return call(function, request, requestName, true, false);
    }

    public static <T, R> R call(Function<T, R> function, T request, String requestName,boolean checkResponse) {
        return call(function, request, requestName, checkResponse, false);
    }

    public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse, boolean checkResponseCode) {
        return call(function, request, request.getClass().getSimpleName(), checkResponse, checkResponseCode);
    }

    public static <T, R> R call(Function<T, R> function, T request, String requestName, boolean checkResponse,
                                boolean checkResponseCode) {
        StopWatch stopWatch = new StopWatch();
        R response = null;
        try {
            //计时器
            stopWatch.start();
            //实际的远程调用方法
            response = function.apply(request);
            stopWatch.stop();

            //响应有效性检验
            if (checkResponse) {
                //fail-fast 非空校验
                Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());
                if (!isResponseValid(response)) {
                    logger.error("Response Invalid on Remote Call request {} , response {}",
                            JSON.toJSONString(request),
                            JSON.toJSONString(response));

                    throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);
                }
            }
            //响应状态码有效性检验
            if (checkResponseCode) {
                Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());
                if (!isResponseCodeValid(response)) {
                    logger.error("Response code Invalid on Remote Call request {} , response {}",
                            JSON.toJSONString(request),
                            JSON.toJSONString(response));

                    throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);
                }
            }

        } catch (IllegalAccessException | InvocationTargetException e) {
            logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);
            throw new IllegalArgumentException("Catch Exception on Remote Call " + e.getMessage(), e);
        } catch (Throwable e) {
            logger.error("request exception {}", JSON.toJSONString(request));
            logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);
            throw e;
        } finally {
            if (logger.isInfoEnabled()) {

                logger.info("## Method={} ,## 耗时={}ms ,## [请求报文]:{},## [响应报文]:{}", requestName,
                        stopWatch.getTotalTimeMillis(),
                        JSON.toJSONString(request), JSON.toJSONString(response));
            }
        }

        return response;
    }

    private static <R> boolean isResponseValid(R response)
            throws IllegalAccessException, InvocationTargetException {
        Method successMethod = null;
        Method[] methods = response.getClass().getMethods();
        for (Method method : methods) {
            String methodName = method.getName();
            if (SUCCESS_CHECK_METHOD.contains(methodName)) {
                successMethod = method;
                break;
            }
        }
        if (successMethod == null) {
            return true;
        }

        return (Boolean) successMethod.invoke(response);
    }

    private static <R> boolean isResponseCodeValid(R response)
            throws IllegalAccessException, InvocationTargetException {
        Method successMethod = null;
        Method[] methods = response.getClass().getMethods();
        for (Method method : methods) {
            String methodName = method.getName();
            if (SUCCESS_CODE_METHOD.contains(methodName)) {
                successMethod = method;
                break;
            }
        }
        if (successMethod == null) {
            return true;
        }

        return SUCCESS_CODE.contains(successMethod.invoke(response));
    }
}

使用举例

//支付服务接口调用货物服务接口
GoodsSaleResponse goodsSaleResponse = RemoteCallWrapper.call(req -> goodsFacadeService.paySuccess(req), goodsSaleRequest, "goodsFacadeService.confirmSale");

服务提供者

提供

在服务类上添加 @DubboService(version = "1.0.0")注解提供RPC服务

@DubboService(version = "1.0.0")
public class UserFacadeServiceImpl implements UserFacadeService {

    public UserQueryResponse<UserInfo> query(UserQueryRequest userLoginRequest) {
        UserQueryResponse response = new UserQueryResponse();
        response.setResponseMessage("hehaha");
        return response;
    }
}

Facade注解实现统一RPC结果包装

定义Facade注解

public @interface Facade {
}

定义注解的切面处理类

功能:

  1. 方法参数校验
  2. 捕获处理业务异常
  3. 封装返回结果
  4. 记录完整的调用日志
package cn.hollis.nft.turbo.rpc.facade;

import cn.hollis.nft.turbo.base.exception.BizException;
import cn.hollis.nft.turbo.base.exception.SystemException;
import cn.hollis.nft.turbo.base.response.BaseResponse;
import cn.hollis.nft.turbo.base.response.ResponseCode;
import cn.hollis.nft.turbo.base.utils.BeanValidator;
import com.alibaba.fastjson2.JSON;
import jakarta.validation.ValidationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;


@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class FacadeAspect {

    private static final Logger LOGGER = LoggerFactory.getLogger(FacadeAspect.class);

    //核心方法 拦截所有添加Facade注解的方法
    @Around("@annotation(cn.hollis.nft.turbo.rpc.facade.Facade)")
    public Object facade(ProceedingJoinPoint pjp) throws Exception {

        //计时器
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        //通过反射拿到方法名和参数
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        Object[] args = pjp.getArgs();
        LOGGER.info("start to execute , method = " + method.getName() + " , args = " + JSON.toJSONString(args));

        //获取返回值类型
        Class returnType = ((MethodSignature) pjp.getSignature()).getMethod().getReturnType();

        //循环遍历所有参数,进行参数校验
        //如果参数上添加了限制性注解 通过这个过程进行参数校验
        /**
         * 实体类中的类似这种注解的校验
         * @NotNull(message = "userId不能为空")
         * private Long userId;
         */
        for (Object parameter : args) {
            try {
                BeanValidator.validateObject(parameter);
            } catch (ValidationException e) {
                printLog(stopWatch, method, args, "failed to validate", null, e);
                return getFailedResponse(returnType, e);
            }
        }


        try {
            // 目标方法执行
            Object response = pjp.proceed();
            //参数补全 补全响应状态信息code和message
            enrichObject(response);
            printLog(stopWatch, method, args, "end to execute", response, null);
            return response;
        } catch (Throwable throwable) {
            // 如果执行异常,则返回一个失败的response
            printLog(stopWatch, method, args, "failed to execute", null, throwable);
            return getFailedResponse(returnType, throwable);
        }
    }

    /**
     * 日志打印
     *
     * @param stopWatch
     * @param method
     * @param args
     * @param action
     * @param response
     */
    private void printLog(StopWatch stopWatch, Method method, Object[] args, String action, Object response,
                          Throwable throwable) {
        try {
            //因为此处有JSON.toJSONString,可能会有异常,需要进行捕获,避免影响主干流程
            LOGGER.info(getInfoMessage(action, stopWatch, method, args, response, throwable), throwable);
            // 如果校验失败,则返回一个失败的response
        } catch (Exception e1) {
            LOGGER.error("log failed", e1);
        }
    }

    /**
     * 统一格式输出,方便做日志统计
     * <p>
     * *** 如果调整此处的格式,需要同步调整日志监控 ***
     *
     * @param action    行为
     * @param stopWatch 耗时
     * @param method    方法
     * @param args      参数
     * @param response  响应
     * @return 拼接后的字符串
     */
    private String getInfoMessage(String action, StopWatch stopWatch, Method method, Object[] args, Object response,
                                  Throwable exception) {

        StringBuilder stringBuilder = new StringBuilder(action);
        stringBuilder.append(" ,method = ");
        stringBuilder.append(method.getName());
        stringBuilder.append(" ,cost = ");
        stringBuilder.append(stopWatch.getTime()).append(" ms");
        if (response instanceof BaseResponse) {
            stringBuilder.append(" ,success = ");
            stringBuilder.append(((BaseResponse) response).getSuccess());
        }
        if (exception != null) {
            stringBuilder.append(" ,success = ");
            stringBuilder.append(false);
        }
        stringBuilder.append(" ,args = ");
        stringBuilder.append(JSON.toJSONString(Arrays.toString(args)));

        if (response != null) {
            stringBuilder.append(" ,resp = ");
            stringBuilder.append(JSON.toJSONString(response));
        }

        if (exception != null) {
            stringBuilder.append(" ,exception = ");
            stringBuilder.append(exception.getMessage());
        }

        if (response instanceof BaseResponse) {
            BaseResponse baseResponse = (BaseResponse) response;
            if (!baseResponse.getSuccess()) {
                stringBuilder.append(" , execute_failed");
            }
        }

        return stringBuilder.toString();
    }

    /**
     * 将response的信息补全,主要是code和message
     *
     * @param response
     */
    private void enrichObject(Object response) {
        if (response instanceof BaseResponse) {
            if (((BaseResponse) response).getSuccess()) {
                //如果状态是成功的,需要将未设置的responseCode设置成SUCCESS
                if (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {
                    ((BaseResponse) response).setResponseCode(ResponseCode.SUCCESS.name());
                }
            } else {
                //如果状态是成功的,需要将未设置的responseCode设置成BIZ_ERROR
                if (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {
                    ((BaseResponse) response).setResponseCode(ResponseCode.BIZ_ERROR.name());
                }
            }
        }
    }

    /**
     * 定义并返回一个通用的失败响应
     */
    private Object getFailedResponse(Class returnType, Throwable throwable)
            throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {

        //如果返回值的类型为BaseResponse 的子类,则创建一个通用的失败响应
        if (returnType.getDeclaredConstructor().newInstance() instanceof BaseResponse) {
            BaseResponse response = (BaseResponse) returnType.getDeclaredConstructor().newInstance();
            response.setSuccess(false);
            if (throwable instanceof BizException bizException) {
                response.setResponseMessage(bizException.getErrorCode().getMessage());
                response.setResponseCode(bizException.getErrorCode().getCode());
            } else if (throwable instanceof SystemException systemException) {
                response.setResponseMessage(systemException.getErrorCode().getMessage());
                response.setResponseCode(systemException.getErrorCode().getCode());
            } else {
                response.setResponseMessage(throwable.toString());
                response.setResponseCode(ResponseCode.BIZ_ERROR.name());
            }

            return response;
        }

        LOGGER.error(
                "failed to getFailedResponse , returnType (" + returnType + ") is not instanceof BaseResponse");
        return null;
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到