SkyWalking + Logstash全链路追踪系统详细实施方案
一、系统架构与数据流向
核心流程:
- 数据采集:SkyWalking Agent埋点收集调用链路数据
- 日志增强:应用程序通过MDC注入TraceID
- 日志收集:Logstash采集应用日志并发送至Elasticsearch
- 数据存储:SkyWalking指标数据与日志数据分别存储
- 可视化分析:SkyWalking UI展示链路追踪,Kibana分析日志
二、环境准备与部署
1. SkyWalking部署(Docker方式)
version: '3.8'
services:
elasticsearch:
image: elasticsearch:7.14.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
- xpack.security.enabled=false
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es-data:/usr/share/elasticsearch/data
networks:
- skywalking-network
skywalking-oap:
image: apache/skywalking-oap-server:9.7.0
container_name: skywalking-oap
depends_on:
- elasticsearch
environment:
- SW_STORAGE=elasticsearch
- SW_STORAGE_ES_CLUSTER_NODES=elasticsearch:9200
- SW_CORE_REST_PORT=12800
- SW_CORE_GRPC_PORT=11800
ports:
- "12800:12800"
- "11800:11800"
networks:
- skywalking-network
skywalking-ui:
image: apache/skywalking-ui:9.7.0
container_name: skywalking-ui
depends_on:
- skywalking-oap
environment:
- SW_OAP_ADDRESS=skywalking-oap:12800
ports:
- "8080:8080"
networks:
- skywalking-network
logstash:
image: logstash:7.14.0
container_name: logstash
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
- ./logs:/usr/share/logstash/logs
ports:
- "5044:5044"
depends_on:
- elasticsearch
networks:
- skywalking-network
networks:
skywalking-network:
driver: bridge
volumes:
es-data:
2. Logstash配置
logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
path.config: /usr/share/logstash/pipeline
日志处理管道配置
input {
file {
path => ["/usr/share/logstash/logs/*.log"]
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_time} \(%{DATA:thread}\)%{LOGLEVEL:level} %{DATA:traceId} \[%{DATA:bizTraceId}\] %{DATA:logger}-%{GREEDYDATA:msg}"
}
}
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
mutate {
add_field => {
"service_name" => "iotdata-back"
"host_ip" => "%{host}"
}
remove_field => ["host", "log_time"]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "iotdata-log-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
三、应用集成详细步骤
1. Maven依赖配置
父pom.xml
<dependencyManagement>
<dependencies>
<!-- SkyWalking依赖管理 -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-bom</artifactId>
<version>9.7.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Logback相关依赖 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.4.0</version>
</dependency>
</dependencies>
</dependencyManagement>
iotdata-common/pom.xml
<dependencies>
<!-- SkyWalking工具包 -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
</dependency>
<!-- AOP相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
</dependencies>
2. 日志配置
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds">
<!-- 上下文名称 -->
<contextName>iotdata-back</contextName>
<!-- 日志输出格式 -->
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}(%thread)%-5level %tid [%X{X-BIZ-TRACE-ID}] %logger{50}-%msg%n"/>
<!-- 日志存储路径 -->
<property name="LOG_PATH" value="d:/shuiwu/iotdata-back/logs"/>
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<pattern>${LOG_PATTERN}</pattern>
</layout>
</encoder>
</appender>
<!-- 文件输出(滚动) -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/app-%d{yyyy-MM-dd}-%i.log</fileNamePattern>
<MaxHistory>30</MaxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>200MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<pattern>${LOG_PATTERN}</pattern>
</layout>
</encoder>
</appender>
<!-- 异步输出 -->
<appender name="ASYNC_FILE" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<queueSize>512</queueSize>
<appender-ref ref="FILE"/>
</appender>
<!-- 根日志级别 -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="ASYNC_FILE"/>
</root>
<!-- 特定包日志级别 -->
<logger name="com.iotdata" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="ASYNC_FILE"/>
</logger>
</configuration>
3. 核心工具类实现
TraceContextUtil.java - 上下文管理工具
package com.iotdata.common.util;
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.slf4j.MDC;
/**
* 链路追踪上下文工具类
* 负责管理SkyWalking TraceID和业务自定义TraceID
*/
public class TraceContextUtil {
// 业务自定义TraceID的MDC键
public static final String BIZ_TRACE_ID_KEY = "X-BIZ-TRACE-ID";
/**
* 设置业务追踪上下文
* @param requestId 请求ID
* @param userId 用户ID
* @param activityId 活动ID
*/
public static void setTraceContext(String requestId, String userId, String activityId) {
// 生成业务复合TraceID
String bizTraceId = String.format("%s_%s_%s", requestId, userId, activityId);
MDC.put(BIZ_TRACE_ID_KEY, bizTraceId);
// 可选:将SkyWalking原生TraceID存入MDC,便于日志分析
String swTraceId = TraceContext.traceId();
MDC.put("X-SW-TRACE-ID", swTraceId);
}
/**
* 获取当前业务TraceID
* @return 业务TraceID
*/
public static String getBizTraceId() {
return MDC.get(BIZ_TRACE_ID_KEY);
}
/**
* 清除追踪上下文
* 必须在finally块中调用,防止ThreadLocal内存泄漏
*/
public static void clearTraceContext() {
MDC.remove(BIZ_TRACE_ID_KEY);
MDC.remove("X-SW-TRACE-ID");
}
/**
* 复制当前上下文到新线程
* @return 上下文快照
*/
public static ContextSnapshot captureContext() {
return new ContextSnapshot(
MDC.getCopyOfContextMap()
);
}
/**
* 在新线程中恢复上下文
* @param snapshot 上下文快照
*/
public static void restoreContext(ContextSnapshot snapshot) {
if (snapshot != null && snapshot.getContextMap() != null) {
MDC.setContextMap(snapshot.getContextMap());
}
}
/**
* 上下文快照类
* 用于在线程间传递MDC上下文
*/
public static class ContextSnapshot {
private final java.util.Map<String, String> contextMap;
public ContextSnapshot(java.util.Map<String, String> contextMap) {
this.contextMap = contextMap;
}
public java.util.Map<String, String> getContextMap() {
return contextMap;
}
}
}
4. AOP自动埋点实现
1. 自定义注解
package com.iotdata.common.annotation;
import java.lang.annotation.*;
/**
* 业务链路追踪注解
* 用于自动注入追踪上下文
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BizTrace {
/**
* 请求ID参数名
*/
String requestIdParam() default "requestId";
/**
* 用户ID参数名
*/
String userIdParam() default "userId";
/**
* 活动ID参数名
*/
String activityIdParam() default "activityId";
/**
* 是否在SkyWalking中创建新的Span
*/
boolean createSpan() default true;
/**
* Span名称
*/
String spanName() default "";
}
2. AOP切面实现
package com.iotdata.common.aspect;
import com.iotdata.common.annotation.BizTrace;
import com.iotdata.common.util.TraceContextUtil;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* 业务链路追踪切面
* 自动为标记@BizTrace的方法注入追踪上下文
*/
@Aspect
@Component
public class BizTraceAspect {
private static final Logger logger = LoggerFactory.getLogger(BizTraceAspect.class);
private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
/**
* 环绕通知,处理业务追踪上下文
*/
@Around("@annotation(bizTrace)")
public Object around(ProceedingJoinPoint joinPoint, BizTrace bizTrace) throws Throwable {
// 1. 提取方法参数
Map<String, Object> paramMap = getParamMap(joinPoint);
// 2. 获取业务ID
String requestId = getParamValue(paramMap, bizTrace.requestIdParam());
String userId = getParamValue(paramMap, bizTrace.userIdParam());
String activityId = getParamValue(paramMap, bizTrace.activityIdParam());
// 3. 生成默认requestId(如果未提供)
if (requestId == null || requestId.trim().isEmpty()) {
requestId = generateRequestId();
logger.warn("未提供requestId,自动生成: {}", requestId);
}
try {
// 4. 设置追踪上下文
TraceContextUtil.setTraceContext(requestId, userId, activityId);
logger.info("业务追踪上下文已设置,bizTraceId: {}", TraceContextUtil.getBizTraceId());
// 5. 是否创建SkyWalking Span
if (bizTrace.createSpan()) {
String spanName = bizTrace.spanName();
if (spanName.isEmpty()) {
spanName = joinPoint.getSignature().getName();
}
return traceWithSpan(joinPoint, spanName);
} else {
// 6. 不创建Span,直接执行
return joinPoint.proceed();
}
} finally {
// 7. 清理上下文(必须在finally中执行)
TraceContextUtil.clearTraceContext();
logger.info("业务追踪上下文已清理");
}
}
/**
* 使用SkyWalking Trace注解创建Span
*/
private Object traceWithSpan(ProceedingJoinPoint joinPoint, String spanName) throws Throwable {
// 使用SkyWalking的@Trace注解创建Span
@Trace
class TraceHelper {
Object proceed(ProceedingJoinPoint pjp) throws Throwable {
return pjp.proceed();
}
}
return new TraceHelper().proceed(joinPoint);
}
/**
* 提取方法参数名和值的映射
*/
private Map<String, Object> getParamMap(ProceedingJoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
String[] paramNames = parameterNameDiscoverer.getParameterNames(method);
Object[] paramValues = joinPoint.getArgs();
Map<String, Object> paramMap = new HashMap<>();
if (paramNames != null && paramValues != null) {
for (int i = 0; i < paramNames.length; i++) {
paramMap.put(paramNames[i], paramValues[i]);
}
}
return paramMap;
}
/**
* 获取参数值
*/
private String getParamValue(Map<String, Object> paramMap, String paramName) {
if (paramMap.containsKey(paramName)) {
Object value = paramMap.get(paramName);
return value != null ? value.toString() : null;
}
return null;
}
/**
* 生成默认requestId(UUID)
*/
private String generateRequestId() {
return java.util.UUID.randomUUID().toString().replace("-", "");
}
}
5. Web请求拦截器
TraceWebInterceptor.java - 统一处理HTTP请求上下文
package com.iotdata.admin.interceptor;
import com.iotdata.common.util.TraceContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;
/**
* Web请求追踪拦截器
* 为所有HTTP请求统一设置追踪上下文
*/
public class TraceWebInterceptor implements HandlerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(TraceWebInterceptor.class);
// 请求头中的TraceID字段
private static final String TRACE_ID_HEADER = "X-TRACE-ID";
private static final String USER_ID_HEADER = "X-USER-ID";
private static final String ACTIVITY_ID_PARAM = "activityId";
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
try {
// 1. 从请求头获取ID(优先)或参数
String requestId = request.getHeader(TRACE_ID_HEADER);
String userId = request.getHeader(USER_ID_HEADER);
String activityId = request.getParameter(ACTIVITY_ID_PARAM);
// 2. 生成默认requestId(如果未提供)
if (requestId == null || requestId.trim().isEmpty()) {
requestId = UUID.randomUUID().toString().replace("-", "");
}
// 3. 设置响应头,便于前端获取TraceID
response.setHeader(TRACE_ID_HEADER, requestId);
// 4. 设置追踪上下文
TraceContextUtil.setTraceContext(requestId, userId, activityId);
logger.info("Web请求追踪上下文已设置,URL: {}", request.getRequestURI());
return true;
} catch (Exception e) {
logger.error("设置Web请求追踪上下文失败", e);
// 即使失败也继续处理请求
return true;
}
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
// 清理上下文
TraceContextUtil.clearTraceContext();
logger.info("Web请求追踪上下文已清理,URL: {}", request.getRequestURI());
}
}
注册拦截器
package com.iotdata.admin.config;
import com.iotdata.admin.interceptor.TraceWebInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 注册追踪拦截器,对所有请求生效
registry.addInterceptor(new TraceWebInterceptor())
.addPathPatterns("/**")
.excludePathPatterns("/static/**", "/error");
}
}
四、特殊场景处理方案
1. 异步方法处理
1. 异步方法注解
package com.iotdata.common.annotation;
import java.lang.annotation.*;
/**
* 异步业务追踪注解
* 用于标记需要追踪的异步方法
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncBizTrace {
/**
* 异步任务名称
*/
String taskName() default "asyncTask";
}
2. 异步方法切面
package com.iotdata.common.aspect;
import com.iotdata.common.annotation.AsyncBizTrace;
import com.iotdata.common.util.TraceContextUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* 异步方法追踪切面
* 确保异步方法中也能正确传递追踪上下文
*/
@Aspect
@Component
public class AsyncBizTraceAspect {
private static final Logger logger = LoggerFactory.getLogger(AsyncBizTraceAspect.class);
@Around("@annotation(asyncBizTrace)")
public Object aroundAsyncMethod(ProceedingJoinPoint joinPoint, AsyncBizTrace asyncBizTrace) throws Throwable {
// 1. 捕获当前线程的上下文
TraceContextUtil.ContextSnapshot snapshot = TraceContextUtil.captureContext();
// 2. 创建新的任务,在新线程中恢复上下文
return ((java.util.concurrent.Callable<?>) () -> {
try {
// 恢复上下文
TraceContextUtil.restoreContext(snapshot);
logger.info("异步任务[{}]追踪上下文已恢复,bizTraceId: {}",
asyncBizTrace.taskName(), TraceContextUtil.getBizTraceId());
// 执行异步任务
return joinPoint.proceed();
} finally {
// 清理上下文
TraceContextUtil.clearTraceContext();
logger.info("异步任务[{}]追踪上下文已清理", asyncBizTrace.taskName());
}
}).call();
}
}
3. 使用示例
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private AsyncTaskService asyncTaskService;
@BizTrace
public void createOrder(String requestId, String userId, String activityId) {
// 同步业务逻辑
// ...
// 调用异步方法
asyncTaskService.notifyUser(requestId, userId);
}
}
@Service
public class AsyncTaskService {
@Async
@AsyncBizTrace(taskName = "notifyUser")
public CompletableFuture<Void> notifyUser(String requestId, String userId) {
// 异步业务逻辑
logger.info("发送订单通知给用户: {}", userId);
// ...
return CompletableFuture.runAsync(() -> {
// 具体异步操作
});
}
}
2. MQ消息追踪
1. RabbitMQ生产者拦截器
package com.iotdata.infrastructure.mq;
import com.iotdata.common.util.TraceContextUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Component;
/**
* RabbitMQ消息生产者拦截器
* 用于在发送消息时注入追踪上下文
*/
@Component
public class RabbitMqProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) {
// 获取当前追踪上下文
String bizTraceId = TraceContextUtil.getBizTraceId();
String swTraceId = org.apache.skywalking.apm.toolkit.trace.TraceContext.traceId();
// 将上下文信息存入消息头
if (bizTraceId != null) {
message.getMessageProperties().setHeader("X-BIZ-TRACE-ID", bizTraceId);
}
if (swTraceId != null && !"N/A".equals(swTraceId)) {
message.getMessageProperties().setHeader("X-SW-TRACE-ID", swTraceId);
}
return message;
}
}
2. RabbitMQ消费者拦截器
package com.iotdata.infrastructure.mq;
import com.iotdata.common.util.TraceContextUtil;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Component;
/**
* RabbitMQ消息消费者拦截器
* 用于在消费消息时恢复追踪上下文
*/
@Component
public class RabbitMqConsumerInterceptor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取消息参数
Message message = null;
for (Object arg : invocation.getArguments()) {
if (arg instanceof Message) {
message = (Message) arg;
break;
}
}
// 从消息头恢复上下文
if (message != null) {
MessageProperties properties = message.getMessageProperties();
String bizTraceId = properties.getHeader("X-BIZ-TRACE-ID");
String swTraceId = properties.getHeader("X-SW-TRACE-ID");
if (bizTraceId != null) {
// 解析业务TraceID(requestId_userId_activityId)
String[] parts = bizTraceId.split("_");
if (parts.length >= 3) {
TraceContextUtil.setTraceContext(parts[0], parts[1], parts[2]);
} else {
TraceContextUtil.setTraceContext(bizTraceId, "unknown", "unknown");
}
}
}
try {
// 执行消费方法
return invocation.proceed();
} finally {
// 清理上下文
TraceContextUtil.clearTraceContext();
}
}
}
3. MQ配置类
package com.iotdata.infrastructure.config;
import com.iotdata.infrastructure.mq.RabbitMqConsumerInterceptor;
import com.iotdata.infrastructure.mq.RabbitMqProducerInterceptor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.aop.target.SingletonTargetSource;
@Configuration
public class RabbitMqConfig {
@Autowired
private RabbitMqProducerInterceptor producerInterceptor;
@Autowired
private RabbitMqConsumerInterceptor consumerInterceptor;
/**
* 配置RabbitTemplate,添加生产者拦截器
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 添加消息后置处理器,注入追踪上下文
rabbitTemplate.setBeforePublishPostProcessors(producerInterceptor);
return rabbitTemplate;
}
/**
* 配置消费者容器工厂,添加消费者拦截器
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
/**
* 为消费者方法创建代理,应用拦截器
*/
@Bean
public ProxyFactoryBean rabbitMqConsumerProxy(Object consumerBean) {
ProxyFactoryBean proxyFactory = new ProxyFactoryBean();
proxyFactory.setTargetSource(new SingletonTargetSource(consumerBean));
proxyFactory.addAdvice(consumerInterceptor);
return proxyFactory;
}
}
五、应用启动配置
1. 启动脚本配置
Windows启动脚本
@echo off
setlocal enabledelayedexpansion
REM SkyWalking Agent配置
set SW_AGENT_HOME=d:/skywalking-agent
set SW_AGENT_SERVICE_NAME=iotdata-back
set SW_AGENT_COLLECTOR_BACKEND_SERVICES=localhost:11800
set SW_AGENT_SAMPLE_RATE=1
set SW_AGENT_LOG_OUTPUT=FILE
set SW_AGENT_LOG_DIR=d:/shuiwu/iotdata-back/logs/agent
REM JVM参数配置
set JAVA_OPTS=-Xms512m -Xmx1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
set JAVA_OPTS=!JAVA_OPTS! -javaagent:!SW_AGENT_HOME!/skywalking-agent.jar
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.agent.service_name=!SW_AGENT_SERVICE_NAME!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.collector.backend_service=!SW_AGENT_COLLECTOR_BACKEND_SERVICES!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.agent.sample_rate=!SW_AGENT_SAMPLE_RATE!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.logging.output=!SW_AGENT_LOG_OUTPUT!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.logging.dir=!SW_AGENT_LOG_DIR!
REM 启动应用
java !JAVA_OPTS! -jar iotdata-admin/target/iotdata-admin.jar
endlocal
2. 验证与测试
1. 业务方法使用示例
package com.iotdata.admin.service.impl;
import com.iotdata.admin.service.GroupOrderService;
import com.iotdata.admin.service.InventoryService;
import com.iotdata.admin.service.MQProducerService;
import com.iotdata.common.annotation.BizTrace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class GroupOrderServiceImpl implements GroupOrderService {
private static final Logger logger = LoggerFactory.getLogger(GroupOrderServiceImpl.class);
@Autowired
private InventoryService inventoryService;
@Autowired
private MQProducerService mqProducerService;
/**
* 创建拼团订单(核心业务方法)
* 使用@BizTrace注解自动注入追踪上下文
*/
@Override
@BizTrace(requestIdParam = "orderId", userIdParam = "userId", activityIdParam = "activityId")
public void createGroupOrder(String orderId, String userId, String activityId, int productId, int quantity) {
logger.info("开始处理拼团订单: orderId={}, productId={}, quantity={}", orderId, productId, quantity);
try {
// 1. 扣减库存
boolean deductSuccess = inventoryService.deductStock(productId, quantity);
if (!deductSuccess) {
logger.error("库存不足,创建订单失败: productId={}, quantity={}", productId, quantity);
throw new RuntimeException("库存不足");
}
logger.info("库存扣减成功: productId={}, quantity={}", productId, quantity);
// 2. 更新拼团记录
updateGroupRecord(orderId, userId, activityId);
logger.info("拼团记录更新成功: orderId={}", orderId);
// 3. 发送MQ通知
mqProducerService.sendGroupSuccessMessage(orderId, userId, activityId);
logger.info("拼团成功通知已发送: orderId={}", orderId);
} catch (Exception e) {
logger.error("创建拼团订单失败: orderId={}", orderId, e);
// 异常处理...
}
}
private void updateGroupRecord(String orderId, String userId, String activityId) {
// 更新拼团记录逻辑
logger.info("执行拼团记录更新: orderId={}, activityId={}", orderId, activityId);
// ...
}
}
2. 测试日志输出
2023-11-15 14:30:45.123(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-开始处理拼团订单: orderId=req12345, productId=1001, quantity=2
2023-11-15 14:30:45.234(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.InventoryServiceImpl-库存扣减成功: productId=1001, quantity=2
2023-11-15 14:30:45.345(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-库存扣减成功: productId=1001, quantity=2
2023-11-15 14:30:45.456(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-执行拼团记录更新: orderId=req12345, activityId=act5678
2023-11-15 14:30:45.567(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-拼团记录更新成功: orderId=req12345
2023-11-15 14:30:45.678(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.MQProducerServiceImpl-发送拼团成功消息: orderId=req12345
2023-11-15 14:30:45.789(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-拼团成功通知已发送: orderId=req12345
3. SkyWalking UI验证
- 访问SkyWalking UI: http://localhost:8080
- 在"追踪"菜单中搜索业务TraceID或SkyWalking TraceID
- 查看完整调用链路和各节点耗时
4. Kibana日志查询
- 访问Kibana: http://localhost:5601
- 创建索引模式: iotdata-log-*
- 在Discover页面搜索特定bizTraceId: req12345_user67890_act5678
- 查看完整业务链路日志
六、常见问题与解决方案
问题场景 | 解决方案 |
---|---|
异步线程上下文丢失 | 使用TraceContextUtil.captureContext()和restoreContext()传递上下文 |
MQ消息追踪断裂 | 通过消息头传递TraceID,消费端恢复上下文 |
日志中没有TraceID | 检查logback配置是否使用TraceIdPatternLogbackLayout |
SkyWalking无数据 | 检查agent路径是否正确,collector地址是否可达 |
高并发下性能问题 | 调整SkyWalking采样率,使用异步日志输出 |
七、总结
本方案通过SkyWalking实现分布式链路追踪,结合Logstash+Elasticsearch实现日志集中管理,通过AOP和拦截器实现追踪上下文的自动注入与传递,最终实现了从拼团下单→扣库存→更新拼团记录→MQ通知→支付回调的全链路追踪。方案特点:
- 自动埋点:通过AOP注解实现业务方法自动埋点
- 全链路覆盖:支持同步、异步、MQ等多种场景
- 业务关联:自定义复合TraceID实现业务属性与技术链路的关联
- 性能优化:异步日志、可配置采样率等性能保障措施
- 易于扩展:模块化设计,支持新增中间件的追踪适配
通过该方案,可以快速定位分布式系统中的问题,分析业务链路性能瓶颈,提升系统可观测性和可维护性。