分布式压测中间件的原理及其实现
原理
通过大量阅读中间件源码,开源社区调研,得到设计原理:
(1)发起压测链路http请求
(2)通过分布式追踪框架获取URL上影子标识,将其放入上下文Context中
(3)提供者应用发起PRC/MQ调用时,中间件会将测试标放入中间件的Context上下文中传递。
(4)消费者处理RPC/MQ消息,获取中间件Context上下文。
(5)经过分库分表/缓存数据库中间件,获取当前Context里的影子标识。
打成Maven包,在项目中直接引入
- 可插拔,业务代码不感知。
- 支持复杂SQL处理,支持全链路测试,且支持全链路追踪。
- 极大提高压测工作效率。
全链路追踪框架(Trace)
从HTTP请求链接上识别到特定的key,如:
URL添加压测标识,test = true,将压测标识添加到追踪链路框架中的Context上下文中。
MQ中间件
例如RocketMQ: com.alibaba.rocketmq.client.hook.SendMessageHook
实现接口SendMessageHook进行日志追踪链路埋点, 分布式链路组件SOFA
Trace也是基于这个接口去埋点,这是mq官方留给实现者的AOP。
public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {
public MetaQSendMessageHookImpl() {
}
public String hookName() {
return "EagleEyeSendMessageHook";
}
public void sendMessageBefore(SendMessageContext context) {
if (context != null && context.getMessage() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {
MetaQTraceContext mqTraceContext = new MetaQTraceContext();
context.setMqTraceContext(mqTraceContext);
mqTraceContext.setMetaQType(MetaQType.METAQ);
mqTraceContext.setGroup(context.getProducerGroup());
mqTraceContext.setAsync(CommunicationMode.ASYNC.equals(context.getCommunicationMode()));
Message msg = context.getMessage();
if (msg != null) {
MetaQTraceBean traceBean = new MetaQTraceBean();
traceBean.setTopic(msg.getTopic());
traceBean.setOriginMsgId(MessageAccessor.getOriginMessageId(msg));
traceBean.setTags(msg.getTags());
traceBean.setKeys(msg.getKeys());
traceBean.setBuyerId(msg.getBuyerId());
traceBean.setTransferFlag(MessageAccessor.getTransferFlag(msg));
traceBean.setCorrectionFlag(MessageAccessor.getCorrectionFlag(msg));
traceBean.setBodyLength(msg.getBody().length);
traceBean.setBornHost(context.getBornHost());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBrokerName(context.getMq().getBrokerName());
traceBean.setProps(context.getProps());
traceBean.setMsgType(context.getMsgType());
List<MetaQTraceBean> beans = new ArrayList();
beans.add(traceBean);
mqTraceContext.setTraceBeans(beans);
if (StringUtils.isNotBlank(msg.getUserProperty("eagleTraceId"))) {
traceBean.setTraceId(msg.getUserProperty("eagleTraceId"));
traceBean.setRpcId(msg.getUserProperty("eagleRpcId"));
traceBean.setEagleEyeUserData(msg.getUserProperty("eagleData"));
}
MetaQSendMessageTraceLog.sendMessageBefore(mqTraceContext);
if (StringUtils.isBlank(msg.getProperty("eagleTraceId")) && StringUtils.isNotBlank(traceBean.getTraceId())) {
msg.putUserProperty("eagleTraceId", traceBean.getTraceId());
msg.putUserProperty("eagleRpcId", traceBean.getRpcId());
msg.putUserProperty("eagleData", traceBean.getEagleEyeUserData());
}
}
}
}
public void sendMessageAfter(SendMessageContext context) {
if (context != null && context.getMessage() != null && context.getSendResult() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {
MetaQTraceContext mqTraceContext = (MetaQTraceContext)context.getMqTraceContext();
mqTraceContext.setRegionId(context.getSendResult().getRegionId());
MetaQTraceBean traceBean = (MetaQTraceBean)mqTraceContext.getTraceBeans().get(0);
if (traceBean != null && context.getSendResult() != null) {
traceBean.setQueueId(context.getMq().getQueueId());
traceBean.setMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setOriginMsgId(context.getSendResult().getMsgId());
traceBean.setOffset(context.getSendResult().getQueueOffset());
mqTraceContext.setSuccess(true);
mqTraceContext.setStatus(context.getSendResult().getSendStatus().toString());
} else if (context.getException() != null) {
String msg = context.getException().getMessage();
mqTraceContext.setErrorMsg(StringUtils.substring(msg, 0, msg.indexOf("\n")));
}
MetaQSendMessageTraceLog.sendMessageAfter(mqTraceContext);
}
}
}
数据库
参考数据库Druid链接池方案:
https://github.com/alibaba/druid/wiki/SQL-Parser
import com.alibaba.druid.filter.FilterAdapter;
import com.alibaba.druid.filter.FilterChain;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.proxy.jdbc.DataSourceProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.List;
/**
* 拦截druid数据链接池
* @author doge
* @date 2021/10/19
*/
@Slf4j
public class DruidShadowTestFilter extends FilterAdapter {
private DruidShadowTestVisitor visitor = new DruidShadowTestVisitor();
@Override
public boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException {
try {
List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);
sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor));
if (visitor.getRewriteStatus()) {
// 改写了SQL,需要替换
String newSql = SQLUtils.toSQLString(sqlStatements,JdbcConstants.MYSQL);
log.debug("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql);
return super.statement_execute(chain, statement, newSql);
}
return super.statement_execute(chain, statement, sql);
} finally {
visitor.removeRewriteStatus();
}
}
@Override
public void init(DataSourceProxy dataSourceProxy){
if (!(dataSourceProxy instanceof DruidDataSource)) {
log.error("ConfigLoader only support DruidDataSource");
}
DruidDataSource dataSource = (DruidDataSource) dataSourceProxy;
log.info("db configuration: url="+ dataSource.getUrl());
}
}
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.ewtp.test.utils.ShadowTestUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
/**
* @author doge
* @date 2021/10/20
*/
public class DruidShadowTestVisitor extends MySqlASTVisitorAdapter {
private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>();
@Override
public boolean visit(SQLExprTableSource sqlExprTableSource) {
// 别名,如果有别名,别名保持不变
String alias = StringUtils.isEmpty(sqlExprTableSource.getAlias()) ? sqlExprTableSource.getExpr().toString() : sqlExprTableSource.getAlias();
// 修改表名,不包含点才加 select c.id,d.name from c left join d on c.id = d.id 中的c 和 d
if(!sqlExprTableSource.getExpr().toString().contains(".")) {
sqlExprTableSource.setExpr(ShadowTestUtil.PREFIX + sqlExprTableSource.getExpr());
}
sqlExprTableSource.setAlias(alias);
REWRITE_STATUS_CACHE.set(true);
return true;
}
/**
* 返回重写状态
* @return 重写状态,{@code true}表示已重写,{@code false}表示未重写
*/
public boolean getRewriteStatus() {
// get reset rewrite status
return Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE);
}
/**
* 重置重写状态
*/
public void removeRewriteStatus() {
REWRITE_STATUS_CACHE.remove();
}
}
分布式缓存中间件(Redis)
可以参考SofaTrace做法
https://www.sofastack.tech/blog/sofa-channel-15-retrospect/
- 新增一个Redis的后置增强器(部分代码)
- 实现redis的连接工厂(部分代码)
- 实现redis的连接器(会在所有redis key前加上前缀 test )
import com.alibaba.ewtp.test.factory.TracingRedisConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
```java
/**
* @author doge
* @date 2021/10/14
* redis 后置增强处理
*/
public class TracingRedisBeanPostProcessor implements BeanPostProcessor {
public TracingRedisBeanPostProcessor(){}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RedisConnectionFactory) {
bean = new TracingRedisConnectionFactory((RedisConnectionFactory) bean);
}
return bean;
}
}
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;
/**
* @author doge
* @date 2021/10/14
*/
public class TracingRedisConnectionFactory implements RedisConnectionFactory {
private final RedisConnectionFactory delegate;
public TracingRedisConnectionFactory(RedisConnectionFactory delegate) {
this.delegate = delegate;
}
@Override
public RedisConnection getConnection() {
// support cluster connection
RedisConnection connection = this.delegate.getConnection();
return new TracingRedisConnection(connection);
}
@Override
public RedisClusterConnection getClusterConnection() {
return delegate.getClusterConnection();
}
@Override
public boolean getConvertPipelineAndTxResults() {
return delegate.getConvertPipelineAndTxResults();
}
@Override
public RedisSentinelConnection getSentinelConnection() {
return delegate.getSentinelConnection();
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException e) {
return delegate.translateExceptionIfPossible(e);
}
}
import org.springframework.dao.DataAccessException;
import org.springframework.data.geo.Distance;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author doge
* @date 2021/10/14
*/
public class TracingRedisConnection implements RedisConnection {
private final RedisConnection connection;
public TracingRedisConnection(RedisConnection connection) {
this.connection = connection;
}
@Override
public Boolean expire(byte[] key, long seconds) {
handleByte(key);
return connection.expire(key, seconds);
}
@Override
public Boolean set(byte[] key, byte[] value) {
handleByte(key);
return connection.set(key, value);
}
@Override
public Boolean mSet(Map<byte[], byte[]> tuple) {
handleByteMap(tuple);
return connection.mSet(tuple);
}
public void handleByte(byte[] key){
if (ShadowTestUtil.isShadowTesLink()){
key = (ShadowTestUtil.prefix + new String(key)).getBytes();
}
}
public void handleBytes(byte[]... keys){
if (ShadowTestUtil.isShadowTesLink()){
for (byte[] bytes : keys){
handleByte(bytes);
}
}
}
public void handleByteMap(Map<byte[], byte[]> tuple){
if (ShadowTestUtil.isShadowTesLink()){
for (Map.Entry<byte[], byte[]> entry : tuple.entrySet()){
handleByte(entry.getKey());
}
}
}
}
分库分表中间件
开源框架的解决方案:
https://shardingsphere.apache.org/document/current/cn/features/shadow
方案&思路: 当获取压测标后,若开启影子链路,将打开Sharding影子库的开关,串通起整个分库分表链路。当然也可以直接用数据库连接池来解决。