查看: 189|回复: 3

Python多线程与多进程实战:GIL影响、线程同步与进程间通信详解

[复制链接]
发表于 6 小时前 | 显示全部楼层 |阅读模式
Python开发者经常需要在并发场景下提升程序效率。理解线程(Thread)和进程(Process)的区别、用法及底层机制,是写出高效代码的关键。本文基于Python 3.13,通过可运行的代码示例,详细讲解多线程与多进程的核心概念、GIL的影响、线程同步、线程安全通信、线程池、进程池以及进程间通信方式。

一、进程与线程的基本概念
进程(Process)是资源分配的基本单位,拥有独立的内存空间,进程间隔离性强,但创建开销大。线程(Thread)是CPU调度的基本单位,共享进程的内存空间,创建开销小,但一个线程崩溃可能影响整个进程。一个进程至少包含一个主线程。

二、Python多线程实战
2.1 threading模块创建线程
创建线程有两种方式:直接构造Thread对象或继承Thread类。
  1. import threading
  2. import time
  3. def task(name, seconds):
  4.     print(f"[{name}] 开始执行")
  5.     time.sleep(seconds)
  6.     print(f"[{name}] 执行完成,耗时 {seconds} 秒")
  7. t1 = threading.Thread(target=task, args=("线程A", 2))
  8. t2 = threading.Thread(target=task, args=("线程B", 1))
  9. t1.start()
  10. t2.start()
  11. t1.join()
  12. t2.join()
  13. print("所有任务执行完毕")
复制代码
输出结果:线程B先完成,线程A后完成,体现并发交替执行。

继承Thread类的方式更灵活,通过重写run()方法定义任务:
  1. class MyThread(threading.Thread):
  2.     def __init__(self, name, count):
  3.         super().__init__()
  4.         self.name = name
  5.         self.count = count
  6.     def run(self):
  7.         for i in range(self.count):
  8.             print(f"{self.name}: 第 {i+1} 次执行")
  9. threads = [MyThread(f"工人{i+1}", 3) for i in range(3)]
  10. for t in threads:
  11.     t.start()
  12. for t in threads:
  13.     t.join()
  14. print("全部工人收工!")
复制代码

2.2 GIL(全局解释器锁)的影响
GIL是CPython解释器的一个机制,同一时刻只允许一个线程执行Python字节码,目的是保证引用计数的安全性。它对不同类型任务影响不同:
- CPU密集型任务:多线程无法真正并行,性能甚至可能更差。
- I/O密集型任务:线程在等待I/O时会释放GIL,多线程可以显著提升吞吐量。
  1. import threading, time
  2. def cpu_task(n):
  3.     count = 0
  4.     for i in range(n):
  5.         count += i ** 2
  6.     return count
  7. # 单线程
  8. start = time.time()
  9. for _ in range(4):
  10.     cpu_task(10000000)
  11. single_time = time.time() - start
  12. # 多线程
  13. start = time.time()
  14. threads = []
  15. for _ in range(4):
  16.     t = threading.Thread(target=cpu_task, args=(10000000,))
  17.     threads.append(t)
  18.     t.start()
  19. for t in threads:
  20.     t.join()
  21. multi_time = time.time() - start
  22. print(f"单线程: {single_time:.2f}s, 多线程: {multi_time:.2f}s, 加速比: {single_time/multi_time:.2f}x")
复制代码
输出显示加速比接近1,说明多线程对CPU密集任务无效。

对于I/O密集型任务(如time.sleep模拟等待),多线程加速效果显著:
  1. def io_task(duration):
  2.     time.sleep(duration)
  3. # 单线程
  4. start = time.time()
  5. for _ in range(10):
  6.     io_task(0.5)
  7. single_time = time.time() - start
  8. # 多线程
  9. start = time.time()
  10. threads = []
  11. for _ in range(10):
  12.     t = threading.Thread(target=io_task, args=(0.5,))
  13.     threads.append(t)
  14.     t.start()
  15. for t in threads:
  16.     t.join()
  17. multi_time = time.time() - start
  18. print(f"单线程: {single_time:.2f}s, 多线程: {multi_time:.2f}s, 加速比: {single_time/multi_time:.2f}x")
复制代码
输出加速比接近10倍。

2.3 线程同步:锁(Lock)
当多个线程同时修改共享变量时,会出现竞态条件(Race Condition)。例如不加锁:
  1. counter = 0
  2. def increment():
  3.     global counter
  4.     for _ in range(1000000):
  5.         counter += 1
  6. t1 = threading.Thread(target=increment)
  7. t2 = threading.Thread(target=increment)
  8. t1.start(); t2.start()
  9. t1.join(); t2.join()
  10. print(f"counter: {counter}, 期望: 2000000")
复制代码
结果通常小于2000000,因为counter+=1不是原子操作(读-改-写三步)。

使用Lock解决:
  1. lock = threading.Lock()
  2. counter = 0
  3. def increment():
  4.     global counter
  5.     for _ in range(1000000):
  6.         with lock:
  7.             counter += 1
  8. # 启动线程后结果正确:2000000
复制代码

死锁问题:当两个线程互相等待对方持有的锁时,程序永远阻塞。避免死锁的方法包括:
- 统一加锁顺序(推荐)。
- 使用超时机制(lock.acquire(timeout=2))。
- 使用可重入锁threading.RLock(),允许同一线程重复获取。

2.4 线程安全通信:queue.Queue
直接共享变量容易出错,推荐使用queue.Queue进行线程间通信,它内部已加锁,提供阻塞的put/get方法。
  1. import queue, threading, time, random
  2. task_queue = queue.Queue(maxsize=20)
  3. def producer(pid):
  4.     for i in range(5):
  5.         task = f"任务-{pid}-{i}"
  6.         task_queue.put(task)
  7.         print(f"[生产者{pid}] 提交: {task}")
  8.         time.sleep(random.uniform(0.1, 0.3))
  9.     print(f"[生产者{pid}] 完成")
  10. def consumer(cid):
  11.     while True:
  12.         try:
  13.             task = task_queue.get(timeout=1)
  14.             print(f"[消费者{cid}] 处理: {task}")
  15.             time.sleep(random.uniform(0.2, 0.5))
  16.             task_queue.task_done()
  17.         except queue.Empty:
  18.             if task_queue.empty():
  19.                 break
  20. producers = [threading.Thread(target=producer, args=(i,)) for i in range(3)]
  21. consumers = [threading.Thread(target=consumer, args=(i,)) for i in range(2)]
  22. for t in producers: t.start()
  23. for t in consumers: t.start()
  24. for t in producers: t.join()
  25. task_queue.join()  # 等待所有任务被处理
  26. print("所有任务处理完毕")
复制代码

2.5 线程池:ThreadPoolExecutor
频繁创建/销毁线程开销大,线程池通过复用线程来控制并发数量。concurrent.futures.ThreadPoolExecutor提供了三种提交方式:
  1. from concurrent.futures import ThreadPoolExecutor, as_completed
  2. import time
  3. def task(n):
  4.     time.sleep(1)
  5.     return f"任务{n}完成"
  6. with ThreadPoolExecutor(max_workers=3) as executor:
  7.     # 方式1:单个提交
  8.     future = executor.submit(task, 1)
  9.     print(future.result())
  10.     # 方式2:批量提交+按完成顺序获取
  11.     futures = [executor.submit(task, i) for i in range(5)]
  12.     for future in as_completed(futures):
  13.         print(future.result())
  14.     # 方式3:map(按输入顺序返回结果,类似pool.map)
  15.     for result in executor.map(task, range(5)):
  16.         print(result)
复制代码
max_workers选择:I/O密集型可设10-50,CPU密集型建议等于CPU核心数(受GIL限制,实际用进程池更好)。

三、Python多进程实战
多进程使用multiprocessing模块,每个进程有独立内存,绕过GIL,适合CPU密集型任务。

3.1 创建进程
  1. import multiprocessing
  2. def cpu_task(n):
  3.     count = 0
  4.     for i in range(n):
  5.         count += i ** 2
  6.     return count
  7. if __name__ == '__main__':
  8.     with multiprocessing.Pool(4) as pool:
  9.         results = pool.map(cpu_task, [10000000] * 4)
  10.         print(results)
复制代码

3.2 进程间通信
进程间不能直接共享变量,需使用Queue、Pipe或共享内存(Value/Array)。

Queue示例:
  1. import multiprocessing, time
  2. def producer(q):
  3.     for i in range(5):
  4.         q.put(f"消息-{i}")
  5.         time.sleep(0.1)
  6.     q.put(None)  # 结束信号
  7. def consumer(q, name):
  8.     while True:
  9.         item = q.get()
  10.         if item is None:
  11.             break
  12.         print(f"[消费者-{name}] 收到: {item}")
  13.         time.sleep(0.2)
  14. if __name__ == '__main__':
  15.     q = multiprocessing.Queue()
  16.     p = multiprocessing.Process(target=producer, args=(q,))
  17.     c = multiprocessing.Process(target=consumer, args=(q, "A"))
  18.     c.start()
  19.     p.start()
  20.     p.join()
  21.     c.join()
  22.     print("进程通信完成")
复制代码

Pipe示例:
  1. import multiprocessing
  2. def sender(conn):
  3.     messages = ["你好", "第二条", "结束"]
  4.     for msg in messages:
  5.         conn.send(msg)
  6.     conn.close()
  7. def receiver(conn):
  8.     while True:
  9.         try:
  10.             msg = conn.recv()
  11.             print(f"收到: {msg}")
  12.             if msg == "结束":
  13.                 break
  14.         except EOFError:
  15.             break
  16.     conn.close()
  17. if __name__ == '__main__':
  18.     parent_conn, child_conn = multiprocessing.Pipe()
  19.     p_sender = multiprocessing.Process(target=sender, args=(parent_conn,))
  20.     p_receiver = multiprocessing.Process(target=receiver, args=(child_conn,))
  21.     p_receiver.start()
  22.     p_sender.start()
  23.     p_sender.join()
  24.     p_receiver.join()
  25.     print("Pipe通信结束")
复制代码

共享内存Value/Array:适用于数值型数据,性能高。
  1. from multiprocessing import Process, Value, Array
  2. def update(v, a):
  3.     v.value += 1
  4.     for i in range(len(a)):
  5.         a[i] = a[i] * 2
  6. if __name__ == '__main__':
  7.     v = Value('i', 0)  # 整型共享变量
  8.     a = Array('d', [1.0, 2.0, 3.0])  # 双精度浮点数组
  9.     p = Process(target=update, args=(v, a))
  10.     p.start()
  11.     p.join()
  12.     print(v.value, a[:])
复制代码

3.3 进程池ProcessPoolExecutor
类似线程池,但使用多进程:
  1. from concurrent.futures import ProcessPoolExecutor
  2. def cpu_heavy(n):
  3.     return sum(i*i for i in range(n))
  4. with ProcessPoolExecutor(max_workers=4) as executor:
  5.     results = list(executor.map(cpu_heavy, [1000000, 2000000, 3000000]))
  6.     print(results)
复制代码

四、线程 vs 进程对比
- 资源开销:进程大(独立内存),线程小(共享内存)。
- 数据隔离:进程强(一个崩溃不影响其他),线程弱(一个崩溃可能拖垮整个进程)。
- 通信难度:进程间通信复杂(Queue/Pipe/共享内存),线程间可共享变量但需加锁。
- 适用场景:CPU密集型用进程(绕过GIL),I/O密集型用线程(GIL释放)。

五、注意事项
- GIL不影响I/O操作,但影响CPU密集型多线程。
- daemon线程:设置为True的线程会在主线程退出时强制结束,慎用于资源回收场景。
- 进程间不能共享全局变量,必须使用上面提到的通信机制。
- 正确关闭方式:线程用join()等待完成,进程池和线程池用with语句自动管理。

通过本文的代码实践,你可以根据任务类型选择合适的并发模型,编写出更高效的Python程序。
回复

使用道具 举报

发表于 6 小时前 | 显示全部楼层

Re: Python多线程与多进程实战:GIL影响、线程同步与进程间通信详解

楼主的分享非常详细,代码示例也很清晰,尤其是用两个对比实验直观展示了GIL对CPU密集和I/O密集任务的不同影响,对理解多线程的适用场景很有帮助。 想请教一下,帖子标题还提到了进程间通信,但正文好像只展示了线程部分?进程间通信的实战代码(比如Queue、Pipe)会后续补上吗?期待完整版。
回复 支持 反对

使用道具 举报

发表于 5 小时前 | 显示全部楼层

Re: Python多线程与多进程实战:GIL影响、线程同步与进程间通信详解

感谢分享,这个帖子非常清晰,尤其是GIL对CPU密集型和I/O密集型任务的影响对比,用代码跑出来的加速比数据很有说服力。我之前一直搞不太懂为什么多线程对计算密集型任务没效果,现在彻底明白了。 另外想问一下楼主,Python 3.13在并发方面有没有什么值得注意的新变化?还有进程间通信的部分好像还没展开,期待后续内容!
回复 支持 反对

使用道具 举报

发表于 2 小时前 | 显示全部楼层

Re: Python多线程与多进程实战:GIL影响、线程同步与进程间通信详解

这篇文章写得非常详细,代码示例也很清晰,特别是GIL对CPU密集和I/O密集任务影响的实际对比,验证了我之前的一些猜测。锁的例子也很好地演示了竞态条件的典型场景。 有一点小建议:在进程池那部分(虽然楼主还没贴全),可以提一下用 `concurrent.futures` 模块来统一管理线程池和进程池,代码会更简洁,新手也更容易上手。另外,进程间通信的部分如果能补充一下 `shared_memory`(Python 3.8+引入)的简单用法就更好了,那个在多进程共享大数据时比Queue高效很多。 总之,干货满满,收藏了。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-16 17:50 , Processed in 0.025783 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部