一、环境准备
系统架构
主机名 | IP | 系统 | 部署应用 |
---|---|---|---|
pg1 | 11.0.1.4 | CentOS7 | Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18 |
pg2 | 11.0.1.5 | CentOS7 | Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18 |
pg3 | 11.0.1.6 | CentOS7 | Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18 |
demo1 | 11.0.1.7 | CentOS7 | HAproxy2.9.6 |
配置本地yum源
将CentOS-7-x86_64-Everything-2207-02.iso镜像上传到服务器(我是VMware虚拟机,镜像文件是在/dev/sr0里),挂载到/media并配置yum源
mkdir /mnt/cdrom
mount /dev/sr0 /mnt/cdrom
sudo tee /etc/yum.repos.d/local.repo <<EOF
[local]
name=CentOS-7 Local DVD Repo
baseurl=file:///mnt/cdrom
enabled=1
gpgcheck=0
EOF
# 重新建立缓存
rm -rf /etc/yum.repos.d/CentOS-*
sudo yum clean all
sudo yum makecache
# 查看可用的yum源
sudo yum repolist
# 安装常用命令
yum install -y vim wget
修改/etc/fstab
,在末尾加入下面这行,让镜像开机自动挂载
/dev/sr0 /mnt/cdrom iso9660 defaults 0 0
关闭防火墙
# 停止防火墙服务
sudo systemctl stop firewalld
# 禁止防火墙开机自启
sudo systemctl disable firewalld
# 验证状态(active应为inactive)
sudo systemctl status firewalld
关闭SELinux
sudo sed -i 's/^SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config && sudo setenforce 0
下载好所需安装包
# openssl
wget https://github.com/openssl/openssl/releases/download/openssl-3.5.1/openssl-3.5.1.tar.gz
# Python
wget https://www.python.org/ftp/python/3.13.5/Python-3.13.5.tgz
# etcd
wget https://github.com/etcd-io/etcd/releases/download/v3.6.2/etcd-v3.6.2-linux-amd64.tar.gz
# postgresql
wget https://ftp.postgresql.org/pub/source/v14.18/postgresql-14.18.tar.gz
# haproxy
wget https://www.haproxy.org/download/2.9/src/haproxy-2.9.6.tar.gz
二、安装openssl
系统自带的openssh版本太低,所以要自己下载高版本的安装
编译安装
# 解压
tar -zxf openssl-3.5.1.tar.gz
cd openssl-3.5.1
# 首先确保系统已安装必要的编译工具
yum install -y perl-IPC-Cmd perl-Data-Dumper gcc make perl wget zlib-devel
# 配置(使用默认lib目录,避免lib64路径问题)
./config --prefix=/usr/local/openssl --openssldir=/usr/local/openssl \
shared zlib no-ssl3-method enable-tls1_3 \
--libdir=lib # 强制使用lib目录而非lib64
make -j $(nproc) && make install
配置动态链接库
# 创建动态链接库配置文件
echo "/usr/local/openssl/lib" | sudo tee /etc/ld.so.conf.d/openssl.conf
# 刷新动态链接库缓存
sudo ldconfig
# 验证配置
ldconfig -p | grep openssl
# 应输出类似:
# libssl.so.3 (libc6,x86-64) => /usr/local/openssl/lib/libssl.so.3
# libcrypto.so.3 (libc6,x86-64) => /usr/local/openssl/lib/libcrypto.so.3
声明环境变量
# 添加到 ~/.bashrc 或 /etc/profile.d/openssl.sh
echo 'export PATH="/usr/local/openssl/bin:$PATH"' | sudo tee -a /etc/profile.d/openssl.sh
echo 'export LD_LIBRARY_PATH="/usr/local/openssl/lib:$LD_LIBRARY_PATH"' | sudo tee -a /etc/profile.d/openssl.sh
# 立即生效
source /etc/profile
检查一下版本
openssl version
# 输出应类似:
# OpenSSL 3.5.1 1 Jul 2025 (Library: OpenSSL 3.5.1 1 Jul 2025)
三、编译安装Python3.13
服务器自带的python版本为2.7,比较老旧,无法满足程序要求
编译安装
# 解压
tar -zxf Python-3.13.5.tgz
cd Python-3.13.5
# 安装必要的开发工具
yum install -y zlib-devel bzip2-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel wget
# 编译安装
./configure --prefix=/usr/local/python3.13 --with-openssl=/usr/local/openssl --enable-shared
make -j $(nproc) && make altinstall
配置动态链接库路径
将 Python 的库目录添加到系统动态链接库配置中,防止服务器重启后python运行有问题:
# 创建Python库路径配置文件
sudo tee /etc/ld.so.conf.d/python313.conf <<EOF
/usr/local/python3.13/lib
EOF
# 刷新动态链接库缓存
sudo ldconfig
配置环境变量
echo 'export PATH="/usr/local/python3.13/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
四、安装ETCD
解压安装
tar -zxf etcd-v3.6.2-linux-amd64.tar.gz
cd etcd-v3.6.2-linux-amd64
cp etcd etcdctl /usr/local/bin/
创建etcd配置文件
mkdir /etc/etcd
cat > /etc/etcd/etcd.conf << EOF
#[Member]
ETCD_NAME=$(hostname)
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"
ETCD_LISTEN_PEER_URLS="http://0.0.0.0:12380"
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:12379"
#[Clustering]
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://$(hostname -i):12380"
ETCD_ADVERTISE_CLIENT_URLS="http://$(hostname -i):12379"
ETCD_INITIAL_CLUSTER="etcd1=http://11.0.1.4:12380,etcd2=http://11.0.1.5:12380,etcd3=http://11.0.1.6:12380"
ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
ETCD_INITIAL_CLUSTER_STATE="new"
EOF
创建systemd服务文件
cat > /etc/systemd/system/etcd.service << EOF
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target
[Service]
Type=notify
EnvironmentFile=/etc/etcd/etcd.conf
ExecStart=/usr/local/bin/etcd
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
EOF
启动etcd服务
systemctl daemon-reload
systemctl enable etcd
systemctl start etcd
验证etcd集群状态
etcdctl --endpoints=http://11.0.1.4:12379,http://11.0.1.5:12379,http://11.0.1.6:12379 member list
五、编译安装pgsql
编译安装
# 解压
tar -zxf postgresql-14.18.tar.gz
cd postgresql-14.18
# 安装开发工具
yum -y groupinstall "Development Tools" "Legacy UNIX Compatibility"
yum -y install bison flex readline* zlib-devel gcc* make
# 编译安装
./configure --prefix=/usr/local/pgsql
make && make install
创建用户和目录
useradd postgres
mkdir -p /data/pgsql
chown -R postgres:postgres /data/pgsql
chown -R postgres:postgres /usr/local/pgsql
配置环境变量
cat >> /home/postgres/.bashrc << EOF
export PATH=/usr/local/pgsql/bin:$PATH
export LD_LIBRARY_PATH=/usr/local/pgsql/lib:$LD_LIBRARY_PATH
export PGDATA=/data/pgsql
EOF
source /home/postgres/.bashrc
初始化
初始化数据库(仅在主节点11.0.1.4上执行)
su - postgres -c "initdb -D /data/pgsql/data"
六、安装Patroni
在所有三个节点上安装Patroni:
联网和离线两种方式二选一即可
联网安装Patroni和依赖
pip3.13 install patroni[etcd] psycopg2-binary etcd3
- 如果你使用 etcd 3.x 及以上版本(如你的场景中是 etcd 3.6.2):
强烈推荐使用etcd3
,原因如下:- 你的 etcd 版本是 3.6.2,原生支持 v3 API,
etcd3
是专门为 v3 设计的客户端,能充分利用其功能(如分布式锁、高效的键值存储等)。 python-etcd
仅支持 v2 API,而 etcd 3.x 中 v2 API 可能被默认禁用或限制,可能导致兼容性问题(例如你之前遇到的 404 错误,可能与 v2 API 路径访问失败有关)。
- 你的 etcd 版本是 3.6.2,原生支持 v3 API,
- 如果你必须兼容 etcd 2.x 版本:
只能选择python-etcd
,而不是etcd3
但需注意其已停止维护,可能存在安全隐患或功能缺失。
离线安装Patroni和依赖
在可以联网的环境中,使用 pip download
命令下载 Patroni 及其所有依赖:
# 创建下载目录
mkdir patroni_dependencies && cd patroni_dependencies
# 下载Patroni及其依赖(默认下载最新版本)
pip3.13 download patroni[etcd] psycopg2-binary etcd3 setuptools wheel
将下载的所有 .whl
和 .tar.gz
文件复制到离线服务器的某个目录(例如 /tmp/patroni_deps
)。
在离线环境中,使用 pip install
从本地文件安装:
# 安装所有下载的包(按依赖顺序)
pip3.13 install --no-index --find-links=. ./*
# 检查Patroni版本
patroni --version
# 检查是否能正常导入
python3.13 -c "from patroni.version import __version__; print(__version__)"
# 正常应该如下输出:
4.0.6
创建Patroni配置目录
mkdir -p /etc/patroni
chown -R postgres:postgres /etc/patroni
创建Patroni配置文件(所有节点)
cat > /etc/patroni/patroni.yml << EOF
scope: pg-cluster
namespace: /postgres/
name: $(hostname)
restapi:
listen: 0.0.0.0:8008
connect_address: $(hostname -i):8008
etcd3:
hosts:
- 11.0.1.4:12379
- 11.0.1.5:12379
- 11.0.1.6:12379
protocol: http
# 可选:添加认证信息(如果Etcd启用了认证)
# username: user
# password: password
bootstrap:
dcs:
ttl: 30
loop_wait: 10
retry_timeout: 10
maximum_lag_on_failover: 1048576
postgresql:
use_pg_rewind: true
use_slots: true
parameters:
wal_level: replica
hot_standby: "on"
max_wal_senders: 10
max_replication_slots: 10
wal_keep_segments: 32
listen_addresses: '*'
port: 15432
archive_mode: "on"
archive_command: 'test ! -f /data/pgsql/archive/%f && cp %p /data/pgsql/archive/%f'
initdb:
- encoding: UTF8
- data-checksums
postgresql:
listen: 0.0.0.0:15432
connect_address: $(hostname -i):15432
data_dir: /data/pgsql/data
bin_dir: /usr/local/pgsql/bin
pgpass: /tmp/pgpass
authentication:
replication:
username: replicator
password: rep-pass
superuser:
username: postgres
password: postgres
parameters:
unix_socket_directories: '/tmp'
tags:
nofailover: false
noloadbalance: false
clonefrom: false
nosync: false
EOF
创建归档目录
mkdir -p /data/pgsql/archive
chown -R postgres:postgres /data/pgsql/archive
创建systemd服务文件
cat > /etc/systemd/system/patroni.service << EOF
[Unit]
Description=Patroni high-availability PostgreSQL
After=syslog.target network.target etcd.service
Requires=etcd.service
[Service]
Type=simple
User=postgres
Group=postgres
ExecStart=/usr/local/python3.13/bin/patroni /etc/patroni/patroni.yml
ExecReload=/bin/kill -HUP $MAINPID
KillMode=process
Restart=always
RestartSec=10
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
EOF
启动Patroni服务(仅在主节点11.0.1.4上执行)
systemctl daemon-reload
systemctl enable patroni
systemctl start patroni
验证Patroni集群
在主节点启动Patroni后,验证集群状态:
patronictl -c /etc/patroni/patroni.yml list
输出应显示类似以下内容:
+ Cluster: pg-cluster (7528443352098796254) -+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+--------+---------+----+-----------+
| pg1 | 11.0.1.4:15432 | Leader | running | 2 | |
+--------+----------------+--------+---------+----+-----------+
创建数据库角色并修改postgres用户的密码:
[root@pg1 ~]# su - postgres -c 'psql -p 15432'
psql (17.5)
Type "help" for help.
postgres=# CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'rep-pass'; #这里的用户名和密码要与patroni.yml里保持一致
CREATE ROLE
postgres=# ALTER USER postgres PASSWORD 'postgres'; # 这里的postgres用户的密码要与patroni.yml里保持一致
ALTER ROLE
postgres=# \q
修改/data/pgsql/data/pg_hba.conf,添加下面四行内容,允许3台服务器有权限进行主从复制:
# 允许客户机连接数据库
host all postgres 11.0.1.1/32 md5
# 允许互相之间使用postgres用户互相访问
host all postgres 11.0.1.4/32 md5
host all postgres 11.0.1.5/32 md5
host all postgres 11.0.1.6/32 md5
# 允许HAproxy服务器使用postgres用户进行健康检查
host all postgres 11.0.1.7/32 md5
# 允许互相之间使用replicator用户进行主从同步数据
host replication replicator 11.0.1.4/32 md5
host replication replicator 11.0.1.5/32 md5
host replication replicator 11.0.1.6/32 md5
修改完成后,要重启pg数据库,不然两个从节点是无法连接到主库进行主从同步的。重启数据库直接重启patroni即可
systemctl restart patroni
然后在从节点11.0.1.5和11.0.1.6上启动Patroni服务:
systemctl daemon-reload
systemctl enable patroni
systemctl start patroni
再次验证集群状态:
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1 | 11.0.1.4:15432 | Leader | running | 3 | |
| pg2 | 11.0.1.5:15432 | Replica | streaming | 3 | 0 |
| pg3 | 11.0.1.6:15432 | Replica | running | 3 | 0 |
+--------+----------------+---------+-----------+----+-----------+
到这一步之后,就可以使用Navicate工具逐个连接三台数据库进行读写测试了。
自动选举测试
先将pg1上的patroni停掉,可以看到pg2成为新的leader了:
[root@pg2 ~]# systemctl stop patroni
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1 | 11.0.1.4:15432 | Replica | stopped | | unknown |
| pg2 | 11.0.1.5:15432 | Leader | running | 4 | |
| pg3 | 11.0.1.6:15432 | Replica | streaming | 4 | 0 |
+--------+----------------+---------+-----------+----+-----------+
测试完成后记得把服务拉起来:
[root@pg2 ~]# systemctl restart patroni
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1 | 11.0.1.4:15432 | Replica | streaming | 4 | 0 |
| pg2 | 11.0.1.5:15432 | Leader | running | 4 | |
| pg3 | 11.0.1.6:15432 | Replica | streaming | 4 | 0 |
+--------+----------------+---------+-----------+----+-----------+
七、安装HAproxy
编译安装
**(HAproxy仅在11.0.1.10上安装)**这里建议使用最新版源码进行编译安装,而不是使用CentOS自带的,自带的版本太低,无法满足需求。
# 安装依赖
sudo yum install -y gcc make pcre-devel openssl-devel systemd-devel
# 编译安装
tar -zxvf haproxy-2.9.6.tar.gz
cd haproxy-2.9.6
# 编译(支持SSL和systemd)
make TARGET=linux-glibc USE_PCRE=1 USE_OPENSSL=1 USE_SYSTEMD=1 USE_CPU_AFFINITY=1
# 安装
sudo make install PREFIX=/usr/local/haproxy
# 创建软链接以便全局调用
sudo ln -s /usr/local/haproxy/sbin/haproxy /usr/sbin/haproxy
配置系统服务
# 创建systemd服务文件
sudo vim /usr/lib/systemd/system/haproxy.service
添加以下内容:
[Unit]
Description=HAProxy Load Balancer
After=network.target
[Service]
ExecStart=/usr/sbin/haproxy -D -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid
ExecReload=/usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -sf $MAINPID
PIDFile=/run/haproxy.pid
ExecStop=/bin/kill -TERM $MAINPID
Restart=always
User=root
[Install]
WantedBy=multi-user.target
修改服务配置文件/etc/haproxy/haproxy.cfg
:
[root@localhost ~]# cat /etc/haproxy/haproxy.cfg
global
log /dev/log local0 info
log /dev/log local1 notice
daemon
maxconn 4096
user root
defaults
log global
mode tcp
retries 3
timeout connect 5s
timeout client 30s
timeout server 30s
option tcplog
# 监控页面
listen stats
bind *:8080
mode http
stats enable
stats uri /haproxy-stats
stats auth admin:admin123
stats refresh 10s
# 写请求(主库,15000端口)
frontend pg_write
bind *:15000
mode tcp
default_backend pg_primary
# 读请求(从库,15001端口)
frontend pg_read
bind *:15001
mode tcp
default_backend pg_replicas
# 主库后端(仅显示primary角色节点)
backend pg_primary
mode tcp
balance roundrobin
# 健康检查:调用Patroni的/role接口,匹配"primary"角色
option httpchk GET /role
http-check send meth GET uri /role
# 精确匹配Patroni返回的角色(注意:Patroni返回可能带引号,如"primary",需测试调整)
http-check expect string '"role": "primary"'
# 节点配置:保持原有server,但健康检查会自动过滤非primary节点
server pg1 11.0.1.4:15432 check port 8008 inter 2s rise 2 fall 3
server pg2 11.0.1.5:15432 check port 8008 inter 2s rise 2 fall 3
server pg3 11.0.1.6:15432 check port 8008 inter 2s rise 2 fall 3
# 从库后端(仅显示replica角色节点)
backend pg_replicas
mode tcp
balance leastconn
# 健康检查:调用Patroni的/role接口,匹配"replica"角色
option httpchk GET /role
http-check send meth GET uri /role
http-check expect string '"role": "replica"'
# 节点配置:保持原有server,健康检查自动过滤非replica节点
server pg1 11.0.1.4:15432 check port 8008 inter 2s rise 2 fall 3
server pg2 11.0.1.5:15432 check port 8008 inter 2s rise 2 fall 3
server pg3 11.0.1.6:15432 check port 8008 inter 2s rise 2 fall 3
验证安装
# 重载系统服务
sudo systemctl daemon-reload
# 启动并设置开机自启
sudo systemctl start haproxy
sudo systemctl enable haproxy
# 检查版本和状态
haproxy -v
sudo systemctl status haproxy
测试读写分离
我们上面HAproxy配置文件定义的主库可写,从库只读;主库端口是15000,从库端口15001。我们使用Navicate分别连接11.0.1.10:15000(写)和11.0.1.10:15001(读)。
在15000上新建一张表,进行写测试:
插入数据正常,使用Ctrl+S快捷键可以正常保存数据。
在15001上打开上面这张表,继续插入数据:
使用Ctrl+S快捷键尝试保存数据时,提示“ERROR: cannot execute INSERT in a read-only transaction”。
八、测试主从切换
写一个Python脚本haproxy_db_test
,对读写进行测试:
import psycopg2
import time
import random
from psycopg2 import OperationalError, InterfaceError, Error
# 数据库配置 - 已更新为你的实际信息
CONFIG = {
"write_host": "11.0.1.7", # 你的HAProxy服务器IP
"write_port": 15000, # 写端口
"read_host": "11.0.1.7", # 你的HAProxy服务器IP
"read_port": 15001, # 读端口
"user": "postgres",
"password": "postgres", # 你的数据库密码
"dbname": "postgres",
"retry_attempts": 10, # 最大重试次数
"retry_delay": 2, # 重试间隔(秒)
"test_interval": 5, # 测试间隔(秒)
"sync_retries": 3, # 读取重试次数
"sync_delay": 2 # 读取重试间隔(秒)
}
def get_connection(host, port):
"""创建数据库连接并获取实际节点信息"""
attempts = 0
while attempts < CONFIG["retry_attempts"]:
try:
conn = psycopg2.connect(
host=host,
port=port,
user=CONFIG["user"],
password=CONFIG["password"],
dbname=CONFIG["dbname"],
connect_timeout=5
)
conn.autocommit = True # 自动提交(对写连接有效,读连接不影响)
# 获取实际数据库节点IP和端口
with conn.cursor() as cur:
cur.execute("""
SELECT inet_server_addr() AS server_ip,
inet_server_port() AS server_port
""")
result = cur.fetchone()
actual_host = result[0]
actual_port = result[1] # 正确索引获取端口
print(f"✅ 成功连接到 {host}:{port} (实际节点: {actual_host}:{actual_port})")
return conn, actual_host, actual_port
except (OperationalError, InterfaceError) as e:
attempts += 1
print(f"❌ 连接 {host}:{port} 失败 (尝试 {attempts}/{CONFIG['retry_attempts']}): {str(e)}")
if attempts < CONFIG["retry_attempts"]:
time.sleep(CONFIG["retry_delay"])
print(f"❌ 达到最大重试次数,无法连接到 {host}:{port}")
return None, None, None
def update_table_schema(conn):
"""检查并更新表结构(仅在写连接中执行)"""
try:
with conn.cursor() as cur:
cur.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'ha_test'
""")
existing_columns = [row[0] for row in cur.fetchall()]
required_columns = [
('write_proxy_host', 'TEXT'),
('write_proxy_port', 'INT'),
('actual_write_host', 'TEXT'),
('actual_write_port', 'INT'),
('read_info', 'TEXT'), # 存储读库信息(JSON格式)
('read_time', 'TIMESTAMPTZ')
]
for col_name, col_type in required_columns:
if col_name not in existing_columns:
cur.execute(f"ALTER TABLE ha_test ADD COLUMN {col_name} {col_type}")
print(f"📝 已添加缺失字段: {col_name}")
print("✅ 表结构检查和更新完成")
return True
except Exception as e:
print(f"❌ 表结构更新失败: {str(e)}")
return False
def init_test_table(conn):
"""初始化测试表(仅在写连接中执行)"""
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS ha_test
(
id
SERIAL
PRIMARY
KEY,
write_time
TIMESTAMPTZ,
write_proxy_host
TEXT,
write_proxy_port
INT,
actual_write_host
TEXT,
actual_write_port
INT,
read_info
TEXT,
#
存储读库信息:
{
"proxy_host"
:
"...",
"actual_host"
:
"...",
.
.
.}
read_time
TIMESTAMPTZ
)
""")
print("📋 测试表初始化完成")
return True
except Exception as e:
print(f"❌ 测试表初始化失败: {str(e)}")
return False
def test_write(conn, actual_write_host, actual_write_port):
"""测试写操作并记录主库地址"""
try:
with conn.cursor() as cur:
test_id = random.randint(1, 1000000)
cur.execute("""
INSERT INTO ha_test (id, write_time, write_proxy_host, write_proxy_port,
actual_write_host, actual_write_port)
VALUES (%s, NOW(), %s, %s, %s, %s) RETURNING id
""", (
test_id,
CONFIG["write_host"], CONFIG["write_port"],
actual_write_host, actual_write_port
))
inserted_id = cur.fetchone()[0]
print(f"✍️ 成功写入数据,ID: {inserted_id} (主库: {actual_write_host}:{actual_write_port})")
return test_id
except Exception as e:
print(f"❌ 写操作失败: {str(e)}")
return None
def test_read(read_conn, test_id, actual_read_host, actual_read_port):
"""纯读操作(不包含任何写操作)"""
if not test_id:
return False
for attempt in range(CONFIG["sync_retries"]):
try:
with read_conn.cursor() as cur:
# 仅执行查询,不做任何更新
cur.execute("SELECT id FROM ha_test WHERE id = %s", (test_id,))
result = cur.fetchone()
if result:
print(
f"📖 成功读取数据,ID: {result[0]} (读库: {actual_read_host}:{actual_read_port}, 尝试 {attempt + 1}/{CONFIG['sync_retries']})")
return True
else:
if attempt < CONFIG["sync_retries"] - 1:
print(
f"⌛ 数据尚未同步,等待 {CONFIG['sync_delay']} 秒后重试 (尝试 {attempt + 1}/{CONFIG['sync_retries']})")
time.sleep(CONFIG["sync_delay"])
except Error as e:
print(f"⚠️ 读取尝试 {attempt + 1} 失败: {str(e)}")
if attempt < CONFIG["sync_retries"] - 1:
time.sleep(CONFIG["sync_delay"])
print(f"❌ 所有读取尝试失败,未找到ID为 {test_id} 的数据 (读库: {actual_read_host}:{actual_read_port})")
return False
def update_read_info(write_conn, test_id, actual_read_host, actual_read_port):
"""通过写连接更新读库信息(仅主库执行)"""
if not test_id or not write_conn:
return
try:
with write_conn.cursor() as cur:
# 构造读库信息JSON
read_info = f"""
{{
"read_proxy_host": "{CONFIG['read_host']}",
"read_proxy_port": {CONFIG['read_port']},
"actual_read_host": "{actual_read_host}",
"actual_read_port": {actual_read_port}
}}
"""
cur.execute("""
UPDATE ha_test
SET read_time = NOW(),
read_info = %s
WHERE id = %s
""", (read_info, test_id))
print(f"📝 已通过主库更新读库信息,ID: {test_id}")
except Exception as e:
print(f"❌ 更新读库信息失败: {str(e)}")
def main():
# 初始化连接和表结构(通过写连接执行)
init_conn, _, _ = get_connection(CONFIG["write_host"], CONFIG["write_port"])
if init_conn:
init_test_table(init_conn)
update_table_schema(init_conn)
init_conn.close()
write_conn = None
read_conn = None
actual_write_host = None
actual_write_port = None
actual_read_host = None
actual_read_port = None
try:
while True:
# 维护连接
if not write_conn or write_conn.closed:
write_conn, actual_write_host, actual_write_port = get_connection(
CONFIG["write_host"], CONFIG["write_port"]
)
if not read_conn or read_conn.closed:
read_conn, actual_read_host, actual_read_port = get_connection(
CONFIG["read_host"], CONFIG["read_port"]
)
# 执行测试
if write_conn and read_conn and actual_write_host and actual_read_host:
test_id = test_write(write_conn, actual_write_host, actual_write_port)
if test_id:
time.sleep(1)
# 纯读操作(从库执行,无写操作)
read_success = test_read(read_conn, test_id, actual_read_host, actual_read_port)
# 通过主库更新读库信息(写操作仅在主库执行)
if read_success:
update_read_info(write_conn, test_id, actual_read_host, actual_read_port)
time.sleep(CONFIG["test_interval"])
except KeyboardInterrupt:
print("\n⏹️ 用户终止程序")
except Exception as e:
print(f"\n💥 发生意外错误: {str(e)}")
finally:
# 清理连接
if write_conn and not write_conn.closed:
write_conn.close()
if read_conn and not read_conn.closed:
read_conn.close()
print("🔚 程序结束")
if __name__ == "__main__":
main()