在云原生和微服务架构中,数据库高可用(HA)是保证业务连续性的关键。传统方案如MHA、Orchestrator部署复杂,而使用Python自研轻量级故障转移(Failover)系统,能灵活适配不同基础设施。本文实现一个生产级别的MySQL自动切换脚本,包含健康检查、分布式锁防脑裂、主从提拔和流量路由更新,全部代码可复用。
## 系统架构与设计原则
采用一主一从标准拓扑:主库`read_only=OFF`,从库`read_only=ON`。高可用控制器核心工作流程:
1. 定时健康检查,连续多次失败后触发切换;
2. 抢占分布式锁(基于Redis),防止多个控制器同时切换(脑裂);
3. 二次确认主库真的挂了,并验证从库健康;
4. 停止从库复制链路,关闭只读模式,提升为主库;
5. 更新流量路由(此处用Redis模拟配置中心)。
设计要点:双向校验(不仅检查TCP连接,还验证数据库实际写入能力);幂等性操作,保证失败后可重入;使用Lua脚本安全释放锁。
## 完整代码实现
首先安装依赖:- pip install pymysql redis
复制代码
创建`db_ha_controller.py`:- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 模块名称: db_ha_controller.py
- 功能描述: 生产级 MySQL 数据库一键高可用与故障自动转移控制器
- """
- import os
- import sys
- import time
- import logging
- import pymysql
- import redis
- from contextlib import contextmanager
- # 1. 日志与全局配置
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d]: %(message)s',
- handlers=[
- logging.StreamHandler(sys.stdout),
- logging.FileHandler("db_ha_controller.log", encoding="utf-8")
- ]
- )
- logger = logging.getLogger("DB-HA")
- class HAConfig:
- MASTER_HOST = os.getenv("HA_MASTER_HOST", "192.168.1.101")
- SLAVE_HOST = os.getenv("HA_SLAVE_HOST", "192.168.1.102")
- DB_PORT = int(os.getenv("HA_DB_PORT", 3306))
- DB_USER = os.getenv("HA_DB_USER", "ha_admin")
- DB_PASSWORD = os.getenv("HA_DB_PASSWORD", "SecureP@ss123")
- CHECK_TIMEOUT = 3 # 连接超时(秒)
- MAX_RETRIES = 3 # 连续失败次数阈值
- CHECK_INTERVAL = 5 # 检查间隔(秒)
- REDIS_HOST = os.getenv("HA_REDIS_HOST", "192.168.1.200")
- REDIS_PORT = int(os.getenv("HA_REDIS_PORT", 6379))
- REDIS_PASSWORD = os.getenv("HA_REDIS_PASSWORD", None)
- LOCK_KEY = "db_ha_failover_lock"
- LOCK_TIMEOUT = 60 # 锁有效期(秒),防死锁
- CONSUL_KV_ROUTE_KEY = "service/mysql/primary"
- @contextmanager
- def get_db_connection(host, user, password, port, timeout=3):
- conn = None
- try:
- conn = pymysql.connect(
- host=host, user=user, password=password,
- port=port, connect_timeout=timeout, autocommit=True
- )
- yield conn
- finally:
- if conn:
- try:
- conn.close()
- except Exception as e:
- logger.debug(f"关闭连接异常: {e}")
- @contextmanager
- def get_redis_client():
- client = None
- try:
- client = redis.Redis(
- host=HAConfig.REDIS_HOST, port=HAConfig.REDIS_PORT,
- password=HAConfig.REDIS_PASSWORD, socket_timeout=3,
- decode_responses=True
- )
- yield client
- finally:
- if client:
- del client
- class DatabaseHAService:
- def __init__(self):
- self.config = HAConfig()
- def check_database_health(self, host):
- """全位健康检查:连接、read_only、写入能力"""
- try:
- with get_db_connection(host=host, user=self.config.DB_USER,
- password=self.config.DB_PASSWORD,
- port=self.config.DB_PORT,
- timeout=self.config.CHECK_TIMEOUT) as conn:
- with conn.cursor() as cursor:
- cursor.execute("SELECT 1;")
- cursor.execute("SHOW VARIABLES LIKE 'read_only';")
- row = cursor.fetchone()
- is_read_only = True if row and row[1].upper() == "ON" else False
- if not is_read_only:
- try:
- cursor.execute("CREATE DATABASE IF NOT EXISTS ha_heartbeat_db;")
- cursor.execute("CREATE TABLE IF NOT EXISTS ha_heartbeat_db.t_check (id INT);")
- cursor.execute("INSERT INTO ha_heartbeat_db.t_check VALUES (1);")
- cursor.execute("DROP TABLE ha_heartbeat_db.t_check;")
- except pymysql.MySQLError as we:
- return False, f"数据库在线但无法写入: {str(we)}", is_read_only
- return True, "健康", is_read_only
- except pymysql.OperationalError as oe:
- return False, f"网络或认证失败: {str(oe)}", None
- except Exception as e:
- return False, f"未知错误: {str(e)}", None
- def acquire_failover_lock(self, redis_cli, identifier):
- """使用SET NX EX原子获取锁"""
- if redis_cli.set(self.config.LOCK_KEY, identifier,
- ex=self.config.LOCK_TIMEOUT, nx=True):
- logger.info("成功获取分布式锁,具备切换资格")
- return True
- return False
- def release_failover_lock(self, redis_cli, identifier):
- """Lua脚本安全释放锁,只释放自己持有的锁"""
- lua_script = """
- if redis.call('get', KEYS[1]) == ARGV[1] then
- return redis.call('del', KEYS[1])
- else
- return 0
- end"""
- try:
- result = redis_cli.eval(lua_script, 1, self.config.LOCK_KEY, identifier)
- if result == 1:
- logger.info("分布式锁释放成功")
- return True
- else:
- logger.warning("锁释放失败,可能已过期或被挪用")
- return False
- except Exception as e:
- logger.error(f"释放锁异常: {e}")
- return False
- def update_traffic_routing(self, redis_cli, new_master_host):
- """更新配置中心(这里用Redis模拟Consul/Etcd)"""
- try:
- redis_cli.set(self.config.CONSUL_KV_ROUTE_KEY,
- f"{new_master_host}:{self.config.DB_PORT}")
- logger.info("流量切换成功,业务路由已同步至最新主库")
- return True
- except Exception as e:
- logger.critical(f"路由更新失败,需人工介入: {e}")
- return False
- def promote_slave_to_master(self, slave_host):
- """停止复制,关闭只读,提升从库为主库"""
- try:
- with get_db_connection(host=slave_host, user=self.config.DB_USER,
- password=self.config.DB_PASSWORD,
- port=self.config.DB_PORT,
- timeout=self.config.CHECK_TIMEOUT) as conn:
- with conn.cursor() as cursor:
- try:
- cursor.execute("STOP SLAVE;")
- except pymysql.InternalError:
- cursor.execute("STOP REPLICA;") # MySQL 8.0+
- try:
- cursor.execute("RESET SLAVE ALL;")
- except pymysql.InternalError:
- cursor.execute("RESET REPLICA ALL;")
- cursor.execute("SET GLOBAL read_only = OFF;")
- cursor.execute("SET GLOBAL super_read_only = OFF;")
- logger.info(f"从库 {slave_host} 提升为主库成功")
- return True
- except Exception as e:
- logger.critical(f"提升从库失败: {e}")
- return False
- def execute_one_click_failover(self, redis_cli, controller_id):
- """一键Failover主逻辑"""
- if not self.acquire_failover_lock(redis_cli, controller_id):
- logger.warning("未抢到分布式锁,其他节点正在执行切换")
- return False
- try:
- # 二次确认主库真挂了
- re_check, reason, _ = self.check_database_health(self.config.MASTER_HOST)
- if re_check:
- logger.warning(f"虚惊一场,主库已恢复 ({reason}),取消切换")
- return False
- # 验证从库健康
- slave_ok, slave_reason, is_ro = self.check_database_health(self.config.SLAVE_HOST)
- if not slave_ok:
- logger.critical(f"从库也不可用: {slave_reason},无法切换")
- return False
- # 提拔从库
- if not self.promote_slave_to_master(self.config.SLAVE_HOST):
- return False
- # 更新路由
- if not self.update_traffic_routing(redis_cli, self.config.SLAVE_HOST):
- logger.critical("从库已升级但路由更新失败,系统存在读写黑洞风险")
- return False
- logger.critical(f"故障转移完成!新主库: {self.config.SLAVE_HOST}")
- return True
- except Exception as e:
- logger.error(f"切换异常: {e}")
- return False
- finally:
- self.release_failover_lock(redis_cli, controller_id)
- def start_guardian_loop(self):
- """守护进程循环"""
- controller_id = f"ha_node_{os.getpid()}_{int(time.time())}"
- logger.info(f"守护进程启动,节点ID: {controller_id}")
- consecutive_failures = 0
- while True:
- try:
- is_ok, reason, is_ro = self.check_database_health(self.config.MASTER_HOST)
- if is_ok:
- if is_ro:
- logger.warning(f"主库在线但被设为只读!")
- consecutive_failures = 0
- logger.info(f"主库 {self.config.MASTER_HOST} 健康")
- else:
- consecutive_failures += 1
- logger.error(f"主库异常 [{consecutive_failures}/{self.config.MAX_RETRIES}]: {reason}")
- if consecutive_failures >= self.config.MAX_RETRIES:
- with get_redis_client() as redis_cli:
- self.execute_one_click_failover(redis_cli, controller_id)
- logger.critical("切换完成,守护进程退出以防脑裂")
- break
- except redis.RedisError as re:
- logger.error(f"Redis异常,降级为纯监控: {re}")
- except Exception as e:
- logger.critical(f"主循环异常: {e}")
- time.sleep(self.config.CHECK_INTERVAL)
- if __name__ == '__main__': # 注意原文章笔误,此处修正为正确写法
- ha_engine = DatabaseHAService()
- try:
- ha_engine.start_guardian_loop()
- except KeyboardInterrupt:
- logger.info("收到终止信号,安全退出")
复制代码
## 核心模块深度解析
### 1. 全位健康检查
常见监控仅检查3306端口是否开放,但生产环境存在“端口开着、数据库假死”的情况。本代码通过`connect_timeout=3`避免连接阻塞,并实际执行写入操作验证主库是否可写。若数据库因磁盘满变为只读,`read_only`检测会告警;若无法创建临时表,则判定为异常。这种深度校验大幅降低误切换概率。
### 2. 防脑裂分布式锁
使用Redis的`SET NX EX`原子命令加锁,锁超时60秒防止控制器崩溃后死锁。释放锁时采用Lua脚本检查锁的持有者标识(`identifier`),避免误删其他控制器的锁。在生产环境,建议替换为Consul或Etcd的强一致性锁,进一步降低脑裂风险。
### 3. 主从提拔与幂等性
`promote_slave_to_master`函数依次执行`STOP SLAVE`、`RESET SLAVE ALL`、`SET GLOBAL read_only=OFF`和`super_read_only=OFF`。代码兼容MySQL 5.7和8.0的语法差异(`SLAVE` vs `REPLICA`)。操作设计为幂等:即使部分步骤失败,重复执行不会破坏数据一致性。
### 4. 流量路由更新
示例中使用Redis存储当前主库地址(`service/mysql/primary`)。实际生产可替换为Consul API、Etcd API或云平台VIP切换。该步骤是业务无感知切换的关键,建议结合配置中心的Watch机制秒级生效。
## 生产部署与测试指南
1. 环境准备:创建MySQL用户`ha_admin`并授予`REPLICATION SLAVE, REPLICATION CLIENT, SUPER, CREATE, DROP, INSERT, SELECT`等权限。确保主从复制已配置好。
2. 启动监控:运行脚本`python3 db_ha_controller.py`,日志将实时打印健康检查结果。
3. 模拟主库故障:使用`systemctl stop mysql`或防火墙阻断3306端口。
4. 观察切换:约15秒(连续3次失败×5秒间隔)后,脚本会抢夺锁,将指定从库提升为主库,并更新路由。守护进程在切换成功后自动退出,防止旧主库恢复后抢写。
5. 验证:连接新主库(原从库IP)执行写操作,确认正常。
## 总结
本方案用不到200行Python代码实现了一个可用的MySQL自动故障转移控制器,覆盖了健康检查、防脑裂、幂等切换和路由更新等生产级特性。读者可根据实际环境调整配置(通过环境变量注入),对接Consul/Nacos或云API即可投入生产。建议结合半同步复制和GTID保证数据零丢失。 |