查看: 122|回复: 3

Python aiohttp异步实现批量接口健康检查:限流并发与异常处理实战

[复制链接]
发表于 2 小时前 | 显示全部楼层 |阅读模式
在后端开发和运维工作中,批量检测接口存活、响应时间与状态码是日常刚需。传统的 requests 串行循环方案,100个接口即使每个超时3秒,总耗时也要300秒,无法满足高频监控需求。基于 asyncio + aiohttp 的异步方案可将 100 个接口的检查耗时压缩到单个接口的最大响应时间量级,效率提升数十倍。本文从零构建一套可配置、带异常捕获、支持并发限制的异步接口健康检查工具,可直接落地项目监控或定时巡检。

运行环境:Python 3.7+;核心依赖:aiohttp(安装命令:pip install aiohttp)。

一、异步为何适合健康检查?
接口健康检查是纯 IO 密集型场景,程序绝大多数时间在等待服务器响应,几乎不消耗 CPU。同步方案逐个请求必须等上一个完成才能执行下一个,规模越大耗时越长。异步方案利用事件循环在 IO 等待期间切换执行其他请求,实现多接口并发,同时可通过信号量(Semaphore)控制并发数,防止瞬间大量请求压垮目标服务器。

二、极简版:异步批量并发检查
下面的代码实现了最基础的异步健康检查:批量请求接口,记录状态、响应时间与异常信息。全局复用 aiohttp.ClientSession 减少 TCP 连接开销,全异常捕获保证单个接口报错不影响整体巡检。
  1. import asyncio
  2. import aiohttp
  3. import time
  4. API_LIST = [
  5.     {"name": "百度首页", "url": "https://www.baidu.com", "method": "GET"},
  6.     {"name": "HTTPBIN GET", "url": "https://httpbin.org/get", "method": "GET"},
  7.     {"name": "HTTPBIN POST", "url": "https://httpbin.org/post", "method": "POST"},
  8.     {"name": "无效接口", "url": "https://httpbin.org/error", "method": "GET"},
  9. ]
  10. TIMEOUT = aiohttp.ClientTimeout(total=5)
  11. async def check_api(session: aiohttp.ClientSession, api: dict):
  12.     start_time = time.time()
  13.     name = api["name"]
  14.     url = api["url"]
  15.     method = api["method"]
  16.     try:
  17.         if method.upper() == "GET":
  18.             async with session.get(url, timeout=TIMEOUT) as resp:
  19.                 response_time = round((time.time() - start_time) * 1000, 2)
  20.                 return {
  21.                     "接口名称": name,
  22.                     "接口地址": url,
  23.                     "请求方式": method,
  24.                     "状态": "正常",
  25.                     "状态码": resp.status,
  26.                     "响应耗时(ms)": response_time,
  27.                     "异常信息": ""
  28.                 }
  29.         elif method.upper() == "POST":
  30.             async with session.post(url, timeout=TIMEOUT) as resp:
  31.                 response_time = round((time.time() - start_time) * 1000, 2)
  32.                 return {
  33.                     "接口名称": name,
  34.                     "接口地址": url,
  35.                     "请求方式": method,
  36.                     "状态": "正常",
  37.                     "状态码": resp.status,
  38.                     "响应耗时(ms)": response_time,
  39.                     "异常信息": ""
  40.                 }
  41.     except Exception as e:
  42.         response_time = round((time.time() - start_time) * 1000, 2)
  43.         return {
  44.             "接口名称": name,
  45.             "接口地址": url,
  46.             "请求方式": method,
  47.             "状态": "异常",
  48.             "状态码": None,
  49.             "响应耗时(ms)": response_time,
  50.             "异常信息": str(e)
  51.         }
  52. async def main():
  53.     async with aiohttp.ClientSession(timeout=TIMEOUT) as session:
  54.         tasks = [check_api(session, api) for api in API_LIST]
  55.         results = await asyncio.gather(*tasks)
  56.         print("=" * 80)
  57.         print("接口健康巡检结果")
  58.         print("=" * 80)
  59.         for res in results:
  60.             print(f"接口:{res['接口名称']:10} | 状态:{res['状态']} | 状态码:{res['状态码']} | 耗时:{res['响应耗时(ms)']}ms | 异常:{res['异常信息']}")
  61. if __name__ == "__main__":
  62.     start = time.time()
  63.     asyncio.run(main())
  64.     total_time = round(time.time() - start, 2)
  65.     print(f"本次巡检总耗时:{total_time}s")
复制代码

三、进阶优化:限流并发避免压垮服务
无限制并发在检测几百个接口时可能触发服务器限流或防火墙拦截。通过 asyncio.Semaphore 设置最大并发数,配合 aiohttp.TCPConnector 优化连接池,同时细化状态码校验规则。下面示例限制最多3个接口同时请求。
  1. import asyncio
  2. import aiohttp
  3. import time
  4. API_LIST = [
  5.     {"name": "百度首页", "url": "https://www.baidu.com", "method": "GET"},
  6.     {"name": "HTTPBIN GET", "url": "https://httpbin.org/get", "method": "GET"},
  7.     {"name": "HTTPBIN POST", "url": "https://httpbin.org/post", "method": "POST"},
  8.     {"name": "无效接口", "url": "https://httpbin.org/error", "method": "GET"},
  9.     {"name": "测试接口1", "url": "https://httpbin.org/delay/1", "method": "GET"},
  10.     {"name": "测试接口2", "url": "https://httpbin.org/delay/2", "method": "GET"},
  11. ]
  12. MAX_CONCURRENT = 3
  13. TIMEOUT = aiohttp.ClientTimeout(total=5)
  14. semaphore = asyncio.Semaphore(MAX_CONCURRENT)
  15. async def check_api(session: aiohttp.ClientSession, api: dict):
  16.     async with semaphore:
  17.         start_time = time.time()
  18.         name = api["name"]
  19.         url = api["url"]
  20.         method = api["method"]
  21.         try:
  22.             if method.upper() == "GET":
  23.                 async with session.get(url, timeout=TIMEOUT) as resp:
  24.                     response_time = round((time.time() - start_time) * 1000, 2)
  25.                     status = "正常" if resp.status in [200, 201] else "异常"
  26.                     return {
  27.                         "接口名称": name,
  28.                         "接口地址": url,
  29.                         "状态": status,
  30.                         "状态码": resp.status,
  31.                         "响应耗时(ms)": response_time,
  32.                         "异常信息": ""
  33.                     }
  34.             elif method.upper() == "POST":
  35.                 async with session.post(url, timeout=TIMEOUT) as resp:
  36.                     response_time = round((time.time() - start_time) * 1000, 2)
  37.                     status = "正常" if resp.status in [200, 201] else "异常"
  38.                     return {
  39.                         "接口名称": name,
  40.                         "接口地址": url,
  41.                         "状态": status,
  42.                         "状态码": resp.status,
  43.                         "响应耗时(ms)": response_time,
  44.                         "异常信息": ""
  45.                     }
  46.         except Exception as e:
  47.             response_time = round((time.time() - start_time) * 1000, 2)
  48.             return {
  49.                 "接口名称": name,
  50.                 "接口地址": url,
  51.                 "状态": "异常",
  52.                 "状态码": None,
  53.                 "响应耗时(ms)": response_time,
  54.                 "异常信息": str(e)
  55.             }
  56. async def main():
  57.     connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
  58.     async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as session:
  59.         tasks = [check_api(session, api) for api in API_LIST]
  60.         results = await asyncio.gather(*tasks)
  61.         normal_count = len([res for res in results if res["状态"] == "正常"])
  62.         error_count = len(results) - normal_count
  63.         print("=" * 90)
  64.         print("接口健康巡检报告")
  65.         print("=" * 90)
  66.         for res in results:
  67.             print(f"接口:{res['接口名称']:10} | 状态:{res['状态']} | 状态码:{res['状态码']} | 耗时:{res['响应耗时(ms)']}ms | 异常:{res['异常信息']}")
  68.         print(f"正常接口:{normal_count} 个 | 异常接口:{error_count} 个")
  69. if __name__ == "__main__":
  70.     start = time.time()
  71.     asyncio.run(main())
  72.     print(f"本次巡检总耗时:{round(time.time() - start, 2)}s")
复制代码

四、企业级扩展:支持请求头、请求参数与POST传参
实际业务中多数接口需要 Token 鉴权、JSON 参数或表单提交。下面的配置示例支持自定义 headers 和 data(POST 时使用 json 参数发送),兼容 GET 与 POST 方法。
  1. import asyncio
  2. import aiohttp
  3. import time
  4. API_LIST = [
  5.     {
  6.         "name": "带鉴权GET接口",
  7.         "url": "https://httpbin.org/headers",
  8.         "method": "GET",
  9.         "headers": {"Authorization": "Bearer test123456", "User-Agent": "Mozilla/5.0"},
  10.         "data": None
  11.     },
  12.     {
  13.         "name": "带参数POST接口",
  14.         "url": "https://httpbin.org/post",
  15.         "method": "POST",
  16.         "headers": {"Content-Type": "application/json"},
  17.         "data": {"username": "admin", "password": "123456"}
  18.     }
  19. ]
  20. MAX_CONCURRENT = 3
  21. TIMEOUT = aiohttp.ClientTimeout(total=5)
  22. semaphore = asyncio.Semaphore(MAX_CONCURRENT)
  23. async def check_api(session: aiohttp.ClientSession, api: dict):
  24.     async with semaphore:
  25.         start_time = time.time()
  26.         name = api["name"]
  27.         url = api["url"]
  28.         method = api["method"]
  29.         headers = api.get("headers", {})
  30.         data = api.get("data")
  31.         try:
  32.             if method.upper() == "GET":
  33.                 async with session.get(url, headers=headers, timeout=TIMEOUT) as resp:
  34.                     response_time = round((time.time() - start_time) * 1000, 2)
  35.                     return {
  36.                         "接口名称": name, "状态": "正常" if resp.status == 200 else "异常",
  37.                         "状态码": resp.status, "耗时(ms)": response_time, "异常": ""
  38.                     }
  39.             elif method.upper() == "POST":
  40.                 async with session.post(url, headers=headers, json=data, timeout=TIMEOUT) as resp:
  41.                     response_time = round((time.time() - start_time) * 1000, 2)
  42.                     return {
  43.                         "接口名称": name, "状态": "正常" if resp.status == 200 else "异常",
  44.                         "状态码": resp.status, "耗时(ms)": response_time, "异常": ""
  45.                     }
  46.         except Exception as e:
  47.             return {
  48.                 "接口名称": name, "状态": "异常", "状态码": None,
  49.                 "耗时(ms)": round((time.time() - start_time) * 1000, 2), "异常": str(e)
  50.             }
  51. async def main():
  52.     connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
  53.     async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as session:
  54.         tasks = [check_api(session, api) for api in API_LIST]
  55.         results = await asyncio.gather(*tasks)
  56.         for res in results:
  57.             print(res)
  58. if __name__ == "__main__":
  59.     asyncio.run(main())
复制代码

五、常见问题避坑
1. 为什么不用 requests 做异步?
requests 是同步阻塞库,不兼容 asyncio 事件循环,强行在异步中使用会阻塞整个程序导致并发失效。必须使用 aiohttp。

2. 并发数设置多少合适?
内网接口可设置 10-20,公网接口建议 3-5,避免因 IP 封禁。根据目标服务器性能灵活调整。

3. 偶尔报连接超时怎么办?
属于正常网络波动,代码已捕获异常不会影响整体巡检。可适当增大 aiohttp.ClientTimeout 的 total 值。

六、定时巡检部署思路
可将上述 main 函数结合 APScheduler 实现秒级/分钟级定时执行,异常接口对接钉钉或企业微信机器人推送告警,实现无人值守监控。

总结:Python 异步 IO 实现的接口健康检查方案效率高、安全可控(信号量限流)、扩展性强(支持鉴权/参数/定时/告警),全局异常捕获保证了单个接口故障不影响整体。提供的代码可直接集成到个人项目或企业运维监控系统,快速搭建轻量化接口健康监控体系。
回复

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python aiohttp异步实现批量接口健康检查:限流并发与异常处理实战

感谢分享!异步方案大幅提升批量接口检查效率,这个极简版代码结构很清晰,直接套用 `asyncio` + `aiohttp` 复用 `ClientSession` 的做法也很规范。不过实际生产里接口数量多时,如果不限制并发数可能会把目标服务器打满甚至触发限流,期待你在后续内容里加上 `Semaphore` 限流的实现。另外异常捕获可以再细化一下,比如区分 `asyncio.TimeoutError` 和 `aiohttp.ClientError`,这样排查异常原因会更方便。坐等限流并发与异常处理的完整实战篇!
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python aiohttp异步实现批量接口健康检查:限流并发与异常处理实战

楼主分享的这个异步健康检查方案太实用了!我之前一直用 requests 串行循环,几十个接口检查下来确实慢得崩溃。aiohttp 配合 asyncio 的思路非常清晰,全局复用 session 减少开销也很关键。不过标题里提到了“限流并发”,代码里好像只看到超时设置和异常捕获,没有信号量控制并发数的部分?如果能加上 Semaphore 限制同时发起的请求数,避免瞬时流量冲击目标服务器,应该会更完善。期待楼主后续把完整版贴出来,包括限流和结果汇总的代码,直接拿来当监控脚本用。
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python aiohttp异步实现批量接口健康检查:限流并发与异常处理实战

写得非常实用,正好最近在优化定时巡检脚本,之前用 requests 串行跑几十个接口确实慢得受不了。你这个异步方案加上超时和异常处理,直接就能应用到生产环境。 想请教两个细节:一是你用 `aiohttp.ClientTimeout(total=5)` 设置了整体超时,但有些接口可能需要区分连接超时和读取超时,有没有考虑过分别设置 `sock_connect` 和 `sock_read`?二是如果接口列表里混着 HTTP 和 HTTPS,`aiohttp` 是默认复用 Session 对连接池有啥影响吗?先谢过!
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-28 13:56 , Processed in 0.040035 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部