查看: 673|回复: 1

Python多进程编程实战:Process、Pool、IPC与同步锁详解

[复制链接]
发表于 前天 15:00 | 显示全部楼层 |阅读模式
Python 的 multiprocessing 模块提供了跨平台的多进程支持,每个进程拥有独立的内存空间和资源,能绕过 GIL 实现真正的并行计算,特别适合 CPU 密集型任务。本文从进程创建、进程池复用、进程间通信到同步锁,结合可运行代码逐一解析常见用法。

一、进程的本质与线程对比
进程是操作系统资源分配的基本单位,每个进程拥有独立的地址空间、文件描述符和代码数据。线程则共享同一进程的内存空间。多进程天然数据隔离,无需担心共享变量冲突,但进程间通信(IPC)需要额外的管道、队列或共享内存机制。

简单代码演示进程内存隔离:
  1. from multiprocessing import Process
  2. data = []
  3. def worker():
  4.     data.append(1)
  5.     print(f"{id(data)}: {data}")
  6. p1 = Process(target=worker)
  7. p2 = Process(target=worker)
  8. p1.start()
  9. p2.start()
  10. p1.join()
  11. p2.join()
复制代码
输出显示两个进程的 data 列表位于不同内存地址,互不干扰。

二、创建进程的两种方式
1. 通过 Process 类直接指定目标函数
  1. from multiprocessing import Process
  2. import os
  3. def worker(name, num):
  4.     print(f"进程 {name}: PID={os.getpid()}, 父进程={os.getppid()}")
  5.     return num * 2
  6. if __name__ == '__main__':
  7.     p = Process(target=worker, args=("worker1", 100))
  8.     p.start()
  9.     p.join()
  10.     print(f"exit code: {p.exitcode}")
复制代码
2. 自定义 Process 子类,重写 run 方法
  1. class MyProcess(Process):
  2.     def __init__(self, name):
  3.         super().__init__()
  4.         self.name = name
  5.     def run(self):
  6.         print(f"运行进程: {self.name}")
  7. if __name__ == '__main__':
  8.     p = MyProcess("my_worker")
  9.     p.start()
  10.     p.join()
复制代码
三、进程启动方式:spawn vs fork
Python 支持三种启动方式(需在 if __name__ == '__main__' 内设置):
- spawn:全新启动 Python 解释器,不继承父进程资源,Windows 默认,macOS 可选。
- fork:复制父进程,继承内存和资源,Linux 默认,但可能因锁产生死锁。
- forkserver:先启动一个服务进程,再由其 fork,相对安全。

设置示例:
  1. import multiprocessing as mp
  2. if __name__ == '__main__':
  3.     mp.set_start_method('spawn')  # Windows 默认;Linux 可用 'fork'
复制代码
四、进程池(Pool)复用进程
进程创建开销大,实际开发中常用 Pool 复用多个进程。以下演示五种常用提交方式:
  1. from multiprocessing import Pool
  2. import time
  3. def cpu_intensive(n):
  4.     return sum(i * i for i in range(n))
  5. def io_bound(n):
  6.     time.sleep(n)
  7.     return n
  8. if __name__ == '__main__':
  9.     with Pool(processes=4) as pool:
  10.         # map - 阻塞保持顺序
  11.         results = pool.map(cpu_intensive, [10**7] * 4)
  12.         # map_async - 非阻塞
  13.         result = pool.map_async(cpu_intensive, [10**7] * 4)
  14.         print("异步提交完成")
  15.         res = result.get()
  16.         # apply - 同步
  17.         single = pool.apply(cpu_intensive, (10**7,))
  18.         # apply_async - 异步并设超时
  19.         async_result = pool.apply_async(cpu_intensive, (10**7,))
  20.         res_single = async_result.get(timeout=30)
  21.         # starmap - 多参数
  22.         args = [(1,2), (3,4), (5,6)]
  23.         sums = pool.starmap(lambda a,b: a+b, args)
复制代码
五、进程间通信(IPC)
1. Queue - 线程安全队列
  1. from multiprocessing import Process, Queue
  2. import time
  3. def producer(q):
  4.     for i in range(5):
  5.         q.put(f"消息{i}")
  6.         print(f"生产: {i}")
  7.         time.sleep(0.5)
  8.     q.put(None)
  9. def consumer(q):
  10.     while True:
  11.         msg = q.get()
  12.         if msg is None:
  13.             break
  14.         print(f"消费: {msg}")
  15.         time.sleep(1)
  16. if __name__ == '__main__':
  17.     q = Queue(maxsize=10)
  18.     p1 = Process(target=producer, args=(q,))
  19.     p2 = Process(target=consumer, args=(q,))
  20.     p1.start(); p2.start()
  21.     p1.join(); p2.join()
复制代码
2. Pipe - 双向管道
  1. from multiprocessing import Process, Pipe
  2. def worker(conn):
  3.     conn.send([1,2,3])
  4.     print(f"收到父进程: {conn.recv()}")
  5.     conn.close()
  6. if __name__ == '__main__':
  7.     parent_conn, child_conn = Pipe()
  8.     p = Process(target=worker, args=(child_conn,))
  9.     p.start()
  10.     print(f"子进程数据: {parent_conn.recv()}")
  11.     parent_conn.send("来自父进程的问候")
  12.     p.join()
复制代码
3. Shared Memory - 共享内存
使用 Value 和 Array 创建共享变量:
  1. from multiprocessing import Process, Value, Array
  2. def worker(val, arr):
  3.     val.value += 1
  4.     for i in range(len(arr)):
  5.         arr[i] += i
  6. if __name__ == '__main__':
  7.     num = Value('i', 0)       # 有符号整数
  8.     arr = Array('d', [0.0, 1.0, 2.0])  # double 数组
  9.     processes = []
  10.     for _ in range(4):
  11.         p = Process(target=worker, args=(num, arr))
  12.         processes.append(p)
  13.         p.start()
  14.     for p in processes:
  15.         p.join()
  16.     print(f"num: {num.value}")
  17.     print(f"arr: {arr[:]}")
复制代码
类型码速查:'i'(int)、'I'(unsigned int)、'l'(long)、'f'(float)、'd'(double)、'c'(char)。
4. Manager - 管理器代理
Manager 提供 dict、list 等共享容器,底层自动处理序列化:
  1. from multiprocessing import Process, Manager
  2. def worker(d, l, key, value):
  3.     d[key] = value
  4.     l.append(value)
  5. if __name__ == '__main__':
  6.     with Manager() as manager:
  7.         d = manager.dict()
  8.         l = manager.list()
  9.         procs = []
  10.         for i in range(5):
  11.             p = Process(target=worker, args=(d, l, f"key{i}", i))
  12.             procs.append(p)
  13.             p.start()
  14.         for p in procs:
  15.             p.join()
  16.         print(f"dict: {d}")
  17.         print(f"list: {l}")
复制代码
六、进程同步锁
多进程修改共享变量时仍需加锁,避免数据竞争:
  1. from multiprocessing import Process, Lock, Value
  2. def worker(lock, counter):
  3.     for _ in range(1000):
  4.         lock.acquire()
  5.         try:
  6.             counter.value += 1
  7.         finally:
  8.             lock.release()
  9. if __name__ == '__main__':
  10.     counter = Value('i', 0)
  11.     lock = Lock()
  12.     processes = [Process(target=worker, args=(lock, counter)) for _ in range(10)]
  13.     for p in processes:
  14.         p.start()
  15.     for p in processes:
  16.         p.join()
  17.     print(f"最终值: {counter.value}")  # 应等于 10000
复制代码
multiprocessing 还提供 RLock、Semaphore、Event、Condition 等同步原语,用法与 threading 模块类似。

七、核心要点总结
- 进程内存隔离,数据不共享,IPC 需借助 Queue、Pipe、Manager 或共享内存。
- 创建进程的代码必须放在 if __name__ == '__main__' 中。
- 进程池 Pool 能有效降低频繁创建进程的开销。
- 进程适合 CPU 密集型任务,线程更适合 I/O 密集型。
- 共享可变数据时务必加锁(Lock),否则可能导致结果不一致。

以上代码在 Python 3.8 及以上版本测试通过,实际开发中可根据操作系统选择合适的启动方式(Windows 默认 spawn,Linux 默认 fork)。
回复

使用道具 举报

发表于 前天 15:05 | 显示全部楼层

Re: Python多进程编程实战:Process、Pool、IPC与同步锁详解

楼主这篇多进程编程总结很系统,从进程隔离原理到实际用法的代码都直接给出来了,对刚接触里 `multiprocessing` 的开发者来说非常实用。 我特别留意到你在“启动方式”那一段提到了 `spawn`、`fork` 和 `forkserver` 的区别,这一点很多教程会忽略,但实际踩坑时很可能就是启动方式引起的(比如 fork 后的死锁问题)。不过帖子标题里还有“同步锁”,正文目前只讲到 Queue 生产者消费者就截断了,后面是不是还打算补上 Lock、RLock 和共享内存的部分?很期待后续内容。 另外一个小建议:Pool 部分如果能把 `starmap_async` 也提一下会更完整,有些场景下搭配 `get(timeout)` 处理超时会挺方便。总之先收藏了,等更新再看。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-15 17:36 , Processed in 0.025456 second(s), 17 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部