Debezium日常分享系列之:在 Kubernetes 上部署 Debezium

发布于:2025-07-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

Debezium 可以无缝部署在 Kubernetes(一个用于容器编排的开源平台)上。此部署利用了 Strimzi 项目,该项目通过自定义资源简化了 Kubernetes 上 Kafka Connect 和连接器的部署。

先决条件

  • 一个正在运行的 Kubernetes 集群(本演示将使用 minikube)
  • kubectl
  • helm

步骤

使用 registry 启动 minikube 集群

minikube start --insecure-registry "10.0.0.0/24"

启用注册表插件

minikube addons enable registry

部署 OLM(操作员生命周期管理器)

git clone https://github.com/operator-framework/operator-lifecycle-manager.git

cd operator-lifecycle-manager/deploy/chart

helm install olm .

检查operator-lifecycle-manager命名空间中的所有pod是否处于运行状态。
在这里插入图片描述
应用以下清单来部署 strimzi kafka 操作员

cat > strimzi-kafka-operator.yaml <<EOF
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: strimzi-kafka-operator
  namespace: operators
spec:
  channel: stable
  name: strimzi-kafka-operator
  source: operatorhubio-catalog
  sourceNamespace: operator-lifecycle-manager # this should be same as the namespace in which olm is deployed
EOF
  • kind: 指定资源类型,这里是 SubscriptionSubscription 资源用于订阅一个 Operator,以便在集群中自动更新和管理该 Operator。
  • spec:
    • channel: 订阅的频道,这里是 stable。频道通常用于指定不同版本的 Operator。
    • name: 要订阅的 Operator 的名称,这里是 strimzi-kafka-operator
    • source: Operator Catalog 的名称,这里是 operatorhubio-catalog。Catalog 是一个包含多个 Operator 的集合。
    • sourceNamespace: Catalog 所在的命名空间,这里是 operator-lifecycle-manager。这个命名空间通常是 Operator Lifecycle Manager (OLM) 安装的命名空间。
kubectl apply -f strimzi-kafka-operator.yaml

检查 Operator Pod 在 Operator 命名空间中是否处于运行状态。启动并运行 Pod 可能需要几分钟时间。

部署数据源 (MySQL)

helm repo add bitnami https://charts.bitnami.com/bitnami

helm repo update # required if above repo is already added

kubectl create ns db

cat > mysql-values.yaml <<EOF
auth:
  rootPassword: "root"
  database: "debezium_db"
  username: "mysql_usr"
  password: "mysql_pwd"
EOF

helm install -n db mysql bitnami/mysql --version 12.2.2 -f mysql-values.yaml
  • helm repo add bitnami https://charts.bitnami.com/bitnami
    命令解释:这个命令将Bitnami Helm仓库添加到你的Helm客户端中。Bitnami是一个提供高质量、预构建的Kubernetes应用包的公司

注意:此图表中已启用 bin 日志,这是 Debezium 所必需的。

检查 mysql pod 是否在 db 命名空间中启动并运行。

登录 MySQL db

MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace db mysql -o jsonpath="{.data.mysql-root-password}" | base64 -d)

kubectl run mysql-client --rm --tty -i --restart='Never' --image  docker.io/bitnami/mysql:8.4.4-debian-12-r0 --namespace db --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD --command -- bash

mysql -h mysql.db.svc.cluster.local -uroot -p"$MYSQL_ROOT_PASSWORD"

# giving permissions to mysql_usr to allow reading the bin_logs properly
GRANT RELOAD, FLUSH_TABLES ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;

GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;
  • kubectl get secret --namespace db mysql:从Kubernetes的db命名空间中获取名为mysql的Secret对象。
  • -o jsonpath="{.data.mysql-root-password}":使用JSONPath表达式从Secret对象中提取mysql-root-password字段的数据。
  • | base64 -d:将提取出的Base64编码的字符串解码为明文。
  • kubectl run mysql-client:创建一个名为mysql-client的Pod。
  • --rm:Pod运行结束后自动删除。
  • --tty -i:分配一个伪终端并保持输入交互。
  • --restart='Never':设置Pod的重启策略为不重启。
  • --image docker.io/bitnami/mysql:8.4.4-debian-12-r0:使用指定的Docker镜像。
  • --namespace db:在db命名空间中创建Pod。
  • --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD:将环境变量MYSQL_ROOT_PASSWORD设置为之前获取的root密码。
  • --command -- bash:在Pod中启动一个Bash shell。

将数据插入其中

-- Create the database if it doesn't exist
CREATE DATABASE IF NOT EXISTS debezium_db;

-- Use the database
USE debezium_db;

-- Create a sample table
CREATE TABLE employees (
    id INT AUTO_INCREMENT PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    salary DECIMAL(10,2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Insert sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('John', 'Doe', 'john.doe@example.com', 60000.00),
('Jane', 'Smith', 'jane.smith@example.com', 75000.00),
('Alice', 'Johnson', 'alice.johnson@example.com', 82000.00),
('Bob', 'Williams', 'bob.williams@example.com', 50000.00);

部署 Kafka

kubectl create ns kafka

cat > kafka-values.yaml <<EOF
listeners:
  client:
    protocol: PLAINTEXT
  controller:
    protocol: PLAINTEXT
  interbroker:
    protocol: PLAINTEXT
  external:
    protocol: PLAINTEXT
sasl:
  enabledMechanisms: PLAIN
broker:
  replicaCount: 1
controller:
  replicaCount: 1
EOF

helm install kafka -n kafka bitnami/kafka --version 31.3.1 -f kafka-values.yaml

注意:上述 Kafka 配置是为了简化演示,禁用了 Kafka Broker 和 Controller 的所有身份验证机制。不建议在生产环境中禁用身份验证运行 Kafka。

请检查 Kafka Broker 和 Controller Pod 是否在 Kafka 命名空间中启动并运行。

部署 kafdrop

安装 Kafka UI,用于查看 Kafka 主题和浏览消费者组。

helm repo add lsst-sqre https://lsst-sqre.github.io/charts/

helm repo update # required if above repo is already added

cat > kafdrop-values.yaml <<EOF
kafka:
  brokerConnect: kafka:9092
EOF


helm install -n kafka kafdrop lsst-sqre/kafdrop --version 0.1.3 -f kafdrop-values.yaml

# port forward kafdrop to localhost:9000
kubectl port-forward -n kafka svc/kafdrop 9000:9000

部署 Debezium 连接器

要部署 Debezium 连接器,您需要先部署一个包含所需连接器插件的 Kafka Connect 集群,然后再实例化实际的连接器本身。第一步,需要创建一个包含该插件的 Kafka Connect 容器镜像。

创建 Kafka Connect 集群

运行以下命令获取 minikube 注册表的 IP:kubectl -n kube-system get svc registry -o jsonpath=‘{.spec.clusterIP}’,并将其替换到下面的 kafkaconnect 清单中。

  • get svc registry: 这部分命令用来获取名为 registry 的服务的信息。svcservice 的缩写,代表服务资源。
  • -o jsonpath='{.spec.clusterIP}': 这个选项指定输出格式为 JSONPath 表达式的结果。JSONPath 是一种查询 JSON 数据的语法,类似于 XPath 用于 XML。这里的 {.spec.clusterIP} 表示从返回的服务对象中提取 spec.clusterIP 字段的值,即该服务的 ClusterIP 地址。
kubectl create ns debezium

cat > kafka-connect.yaml <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  namespace: debezium
spec:
  version: 3.8.0
  replicas: 1
  bootstrapServers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpoint
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: <TO_BE_REPLACED_BY_ABOVE_CLUSTER_IP>/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.7.Final/debezium-connector-mysql-3.0.7.Final-plugin.tar.gz
EOF

kubectl apply -f kafka-connect.yaml
  • build: 配置了构建过程:
  • output.typeoutput.image: 指定了 Docker 镜像的输出类型和名称。
  • plugins: 定义了要安装的插件,这里是 Debezium MySQL 连接器,指定了插件的下载 URL。

注意:在上述配置中,如果您希望将镜像推送到 ECR/GCR 或任何其他镜像仓库,请将镜像端点替换为相应的端点,并且集群应具有将镜像推送到该仓库的权限。

检查 Kafka 连接是否已就绪。可能需要几分钟(4-5 分钟)才能进入就绪状态。

kubectl get kafkaconnect -n debezium

它应该返回状态 Ready: True

NAME                       DESIRED REPLICAS   READY
debezium-connect-cluster   1                  True

在 Kafdrop 中,您应该能够看到 3 个主题:

connect-cluster-configs
connect-cluster-offsets
connect-cluster-status

创建 Debezium 连接器

在创建连接器之前,我们需要创建一个 k8s secret(用于存储数据库凭据)和 rbac。

cat > mysql-creds.yaml <<EOF
apiVersion: v1
kind: Secret
metadata:
  name: mysql-creds
  namespace: debezium
type: Opaque
stringData:
  username: mysql_usr
  password: mysql_pwd
EOF

kubectl apply -f mysql-creds.yaml

cat > role.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium
rules:
  - apiGroups: [""]
    resources: ["secrets"]
    resourceNames: ["mysql-creds"]
    verbs: ["get"]
EOF

kubectl apply -f role.yaml

cat > role-binding.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium
subjects:
  - kind: ServiceAccount
    name: debezium-connect-cluster-connect
    namespace: debezium
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
EOF

kubectl apply -f role-binding.yaml

通过登录 MySQL 获取 MySQL server_id(按照与上述相同的步骤)

SELECT @@server_id; # it is expected to be 1

部署 kafka 连接器

cat > kafka-connector.yaml <<'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  namespace: debezium
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql.db.svc.cluster.local # mysql db hostname
    database.port: 3306
    database.user: ${secrets:debezium/mysql-creds:username}
    database.password: ${secrets:debezium/mysql-creds:password}
    database.server.id: 1 # SELECT @@server_id;
    topic.prefix: mysql
    database.include.list: debezium_db
    schema.history.internal.kafka.bootstrap.servers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpoint
    schema.history.internal.kafka.topic: schema-changes.debezium_db
EOF

kubectl apply -f kafka-connector.yaml

检查 kafka 连接是否已就绪。可能需要几分钟才能进入就绪状态。

kubectl get kafkaconnector -n debezium

它应该返回状态 Ready: True

NAME                       CLUSTER                    CONNECTOR CLASS                              MAX TASKS   READY
debezium-connector-mysql   debezium-connect-cluster   io.debezium.connector.mysql.MySqlConnector   1           True

现在,您应该能够在 kafdrop 中看到更多主题,例如 mysql.debezium_db.employees。

此主题将包含上面创建员工表时插入的所有数据。

为了测试 Debezium 连接器,请向表中添加更多数据。

-- Insert more sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('Charlie', 'Brown', 'charlie.brown@example.com', 72000.00),
('David', 'Miller', 'david.miller@example.com', 68000.00),
('Emma', 'Wilson', 'emma.wilson@example.com', 79000.00),
('Frank', 'Anderson', 'frank.anderson@example.com', 55000.00),
('Grace', 'Thomas', 'grace.thomas@example.com', 87000.00),
('Henry', 'Taylor', 'henry.taylor@example.com', 62000.00);

它应该反映在主题mysql.debezium_db.employees中。
在这里插入图片描述
这表明 MySQL 表员工中所做的更改已反映在 Kafka 中。