1. Metrics收集原理
Metrics类用来管理kafka运行产生的各种埋点数据,内部管理两个关键类:Metric和Sensor.
1.1 Metric
程序运行时会产生各种数据,Metric封装了获取这些数据的细节,提供给外界使用(Facade Pattern)。每一个Metric代表一种类型的数据,一系列Metic组成所有维度的统计数据,通过Measurable方法获取具体的数据。如下图所示:
图1 Metric收集原理
- 程序运行的过程中不断产生各种数据,这些数据可以是当前的某个状态,比如访问次数总和,也可以是历史记录,比如每分钟的统计次数。这些指标数据可以是内存中的一个变量,也可以保存到数据库中。
- 需要统计的指标为一个Metric,用来供外界查看。
- 我们需要建立一个从Metric到数据的映射关系,这就通过Measurable来建立。
比如:
// 数据
int count = 0;
// metrics “api”
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
//metrics到数据的映射方式
new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return count;
}
});
复制代码
Map<Long, Integer> counts1 = new HashMap<Long, Integer>();
Map<Long, Integer> counts2 = new HashMap<Long, Integer>();
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return count1.get(now) + count2.get(now);
}
});
复制代码
1.2 Sensor
A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set of metrics about request sizes such as the average or max.
我们可以通过 生成数据 + Metrics+Measurable 来统计任何维度的数据。Sensor其实就是一种特殊实现,帮我们实现了这样一种常见场景:采集数据,然后将之前采集的所有数据映射成各种聚合的结果,再通过Metrics来提供给外界。如下图所示:
图2 Sensor采集原理
- 首先在Sensor中注册Metric和对应的Stat。
- 在程序运行的过程中,通过 Sensor.record 采集数据。
- Sensor会将采集的数据分发给所有的Stat,同时检查其对应的所有的Metric是否超过配置中的限额。
- Stat对数据进行集成操作(如count、avg、max)
- Metric通过调用Stat的measure方法获取数据
其中的关键就是:SampledStat,可以看到继承了Measurable。即既保存数据,又提供数据到Metic的映射关系。
1.3 总体结构
通过上面的内容,已经大致了解采集的具体结构。再来看一下Metrics的总体结构:
总体结构
2. 外界获取Metrics数据
可以通过多种方式将数据暴露给外部使用。如JMX、yamml等。
2.1 JMX
JmxReporter实现了MetricsReporter方法,将Metrics提供给JMX。MetricsReporter定义了一些钩子函数,会在注册Repoter、更新Metric、移除Metric时触发。
具体的原理也很简单,就是将Metric的值保存在MBean中,通过JMX Agent暴露出去:
public class JmxReporter implements MetricsReporter {
private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
//创建时将metric封装进KafkaMbean,然后注册KafkaMbean到JMX Agent(更新、移除时同理)
@Override
public void init(List<KafkaMetric> metrics) {
synchronized (LOCK) {
for (KafkaMetric metric : metrics)
addAttribute(metric);
for (KafkaMbean mbean : mbeans.values())
reregister(mbean);
}
}
// 注册KafkaMbean到JMX Agent
private void reregister(KafkaMbean mbean) {
unregister(mbean);
try {
ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
} catch (JMException e) {
throw new KafkaException("Error registering mbean " + mbean.name(), e);
}
}
}