在后端开发中,并发编程是提升系统性能的关键技术。Python的concurrent.futures模块提供了简洁的高级API,让开发者能够轻松实现多线程和多进程并发。本文基于实际开发经验,详细讲解该模块的核心组件、使用模式、高级技巧以及实战案例,帮助你编写高效的并发代码。
一、核心组件概述
concurrent.futures模块主要提供两个执行器:
- ThreadPoolExecutor:线程池执行器,适用于IO密集型任务。
- ProcessPoolExecutor:进程池执行器,适用于CPU密集型任务。
此外,Future对象用于表示异步计算的结果,可以通过它获取返回值、添加回调或设置超时。
二、ThreadPoolExecutor使用详解
创建线程池时,可以指定最大工作线程数和线程名前缀。推荐使用上下文管理器with语句,确保执行器在任务完成后自动关闭。
- from concurrent.futures import ThreadPoolExecutor
- def download_file(url):
- import time
- time.sleep(1)
- return f"Downloaded: {url}"
- with ThreadPoolExecutor(max_workers=3, thread_name_prefix='worker-') as executor:
- # 提交单个任务
- future = executor.submit(download_file, "http://example.com/file1.txt")
- # 获取结果(可设置超时)
- result = future.result(timeout=5)
- print(result)
- # 批量提交任务
- urls = ["http://example.com/file1.txt", "http://example.com/file2.txt", "http://example.com/file3.txt"]
- futures = [executor.submit(download_file, url) for url in urls]
- for future in futures:
- print(future.result())
复制代码
三、ProcessPoolExecutor详解
进程池绕过Python全局解释器锁(GIL),适合计算密集型任务。创建方式和线程池类似。
- from concurrent.futures import ProcessPoolExecutor
- def compute_heavy(n):
- return sum(i * i for i in range(n))
- with ProcessPoolExecutor(max_workers=4) as executor:
- future = executor.submit(compute_heavy, 1_000_000)
- print(future.result())
复制代码
四、线程池与进程池对比及选择建议
| 特性 | ThreadPoolExecutor | ProcessPoolExecutor |
|------|-------------------|-------------------|
| GIL限制 | 受GIL限制 | 不受GIL限制 |
| 适用场景 | IO密集型(网络请求、文件读写) | CPU密集型(数学计算、图像处理) |
| 启动开销 | 低 | 高 |
| 内存开销 | 低 | 高 |
| 数据共享 | 容易 | 困难 |
选择建议:IO密集型任务优先使用ThreadPoolExecutor;CPU密集型任务使用ProcessPoolExecutor;混合任务可结合使用。
五、Future对象详解
Future对象有三种状态:未完成、运行中、已完成或取消。可以通过done()、running()、cancelled()检查状态。
- from concurrent.futures import Future
- future = Future()
- print(future.done()) # False
- print(future.running()) # False
- print(future.cancelled()) # False
- future.set_result(42)
- print(future.done()) # True
- print(future.result()) # 42
复制代码
添加回调函数:在任务完成时自动执行。- def callback(future):
- print(f"Task completed: {future.result()}")
- with ThreadPoolExecutor() as executor:
- future = executor.submit(download_file, "url")
- future.add_done_callback(callback)
复制代码
超时处理:如果任务在规定时间内未完成,抛出concurrent.futures.TimeoutError。- try:
- result = future.result(timeout=2)
- except concurrent.futures.TimeoutError:
- print("Task timed out")
复制代码
六、高级用法
6.1 as_completed:按任务完成顺序迭代结果。- from concurrent.futures import ThreadPoolExecutor, as_completed
- def task(id):
- import time
- time.sleep(id)
- return f"Task {id} completed"
- with ThreadPoolExecutor(max_workers=3) as executor:
- futures = [executor.submit(task, i) for i in range(1, 4)]
- for future in as_completed(futures):
- print(future.result())
复制代码
6.2 map函数:按输入顺序批量提交并获取结果,返回迭代器。- with ThreadPoolExecutor(max_workers=3) as executor:
- results = executor.map(task, [1, 2, 3, 4, 5])
- for result in results:
- print(result)
复制代码
6.3 wait函数:等待指定条件满足后返回已完成和未完成的future集合。- from concurrent.futures import wait, FIRST_COMPLETED
- with ThreadPoolExecutor(max_workers=3) as executor:
- futures = [executor.submit(task, i) for i in range(1, 4)]
- done, not_done = wait(futures, return_when=FIRST_COMPLETED)
- print(f"Completed: {len(done)}")
- print(f"Not completed: {len(not_done)}")
复制代码
七、实战案例
7.1 并行下载文件(IO密集型)- import requests
- from concurrent.futures import ThreadPoolExecutor, as_completed
- def download_file(url, save_path):
- response = requests.get(url)
- with open(save_path, 'wb') as f:
- f.write(response.content)
- return save_path
- urls = [("https://example.com/image1.jpg", "images/image1.jpg"),
- ("https://example.com/image2.jpg", "images/image2.jpg"),
- ("https://example.com/image3.jpg", "images/image3.jpg")]
- with ThreadPoolExecutor(max_workers=5) as executor:
- futures = [executor.submit(download_file, url, path) for url, path in urls]
- for future in as_completed(futures):
- print(f"Downloaded: {future.result()}")
复制代码
7.2 并行数据库查询- import psycopg2
- from concurrent.futures import ThreadPoolExecutor
- def query_user(user_id):
- conn = psycopg2.connect("dbname=example user=postgres")
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
- result = cursor.fetchone()
- conn.close()
- return result
- user_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- with ThreadPoolExecutor(max_workers=4) as executor:
- results = executor.map(query_user, user_ids)
- for user_id, user in zip(user_ids, results):
- print(f"User {user_id}: {user}")
复制代码
7.3 混合IO和CPU任务:先用线程池获取数据,再用进程池处理数据。- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
- import requests
- def fetch_data(url):
- return requests.get(url).json()
- def process_data(data):
- return sum(item['value'] for item in data)
- urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
- with ThreadPoolExecutor(max_workers=4) as io_executor:
- futures = [io_executor.submit(fetch_data, url) for url in urls]
- raw_data = [f.result() for f in futures]
- with ProcessPoolExecutor(max_workers=4) as cpu_executor:
- results = list(cpu_executor.map(process_data, raw_data))
- print(results)
复制代码
八、最佳实践
8.1 合理设置worker数量
通常根据任务类型决定:CPU密集型设为os.cpu_count();IO密集型可设为min(32, os.cpu_count() * 5)。- import os
- cpu_workers = os.cpu_count() or 4
- io_workers = min(32, (os.cpu_count() or 4) * 5)
复制代码
8.2 避免共享状态
多线程中共享可变变量可能导致数据竞争,应使用线程安全的数据结构或锁。- from threading import Lock
- class ThreadSafeCounter:
- def __init__(self):
- self._count = 0
- self._lock = Lock()
- def increment(self):
- with self._lock:
- self._count += 1
复制代码
8.3 优雅关闭
使用try/finally或上下文管理器确保executor的shutdown(wait=True)被调用,等待所有任务完成。- executor = ThreadPoolExecutor(max_workers=4)
- try:
- futures = [executor.submit(task, i) for i in range(10)]
- for future in futures:
- print(future.result())
- finally:
- executor.shutdown(wait=True)
复制代码
九、性能对比
9.1 同步 vs 异步(IO密集型)- import time
- def sync_download(urls):
- for url in urls:
- download_file(url)
- def async_download(urls):
- with ThreadPoolExecutor(max_workers=5) as executor:
- executor.map(download_file, urls)
- urls = ["https://example.com/file{}.txt".format(i) for i in range(10)]
- start = time.time()
- sync_download(urls)
- print(f"Sync time: {time.time() - start:.2f}s")
- start = time.time()
- async_download(urls)
- print(f"Async time: {time.time() - start:.2f}s")
复制代码
9.2 线程池 vs 进程池(CPU密集型)- def cpu_intensive(n):
- return sum(i * i for i in range(n))
- with ThreadPoolExecutor(max_workers=4) as executor:
- start = time.time()
- executor.map(cpu_intensive, [10_000_000] * 4)
- print(f"ThreadPool time: {time.time() - start:.2f}s")
- with ProcessPoolExecutor(max_workers=4) as executor:
- start = time.time()
- executor.map(cpu_intensive, [10_000_000] * 4)
- print(f"ProcessPool time: {time.time() - start:.2f}s")
复制代码
总结
本文从基础组件到高级用法,结合实战案例和性能对比,系统梳理了Python concurrent.futures的并发编程技巧。核心要点:IO密集型任务使用ThreadPoolExecutor,CPU密集型任务使用ProcessPoolExecutor;合理设置worker数量,避免共享状态,优先使用上下文管理器确保资源释放。掌握这些内容,你将能高效利用Python的并发能力,构建高性能系统。 |