如何知道kafka是否正常生产消费?看看客户端指标采集

发布于:2022-12-06 ⋅ 阅读:(374) ⋅ 点赞:(0)

1. Metrics收集原理

Metrics类用来管理kafka运行产生的各种埋点数据,内部管理两个关键类:MetricSensor.

1.1 Metric

程序运行时会产生各种数据,Metric封装了获取这些数据的细节,提供给外界使用(Facade Pattern)。每一个Metric代表一种类型的数据,一系列Metic组成所有维度的统计数据,通过Measurable方法获取具体的数据。如下图所示:

图1 Metric收集原理

  1. 程序运行的过程中不断产生各种数据,这些数据可以是当前的某个状态,比如访问次数总和,也可以是历史记录,比如每分钟的统计次数。这些指标数据可以是内存中的一个变量,也可以保存到数据库中。
  2. 需要统计的指标为一个Metric,用来供外界查看。
  3. 我们需要建立一个从Metric到数据的映射关系,这就通过Measurable来建立。

比如:

// 数据
int count = 0;

// metrics “api”
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
          //metrics到数据的映射方式
      new Measurable() {
               @Override
           public double measure(MetricConfig config, long now) {
                 return count;
           }
      });
复制代码
Map<Long, Integer> counts1 = new HashMap<Long, Integer>();
Map<Long, Integer> counts2 = new HashMap<Long, Integer>();
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
    new Measurable() {
                @Override
         public double measure(MetricConfig config, long now) {
               return count1.get(now) + count2.get(now);
         }
    });
复制代码

1.2 Sensor

A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set of metrics about request sizes such as the average or max.

我们可以通过 生成数据 + Metrics+Measurable 来统计任何维度的数据。Sensor其实就是一种特殊实现,帮我们实现了这样一种常见场景:采集数据,然后将之前采集的所有数据映射成各种聚合的结果,再通过Metrics来提供给外界。如下图所示:

图2 Sensor采集原理

  1. 首先在Sensor中注册Metric和对应的Stat。
  2. 在程序运行的过程中,通过 Sensor.record 采集数据。
  3. Sensor会将采集的数据分发给所有的Stat,同时检查其对应的所有的Metric是否超过配置中的限额。
  4. Stat对数据进行集成操作(如count、avg、max)
  5. Metric通过调用Stat的measure方法获取数据

其中的关键就是:SampledStat,可以看到继承了Measurable。即既保存数据,又提供数据到Metic的映射关系。

1.3 总体结构

通过上面的内容,已经大致了解采集的具体结构。再来看一下Metrics的总体结构:

总体结构

2. 外界获取Metrics数据

可以通过多种方式将数据暴露给外部使用。如JMX、yamml等。

2.1 JMX

Basic Introduction to JMX

JmxReporter实现了MetricsReporter方法,将Metrics提供给JMX。MetricsReporter定义了一些钩子函数,会在注册Repoter、更新Metric、移除Metric时触发。

具体的原理也很简单,就是将Metric的值保存在MBean中,通过JMX Agent暴露出去:

public class JmxReporter implements MetricsReporter {
  private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
  
  //创建时将metric封装进KafkaMbean,然后注册KafkaMbean到JMX Agent(更新、移除时同理)
  @Override
  public void init(List<KafkaMetric> metrics) {
        synchronized (LOCK) {
            for (KafkaMetric metric : metrics)
                addAttribute(metric);
            for (KafkaMbean mbean : mbeans.values())
                reregister(mbean);
        }
    }
  
  // 注册KafkaMbean到JMX Agent
   private void reregister(KafkaMbean mbean) {
        unregister(mbean);
        try {
            ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
        } catch (JMException e) {
            throw new KafkaException("Error registering mbean " + mbean.name(), e);
        }
    }
  
}