查看: 165|回复: 3

Python实现MySQL高可用自动故障转移:健康检查+防脑裂锁+主从切换

[复制链接]
发表于 3 小时前 | 显示全部楼层 |阅读模式
在云原生和微服务架构中,数据库高可用(HA)是保证业务连续性的关键。传统方案如MHA、Orchestrator部署复杂,而使用Python自研轻量级故障转移(Failover)系统,能灵活适配不同基础设施。本文实现一个生产级别的MySQL自动切换脚本,包含健康检查、分布式锁防脑裂、主从提拔和流量路由更新,全部代码可复用。

## 系统架构与设计原则

采用一主一从标准拓扑:主库`read_only=OFF`,从库`read_only=ON`。高可用控制器核心工作流程:

1. 定时健康检查,连续多次失败后触发切换;
2. 抢占分布式锁(基于Redis),防止多个控制器同时切换(脑裂);
3. 二次确认主库真的挂了,并验证从库健康;
4. 停止从库复制链路,关闭只读模式,提升为主库;
5. 更新流量路由(此处用Redis模拟配置中心)。

设计要点:双向校验(不仅检查TCP连接,还验证数据库实际写入能力);幂等性操作,保证失败后可重入;使用Lua脚本安全释放锁。

## 完整代码实现

首先安装依赖:
  1. pip install pymysql redis
复制代码

创建`db_ha_controller.py`:
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 模块名称: db_ha_controller.py
  5. 功能描述: 生产级 MySQL 数据库一键高可用与故障自动转移控制器
  6. """
  7. import os
  8. import sys
  9. import time
  10. import logging
  11. import pymysql
  12. import redis
  13. from contextlib import contextmanager
  14. # 1. 日志与全局配置
  15. logging.basicConfig(
  16.     level=logging.INFO,
  17.     format='%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d]: %(message)s',
  18.     handlers=[
  19.         logging.StreamHandler(sys.stdout),
  20.         logging.FileHandler("db_ha_controller.log", encoding="utf-8")
  21.     ]
  22. )
  23. logger = logging.getLogger("DB-HA")
  24. class HAConfig:
  25.     MASTER_HOST = os.getenv("HA_MASTER_HOST", "192.168.1.101")
  26.     SLAVE_HOST = os.getenv("HA_SLAVE_HOST", "192.168.1.102")
  27.     DB_PORT = int(os.getenv("HA_DB_PORT", 3306))
  28.     DB_USER = os.getenv("HA_DB_USER", "ha_admin")
  29.     DB_PASSWORD = os.getenv("HA_DB_PASSWORD", "SecureP@ss123")
  30.     CHECK_TIMEOUT = 3                # 连接超时(秒)
  31.     MAX_RETRIES = 3                  # 连续失败次数阈值
  32.     CHECK_INTERVAL = 5               # 检查间隔(秒)
  33.     REDIS_HOST = os.getenv("HA_REDIS_HOST", "192.168.1.200")
  34.     REDIS_PORT = int(os.getenv("HA_REDIS_PORT", 6379))
  35.     REDIS_PASSWORD = os.getenv("HA_REDIS_PASSWORD", None)
  36.     LOCK_KEY = "db_ha_failover_lock"
  37.     LOCK_TIMEOUT = 60                # 锁有效期(秒),防死锁
  38.     CONSUL_KV_ROUTE_KEY = "service/mysql/primary"
  39. @contextmanager
  40. def get_db_connection(host, user, password, port, timeout=3):
  41.     conn = None
  42.     try:
  43.         conn = pymysql.connect(
  44.             host=host, user=user, password=password,
  45.             port=port, connect_timeout=timeout, autocommit=True
  46.         )
  47.         yield conn
  48.     finally:
  49.         if conn:
  50.             try:
  51.                 conn.close()
  52.             except Exception as e:
  53.                 logger.debug(f"关闭连接异常: {e}")
  54. @contextmanager
  55. def get_redis_client():
  56.     client = None
  57.     try:
  58.         client = redis.Redis(
  59.             host=HAConfig.REDIS_HOST, port=HAConfig.REDIS_PORT,
  60.             password=HAConfig.REDIS_PASSWORD, socket_timeout=3,
  61.             decode_responses=True
  62.         )
  63.         yield client
  64.     finally:
  65.         if client:
  66.             del client
  67. class DatabaseHAService:
  68.     def __init__(self):
  69.         self.config = HAConfig()
  70.     def check_database_health(self, host):
  71.         """全位健康检查:连接、read_only、写入能力"""
  72.         try:
  73.             with get_db_connection(host=host, user=self.config.DB_USER,
  74.                                    password=self.config.DB_PASSWORD,
  75.                                    port=self.config.DB_PORT,
  76.                                    timeout=self.config.CHECK_TIMEOUT) as conn:
  77.                 with conn.cursor() as cursor:
  78.                     cursor.execute("SELECT 1;")
  79.                     cursor.execute("SHOW VARIABLES LIKE 'read_only';")
  80.                     row = cursor.fetchone()
  81.                     is_read_only = True if row and row[1].upper() == "ON" else False
  82.                     if not is_read_only:
  83.                         try:
  84.                             cursor.execute("CREATE DATABASE IF NOT EXISTS ha_heartbeat_db;")
  85.                             cursor.execute("CREATE TABLE IF NOT EXISTS ha_heartbeat_db.t_check (id INT);")
  86.                             cursor.execute("INSERT INTO ha_heartbeat_db.t_check VALUES (1);")
  87.                             cursor.execute("DROP TABLE ha_heartbeat_db.t_check;")
  88.                         except pymysql.MySQLError as we:
  89.                             return False, f"数据库在线但无法写入: {str(we)}", is_read_only
  90.                     return True, "健康", is_read_only
  91.         except pymysql.OperationalError as oe:
  92.             return False, f"网络或认证失败: {str(oe)}", None
  93.         except Exception as e:
  94.             return False, f"未知错误: {str(e)}", None
  95.     def acquire_failover_lock(self, redis_cli, identifier):
  96.         """使用SET NX EX原子获取锁"""
  97.         if redis_cli.set(self.config.LOCK_KEY, identifier,
  98.                          ex=self.config.LOCK_TIMEOUT, nx=True):
  99.             logger.info("成功获取分布式锁,具备切换资格")
  100.             return True
  101.         return False
  102.     def release_failover_lock(self, redis_cli, identifier):
  103.         """Lua脚本安全释放锁,只释放自己持有的锁"""
  104.         lua_script = """
  105. if redis.call('get', KEYS[1]) == ARGV[1] then
  106.     return redis.call('del', KEYS[1])
  107. else
  108.     return 0
  109. end"""
  110.         try:
  111.             result = redis_cli.eval(lua_script, 1, self.config.LOCK_KEY, identifier)
  112.             if result == 1:
  113.                 logger.info("分布式锁释放成功")
  114.                 return True
  115.             else:
  116.                 logger.warning("锁释放失败,可能已过期或被挪用")
  117.                 return False
  118.         except Exception as e:
  119.             logger.error(f"释放锁异常: {e}")
  120.             return False
  121.     def update_traffic_routing(self, redis_cli, new_master_host):
  122.         """更新配置中心(这里用Redis模拟Consul/Etcd)"""
  123.         try:
  124.             redis_cli.set(self.config.CONSUL_KV_ROUTE_KEY,
  125.                           f"{new_master_host}:{self.config.DB_PORT}")
  126.             logger.info("流量切换成功,业务路由已同步至最新主库")
  127.             return True
  128.         except Exception as e:
  129.             logger.critical(f"路由更新失败,需人工介入: {e}")
  130.             return False
  131.     def promote_slave_to_master(self, slave_host):
  132.         """停止复制,关闭只读,提升从库为主库"""
  133.         try:
  134.             with get_db_connection(host=slave_host, user=self.config.DB_USER,
  135.                                    password=self.config.DB_PASSWORD,
  136.                                    port=self.config.DB_PORT,
  137.                                    timeout=self.config.CHECK_TIMEOUT) as conn:
  138.                 with conn.cursor() as cursor:
  139.                     try:
  140.                         cursor.execute("STOP SLAVE;")
  141.                     except pymysql.InternalError:
  142.                         cursor.execute("STOP REPLICA;")  # MySQL 8.0+
  143.                     try:
  144.                         cursor.execute("RESET SLAVE ALL;")
  145.                     except pymysql.InternalError:
  146.                         cursor.execute("RESET REPLICA ALL;")
  147.                     cursor.execute("SET GLOBAL read_only = OFF;")
  148.                     cursor.execute("SET GLOBAL super_read_only = OFF;")
  149.                     logger.info(f"从库 {slave_host} 提升为主库成功")
  150.                     return True
  151.         except Exception as e:
  152.             logger.critical(f"提升从库失败: {e}")
  153.             return False
  154.     def execute_one_click_failover(self, redis_cli, controller_id):
  155.         """一键Failover主逻辑"""
  156.         if not self.acquire_failover_lock(redis_cli, controller_id):
  157.             logger.warning("未抢到分布式锁,其他节点正在执行切换")
  158.             return False
  159.         try:
  160.             # 二次确认主库真挂了
  161.             re_check, reason, _ = self.check_database_health(self.config.MASTER_HOST)
  162.             if re_check:
  163.                 logger.warning(f"虚惊一场,主库已恢复 ({reason}),取消切换")
  164.                 return False
  165.             # 验证从库健康
  166.             slave_ok, slave_reason, is_ro = self.check_database_health(self.config.SLAVE_HOST)
  167.             if not slave_ok:
  168.                 logger.critical(f"从库也不可用: {slave_reason},无法切换")
  169.                 return False
  170.             # 提拔从库
  171.             if not self.promote_slave_to_master(self.config.SLAVE_HOST):
  172.                 return False
  173.             # 更新路由
  174.             if not self.update_traffic_routing(redis_cli, self.config.SLAVE_HOST):
  175.                 logger.critical("从库已升级但路由更新失败,系统存在读写黑洞风险")
  176.                 return False
  177.             logger.critical(f"故障转移完成!新主库: {self.config.SLAVE_HOST}")
  178.             return True
  179.         except Exception as e:
  180.             logger.error(f"切换异常: {e}")
  181.             return False
  182.         finally:
  183.             self.release_failover_lock(redis_cli, controller_id)
  184.     def start_guardian_loop(self):
  185.         """守护进程循环"""
  186.         controller_id = f"ha_node_{os.getpid()}_{int(time.time())}"
  187.         logger.info(f"守护进程启动,节点ID: {controller_id}")
  188.         consecutive_failures = 0
  189.         while True:
  190.             try:
  191.                 is_ok, reason, is_ro = self.check_database_health(self.config.MASTER_HOST)
  192.                 if is_ok:
  193.                     if is_ro:
  194.                         logger.warning(f"主库在线但被设为只读!")
  195.                     consecutive_failures = 0
  196.                     logger.info(f"主库 {self.config.MASTER_HOST} 健康")
  197.                 else:
  198.                     consecutive_failures += 1
  199.                     logger.error(f"主库异常 [{consecutive_failures}/{self.config.MAX_RETRIES}]: {reason}")
  200.                     if consecutive_failures >= self.config.MAX_RETRIES:
  201.                         with get_redis_client() as redis_cli:
  202.                             self.execute_one_click_failover(redis_cli, controller_id)
  203.                         logger.critical("切换完成,守护进程退出以防脑裂")
  204.                         break
  205.             except redis.RedisError as re:
  206.                 logger.error(f"Redis异常,降级为纯监控: {re}")
  207.             except Exception as e:
  208.                 logger.critical(f"主循环异常: {e}")
  209.             time.sleep(self.config.CHECK_INTERVAL)
  210. if __name__ == '__main__':  # 注意原文章笔误,此处修正为正确写法
  211.     ha_engine = DatabaseHAService()
  212.     try:
  213.         ha_engine.start_guardian_loop()
  214.     except KeyboardInterrupt:
  215.         logger.info("收到终止信号,安全退出")
复制代码

## 核心模块深度解析

### 1. 全位健康检查

常见监控仅检查3306端口是否开放,但生产环境存在“端口开着、数据库假死”的情况。本代码通过`connect_timeout=3`避免连接阻塞,并实际执行写入操作验证主库是否可写。若数据库因磁盘满变为只读,`read_only`检测会告警;若无法创建临时表,则判定为异常。这种深度校验大幅降低误切换概率。

### 2. 防脑裂分布式锁

使用Redis的`SET NX EX`原子命令加锁,锁超时60秒防止控制器崩溃后死锁。释放锁时采用Lua脚本检查锁的持有者标识(`identifier`),避免误删其他控制器的锁。在生产环境,建议替换为Consul或Etcd的强一致性锁,进一步降低脑裂风险。

### 3. 主从提拔与幂等性

`promote_slave_to_master`函数依次执行`STOP SLAVE`、`RESET SLAVE ALL`、`SET GLOBAL read_only=OFF`和`super_read_only=OFF`。代码兼容MySQL 5.7和8.0的语法差异(`SLAVE` vs `REPLICA`)。操作设计为幂等:即使部分步骤失败,重复执行不会破坏数据一致性。

### 4. 流量路由更新

示例中使用Redis存储当前主库地址(`service/mysql/primary`)。实际生产可替换为Consul API、Etcd API或云平台VIP切换。该步骤是业务无感知切换的关键,建议结合配置中心的Watch机制秒级生效。

## 生产部署与测试指南

1. 环境准备:创建MySQL用户`ha_admin`并授予`REPLICATION SLAVE, REPLICATION CLIENT, SUPER, CREATE, DROP, INSERT, SELECT`等权限。确保主从复制已配置好。
2. 启动监控:运行脚本`python3 db_ha_controller.py`,日志将实时打印健康检查结果。
3. 模拟主库故障:使用`systemctl stop mysql`或防火墙阻断3306端口。
4. 观察切换:约15秒(连续3次失败×5秒间隔)后,脚本会抢夺锁,将指定从库提升为主库,并更新路由。守护进程在切换成功后自动退出,防止旧主库恢复后抢写。
5. 验证:连接新主库(原从库IP)执行写操作,确认正常。

## 总结

本方案用不到200行Python代码实现了一个可用的MySQL自动故障转移控制器,覆盖了健康检查、防脑裂、幂等切换和路由更新等生产级特性。读者可根据实际环境调整配置(通过环境变量注入),对接Consul/Nacos或云API即可投入生产。建议结合半同步复制和GTID保证数据零丢失。
回复

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python实现MySQL高可用自动故障转移:健康检查+防脑裂锁+主从切换

感谢分享这么详细的实战代码!一主一从+Redis锁防脑裂的设计思路很清晰,而且用Lua脚本释放锁保证了原子性,很靠谱。有个问题想请教:检查到主库故障后,提升从库之前,有没有考虑过从库的复制延迟或binlog位置校验?如果从库落后主库几秒甚至更多,直接提升可能会丢数据。另外,生产环境建议加上半同步复制或GTID校验,让切换更安全。期待后续能补充这部分逻辑!
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python实现MySQL高可用自动故障转移:健康检查+防脑裂锁+主从切换

楼主这个实现很完整,把生产环境里几个关键点都考虑到了:健康检查的双向校验、Redis分布式锁防脑裂、主从切换的幂等性,还有用Lua脚本释放锁,细节处理得很扎实。特别是用环境变量来做配置,方便容器化部署,很符合云原生习惯。 想请教一个问题:如果主库宕机后,从库的复制延迟比较大,楼主的脚本是直接提升从库为主,还是有什么机制来判断数据一致性?另外,切换后旧主库恢复时,有没有考虑自动把它重新加入集群作为新从库?这部分在代码里好像没看到,但实际运维中挺常见的。
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python实现MySQL高可用自动故障转移:健康检查+防脑裂锁+主从切换

楼主这个实现很实用,健康检查+Redis锁防脑裂的设计思路清晰,代码结构也很规范。我比较关心在多节点部署时,Redis锁的持有者如果意外宕机,`LOCK_TIMEOUT` 到期后其他节点是否能安全接管?另外,切换后通知配置中心更新路由的地方,用Consul KV会更好吗?期待后续能分享更多生产环境的踩坑经验。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-28 18:06 , Processed in 0.042107 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部