安装 OpenJDK 11
- CentOS/RHEL
yum install -y java-11-openjdk-devel
- Ubuntu/Debian
apt install -y openjdk-11-jdk
下载安装包
wget https://mirrors.aliyun.com/apache/kafka/3.9.1/kafka_2.12-3.9.1.tgz
tar -zxvf kafka_2.12-3.9.1.tgz -C /usr/local
创建认证文件
vim /usr/local/kafka_2.12-3.9.1/config/kraft/kafka_server_jaas.conf
写入内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
ps: username 和 password 表示节点建立集群时,需要验证的身份信息,只有验证通过的节点,方能成功建立集群; user_admin="admin-secret"表示用户名 admin,对应的密码为 admin-secret。user_alice 同理。
修改启动文件
vim /usr/local/kafka_2.12-3.9.1/bin/kafka-server-start.sh
写入内容如下:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.12-3.9.1/config/kraft/kafka_server_jaas.conf"
fi
修改配置
vim /usr/local/kafka_2.12-3.9.1/config/kraft/server.properties
添加或修改内容如下:
# 表示开启PLAIN认证机制
sasl.enabled.mechanisms=PLAIN
# 表示Broker间通信也启用PLAIN机制
sasl.mechanism.inter.broker.protocol=PLAIN
# 禁止对所有用户topic可见
allow.everyone.if.no.acl.found=false
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
# 表示Broker间通信使用SASL
inter.broker.listener.name=SASL_PLAINTEXT
### x.x.x.x 改成公网ip
advertised.listeners=SASL_PLAINTEXT://x.x.x.x:9092,CONTROLLER://localhost:9093
生成集群唯一 ID
/usr/local/kafka_2.12-3.9.1/bin/kafka-storage.sh random-uuid
格式化存储目录(ps:集群模式下,每个集群都要执行,ID 需要保一致)
/usr/local/kafka_2.12-3.9.1/bin/kafka-storage.sh format -t IAtPev4fQu6b_OkrXGpciw -c /usr/local/kafka_2.12-3.9.1/config/kraft/server.properties
启动 kafka 集群
/usr/local/kafka_2.12-3.9.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.9.1/config/kraft/server.properties
查看 kafka 服务日志
cat /usr/local/kafka_2.12-3.9.1/logs/server.log
停止 kafka 集群
/usr/local/kafka_2.12-3.9.1/bin/kafka-server-stop.sh
创建客户端认证文件
vim /usr/local/kafka_2.12-3.9.1/config/kraft/kafka_client_jaas.conf
写入内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-secret";
};
修改客户端生产者配置 producer.properties,添加认证机制
vim /usr/local/kafka_2.12-3.9.1/config/producer.properties
写入内容如下:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
修改客户端消费者配置 consumer.properties,添加认证机制
vim /usr/local/kafka_2.12-3.9.1/config/consumer.properties
写入内容如下:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
修改客户端生产者启动脚本 kafka-console-producer.sh,配置认证文件 kafka_client_jaas.conf
vim /usr/local/kafka_2.12-3.9.1/bin/kafka-console-producer.sh
修改内容如下:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-3.9.1/config/kraft/kafka_client_jaas.conf"
fi
修改客户端消费者启动脚本 kafka-console-consumer.sh,配置认证文件 kafka_client_jaas.conf
vim /usr/local/kafka_2.12-3.9.1/bin/kafka-console-consumer.sh
修改内容如下:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-3.9.1/config/kraft/kafka_client_jaas.conf"
fi
创建客户端认证文件
vim /usr/local/kafka_2.12-3.9.1/config/kraft/kafka_client_jaas.properties
写入内容如下:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";
启动客户端生产者
/usr/local/kafka_2.12-3.9.1/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --producer.config /usr/local/kafka_2.12-3.9.1/config/producer.properties --topic test
启动客户端消费者
/usr/local/kafka_2.12-3.9.1/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer.config /usr/local/kafka_2.12-3.9.1/config/consumer.properties --topic test --from-beginning
创建 topic
/usr/local/kafka_2.12-3.9.1/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --command-config /usr/local/kafka_2.12-3.9.1/config/kraft/kafka_client_jaas.properties --create --topic test-topic
查看 topic 列表
/usr/local/kafka_2.12-3.9.1/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --command-config /usr/local/kafka_2.12-3.9.1/config/kraft/kafka_client_jaas.properties --list