接kafka4.0部署  https://blog.51cto.com/mapengfei/14081048

本文适用于:

  • 需要在生产环境或测试环境下实现 Kafka 全链路 SSL 加密通信的团队和个人
  • 希望通过证书实现不同客户端精细化权限控制(ACL),可以精确到每个客户端能操作哪些topic已经具体操作,提升安全等级
  • 需要一键自动化、批量证书签发、权限分离、运维规范的 Kafka 用户

文章内容涵盖:

  • Kafka 服务端与客户端证书链全流程(含 SAN、CA、自签、PKCS12、Truststore)
  • Kafka 端到端 SSL 配置、server.properties 推荐模板
  • 每个客户端独立证书、基于证书 CN 的 ACL 权限精细化控制
  • 一键自动化与可视化流程图

适合 Kafka 安全运维、DevOps、数据平台、微服务架构等场景,助你快速落地企业级安全实践。

文章共分为2部分,kafka的server配置和客户端认证ACL控制部分

一、Kafka Server的配置部分

1. 生成服务端证书相关目录和 openssl 配置

mkdir -p /etc/kafka/server

cat > /etc/kafka/server/san.cnf <<EOF

[ req ]

default_bits = 2048

distinguished_name = req_distinguished_name

req_extensions = v3_req

prompt = no

[ req_distinguished_name ]

C = CN

ST = BJ

L = BJ

O = test

OU = test

CN = 10.0.70.189

[ v3_req ]

subjectAltName = @alt_names

[ alt_names ]

IP.1 = 10.0.70.189

DNS.1 = 10.0.70.189

EOF

2. 生成服务端私钥和 CSR

cd /etc/kafka/server

openssl req -new -nodes -out server.csr -keyout server.key -config san.cnf

3. 生成 CA

openssl genrsa -out ca.key 2048

openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/C=CN/ST=BJ/L=BJ/O=test/OU=test/CN=Kafka-CA"

4. 用 CA 签发服务端证书

openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 3650 -extensions v3_req -extfile san.cnf

5. 生成 PKCS12 证书

openssl pkcs12 -export -in server.crt -inkey server.key -out kafka.server.p12 -name kafka-server -CAfile ca.crt -caname root -password pass:mafei2025

6. 生成 Truststore

rm -f kafka.server.truststore.p12

keytool -importcert -trustcacerts -alias CARoot -file ca.crt -keystore kafka.server.truststore.p12 -storetype PKCS12 -storepass mafei2025 -noprompt

7. 生成 server-ssl.properties

cat > /etc/kafka/server/server-ssl.properties <<EOF

security.protocol=SSL

ssl.truststore.location=/etc/kafka/server/kafka.server.truststore.p12

ssl.truststore.password=mafei2025

ssl.keystore.location=/etc/kafka/server/kafka.server.p12

ssl.keystore.password=mafei2025

ssl.key.password=mafei2025

EOF

8.生成/覆盖 Kafka server.properties 主要SSL配置

cat > /opt/kafka/config/server.properties <<EOF

process.roles=broker,controller

node.id=1

controller.quorum.bootstrap.servers=10.0.70.189:9093

listeners=CONTROLLER://10.0.70.189:9092,BROKER://10.0.70.189:9093

inter.broker.listener.name=BROKER

advertised.listeners=CONTROLLER://10.0.70.189:9092,BROKER://10.0.70.189:9093

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL

ssl.keystore.type=PKCS12

ssl.keystore.location=/etc/kafka/server/kafka.server.p12

ssl.keystore.password=mafei2025

ssl.key.password=mafei2025

ssl.truststore.type=PKCS12

ssl.truststore.location=/etc/kafka/server/kafka.server.truststore.p12

ssl.truststore.password=mafei2025

ssl.client.auth=required

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

super.users=User:root;User:CN=10.0.70.189,OU=test,O=test,L=BJ,ST=BJ,C=CN

log.dirs=/data/kafka

num.partitions=1

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

EOF

echo "已自动生成 /opt/kafka/config/server.properties,包含SSL和KRaft主要配置。"

9.启动kafka Server

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

二、客户端A配置

1.生成 clientA 证书

mkdir -p /etc/kafka/clientA

cd /etc/kafka/clientA

openssl genrsa -out clientA.key 2048

openssl req -new -key clientA.key -out clientA.csr -subj "/CN=clientA"

openssl x509 -req -in clientA.csr -CA /etc/kafka/server/ca.crt -CAkey /etc/kafka/server/ca.key -CAcreateserial -out clientA.crt -days 3650 -sha256

openssl pkcs12 -export -in clientA.crt -inkey clientA.key -certfile /etc/kafka/server/server.crt -name clientA -out clientA.p12 -password pass:mafei2025

2. 创建 topic

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.70.189:9093 --create --topic t_mafei --partitions 1 --replication-factor 1 --command-config /etc/kafka/server/server-ssl.properties

3. 添加 ACL,限制clientA只能操作t_mafei 这个topic

/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --add --allow-principal "User:CN=clientA" --operation All --topic t_mafei --command-config /etc/kafka/server/server-ssl.properties

4.测试生产消息( /root/kafka_test_amd64 工具就是kafka producer客户端,文章最后有贴)

/root/kafka_test_amd64 --ca=/etc/kafka/server/ca.crt --cert=/etc/kafka/clientA/clientA.crt --key=/etc/kafka/clientA/clientA.key --topic t_mafei --broker 10.0.70.189:9093

Kafka SSL 认证与 ACL 权限最佳实践(实战流程)_kafka

5.换一个topic应该就能看到报错信息了, 发送消息失败: kafka server: The client is not authorized to access this topic

/root/kafka_test_amd64 --ca=/etc/kafka/server/ca.crt --cert=/etc/kafka/clientA/clientA.crt --key=/etc/kafka/clientA/clientA.key --topic test1 --broker 10.0.70.189:9093

Kafka SSL 认证与 ACL 权限最佳实践(实战流程)_SSL_02

6.其他命令

1).删除其他多余的acl

/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --remove --allow-principal "User:CN=clientA" --operation Create --topic test2 --command-config /etc/kafka/server/server-ssl.properties

2).查看当前所有acl

/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093 --list --command-config /etc/kafka/server/server-ssl.properties

Kafka SSL 认证与 ACL 权限最佳实践(实战流程)_客户端_03

3).为每个客户端生成 PKCS12 keystore(如需)

cd /etc/kafka/clientA

openssl pkcs12 -export -in clientA.crt -inkey clientA.key -certfile /etc/kafka/server/server.crt -name clientA -out clientA.p12 -password pass:mafei2025

每个客户端用自己的证书和私钥(或 p12 文件)连接 Kafka,配置如下:

● clientA 用 clientA.crt + clientA.key(或 clientA.p12)

● clientB 用 clientB.crt + clientB.key(或 clientB.p12)

● 都用同一个 ca.crt 作为 truststore

4)ACL授权以t_mafei开头的所有topic

/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.70.189:9093  --add --allow-principal "User:CN=clientA" --operation Create --topic t_mafei --resource-pattern-type PREFIXED --command-config /etc/kafka/server-ssl.properties

这样,User:CN=clientA 将拥有所有以 t_mafei 开头的 topic 的 Create 权限(如 t_mafei1、t_mafei_test 等)。

说明:

  • --resource-pattern-type PREFIXED 是 Kafka 2.0 及以上版本支持的功能。
  • 你也可以用 --operation All 授权所有操作。

三、总结

● 每个客户端独立生成私钥和证书请求,由同一个 CA 签发。

● 服务端只需信任 CA。

● 客户端用自己的证书和私钥连接。

● 可通过证书 CN 字段实现精细化权限控制。

附 golang写的客户端测试工具

package main

import (
	"crypto/tls"
	"crypto/x509"
	"flag"
	"log"
	"os"

	"github.com/IBM/sarama"
)

func main() {
	caPath := flag.String("ca", "ca.crt", "服务器CA证书路径")
	certPath := flag.String("cert", "client.crt", "客户端证书路径")
	keyPath := flag.String("key", "client.key", "客户端私钥路径")
	topic := flag.String("topic", "test_mafei", "Kafka topic名称")             // 新增topic参数
	brokerAddr := flag.String("broker", "10.0.70.189:9093", "Kafka服务器地址") // 新增broker参数
	flag.Parse()

	// 检查文件是否存在,否则报错
	if _, err := os.Stat(*caPath); os.IsNotExist(err) {
		log.Fatalf("CA证书文件不存在: %s", *caPath)
	}
	if _, err := os.Stat(*certPath); os.IsNotExist(err) {
		log.Fatalf("客户端证书文件不存在: %s", *certPath)
	}
	if _, err := os.Stat(*keyPath); os.IsNotExist(err) {
		log.Fatalf("客户端私钥文件不存在: %s", *keyPath)
	}

	//brokers := []string{"10.0.70.187:9093"}
	brokers := []string{*brokerAddr}
	value := `{"sip":"1.1.1.1"}`

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Net.TLS.Enable = true

	// 加载CA证书
	caCert, err := os.ReadFile(*caPath)
	if err != nil {
		log.Fatalf("读取CA证书失败: %v", err)
	}
	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)

	// 加载客户端证书和私钥
	cert, err := tls.LoadX509KeyPair(*certPath, *keyPath)
	if err != nil {
		log.Fatalf("加载客户端证书/私钥失败: %v", err)
	}

	tlsConfig := &tls.Config{
		Certificates: []tls.Certificate{cert},
		RootCAs:      caCertPool,
		MinVersion:   tls.VersionTLS12,
	}
	config.Net.TLS.Config = tlsConfig

	producer, err := sarama.NewSyncProducer(brokers, config)
	if err != nil {
		log.Fatalf("创建生产者失败: %v", err)
	}
	defer producer.Close()

	msg := &sarama.ProducerMessage{
		Topic: *topic, // 使用动态topic
		Value: sarama.StringEncoder(value),
	}

	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Fatalf("发送消息失败: %v", err)
	}
	log.Printf("消息发送成功,分区: %d, 偏移: %d", partition, offset)
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.