在后端开发和运维工作中,批量检测接口存活、响应时间与状态码是日常刚需。传统的 requests 串行循环方案,100个接口即使每个超时3秒,总耗时也要300秒,无法满足高频监控需求。基于 asyncio + aiohttp 的异步方案可将 100 个接口的检查耗时压缩到单个接口的最大响应时间量级,效率提升数十倍。本文从零构建一套可配置、带异常捕获、支持并发限制的异步接口健康检查工具,可直接落地项目监控或定时巡检。
运行环境:Python 3.7+;核心依赖:aiohttp(安装命令:pip install aiohttp)。
一、异步为何适合健康检查?
接口健康检查是纯 IO 密集型场景,程序绝大多数时间在等待服务器响应,几乎不消耗 CPU。同步方案逐个请求必须等上一个完成才能执行下一个,规模越大耗时越长。异步方案利用事件循环在 IO 等待期间切换执行其他请求,实现多接口并发,同时可通过信号量(Semaphore)控制并发数,防止瞬间大量请求压垮目标服务器。
二、极简版:异步批量并发检查
下面的代码实现了最基础的异步健康检查:批量请求接口,记录状态、响应时间与异常信息。全局复用 aiohttp.ClientSession 减少 TCP 连接开销,全异常捕获保证单个接口报错不影响整体巡检。
- import asyncio
- import aiohttp
- import time
- API_LIST = [
- {"name": "百度首页", "url": "https://www.baidu.com", "method": "GET"},
- {"name": "HTTPBIN GET", "url": "https://httpbin.org/get", "method": "GET"},
- {"name": "HTTPBIN POST", "url": "https://httpbin.org/post", "method": "POST"},
- {"name": "无效接口", "url": "https://httpbin.org/error", "method": "GET"},
- ]
- TIMEOUT = aiohttp.ClientTimeout(total=5)
- async def check_api(session: aiohttp.ClientSession, api: dict):
- start_time = time.time()
- name = api["name"]
- url = api["url"]
- method = api["method"]
- try:
- if method.upper() == "GET":
- async with session.get(url, timeout=TIMEOUT) as resp:
- response_time = round((time.time() - start_time) * 1000, 2)
- return {
- "接口名称": name,
- "接口地址": url,
- "请求方式": method,
- "状态": "正常",
- "状态码": resp.status,
- "响应耗时(ms)": response_time,
- "异常信息": ""
- }
- elif method.upper() == "POST":
- async with session.post(url, timeout=TIMEOUT) as resp:
- response_time = round((time.time() - start_time) * 1000, 2)
- return {
- "接口名称": name,
- "接口地址": url,
- "请求方式": method,
- "状态": "正常",
- "状态码": resp.status,
- "响应耗时(ms)": response_time,
- "异常信息": ""
- }
- except Exception as e:
- response_time = round((time.time() - start_time) * 1000, 2)
- return {
- "接口名称": name,
- "接口地址": url,
- "请求方式": method,
- "状态": "异常",
- "状态码": None,
- "响应耗时(ms)": response_time,
- "异常信息": str(e)
- }
- async def main():
- async with aiohttp.ClientSession(timeout=TIMEOUT) as session:
- tasks = [check_api(session, api) for api in API_LIST]
- results = await asyncio.gather(*tasks)
- print("=" * 80)
- print("接口健康巡检结果")
- print("=" * 80)
- for res in results:
- print(f"接口:{res['接口名称']:10} | 状态:{res['状态']} | 状态码:{res['状态码']} | 耗时:{res['响应耗时(ms)']}ms | 异常:{res['异常信息']}")
- if __name__ == "__main__":
- start = time.time()
- asyncio.run(main())
- total_time = round(time.time() - start, 2)
- print(f"本次巡检总耗时:{total_time}s")
复制代码
三、进阶优化:限流并发避免压垮服务
无限制并发在检测几百个接口时可能触发服务器限流或防火墙拦截。通过 asyncio.Semaphore 设置最大并发数,配合 aiohttp.TCPConnector 优化连接池,同时细化状态码校验规则。下面示例限制最多3个接口同时请求。
- import asyncio
- import aiohttp
- import time
- API_LIST = [
- {"name": "百度首页", "url": "https://www.baidu.com", "method": "GET"},
- {"name": "HTTPBIN GET", "url": "https://httpbin.org/get", "method": "GET"},
- {"name": "HTTPBIN POST", "url": "https://httpbin.org/post", "method": "POST"},
- {"name": "无效接口", "url": "https://httpbin.org/error", "method": "GET"},
- {"name": "测试接口1", "url": "https://httpbin.org/delay/1", "method": "GET"},
- {"name": "测试接口2", "url": "https://httpbin.org/delay/2", "method": "GET"},
- ]
- MAX_CONCURRENT = 3
- TIMEOUT = aiohttp.ClientTimeout(total=5)
- semaphore = asyncio.Semaphore(MAX_CONCURRENT)
- async def check_api(session: aiohttp.ClientSession, api: dict):
- async with semaphore:
- start_time = time.time()
- name = api["name"]
- url = api["url"]
- method = api["method"]
- try:
- if method.upper() == "GET":
- async with session.get(url, timeout=TIMEOUT) as resp:
- response_time = round((time.time() - start_time) * 1000, 2)
- status = "正常" if resp.status in [200, 201] else "异常"
- return {
- "接口名称": name,
- "接口地址": url,
- "状态": status,
- "状态码": resp.status,
- "响应耗时(ms)": response_time,
- "异常信息": ""
- }
- elif method.upper() == "POST":
- async with session.post(url, timeout=TIMEOUT) as resp:
- response_time = round((time.time() - start_time) * 1000, 2)
- status = "正常" if resp.status in [200, 201] else "异常"
- return {
- "接口名称": name,
- "接口地址": url,
- "状态": status,
- "状态码": resp.status,
- "响应耗时(ms)": response_time,
- "异常信息": ""
- }
- except Exception as e:
- response_time = round((time.time() - start_time) * 1000, 2)
- return {
- "接口名称": name,
- "接口地址": url,
- "状态": "异常",
- "状态码": None,
- "响应耗时(ms)": response_time,
- "异常信息": str(e)
- }
- async def main():
- connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
- async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as session:
- tasks = [check_api(session, api) for api in API_LIST]
- results = await asyncio.gather(*tasks)
- normal_count = len([res for res in results if res["状态"] == "正常"])
- error_count = len(results) - normal_count
- print("=" * 90)
- print("接口健康巡检报告")
- print("=" * 90)
- for res in results:
- print(f"接口:{res['接口名称']:10} | 状态:{res['状态']} | 状态码:{res['状态码']} | 耗时:{res['响应耗时(ms)']}ms | 异常:{res['异常信息']}")
- print(f"正常接口:{normal_count} 个 | 异常接口:{error_count} 个")
- if __name__ == "__main__":
- start = time.time()
- asyncio.run(main())
- print(f"本次巡检总耗时:{round(time.time() - start, 2)}s")
复制代码
四、企业级扩展:支持请求头、请求参数与POST传参
实际业务中多数接口需要 Token 鉴权、JSON 参数或表单提交。下面的配置示例支持自定义 headers 和 data(POST 时使用 json 参数发送),兼容 GET 与 POST 方法。
- import asyncio
- import aiohttp
- import time
- API_LIST = [
- {
- "name": "带鉴权GET接口",
- "url": "https://httpbin.org/headers",
- "method": "GET",
- "headers": {"Authorization": "Bearer test123456", "User-Agent": "Mozilla/5.0"},
- "data": None
- },
- {
- "name": "带参数POST接口",
- "url": "https://httpbin.org/post",
- "method": "POST",
- "headers": {"Content-Type": "application/json"},
- "data": {"username": "admin", "password": "123456"}
- }
- ]
- MAX_CONCURRENT = 3
- TIMEOUT = aiohttp.ClientTimeout(total=5)
- semaphore = asyncio.Semaphore(MAX_CONCURRENT)
- async def check_api(session: aiohttp.ClientSession, api: dict):
- async with semaphore:
- start_time = time.time()
- name = api["name"]
- url = api["url"]
- method = api["method"]
- headers = api.get("headers", {})
- data = api.get("data")
- try:
- if method.upper() == "GET":
- async with session.get(url, headers=headers, timeout=TIMEOUT) as resp:
- response_time = round((time.time() - start_time) * 1000, 2)
- return {
- "接口名称": name, "状态": "正常" if resp.status == 200 else "异常",
- "状态码": resp.status, "耗时(ms)": response_time, "异常": ""
- }
- elif method.upper() == "POST":
- async with session.post(url, headers=headers, json=data, timeout=TIMEOUT) as resp:
- response_time = round((time.time() - start_time) * 1000, 2)
- return {
- "接口名称": name, "状态": "正常" if resp.status == 200 else "异常",
- "状态码": resp.status, "耗时(ms)": response_time, "异常": ""
- }
- except Exception as e:
- return {
- "接口名称": name, "状态": "异常", "状态码": None,
- "耗时(ms)": round((time.time() - start_time) * 1000, 2), "异常": str(e)
- }
- async def main():
- connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
- async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as session:
- tasks = [check_api(session, api) for api in API_LIST]
- results = await asyncio.gather(*tasks)
- for res in results:
- print(res)
- if __name__ == "__main__":
- asyncio.run(main())
复制代码
五、常见问题避坑
1. 为什么不用 requests 做异步?
requests 是同步阻塞库,不兼容 asyncio 事件循环,强行在异步中使用会阻塞整个程序导致并发失效。必须使用 aiohttp。
2. 并发数设置多少合适?
内网接口可设置 10-20,公网接口建议 3-5,避免因 IP 封禁。根据目标服务器性能灵活调整。
3. 偶尔报连接超时怎么办?
属于正常网络波动,代码已捕获异常不会影响整体巡检。可适当增大 aiohttp.ClientTimeout 的 total 值。
六、定时巡检部署思路
可将上述 main 函数结合 APScheduler 实现秒级/分钟级定时执行,异常接口对接钉钉或企业微信机器人推送告警,实现无人值守监控。
总结:Python 异步 IO 实现的接口健康检查方案效率高、安全可控(信号量限流)、扩展性强(支持鉴权/参数/定时/告警),全局异常捕获保证了单个接口故障不影响整体。提供的代码可直接集成到个人项目或企业运维监控系统,快速搭建轻量化接口健康监控体系。 |