【如何实现分布式压测中间件】

发布于:2025-06-30 ⋅ 阅读:(20) ⋅ 点赞:(0)

原理

通过大量阅读中间件源码,开源社区调研,得到设计原理:
(1)发起压测链路http请求
(2)通过分布式追踪框架获取URL上影子标识,将其放入上下文Context中
(3)提供者应用发起PRC/MQ调用时,中间件会将测试标放入中间件的Context上下文中传递。
(4)消费者处理RPC/MQ消息,获取中间件Context上下文。
(5)经过分库分表/缓存数据库中间件,获取当前Context里的影子标识。

打成Maven包,在项目中直接引入

  1. 可插拔,业务代码不感知。
  2. 支持复杂SQL处理,支持全链路测试,且支持全链路追踪。
  3. 极大提高压测工作效率。

全链路追踪框架(Trace)

从HTTP请求链接上识别到特定的key,如:

URL添加压测标识,test = true,将压测标识添加到追踪链路框架中的Context上下文中。

MQ中间件

例如RocketMQ: com.alibaba.rocketmq.client.hook.SendMessageHook
实现接口SendMessageHook进行日志追踪链路埋点, 分布式链路组件SOFA
Trace也是基于这个接口去埋点,这是mq官方留给实现者的AOP。

public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {
    public MetaQSendMessageHookImpl() {
    }

    public String hookName() {
        return "EagleEyeSendMessageHook";
    }

    public void sendMessageBefore(SendMessageContext context) {
        if (context != null && context.getMessage() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {
            MetaQTraceContext mqTraceContext = new MetaQTraceContext();
            context.setMqTraceContext(mqTraceContext);
            mqTraceContext.setMetaQType(MetaQType.METAQ);
            mqTraceContext.setGroup(context.getProducerGroup());
            mqTraceContext.setAsync(CommunicationMode.ASYNC.equals(context.getCommunicationMode()));
            Message msg = context.getMessage();
            if (msg != null) {
                MetaQTraceBean traceBean = new MetaQTraceBean();
                traceBean.setTopic(msg.getTopic());
                traceBean.setOriginMsgId(MessageAccessor.getOriginMessageId(msg));
                traceBean.setTags(msg.getTags());
                traceBean.setKeys(msg.getKeys());
                traceBean.setBuyerId(msg.getBuyerId());
                traceBean.setTransferFlag(MessageAccessor.getTransferFlag(msg));
                traceBean.setCorrectionFlag(MessageAccessor.getCorrectionFlag(msg));
                traceBean.setBodyLength(msg.getBody().length);
                traceBean.setBornHost(context.getBornHost());
                traceBean.setStoreHost(context.getBrokerAddr());
                traceBean.setBrokerName(context.getMq().getBrokerName());
                traceBean.setProps(context.getProps());
                traceBean.setMsgType(context.getMsgType());
                List<MetaQTraceBean> beans = new ArrayList();
                beans.add(traceBean);
                mqTraceContext.setTraceBeans(beans);
                if (StringUtils.isNotBlank(msg.getUserProperty("eagleTraceId"))) {
                    traceBean.setTraceId(msg.getUserProperty("eagleTraceId"));
                    traceBean.setRpcId(msg.getUserProperty("eagleRpcId"));
                    traceBean.setEagleEyeUserData(msg.getUserProperty("eagleData"));
                }

                MetaQSendMessageTraceLog.sendMessageBefore(mqTraceContext);
                if (StringUtils.isBlank(msg.getProperty("eagleTraceId")) && StringUtils.isNotBlank(traceBean.getTraceId())) {
                    msg.putUserProperty("eagleTraceId", traceBean.getTraceId());
                    msg.putUserProperty("eagleRpcId", traceBean.getRpcId());
                    msg.putUserProperty("eagleData", traceBean.getEagleEyeUserData());
                }

            }
        }
    }

    public void sendMessageAfter(SendMessageContext context) {
        if (context != null && context.getMessage() != null && context.getSendResult() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {
            MetaQTraceContext mqTraceContext = (MetaQTraceContext)context.getMqTraceContext();
            mqTraceContext.setRegionId(context.getSendResult().getRegionId());
            MetaQTraceBean traceBean = (MetaQTraceBean)mqTraceContext.getTraceBeans().get(0);
            if (traceBean != null && context.getSendResult() != null) {
                traceBean.setQueueId(context.getMq().getQueueId());
                traceBean.setMsgId(context.getSendResult().getOffsetMsgId());
                traceBean.setOriginMsgId(context.getSendResult().getMsgId());
                traceBean.setOffset(context.getSendResult().getQueueOffset());
                mqTraceContext.setSuccess(true);
                mqTraceContext.setStatus(context.getSendResult().getSendStatus().toString());
            } else if (context.getException() != null) {
                String msg = context.getException().getMessage();
                mqTraceContext.setErrorMsg(StringUtils.substring(msg, 0, msg.indexOf("\n")));
            }

            MetaQSendMessageTraceLog.sendMessageAfter(mqTraceContext);
        }
    }
}

数据库

参考数据库Druid链接池方案:
https://github.com/alibaba/druid/wiki/SQL-Parser

import com.alibaba.druid.filter.FilterAdapter;
import com.alibaba.druid.filter.FilterChain;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.proxy.jdbc.DataSourceProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.List;

/**
 * 拦截druid数据链接池
 * @author doge
 * @date 2021/10/19
 */
@Slf4j
public class DruidShadowTestFilter extends FilterAdapter {

    private DruidShadowTestVisitor visitor = new DruidShadowTestVisitor();

    @Override
    public boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException {
        try {
            List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);
            sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor));
            if (visitor.getRewriteStatus()) {
                // 改写了SQL,需要替换
                String newSql = SQLUtils.toSQLString(sqlStatements,JdbcConstants.MYSQL);
                log.debug("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql);
                return super.statement_execute(chain, statement, newSql);
            }
            return super.statement_execute(chain, statement, sql);
        } finally {
            visitor.removeRewriteStatus();
        }
    }



    @Override
    public void init(DataSourceProxy dataSourceProxy){
        if (!(dataSourceProxy instanceof DruidDataSource)) {
            log.error("ConfigLoader only support DruidDataSource");
        }
        DruidDataSource dataSource = (DruidDataSource) dataSourceProxy;
        log.info("db configuration: url="+ dataSource.getUrl());
    }



}

import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.ewtp.test.utils.ShadowTestUtil;
import org.apache.commons.lang3.StringUtils;

import java.util.Optional;

/**
 * @author doge
 * @date 2021/10/20
 */
public class DruidShadowTestVisitor extends MySqlASTVisitorAdapter {

    private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>();

    @Override
    public boolean visit(SQLExprTableSource sqlExprTableSource) {
        // 别名,如果有别名,别名保持不变
        String alias = StringUtils.isEmpty(sqlExprTableSource.getAlias()) ? sqlExprTableSource.getExpr().toString() : sqlExprTableSource.getAlias();
        // 修改表名,不包含点才加 select c.id,d.name from c left join d on c.id = d.id 中的c 和 d
        if(!sqlExprTableSource.getExpr().toString().contains(".")) {
            sqlExprTableSource.setExpr(ShadowTestUtil.PREFIX  + sqlExprTableSource.getExpr());
        }
        sqlExprTableSource.setAlias(alias);
        REWRITE_STATUS_CACHE.set(true);
        return true;
    }




    /**
     * 返回重写状态
     * @return 重写状态,{@code true}表示已重写,{@code false}表示未重写
     */
    public boolean getRewriteStatus() {
        // get reset rewrite status
        return Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE);
    }


    /**
     * 重置重写状态
     */
    public void removeRewriteStatus() {
        REWRITE_STATUS_CACHE.remove();
    }
}

分布式缓存中间件(Redis)

可以参考SofaTrace做法
https://www.sofastack.tech/blog/sofa-channel-15-retrospect/

  1. 新增一个Redis的后置增强器(部分代码)
  2. 实现redis的连接工厂(部分代码)
  3. 实现redis的连接器(会在所有redis key前加上前缀 test

import com.alibaba.ewtp.test.factory.TracingRedisConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
```java

/**
 * @author doge
 * @date 2021/10/14
 * redis 后置增强处理
 */
public class TracingRedisBeanPostProcessor implements BeanPostProcessor {


    public TracingRedisBeanPostProcessor(){}

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof RedisConnectionFactory) {
            bean = new TracingRedisConnectionFactory((RedisConnectionFactory) bean);
        }
        return bean;
    }
}



import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;

/**
 * @author doge
 * @date 2021/10/14
 */
public class TracingRedisConnectionFactory implements RedisConnectionFactory {

    private final RedisConnectionFactory delegate;


    public TracingRedisConnectionFactory(RedisConnectionFactory delegate) {
        this.delegate = delegate;
    }

    @Override
    public RedisConnection getConnection() {
        // support cluster connection
        RedisConnection connection = this.delegate.getConnection();
        return new TracingRedisConnection(connection);
    }

    @Override
    public RedisClusterConnection getClusterConnection() {
        return delegate.getClusterConnection();
    }

    @Override
    public boolean getConvertPipelineAndTxResults() {
        return delegate.getConvertPipelineAndTxResults();
    }

    @Override
    public RedisSentinelConnection getSentinelConnection() {
        return delegate.getSentinelConnection();
    }

    @Override
    public DataAccessException translateExceptionIfPossible(RuntimeException e) {
        return delegate.translateExceptionIfPossible(e);
    }
}


import org.springframework.dao.DataAccessException;
import org.springframework.data.geo.Distance;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @author doge
 * @date 2021/10/14
 */
public class TracingRedisConnection implements RedisConnection {
    private final RedisConnection connection;
    public TracingRedisConnection(RedisConnection connection) {
        this.connection = connection;
    }

  
    @Override
    public Boolean expire(byte[] key, long seconds) {
        handleByte(key);
        return connection.expire(key, seconds);
    }

  
	@Override
    public Boolean set(byte[] key, byte[] value) {
        handleByte(key);
        return connection.set(key, value);
    }
 	@Override
    public Boolean mSet(Map<byte[], byte[]> tuple) {
        handleByteMap(tuple);
        return connection.mSet(tuple);
    }
	public void handleByte(byte[] key){
        if (ShadowTestUtil.isShadowTesLink()){
            key = (ShadowTestUtil.prefix + new String(key)).getBytes();
        }
    }


    public void handleBytes(byte[]... keys){
        if (ShadowTestUtil.isShadowTesLink()){
            for (byte[] bytes : keys){
                handleByte(bytes);
            }
        }
    }

    public void handleByteMap(Map<byte[], byte[]> tuple){
        if (ShadowTestUtil.isShadowTesLink()){
            for (Map.Entry<byte[], byte[]> entry : tuple.entrySet()){
                handleByte(entry.getKey());
            }
        }

    }


}

分库分表中间件

开源框架的解决方案:
https://shardingsphere.apache.org/document/current/cn/features/shadow

方案&思路: 当获取压测标后,若开启影子链路,将打开Sharding影子库的开关,串通起整个分库分表链路。当然也可以直接用数据库连接池来解决。


网站公告

今日签到

点亮在社区的每一天
去签到