查看: 134|回复: 1

Python concurrent.futures并发编程实战:线程池与进程池高效使用指南

[复制链接]
发表于 2 小时前 | 显示全部楼层 |阅读模式
在后端开发中,并发编程是提升系统性能的关键技术。Python的concurrent.futures模块提供了简洁的高级API,让开发者能够轻松实现多线程和多进程并发。本文基于实际开发经验,详细讲解该模块的核心组件、使用模式、高级技巧以及实战案例,帮助你编写高效的并发代码。

一、核心组件概述
concurrent.futures模块主要提供两个执行器:
- ThreadPoolExecutor:线程池执行器,适用于IO密集型任务。
- ProcessPoolExecutor:进程池执行器,适用于CPU密集型任务。

此外,Future对象用于表示异步计算的结果,可以通过它获取返回值、添加回调或设置超时。

二、ThreadPoolExecutor使用详解
创建线程池时,可以指定最大工作线程数和线程名前缀。推荐使用上下文管理器with语句,确保执行器在任务完成后自动关闭。
  1. from concurrent.futures import ThreadPoolExecutor
  2. def download_file(url):
  3.     import time
  4.     time.sleep(1)
  5.     return f"Downloaded: {url}"
  6. with ThreadPoolExecutor(max_workers=3, thread_name_prefix='worker-') as executor:
  7.     # 提交单个任务
  8.     future = executor.submit(download_file, "http://example.com/file1.txt")
  9.     # 获取结果(可设置超时)
  10.     result = future.result(timeout=5)
  11.     print(result)
  12.     # 批量提交任务
  13.     urls = ["http://example.com/file1.txt", "http://example.com/file2.txt", "http://example.com/file3.txt"]
  14.     futures = [executor.submit(download_file, url) for url in urls]
  15.     for future in futures:
  16.         print(future.result())
复制代码

三、ProcessPoolExecutor详解
进程池绕过Python全局解释器锁(GIL),适合计算密集型任务。创建方式和线程池类似。
  1. from concurrent.futures import ProcessPoolExecutor
  2. def compute_heavy(n):
  3.     return sum(i * i for i in range(n))
  4. with ProcessPoolExecutor(max_workers=4) as executor:
  5.     future = executor.submit(compute_heavy, 1_000_000)
  6.     print(future.result())
复制代码

四、线程池与进程池对比及选择建议
| 特性 | ThreadPoolExecutor | ProcessPoolExecutor |
|------|-------------------|-------------------|
| GIL限制 | 受GIL限制 | 不受GIL限制 |
| 适用场景 | IO密集型(网络请求、文件读写) | CPU密集型(数学计算、图像处理) |
| 启动开销 | 低 | 高 |
| 内存开销 | 低 | 高 |
| 数据共享 | 容易 | 困难 |

选择建议:IO密集型任务优先使用ThreadPoolExecutor;CPU密集型任务使用ProcessPoolExecutor;混合任务可结合使用。

五、Future对象详解
Future对象有三种状态:未完成、运行中、已完成或取消。可以通过done()、running()、cancelled()检查状态。
  1. from concurrent.futures import Future
  2. future = Future()
  3. print(future.done())          # False
  4. print(future.running())       # False
  5. print(future.cancelled())     # False
  6. future.set_result(42)
  7. print(future.done())          # True
  8. print(future.result())        # 42
复制代码

添加回调函数:在任务完成时自动执行。
  1. def callback(future):
  2.     print(f"Task completed: {future.result()}")
  3. with ThreadPoolExecutor() as executor:
  4.     future = executor.submit(download_file, "url")
  5.     future.add_done_callback(callback)
复制代码

超时处理:如果任务在规定时间内未完成,抛出concurrent.futures.TimeoutError。
  1. try:
  2.     result = future.result(timeout=2)
  3. except concurrent.futures.TimeoutError:
  4.     print("Task timed out")
复制代码

六、高级用法
6.1 as_completed:按任务完成顺序迭代结果。
  1. from concurrent.futures import ThreadPoolExecutor, as_completed
  2. def task(id):
  3.     import time
  4.     time.sleep(id)
  5.     return f"Task {id} completed"
  6. with ThreadPoolExecutor(max_workers=3) as executor:
  7.     futures = [executor.submit(task, i) for i in range(1, 4)]
  8.     for future in as_completed(futures):
  9.         print(future.result())
复制代码

6.2 map函数:按输入顺序批量提交并获取结果,返回迭代器。
  1. with ThreadPoolExecutor(max_workers=3) as executor:
  2.     results = executor.map(task, [1, 2, 3, 4, 5])
  3.     for result in results:
  4.         print(result)
复制代码

6.3 wait函数:等待指定条件满足后返回已完成和未完成的future集合。
  1. from concurrent.futures import wait, FIRST_COMPLETED
  2. with ThreadPoolExecutor(max_workers=3) as executor:
  3.     futures = [executor.submit(task, i) for i in range(1, 4)]
  4.     done, not_done = wait(futures, return_when=FIRST_COMPLETED)
  5.     print(f"Completed: {len(done)}")
  6.     print(f"Not completed: {len(not_done)}")
复制代码

七、实战案例
7.1 并行下载文件(IO密集型)
  1. import requests
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. def download_file(url, save_path):
  4.     response = requests.get(url)
  5.     with open(save_path, 'wb') as f:
  6.         f.write(response.content)
  7.     return save_path
  8. urls = [("https://example.com/image1.jpg", "images/image1.jpg"),
  9.         ("https://example.com/image2.jpg", "images/image2.jpg"),
  10.         ("https://example.com/image3.jpg", "images/image3.jpg")]
  11. with ThreadPoolExecutor(max_workers=5) as executor:
  12.     futures = [executor.submit(download_file, url, path) for url, path in urls]
  13.     for future in as_completed(futures):
  14.         print(f"Downloaded: {future.result()}")
复制代码

7.2 并行数据库查询
  1. import psycopg2
  2. from concurrent.futures import ThreadPoolExecutor
  3. def query_user(user_id):
  4.     conn = psycopg2.connect("dbname=example user=postgres")
  5.     cursor = conn.cursor()
  6.     cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
  7.     result = cursor.fetchone()
  8.     conn.close()
  9.     return result
  10. user_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  11. with ThreadPoolExecutor(max_workers=4) as executor:
  12.     results = executor.map(query_user, user_ids)
  13.     for user_id, user in zip(user_ids, results):
  14.         print(f"User {user_id}: {user}")
复制代码

7.3 混合IO和CPU任务:先用线程池获取数据,再用进程池处理数据。
  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  2. import requests
  3. def fetch_data(url):
  4.     return requests.get(url).json()
  5. def process_data(data):
  6.     return sum(item['value'] for item in data)
  7. urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
  8. with ThreadPoolExecutor(max_workers=4) as io_executor:
  9.     futures = [io_executor.submit(fetch_data, url) for url in urls]
  10.     raw_data = [f.result() for f in futures]
  11. with ProcessPoolExecutor(max_workers=4) as cpu_executor:
  12.     results = list(cpu_executor.map(process_data, raw_data))
  13. print(results)
复制代码

八、最佳实践
8.1 合理设置worker数量
通常根据任务类型决定:CPU密集型设为os.cpu_count();IO密集型可设为min(32, os.cpu_count() * 5)。
  1. import os
  2. cpu_workers = os.cpu_count() or 4
  3. io_workers = min(32, (os.cpu_count() or 4) * 5)
复制代码

8.2 避免共享状态
多线程中共享可变变量可能导致数据竞争,应使用线程安全的数据结构或锁。
  1. from threading import Lock
  2. class ThreadSafeCounter:
  3.     def __init__(self):
  4.         self._count = 0
  5.         self._lock = Lock()
  6.     def increment(self):
  7.         with self._lock:
  8.             self._count += 1
复制代码

8.3 优雅关闭
使用try/finally或上下文管理器确保executor的shutdown(wait=True)被调用,等待所有任务完成。
  1. executor = ThreadPoolExecutor(max_workers=4)
  2. try:
  3.     futures = [executor.submit(task, i) for i in range(10)]
  4.     for future in futures:
  5.         print(future.result())
  6. finally:
  7.     executor.shutdown(wait=True)
复制代码

九、性能对比
9.1 同步 vs 异步(IO密集型)
  1. import time
  2. def sync_download(urls):
  3.     for url in urls:
  4.         download_file(url)
  5. def async_download(urls):
  6.     with ThreadPoolExecutor(max_workers=5) as executor:
  7.         executor.map(download_file, urls)
  8. urls = ["https://example.com/file{}.txt".format(i) for i in range(10)]
  9. start = time.time()
  10. sync_download(urls)
  11. print(f"Sync time: {time.time() - start:.2f}s")
  12. start = time.time()
  13. async_download(urls)
  14. print(f"Async time: {time.time() - start:.2f}s")
复制代码

9.2 线程池 vs 进程池(CPU密集型)
  1. def cpu_intensive(n):
  2.     return sum(i * i for i in range(n))
  3. with ThreadPoolExecutor(max_workers=4) as executor:
  4.     start = time.time()
  5.     executor.map(cpu_intensive, [10_000_000] * 4)
  6.     print(f"ThreadPool time: {time.time() - start:.2f}s")
  7. with ProcessPoolExecutor(max_workers=4) as executor:
  8.     start = time.time()
  9.     executor.map(cpu_intensive, [10_000_000] * 4)
  10.     print(f"ProcessPool time: {time.time() - start:.2f}s")
复制代码

总结
本文从基础组件到高级用法,结合实战案例和性能对比,系统梳理了Python concurrent.futures的并发编程技巧。核心要点:IO密集型任务使用ThreadPoolExecutor,CPU密集型任务使用ProcessPoolExecutor;合理设置worker数量,避免共享状态,优先使用上下文管理器确保资源释放。掌握这些内容,你将能高效利用Python的并发能力,构建高性能系统。
回复

使用道具 举报

发表于 27 分钟前 | 显示全部楼层

Re: Python concurrent.futures并发编程实战:线程池与进程池高效使用指南

写得非常详细,把 `concurrent.futures` 的核心概念和实战用法都讲清楚了,特别是对比表清晰明了,对新手选型很有帮助。我自己在实际项目里也经常用 `ThreadPoolExecutor` 做 IO 密集型任务,配合 `as_completed` 确实比手动管理线程方便很多。有一个小提醒:用 `map()` 时如果某个任务抛出异常,会延迟到迭代时才抛出,要注意异常处理。感谢分享!
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-22 13:27 , Processed in 0.039803 second(s), 19 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部