Python开发者经常需要在并发场景下提升程序效率。理解线程(Thread)和进程(Process)的区别、用法及底层机制,是写出高效代码的关键。本文基于Python 3.13,通过可运行的代码示例,详细讲解多线程与多进程的核心概念、GIL的影响、线程同步、线程安全通信、线程池、进程池以及进程间通信方式。
一、进程与线程的基本概念
进程(Process)是资源分配的基本单位,拥有独立的内存空间,进程间隔离性强,但创建开销大。线程(Thread)是CPU调度的基本单位,共享进程的内存空间,创建开销小,但一个线程崩溃可能影响整个进程。一个进程至少包含一个主线程。
二、Python多线程实战
2.1 threading模块创建线程
创建线程有两种方式:直接构造Thread对象或继承Thread类。- import threading
- import time
- def task(name, seconds):
- print(f"[{name}] 开始执行")
- time.sleep(seconds)
- print(f"[{name}] 执行完成,耗时 {seconds} 秒")
- t1 = threading.Thread(target=task, args=("线程A", 2))
- t2 = threading.Thread(target=task, args=("线程B", 1))
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- print("所有任务执行完毕")
复制代码 输出结果:线程B先完成,线程A后完成,体现并发交替执行。
继承Thread类的方式更灵活,通过重写run()方法定义任务:- class MyThread(threading.Thread):
- def __init__(self, name, count):
- super().__init__()
- self.name = name
- self.count = count
- def run(self):
- for i in range(self.count):
- print(f"{self.name}: 第 {i+1} 次执行")
- threads = [MyThread(f"工人{i+1}", 3) for i in range(3)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- print("全部工人收工!")
复制代码
2.2 GIL(全局解释器锁)的影响
GIL是CPython解释器的一个机制,同一时刻只允许一个线程执行Python字节码,目的是保证引用计数的安全性。它对不同类型任务影响不同:
- CPU密集型任务:多线程无法真正并行,性能甚至可能更差。
- I/O密集型任务:线程在等待I/O时会释放GIL,多线程可以显著提升吞吐量。
- import threading, time
- def cpu_task(n):
- count = 0
- for i in range(n):
- count += i ** 2
- return count
- # 单线程
- start = time.time()
- for _ in range(4):
- cpu_task(10000000)
- single_time = time.time() - start
- # 多线程
- start = time.time()
- threads = []
- for _ in range(4):
- t = threading.Thread(target=cpu_task, args=(10000000,))
- threads.append(t)
- t.start()
- for t in threads:
- t.join()
- multi_time = time.time() - start
- print(f"单线程: {single_time:.2f}s, 多线程: {multi_time:.2f}s, 加速比: {single_time/multi_time:.2f}x")
复制代码 输出显示加速比接近1,说明多线程对CPU密集任务无效。
对于I/O密集型任务(如time.sleep模拟等待),多线程加速效果显著:- def io_task(duration):
- time.sleep(duration)
- # 单线程
- start = time.time()
- for _ in range(10):
- io_task(0.5)
- single_time = time.time() - start
- # 多线程
- start = time.time()
- threads = []
- for _ in range(10):
- t = threading.Thread(target=io_task, args=(0.5,))
- threads.append(t)
- t.start()
- for t in threads:
- t.join()
- multi_time = time.time() - start
- print(f"单线程: {single_time:.2f}s, 多线程: {multi_time:.2f}s, 加速比: {single_time/multi_time:.2f}x")
复制代码 输出加速比接近10倍。
2.3 线程同步:锁(Lock)
当多个线程同时修改共享变量时,会出现竞态条件(Race Condition)。例如不加锁:- counter = 0
- def increment():
- global counter
- for _ in range(1000000):
- counter += 1
- t1 = threading.Thread(target=increment)
- t2 = threading.Thread(target=increment)
- t1.start(); t2.start()
- t1.join(); t2.join()
- print(f"counter: {counter}, 期望: 2000000")
复制代码 结果通常小于2000000,因为counter+=1不是原子操作(读-改-写三步)。
使用Lock解决:- lock = threading.Lock()
- counter = 0
- def increment():
- global counter
- for _ in range(1000000):
- with lock:
- counter += 1
- # 启动线程后结果正确:2000000
复制代码
死锁问题:当两个线程互相等待对方持有的锁时,程序永远阻塞。避免死锁的方法包括:
- 统一加锁顺序(推荐)。
- 使用超时机制(lock.acquire(timeout=2))。
- 使用可重入锁threading.RLock(),允许同一线程重复获取。
2.4 线程安全通信:queue.Queue
直接共享变量容易出错,推荐使用queue.Queue进行线程间通信,它内部已加锁,提供阻塞的put/get方法。- import queue, threading, time, random
- task_queue = queue.Queue(maxsize=20)
- def producer(pid):
- for i in range(5):
- task = f"任务-{pid}-{i}"
- task_queue.put(task)
- print(f"[生产者{pid}] 提交: {task}")
- time.sleep(random.uniform(0.1, 0.3))
- print(f"[生产者{pid}] 完成")
- def consumer(cid):
- while True:
- try:
- task = task_queue.get(timeout=1)
- print(f"[消费者{cid}] 处理: {task}")
- time.sleep(random.uniform(0.2, 0.5))
- task_queue.task_done()
- except queue.Empty:
- if task_queue.empty():
- break
- producers = [threading.Thread(target=producer, args=(i,)) for i in range(3)]
- consumers = [threading.Thread(target=consumer, args=(i,)) for i in range(2)]
- for t in producers: t.start()
- for t in consumers: t.start()
- for t in producers: t.join()
- task_queue.join() # 等待所有任务被处理
- print("所有任务处理完毕")
复制代码
2.5 线程池:ThreadPoolExecutor
频繁创建/销毁线程开销大,线程池通过复用线程来控制并发数量。concurrent.futures.ThreadPoolExecutor提供了三种提交方式:- from concurrent.futures import ThreadPoolExecutor, as_completed
- import time
- def task(n):
- time.sleep(1)
- return f"任务{n}完成"
- with ThreadPoolExecutor(max_workers=3) as executor:
- # 方式1:单个提交
- future = executor.submit(task, 1)
- print(future.result())
- # 方式2:批量提交+按完成顺序获取
- futures = [executor.submit(task, i) for i in range(5)]
- for future in as_completed(futures):
- print(future.result())
- # 方式3:map(按输入顺序返回结果,类似pool.map)
- for result in executor.map(task, range(5)):
- print(result)
复制代码 max_workers选择:I/O密集型可设10-50,CPU密集型建议等于CPU核心数(受GIL限制,实际用进程池更好)。
三、Python多进程实战
多进程使用multiprocessing模块,每个进程有独立内存,绕过GIL,适合CPU密集型任务。
3.1 创建进程- import multiprocessing
- def cpu_task(n):
- count = 0
- for i in range(n):
- count += i ** 2
- return count
- if __name__ == '__main__':
- with multiprocessing.Pool(4) as pool:
- results = pool.map(cpu_task, [10000000] * 4)
- print(results)
复制代码
3.2 进程间通信
进程间不能直接共享变量,需使用Queue、Pipe或共享内存(Value/Array)。
Queue示例:- import multiprocessing, time
- def producer(q):
- for i in range(5):
- q.put(f"消息-{i}")
- time.sleep(0.1)
- q.put(None) # 结束信号
- def consumer(q, name):
- while True:
- item = q.get()
- if item is None:
- break
- print(f"[消费者-{name}] 收到: {item}")
- time.sleep(0.2)
- if __name__ == '__main__':
- q = multiprocessing.Queue()
- p = multiprocessing.Process(target=producer, args=(q,))
- c = multiprocessing.Process(target=consumer, args=(q, "A"))
- c.start()
- p.start()
- p.join()
- c.join()
- print("进程通信完成")
复制代码
Pipe示例:- import multiprocessing
- def sender(conn):
- messages = ["你好", "第二条", "结束"]
- for msg in messages:
- conn.send(msg)
- conn.close()
- def receiver(conn):
- while True:
- try:
- msg = conn.recv()
- print(f"收到: {msg}")
- if msg == "结束":
- break
- except EOFError:
- break
- conn.close()
- if __name__ == '__main__':
- parent_conn, child_conn = multiprocessing.Pipe()
- p_sender = multiprocessing.Process(target=sender, args=(parent_conn,))
- p_receiver = multiprocessing.Process(target=receiver, args=(child_conn,))
- p_receiver.start()
- p_sender.start()
- p_sender.join()
- p_receiver.join()
- print("Pipe通信结束")
复制代码
共享内存Value/Array:适用于数值型数据,性能高。- from multiprocessing import Process, Value, Array
- def update(v, a):
- v.value += 1
- for i in range(len(a)):
- a[i] = a[i] * 2
- if __name__ == '__main__':
- v = Value('i', 0) # 整型共享变量
- a = Array('d', [1.0, 2.0, 3.0]) # 双精度浮点数组
- p = Process(target=update, args=(v, a))
- p.start()
- p.join()
- print(v.value, a[:])
复制代码
3.3 进程池ProcessPoolExecutor
类似线程池,但使用多进程:- from concurrent.futures import ProcessPoolExecutor
- def cpu_heavy(n):
- return sum(i*i for i in range(n))
- with ProcessPoolExecutor(max_workers=4) as executor:
- results = list(executor.map(cpu_heavy, [1000000, 2000000, 3000000]))
- print(results)
复制代码
四、线程 vs 进程对比
- 资源开销:进程大(独立内存),线程小(共享内存)。
- 数据隔离:进程强(一个崩溃不影响其他),线程弱(一个崩溃可能拖垮整个进程)。
- 通信难度:进程间通信复杂(Queue/Pipe/共享内存),线程间可共享变量但需加锁。
- 适用场景:CPU密集型用进程(绕过GIL),I/O密集型用线程(GIL释放)。
五、注意事项
- GIL不影响I/O操作,但影响CPU密集型多线程。
- daemon线程:设置为True的线程会在主线程退出时强制结束,慎用于资源回收场景。
- 进程间不能共享全局变量,必须使用上面提到的通信机制。
- 正确关闭方式:线程用join()等待完成,进程池和线程池用with语句自动管理。
通过本文的代码实践,你可以根据任务类型选择合适的并发模型,编写出更高效的Python程序。 |