[自研]分布式日志追踪 - dubbo

发布于:2024-01-01 ⋅ 阅读:(62) ⋅ 点赞:(0)

 微服务的一次调用的日志信息分布在不同的机器上 ,当需要看一条链路调用所有的日志信息时, 通过生产一个本次唯一的链路Id(traceId)把一次请求的信息串起来。

为减少日志量 :切面日志可按需要添加

一 、 分布式追踪系统的思想模型:

二、选型

方案一:集成中间产品,如: Zipkin、SkyWalking等产品

优点:

    无需编码
    业务无入侵
    图形化界面中使用该ID快速定位各种接口的调用关系

缺点:                   

    强耦合中间件服务才能生
    效必须添加相应 javaagent
    必须部署服务端

 方案二  集成 Spring Cloud sleuth 

优点:业务无入侵,有丰富的插件进行扩展包括定时任务、MQ等。

缺点:brave-instrumentation-dubbo-rpc 不支持 dubbo 2.7.x 需要自行开发插件

方案三:自研, 集成  Logback 的 MDC 机制 (优先使用)

优点:
    业务无入侵,最小依赖
    自研,扩展灵活,适配性强
    修改程序即可,无需部署其它中间件
缺点:
    维护、升级需要有研发成本

  三、这里推荐选择 方案三  

思路及编码介绍(注意:本人用的  com.alibaba.dubbo ):
       一次请求唯一 traceId 从网关层 到  服务(dubbo\springcloud)的消费者端 再到 服务的生产者端 透传,调用结束,清空traceId 

1.公共类:

线程传递--ThreadTraceIdUtil  

/**
 * InheritableThreadLocal 线程传递
 * @author Sjh
 */
public class ThreadTraceIdUtil {

    /**使用InheritableThreadLocal便于在主子线程间传递参数*/
    private static final ThreadLocal<String> TRACE_ID = new InheritableThreadLocal<>();

    public ThreadTraceIdUtil() {
    }

    /**
     * 从当前线程局部变量获取TraceId
     * 首次调用该方法会生成traceId,后续每次都从线程上下文获取
     * @return
     */
    public static String getTraceId() {
        return TRACE_ID.get();
    }

    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }

    public static void removeTraceId() {
        TRACE_ID.remove();
    }
}

生成 TraceId - TraceIdGenerator  


/**
 * @author Sjh
 * 
 * 生成  TraceId
 */
@Slf4j
public class TraceIdGenerator {


    /**
     * 消费端创建TraceId,并设置到线程上下文中
     * 该方法只调用一次
     *
     * @return
     */
    public static String createTraceId() {
        // 创建的同时就设置到上下文中
        String traceId = getTraceId();
        ThreadTraceIdUtil.setTraceId(traceId);
        return traceId;
    }

    /**
     * 生成32位traceId
     *
     * @return
     */
    private static String getTraceId() {
        String result = "";
        String ip = "";

        // 获取本地ipv4地址
        try {
            InetAddress address = InetAddress.getLocalHost();
            ip = address.getHostAddress();
        } catch (Exception var5) {
            return result;
        }

        // 根据.截取为String数组
        String[] ipAddressInArray = ip.split("\\.");
        // 拼装为字符串,将每一个元素转换为16进制
        for (int i = 3; i >= 0; --i) {
            Integer id = Integer.parseInt(ipAddressInArray[3 - i]);
            result = result + String.format("%02x", id);
        }
        // 拼装时间戳及随机数
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
        result = result + simpleDateFormat.format(new Date()) + UUID.randomUUID().toString().substring(0, 7);
        return result;
    }

    /**
     * TraceId默认第一个为空,如果没值则分配一个
     *
     * @param traceId
     * @return
     */
    public static String validateTraceId(String traceId) {
        if (null == traceId) {
            traceId = createTraceId();
            if (log.isDebugEnabled()) {
                log.debug("[TraceInterceptor]首次请求未分配TraceId,生成首次TraceId={}", traceId);
            }
        }
        return traceId;
    }
}

RpcContext传递 - RpcTraceIdFilter  

/**
 * dubbo 的Filter 重写 invoke,利用RpcContext传递 TraceId
 * 消费者织入  TraceId
 * 生产者取出  TraceId
 * @author Sjh
 */
@Slf4j
@Activate(group = {"provider", "consumer"}, order = -10000)
public class RpcTraceIdFilter implements Filter {

    /**
     * RpcContext 加入 traceId
     *
     * @param invoker
     * @param invocation
     * @return
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();

        String traceId;

        //消费者
        if (rpcContext.isConsumerSide()) {

            traceId = MDC.get(Constants.TRACE_ID);
            if (traceId == null) {
                traceId = TraceIdGenerator.createTraceId();
            }
            rpcContext.setAttachment(Constants.TRACE_ID, traceId);
        }

        //生产者
        if (rpcContext.isProviderSide()) {
            traceId = rpcContext.getAttachment(Constants.TRACE_ID);
            MDC.put(Constants.TRACE_ID, traceId);
        }

        Result result = invoker.invoke(invocation);

        // after
        if (rpcContext.isProviderSide()) {
            // clear traceId from MDC
            MDC.remove(Constants.TRACE_ID);
        }
        return result;

    }

}

2.消费者端实现(http服务调用dubbo服务)

拦截web请求,生成TraceId ,并放入log4j的MDC和dubbo的RpcContext中, 通过  attachMent 把TraceId传递给生产者 

InterceptorConfig 


/**
* MVC  Interceptor
* traceId的拦截器LogInterceptor到容器
* @author sjh
*/
@Configuration
//@EnableMvc 会引发其它重写冲突,注释掉
public class InterceptorConfig implements WebMvcConfigurer {

    /**
     * 注解LogInterceptor类到IOC容器中
     */
    @Bean
    public LogInterceptor logInterceptor() {
        return new LogInterceptor();
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        //注册日志拦截器
        registry.addInterceptor(logInterceptor());
    }

    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        /**
         * 解决 webmvc 与 swagger 页面冲突的问题
         * https://blog.csdn.net/Kerwin_luo/article/details/114266444
         * @param registry
         */
        registry.addResourceHandler("/webjars/**")
                .addResourceLocations("classpath:/META-INF/resources/webjars/");

        //添加静态页面资源,文件下载资源等
        registry.addResourceHandler("/**").addResourceLocations("classpath:/META-INF/resources/",
                "classpath:/resources/", "classpath:/static/", "classpath:/public/","file:/E:/","file:/");
    }


}

  LogInterceptor 

 
/**
 * 拦截web请求
 *
 * @author Sjh
 */
@Slf4j
public class LogInterceptor implements HandlerInterceptor {

    /**
     * controller方法前调用
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //如果有上层调用就用上层的ID
        String traceId = request.getHeader(Constants.TRACE_ID);

        traceId = TraceIdGenerator.validateTraceId(traceId);

        MDC.put(Constants.TRACE_ID, traceId);
        return true;
    }


    /**
     * preHandle方法返回true之后
     * 在DispatcherServlet进行视图的渲染之后调用
     */
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
            throws Exception {
        //controller结束之后删除对应的唯一值
        MDC.remove(Constants.TRACE_ID);
    }
}

dubbo的SPI扩展RpcTraceIdFilter

/**
 * dubbo 的Filter 重写 invoke,利用RpcContext传递 TraceId
 * 消费者织入  TraceId
 * 生产者取出  TraceId
 * @author Sjh
 */
@Slf4j
@Activate(group = {"provider", "consumer"}, order = -10000)
public class RpcTraceIdFilter implements Filter {

    /**
     * RpcContext 加入 traceId
     *
     * @param invoker
     * @param invocation
     * @return
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();

        String traceId;

        //消费者
        if (rpcContext.isConsumerSide()) {

            traceId = MDC.get(Constants.TRACE_ID);
            if (traceId == null) {
                traceId = TraceIdGenerator.createTraceId();
            }
            rpcContext.setAttachment(Constants.TRACE_ID, traceId);
        }

        //生产者
        if (rpcContext.isProviderSide()) {
            traceId = rpcContext.getAttachment(Constants.TRACE_ID);
            MDC.put(Constants.TRACE_ID, traceId);
        }

        Result result = invoker.invoke(invocation);

        // after
        if (rpcContext.isProviderSide()) {
            // clear traceId from MDC
            MDC.remove(Constants.TRACE_ID);
        }
        return result;

    }

}

logback-spring.xml 加入 [TraceId:\%X\{traceId\}]  

<!-- CONSOLE_LOG_PATTERN属性会在console-appender.xml文件中引用 -->
<property name="CONSOLE_LOG_PATTERN" value="%clr(${spring_application_name}){cyan}[TraceId:%X{traceId}]|%clr{blue}|%clr(%d{ISO8601}){faint}|%clr(%p)|${server_port}|${PID}|%clr(%t){faint}|%clr(%.40logger{39}){cyan}.%clr(%method){cyan}:%L|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

<!-- FILE_LOG_PATTERN属性会在logback-defaults.xml文件中引用 -->
<property name="FILE_LOG_PATTERN" value="${spring_application_name}[TraceId:%X{traceId}]|%d{ISO8601}|%p|${server_port}|${PID}|%t|%.40logger{39}.%method:%L|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

wishbetter


yml配置调整  

consumer:
  ......
  filter: RpcTraceIdFilter

SPI扩展RpcTraceIdFilter 增加纯文本文件

路径 :

|-resources

        |-META-INF

            |-dubbo

                |-com.alibaba.dubbo.rpc.Filter  (注意:区分是alibaba dubbo 不是 apache dubbo)

内容:

rpcTraceIdFilter=com.xxx.xxx.xxx.RpcTraceIdFilter()

3.消费者端实现(dubbo)

 拦截Rpc请求,attachMent里面传递过来的TraceId放到MDC内容中 

dubbo的SPI扩展RpcTraceIdFilter  

logback-spring.xml 加入 [TraceId:\%X\{traceId\}] 

yml配置调整 

provider:
 ......
 filter: RpcTraceIdFilter

4.多线程实现

多线程ThreadPoolTaskExecutor实现 


/**
 * 线程池包装类
 *
 * @author hetiantian
 * @version 1.0
 * @Date 2020/03/18 15:29
 */
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

SPI扩展 RpcTraceIdFilter

路径 :

|-resources

        |-META-INF

            |-dubbo

                |-com.alibaba.dubbo.rpc.Filter  (注意:区分是alibaba dubbo 不是 apache dubbo)

内容:

rpcTraceIdFilter=com.xxx.xxx.xxx.RpcTraceIdFilter(全限定名)

实现中我是封装成一个jar包,给dubbo应用引用

POM:


潦倒新停浊酒杯


网站公告

今日签到

点亮在社区的每一天
去签到