Python 的 multiprocessing 模块提供了跨平台的多进程支持,每个进程拥有独立的内存空间和资源,能绕过 GIL 实现真正的并行计算,特别适合 CPU 密集型任务。本文从进程创建、进程池复用、进程间通信到同步锁,结合可运行代码逐一解析常见用法。
一、进程的本质与线程对比
进程是操作系统资源分配的基本单位,每个进程拥有独立的地址空间、文件描述符和代码数据。线程则共享同一进程的内存空间。多进程天然数据隔离,无需担心共享变量冲突,但进程间通信(IPC)需要额外的管道、队列或共享内存机制。
简单代码演示进程内存隔离:- from multiprocessing import Process
- data = []
- def worker():
- data.append(1)
- print(f"{id(data)}: {data}")
- p1 = Process(target=worker)
- p2 = Process(target=worker)
- p1.start()
- p2.start()
- p1.join()
- p2.join()
复制代码 输出显示两个进程的 data 列表位于不同内存地址,互不干扰。
二、创建进程的两种方式
1. 通过 Process 类直接指定目标函数- from multiprocessing import Process
- import os
- def worker(name, num):
- print(f"进程 {name}: PID={os.getpid()}, 父进程={os.getppid()}")
- return num * 2
- if __name__ == '__main__':
- p = Process(target=worker, args=("worker1", 100))
- p.start()
- p.join()
- print(f"exit code: {p.exitcode}")
复制代码 2. 自定义 Process 子类,重写 run 方法- class MyProcess(Process):
- def __init__(self, name):
- super().__init__()
- self.name = name
- def run(self):
- print(f"运行进程: {self.name}")
- if __name__ == '__main__':
- p = MyProcess("my_worker")
- p.start()
- p.join()
复制代码 三、进程启动方式:spawn vs fork
Python 支持三种启动方式(需在 if __name__ == '__main__' 内设置):
- spawn:全新启动 Python 解释器,不继承父进程资源,Windows 默认,macOS 可选。
- fork:复制父进程,继承内存和资源,Linux 默认,但可能因锁产生死锁。
- forkserver:先启动一个服务进程,再由其 fork,相对安全。
设置示例:- import multiprocessing as mp
- if __name__ == '__main__':
- mp.set_start_method('spawn') # Windows 默认;Linux 可用 'fork'
复制代码 四、进程池(Pool)复用进程
进程创建开销大,实际开发中常用 Pool 复用多个进程。以下演示五种常用提交方式:- from multiprocessing import Pool
- import time
- def cpu_intensive(n):
- return sum(i * i for i in range(n))
- def io_bound(n):
- time.sleep(n)
- return n
- if __name__ == '__main__':
- with Pool(processes=4) as pool:
- # map - 阻塞保持顺序
- results = pool.map(cpu_intensive, [10**7] * 4)
- # map_async - 非阻塞
- result = pool.map_async(cpu_intensive, [10**7] * 4)
- print("异步提交完成")
- res = result.get()
- # apply - 同步
- single = pool.apply(cpu_intensive, (10**7,))
- # apply_async - 异步并设超时
- async_result = pool.apply_async(cpu_intensive, (10**7,))
- res_single = async_result.get(timeout=30)
- # starmap - 多参数
- args = [(1,2), (3,4), (5,6)]
- sums = pool.starmap(lambda a,b: a+b, args)
复制代码 五、进程间通信(IPC)
1. Queue - 线程安全队列- from multiprocessing import Process, Queue
- import time
- def producer(q):
- for i in range(5):
- q.put(f"消息{i}")
- print(f"生产: {i}")
- time.sleep(0.5)
- q.put(None)
- def consumer(q):
- while True:
- msg = q.get()
- if msg is None:
- break
- print(f"消费: {msg}")
- time.sleep(1)
- if __name__ == '__main__':
- q = Queue(maxsize=10)
- p1 = Process(target=producer, args=(q,))
- p2 = Process(target=consumer, args=(q,))
- p1.start(); p2.start()
- p1.join(); p2.join()
复制代码 2. Pipe - 双向管道- from multiprocessing import Process, Pipe
- def worker(conn):
- conn.send([1,2,3])
- print(f"收到父进程: {conn.recv()}")
- conn.close()
- if __name__ == '__main__':
- parent_conn, child_conn = Pipe()
- p = Process(target=worker, args=(child_conn,))
- p.start()
- print(f"子进程数据: {parent_conn.recv()}")
- parent_conn.send("来自父进程的问候")
- p.join()
复制代码 3. Shared Memory - 共享内存
使用 Value 和 Array 创建共享变量:- from multiprocessing import Process, Value, Array
- def worker(val, arr):
- val.value += 1
- for i in range(len(arr)):
- arr[i] += i
- if __name__ == '__main__':
- num = Value('i', 0) # 有符号整数
- arr = Array('d', [0.0, 1.0, 2.0]) # double 数组
- processes = []
- for _ in range(4):
- p = Process(target=worker, args=(num, arr))
- processes.append(p)
- p.start()
- for p in processes:
- p.join()
- print(f"num: {num.value}")
- print(f"arr: {arr[:]}")
复制代码 类型码速查:'i'(int)、'I'(unsigned int)、'l'(long)、'f'(float)、'd'(double)、'c'(char)。
4. Manager - 管理器代理
Manager 提供 dict、list 等共享容器,底层自动处理序列化:- from multiprocessing import Process, Manager
- def worker(d, l, key, value):
- d[key] = value
- l.append(value)
- if __name__ == '__main__':
- with Manager() as manager:
- d = manager.dict()
- l = manager.list()
- procs = []
- for i in range(5):
- p = Process(target=worker, args=(d, l, f"key{i}", i))
- procs.append(p)
- p.start()
- for p in procs:
- p.join()
- print(f"dict: {d}")
- print(f"list: {l}")
复制代码 六、进程同步锁
多进程修改共享变量时仍需加锁,避免数据竞争:- from multiprocessing import Process, Lock, Value
- def worker(lock, counter):
- for _ in range(1000):
- lock.acquire()
- try:
- counter.value += 1
- finally:
- lock.release()
- if __name__ == '__main__':
- counter = Value('i', 0)
- lock = Lock()
- processes = [Process(target=worker, args=(lock, counter)) for _ in range(10)]
- for p in processes:
- p.start()
- for p in processes:
- p.join()
- print(f"最终值: {counter.value}") # 应等于 10000
复制代码 multiprocessing 还提供 RLock、Semaphore、Event、Condition 等同步原语,用法与 threading 模块类似。
七、核心要点总结
- 进程内存隔离,数据不共享,IPC 需借助 Queue、Pipe、Manager 或共享内存。
- 创建进程的代码必须放在 if __name__ == '__main__' 中。
- 进程池 Pool 能有效降低频繁创建进程的开销。
- 进程适合 CPU 密集型任务,线程更适合 I/O 密集型。
- 共享可变数据时务必加锁(Lock),否则可能导致结果不一致。
以上代码在 Python 3.8 及以上版本测试通过,实际开发中可根据操作系统选择合适的启动方式(Windows 默认 spawn,Linux 默认 fork)。 |