基于Patroni实现PostgreSQL数据库高可用

发布于:2025-07-23 ⋅ 阅读:(18) ⋅ 点赞:(0)

一、环境准备

系统架构

主机名 IP 系统 部署应用
pg1 11.0.1.4 CentOS7 Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18
pg2 11.0.1.5 CentOS7 Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18
pg3 11.0.1.6 CentOS7 Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18
demo1 11.0.1.7 CentOS7 HAproxy2.9.6
配置本地yum源

将CentOS-7-x86_64-Everything-2207-02.iso镜像上传到服务器(我是VMware虚拟机,镜像文件是在/dev/sr0里),挂载到/media并配置yum源

mkdir /mnt/cdrom
mount /dev/sr0 /mnt/cdrom
sudo tee /etc/yum.repos.d/local.repo <<EOF
[local]
name=CentOS-7 Local DVD Repo
baseurl=file:///mnt/cdrom
enabled=1
gpgcheck=0
EOF

# 重新建立缓存
rm -rf /etc/yum.repos.d/CentOS-*
sudo yum clean all
sudo yum makecache

# 查看可用的yum源
sudo yum repolist

# 安装常用命令
yum install -y vim wget

修改/etc/fstab,在末尾加入下面这行,让镜像开机自动挂载

/dev/sr0 /mnt/cdrom iso9660 defaults 0 0
关闭防火墙
# 停止防火墙服务
sudo systemctl stop firewalld

# 禁止防火墙开机自启
sudo systemctl disable firewalld

# 验证状态(active应为inactive)
sudo systemctl status firewalld

关闭SELinux
sudo sed -i 's/^SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config && sudo setenforce 0
下载好所需安装包
# openssl
wget https://github.com/openssl/openssl/releases/download/openssl-3.5.1/openssl-3.5.1.tar.gz
# Python
wget https://www.python.org/ftp/python/3.13.5/Python-3.13.5.tgz
# etcd
wget https://github.com/etcd-io/etcd/releases/download/v3.6.2/etcd-v3.6.2-linux-amd64.tar.gz
# postgresql
wget https://ftp.postgresql.org/pub/source/v14.18/postgresql-14.18.tar.gz
# haproxy
wget https://www.haproxy.org/download/2.9/src/haproxy-2.9.6.tar.gz

二、安装openssl

系统自带的openssh版本太低,所以要自己下载高版本的安装

编译安装
# 解压
tar -zxf openssl-3.5.1.tar.gz
cd openssl-3.5.1

# 首先确保系统已安装必要的编译工具
yum install -y perl-IPC-Cmd perl-Data-Dumper gcc make perl wget zlib-devel

# 配置(使用默认lib目录,避免lib64路径问题)
./config --prefix=/usr/local/openssl --openssldir=/usr/local/openssl \
         shared zlib no-ssl3-method enable-tls1_3 \
         --libdir=lib  # 强制使用lib目录而非lib64

make -j $(nproc)  && make install
配置动态链接库
# 创建动态链接库配置文件
echo "/usr/local/openssl/lib" | sudo tee /etc/ld.so.conf.d/openssl.conf

# 刷新动态链接库缓存
sudo ldconfig

# 验证配置
ldconfig -p | grep openssl
# 应输出类似:
#         libssl.so.3 (libc6,x86-64) => /usr/local/openssl/lib/libssl.so.3
#         libcrypto.so.3 (libc6,x86-64) => /usr/local/openssl/lib/libcrypto.so.3
声明环境变量
# 添加到 ~/.bashrc 或 /etc/profile.d/openssl.sh
echo 'export PATH="/usr/local/openssl/bin:$PATH"' | sudo tee -a /etc/profile.d/openssl.sh
echo 'export LD_LIBRARY_PATH="/usr/local/openssl/lib:$LD_LIBRARY_PATH"' | sudo tee -a /etc/profile.d/openssl.sh

# 立即生效
source /etc/profile
检查一下版本
openssl version
# 输出应类似:
# OpenSSL 3.5.1 1 Jul 2025 (Library: OpenSSL 3.5.1 1 Jul 2025)

三、编译安装Python3.13

服务器自带的python版本为2.7,比较老旧,无法满足程序要求

编译安装
# 解压
tar -zxf Python-3.13.5.tgz 
cd Python-3.13.5
# 安装必要的开发工具
yum install -y zlib-devel bzip2-devel  ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel wget
# 编译安装
./configure --prefix=/usr/local/python3.13 --with-openssl=/usr/local/openssl --enable-shared
make -j $(nproc) && make altinstall
配置动态链接库路径

将 Python 的库目录添加到系统动态链接库配置中,防止服务器重启后python运行有问题:

# 创建Python库路径配置文件
sudo tee /etc/ld.so.conf.d/python313.conf <<EOF
/usr/local/python3.13/lib
EOF

# 刷新动态链接库缓存
sudo ldconfig
配置环境变量
echo 'export PATH="/usr/local/python3.13/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

四、安装ETCD

解压安装
tar -zxf etcd-v3.6.2-linux-amd64.tar.gz 
cd etcd-v3.6.2-linux-amd64
cp etcd etcdctl /usr/local/bin/
创建etcd配置文件
mkdir /etc/etcd

cat > /etc/etcd/etcd.conf << EOF
#[Member]
ETCD_NAME=$(hostname)
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"
ETCD_LISTEN_PEER_URLS="http://0.0.0.0:12380"
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:12379"

#[Clustering]
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://$(hostname -i):12380"
ETCD_ADVERTISE_CLIENT_URLS="http://$(hostname -i):12379"
ETCD_INITIAL_CLUSTER="etcd1=http://11.0.1.4:12380,etcd2=http://11.0.1.5:12380,etcd3=http://11.0.1.6:12380"
ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
ETCD_INITIAL_CLUSTER_STATE="new"
EOF
创建systemd服务文件
cat > /etc/systemd/system/etcd.service << EOF
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target

[Service]
Type=notify
EnvironmentFile=/etc/etcd/etcd.conf
ExecStart=/usr/local/bin/etcd
Restart=on-failure
RestartSec=5
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
EOF
启动etcd服务
systemctl daemon-reload
systemctl enable etcd
systemctl start etcd
验证etcd集群状态
etcdctl --endpoints=http://11.0.1.4:12379,http://11.0.1.5:12379,http://11.0.1.6:12379 member list

五、编译安装pgsql

编译安装
# 解压
tar -zxf postgresql-14.18.tar.gz 
cd postgresql-14.18
# 安装开发工具
yum -y groupinstall "Development Tools" "Legacy UNIX Compatibility"
yum -y install bison flex readline* zlib-devel gcc* make
# 编译安装
./configure --prefix=/usr/local/pgsql
 make && make install
创建用户和目录
useradd postgres
mkdir -p /data/pgsql
chown -R postgres:postgres /data/pgsql
chown -R postgres:postgres /usr/local/pgsql
配置环境变量
cat >> /home/postgres/.bashrc << EOF
export PATH=/usr/local/pgsql/bin:$PATH
export LD_LIBRARY_PATH=/usr/local/pgsql/lib:$LD_LIBRARY_PATH
export PGDATA=/data/pgsql
EOF
source /home/postgres/.bashrc
初始化

初始化数据库(仅在主节点11.0.1.4上执行)

su - postgres -c "initdb -D /data/pgsql/data"

六、安装Patroni

在所有三个节点上安装Patroni:

联网和离线两种方式二选一即可

联网安装Patroni和依赖
pip3.13 install patroni[etcd] psycopg2-binary etcd3
  • 如果你使用 etcd 3.x 及以上版本(如你的场景中是 etcd 3.6.2)
    强烈推荐使用 etcd3,原因如下:
    • 你的 etcd 版本是 3.6.2,原生支持 v3 API,etcd3 是专门为 v3 设计的客户端,能充分利用其功能(如分布式锁、高效的键值存储等)。
    • python-etcd 仅支持 v2 API,而 etcd 3.x 中 v2 API 可能被默认禁用或限制,可能导致兼容性问题(例如你之前遇到的 404 错误,可能与 v2 API 路径访问失败有关)。
  • 如果你必须兼容 etcd 2.x 版本
    只能选择 python-etcd,而不是etcd3但需注意其已停止维护,可能存在安全隐患或功能缺失。
离线安装Patroni和依赖

在可以联网的环境中,使用 pip download 命令下载 Patroni 及其所有依赖:

# 创建下载目录
mkdir patroni_dependencies && cd patroni_dependencies

# 下载Patroni及其依赖(默认下载最新版本)
pip3.13 download patroni[etcd] psycopg2-binary etcd3 setuptools wheel

将下载的所有 .whl.tar.gz 文件复制到离线服务器的某个目录(例如 /tmp/patroni_deps)。

在离线环境中,使用 pip install 从本地文件安装:

# 安装所有下载的包(按依赖顺序)
pip3.13 install --no-index --find-links=. ./*

# 检查Patroni版本
patroni --version

# 检查是否能正常导入
python3.13 -c "from patroni.version import __version__; print(__version__)"
# 正常应该如下输出:
4.0.6
创建Patroni配置目录
mkdir -p /etc/patroni
chown -R postgres:postgres /etc/patroni
创建Patroni配置文件(所有节点)
cat > /etc/patroni/patroni.yml << EOF
scope: pg-cluster
namespace: /postgres/
name: $(hostname)

restapi:
  listen: 0.0.0.0:8008
  connect_address: $(hostname -i):8008

etcd3:
  hosts:
    - 11.0.1.4:12379
    - 11.0.1.5:12379
    - 11.0.1.6:12379
  protocol: http
  # 可选:添加认证信息(如果Etcd启用了认证)
  # username: user
  # password: password
bootstrap:
  dcs:
    ttl: 30
    loop_wait: 10
    retry_timeout: 10
    maximum_lag_on_failover: 1048576
    postgresql:
      use_pg_rewind: true
      use_slots: true
      parameters:
        wal_level: replica
        hot_standby: "on"
        max_wal_senders: 10
        max_replication_slots: 10
        wal_keep_segments: 32
        listen_addresses: '*'
        port: 15432
        archive_mode: "on"
        archive_command: 'test ! -f /data/pgsql/archive/%f && cp %p /data/pgsql/archive/%f'

  initdb:
    - encoding: UTF8
    - data-checksums

postgresql:
  listen: 0.0.0.0:15432
  connect_address: $(hostname -i):15432
  data_dir: /data/pgsql/data
  bin_dir: /usr/local/pgsql/bin
  pgpass: /tmp/pgpass
  authentication:
    replication:
      username: replicator
      password: rep-pass
    superuser:
      username: postgres
      password: postgres
  parameters:
    unix_socket_directories: '/tmp'

tags:
  nofailover: false
  noloadbalance: false
  clonefrom: false
  nosync: false
EOF
创建归档目录
mkdir -p /data/pgsql/archive
chown -R postgres:postgres /data/pgsql/archive
创建systemd服务文件
cat > /etc/systemd/system/patroni.service << EOF
[Unit]
Description=Patroni high-availability PostgreSQL
After=syslog.target network.target etcd.service
Requires=etcd.service

[Service]
Type=simple
User=postgres
Group=postgres
ExecStart=/usr/local/python3.13/bin/patroni /etc/patroni/patroni.yml
ExecReload=/bin/kill -HUP $MAINPID
KillMode=process
Restart=always
RestartSec=10
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
EOF
启动Patroni服务(仅在主节点11.0.1.4上执行)
systemctl daemon-reload
systemctl enable patroni
systemctl start patroni
验证Patroni集群

在主节点启动Patroni后,验证集群状态:

patronictl -c /etc/patroni/patroni.yml list

输出应显示类似以下内容:

+ Cluster: pg-cluster (7528443352098796254) -+----+-----------+
| Member | Host           | Role   | State   | TL | Lag in MB |
+--------+----------------+--------+---------+----+-----------+
| pg1    | 11.0.1.4:15432 | Leader | running |  2 |           |
+--------+----------------+--------+---------+----+-----------+

创建数据库角色并修改postgres用户的密码:

[root@pg1 ~]# su - postgres -c 'psql -p 15432'
psql (17.5)
Type "help" for help.

postgres=# CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'rep-pass';  #这里的用户名和密码要与patroni.yml里保持一致
CREATE ROLE
postgres=# ALTER USER postgres PASSWORD 'postgres';  # 这里的postgres用户的密码要与patroni.yml里保持一致
ALTER ROLE 
postgres=# \q

修改/data/pgsql/data/pg_hba.conf,添加下面四行内容,允许3台服务器有权限进行主从复制:

# 允许客户机连接数据库
host    all             postgres        11.0.1.1/32           md5
# 允许互相之间使用postgres用户互相访问
host    all             postgres        11.0.1.4/32           md5
host    all             postgres        11.0.1.5/32           md5
host    all             postgres        11.0.1.6/32           md5
# 允许HAproxy服务器使用postgres用户进行健康检查
host    all             postgres        11.0.1.7/32           md5
# 允许互相之间使用replicator用户进行主从同步数据
host    replication     replicator      11.0.1.4/32           md5
host    replication     replicator      11.0.1.5/32           md5
host    replication     replicator      11.0.1.6/32           md5

修改完成后,要重启pg数据库,不然两个从节点是无法连接到主库进行主从同步的。重启数据库直接重启patroni即可

systemctl restart patroni

然后在从节点11.0.1.5和11.0.1.6上启动Patroni服务:

systemctl daemon-reload
systemctl enable patroni
systemctl start patroni

再次验证集群状态:

[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host           | Role    | State     | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1    | 11.0.1.4:15432 | Leader  | running   |  3 |           |
| pg2    | 11.0.1.5:15432 | Replica | streaming |  3 |         0 |
| pg3    | 11.0.1.6:15432 | Replica | running   |  3 |         0 |
+--------+----------------+---------+-----------+----+-----------+

到这一步之后,就可以使用Navicate工具逐个连接三台数据库进行读写测试了。

自动选举测试

先将pg1上的patroni停掉,可以看到pg2成为新的leader了:

[root@pg2 ~]# systemctl stop patroni
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host           | Role    | State     | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1    | 11.0.1.4:15432 | Replica | stopped   |    |   unknown |
| pg2    | 11.0.1.5:15432 | Leader  | running   |  4 |           |
| pg3    | 11.0.1.6:15432 | Replica | streaming |  4 |         0 |
+--------+----------------+---------+-----------+----+-----------+

测试完成后记得把服务拉起来:

[root@pg2 ~]# systemctl restart patroni
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host           | Role    | State     | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1    | 11.0.1.4:15432 | Replica | streaming |  4 |         0 |
| pg2    | 11.0.1.5:15432 | Leader  | running   |  4 |           |
| pg3    | 11.0.1.6:15432 | Replica | streaming |  4 |         0 |
+--------+----------------+---------+-----------+----+-----------+

七、安装HAproxy

编译安装

**(HAproxy仅在11.0.1.10上安装)**这里建议使用最新版源码进行编译安装,而不是使用CentOS自带的,自带的版本太低,无法满足需求。

# 安装依赖
sudo yum install -y gcc make pcre-devel openssl-devel systemd-devel
# 编译安装
tar -zxvf haproxy-2.9.6.tar.gz
cd haproxy-2.9.6
# 编译(支持SSL和systemd)
make TARGET=linux-glibc USE_PCRE=1 USE_OPENSSL=1 USE_SYSTEMD=1 USE_CPU_AFFINITY=1

# 安装
sudo make install PREFIX=/usr/local/haproxy

# 创建软链接以便全局调用
sudo ln -s /usr/local/haproxy/sbin/haproxy /usr/sbin/haproxy
配置系统服务
# 创建systemd服务文件
sudo vim /usr/lib/systemd/system/haproxy.service

添加以下内容:

[Unit]
Description=HAProxy Load Balancer
After=network.target

[Service]
ExecStart=/usr/sbin/haproxy -D -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid
ExecReload=/usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -sf $MAINPID
PIDFile=/run/haproxy.pid
ExecStop=/bin/kill -TERM $MAINPID
Restart=always
User=root  

[Install]
WantedBy=multi-user.target

修改服务配置文件/etc/haproxy/haproxy.cfg

[root@localhost ~]# cat /etc/haproxy/haproxy.cfg
global
    log /dev/log local0 info
    log /dev/log local1 notice
    daemon
    maxconn 4096
    user root

defaults
    log global
    mode tcp
    retries 3
    timeout connect 5s
    timeout client 30s
    timeout server 30s
    option tcplog

# 监控页面
listen stats
    bind *:8080
    mode http
    stats enable
    stats uri /haproxy-stats
    stats auth admin:admin123
    stats refresh 10s

# 写请求(主库,15000端口)
frontend pg_write
    bind *:15000
    mode tcp
    default_backend pg_primary

# 读请求(从库,15001端口)
frontend pg_read
    bind *:15001
    mode tcp
    default_backend pg_replicas

# 主库后端(仅显示primary角色节点)
backend pg_primary
    mode tcp
    balance roundrobin
    # 健康检查:调用Patroni的/role接口,匹配"primary"角色
    option httpchk GET /role
    http-check send meth GET uri /role
    # 精确匹配Patroni返回的角色(注意:Patroni返回可能带引号,如"primary",需测试调整)
    http-check expect string '"role": "primary"'
    # 节点配置:保持原有server,但健康检查会自动过滤非primary节点
    server pg1 11.0.1.4:15432 check port 8008 inter 2s rise 2 fall 3
    server pg2 11.0.1.5:15432 check port 8008 inter 2s rise 2 fall 3
    server pg3 11.0.1.6:15432 check port 8008 inter 2s rise 2 fall 3

# 从库后端(仅显示replica角色节点)
backend pg_replicas
    mode tcp
    balance leastconn
    # 健康检查:调用Patroni的/role接口,匹配"replica"角色
    option httpchk GET /role
    http-check send meth GET uri /role
    http-check expect string '"role": "replica"'
    # 节点配置:保持原有server,健康检查自动过滤非replica节点
    server pg1 11.0.1.4:15432 check port 8008 inter 2s rise 2 fall 3
    server pg2 11.0.1.5:15432 check port 8008 inter 2s rise 2 fall 3
    server pg3 11.0.1.6:15432 check port 8008 inter 2s rise 2 fall 3

验证安装
# 重载系统服务
sudo systemctl daemon-reload

# 启动并设置开机自启
sudo systemctl start haproxy
sudo systemctl enable haproxy

# 检查版本和状态
haproxy -v
sudo systemctl status haproxy
测试读写分离

我们上面HAproxy配置文件定义的主库可写,从库只读;主库端口是15000,从库端口15001。我们使用Navicate分别连接11.0.1.10:15000(写)和11.0.1.10:15001(读)。

在15000上新建一张表,进行写测试:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

插入数据正常,使用Ctrl+S快捷键可以正常保存数据。

在15001上打开上面这张表,继续插入数据:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

使用Ctrl+S快捷键尝试保存数据时,提示“ERROR: cannot execute INSERT in a read-only transaction”。

八、测试主从切换

写一个Python脚本haproxy_db_test,对读写进行测试:

import psycopg2
import time
import random
from psycopg2 import OperationalError, InterfaceError, Error

# 数据库配置 - 已更新为你的实际信息
CONFIG = {
    "write_host": "11.0.1.7",  # 你的HAProxy服务器IP
    "write_port": 15000,  # 写端口
    "read_host": "11.0.1.7",  # 你的HAProxy服务器IP
    "read_port": 15001,  # 读端口
    "user": "postgres",
    "password": "postgres",  # 你的数据库密码
    "dbname": "postgres",
    "retry_attempts": 10,  # 最大重试次数
    "retry_delay": 2,  # 重试间隔(秒)
    "test_interval": 5,  # 测试间隔(秒)
    "sync_retries": 3,  # 读取重试次数
    "sync_delay": 2  # 读取重试间隔(秒)
}


def get_connection(host, port):
    """创建数据库连接并获取实际节点信息"""
    attempts = 0
    while attempts < CONFIG["retry_attempts"]:
        try:
            conn = psycopg2.connect(
                host=host,
                port=port,
                user=CONFIG["user"],
                password=CONFIG["password"],
                dbname=CONFIG["dbname"],
                connect_timeout=5
            )
            conn.autocommit = True  # 自动提交(对写连接有效,读连接不影响)

            # 获取实际数据库节点IP和端口
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT inet_server_addr() AS server_ip,
                           inet_server_port() AS server_port
                """)
                result = cur.fetchone()
                actual_host = result[0]
                actual_port = result[1]  # 正确索引获取端口

            print(f"✅ 成功连接到 {host}:{port} (实际节点: {actual_host}:{actual_port})")
            return conn, actual_host, actual_port

        except (OperationalError, InterfaceError) as e:
            attempts += 1
            print(f"❌ 连接 {host}:{port} 失败 (尝试 {attempts}/{CONFIG['retry_attempts']}): {str(e)}")
            if attempts < CONFIG["retry_attempts"]:
                time.sleep(CONFIG["retry_delay"])

    print(f"❌ 达到最大重试次数,无法连接到 {host}:{port}")
    return None, None, None


def update_table_schema(conn):
    """检查并更新表结构(仅在写连接中执行)"""
    try:
        with conn.cursor() as cur:
            cur.execute("""
                        SELECT column_name
                        FROM information_schema.columns
                        WHERE table_name = 'ha_test'
                        """)
            existing_columns = [row[0] for row in cur.fetchall()]

            required_columns = [
                ('write_proxy_host', 'TEXT'),
                ('write_proxy_port', 'INT'),
                ('actual_write_host', 'TEXT'),
                ('actual_write_port', 'INT'),
                ('read_info', 'TEXT'),  # 存储读库信息(JSON格式)
                ('read_time', 'TIMESTAMPTZ')
            ]

            for col_name, col_type in required_columns:
                if col_name not in existing_columns:
                    cur.execute(f"ALTER TABLE ha_test ADD COLUMN {col_name} {col_type}")
                    print(f"📝 已添加缺失字段: {col_name}")

        print("✅ 表结构检查和更新完成")
        return True
    except Exception as e:
        print(f"❌ 表结构更新失败: {str(e)}")
        return False


def init_test_table(conn):
    """初始化测试表(仅在写连接中执行)"""
    try:
        with conn.cursor() as cur:
            cur.execute("""
                        CREATE TABLE IF NOT EXISTS ha_test
                        (
                            id
                            SERIAL
                            PRIMARY
                            KEY,
                            write_time
                            TIMESTAMPTZ,
                            write_proxy_host
                            TEXT,
                            write_proxy_port
                            INT,
                            actual_write_host
                            TEXT,
                            actual_write_port
                            INT,
                            read_info
                            TEXT,
                            #
                            存储读库信息:
                        {
                            "proxy_host"
                            :
                            "...",
                            "actual_host"
                            :
                            "...",
                            .
                            .
                            .}
                            read_time
                            TIMESTAMPTZ
                        )
                        """)
        print("📋 测试表初始化完成")
        return True
    except Exception as e:
        print(f"❌ 测试表初始化失败: {str(e)}")
        return False


def test_write(conn, actual_write_host, actual_write_port):
    """测试写操作并记录主库地址"""
    try:
        with conn.cursor() as cur:
            test_id = random.randint(1, 1000000)
            cur.execute("""
                        INSERT INTO ha_test (id, write_time, write_proxy_host, write_proxy_port,
                                             actual_write_host, actual_write_port)
                        VALUES (%s, NOW(), %s, %s, %s, %s) RETURNING id
                        """, (
                            test_id,
                            CONFIG["write_host"], CONFIG["write_port"],
                            actual_write_host, actual_write_port
                        ))
            inserted_id = cur.fetchone()[0]
            print(f"✍️ 成功写入数据,ID: {inserted_id} (主库: {actual_write_host}:{actual_write_port})")
            return test_id
    except Exception as e:
        print(f"❌ 写操作失败: {str(e)}")
        return None


def test_read(read_conn, test_id, actual_read_host, actual_read_port):
    """纯读操作(不包含任何写操作)"""
    if not test_id:
        return False

    for attempt in range(CONFIG["sync_retries"]):
        try:
            with read_conn.cursor() as cur:
                # 仅执行查询,不做任何更新
                cur.execute("SELECT id FROM ha_test WHERE id = %s", (test_id,))
                result = cur.fetchone()

                if result:
                    print(
                        f"📖 成功读取数据,ID: {result[0]} (读库: {actual_read_host}:{actual_read_port}, 尝试 {attempt + 1}/{CONFIG['sync_retries']})")
                    return True
                else:
                    if attempt < CONFIG["sync_retries"] - 1:
                        print(
                            f"⌛ 数据尚未同步,等待 {CONFIG['sync_delay']} 秒后重试 (尝试 {attempt + 1}/{CONFIG['sync_retries']})")
                        time.sleep(CONFIG["sync_delay"])

        except Error as e:
            print(f"⚠️ 读取尝试 {attempt + 1} 失败: {str(e)}")
            if attempt < CONFIG["sync_retries"] - 1:
                time.sleep(CONFIG["sync_delay"])

    print(f"❌ 所有读取尝试失败,未找到ID为 {test_id} 的数据 (读库: {actual_read_host}:{actual_read_port})")
    return False


def update_read_info(write_conn, test_id, actual_read_host, actual_read_port):
    """通过写连接更新读库信息(仅主库执行)"""
    if not test_id or not write_conn:
        return

    try:
        with write_conn.cursor() as cur:
            # 构造读库信息JSON
            read_info = f"""
                {{
                    "read_proxy_host": "{CONFIG['read_host']}",
                    "read_proxy_port": {CONFIG['read_port']},
                    "actual_read_host": "{actual_read_host}",
                    "actual_read_port": {actual_read_port}
                }}
            """
            cur.execute("""
                        UPDATE ha_test
                        SET read_time = NOW(),
                            read_info = %s
                        WHERE id = %s
                        """, (read_info, test_id))
        print(f"📝 已通过主库更新读库信息,ID: {test_id}")
    except Exception as e:
        print(f"❌ 更新读库信息失败: {str(e)}")


def main():
    # 初始化连接和表结构(通过写连接执行)
    init_conn, _, _ = get_connection(CONFIG["write_host"], CONFIG["write_port"])
    if init_conn:
        init_test_table(init_conn)
        update_table_schema(init_conn)
        init_conn.close()

    write_conn = None
    read_conn = None
    actual_write_host = None
    actual_write_port = None
    actual_read_host = None
    actual_read_port = None

    try:
        while True:
            # 维护连接
            if not write_conn or write_conn.closed:
                write_conn, actual_write_host, actual_write_port = get_connection(
                    CONFIG["write_host"], CONFIG["write_port"]
                )

            if not read_conn or read_conn.closed:
                read_conn, actual_read_host, actual_read_port = get_connection(
                    CONFIG["read_host"], CONFIG["read_port"]
                )

            # 执行测试
            if write_conn and read_conn and actual_write_host and actual_read_host:
                test_id = test_write(write_conn, actual_write_host, actual_write_port)
                if test_id:
                    time.sleep(1)
                    # 纯读操作(从库执行,无写操作)
                    read_success = test_read(read_conn, test_id, actual_read_host, actual_read_port)
                    # 通过主库更新读库信息(写操作仅在主库执行)
                    if read_success:
                        update_read_info(write_conn, test_id, actual_read_host, actual_read_port)

            time.sleep(CONFIG["test_interval"])

    except KeyboardInterrupt:
        print("\n⏹️ 用户终止程序")
    except Exception as e:
        print(f"\n💥 发生意外错误: {str(e)}")
    finally:
        # 清理连接
        if write_conn and not write_conn.closed:
            write_conn.close()
        if read_conn and not read_conn.closed:
            read_conn.close()
        print("🔚 程序结束")


if __name__ == "__main__":
    main()

网站公告

今日签到

点亮在社区的每一天
去签到