multiprocessing 模块
package
本文字数:1.5k 字 | 阅读时长 ≈ 6 min

multiprocessing 模块

package
本文字数:1.5k 字 | 阅读时长 ≈ 6 min

multiprocessing 库是 Python 的一个标准库,已经集成到了 python 中(笔者在 python3.10 版本无需安装),他可以在多个进程之间分配任务,有效的利用多个 CPU 进行计算,通过合理设计也可以用多核 GPU 加速运算。

1. multiprocessing 的基本使用

multiprocessing 中有一个 Process 类,可以通过 process 类来创建一个进程,如下所示,我们创建一个 worker 进程,args 中为他的参数

import multiprocessing import Process

def worker(num):
    print('Worker', num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Process(target=worker, args=(i,))
        jobs.append(p)
    for i in range(5):
        jobs[4-i].start()

运行上述程序,得到下面结果,可以看出,这里我们每创建一个 process 类,将其放到 jobs 中,接下来我们需要执行 start()方法来执行每个进程,为了方便理解,这里顺序添加进程,倒序执行进程

Worker 4
Worker 3
Worker 2
Worker 1
Worker 0

2. multiprocessing 的进阶用法

2.1 Pool 进程池

1. 非阻塞式运行 apply_async

apply_async(func, [,args,]) 方法,使用非阻塞的方式运行程序,即非顺序执行,args 为传给 func 的参数列表

from multiprocessing import Pool
import os, time, random
 
def work1(id):
    start_time = time.time()  # 开始时间
    print("循环任务%d由进程号%d进程执行" % (id, os.getpid()))
    time.sleep(random.random())  # 随机生产0-1的浮点数
    end_time = time.time()  # 结束时间
    # time.sleep(3)
    print(id, "执行完毕,耗时%0.2f" % (end_time - start_time))
 
if __name__ == '__main__':
    pool = Pool(3)  # 定义一个进程池,最大进程数为3
    for i in range(1, 6):
        pool.apply_async(func=work1, args=(i,))  # 非阻塞方式运行 
    print("------start------")
    pool.close()  # 关闭进程池,关闭后pool不再接收新的请求任务
    pool.join()  # 等待pool进程池中所有的子进程执行完成,必须放在pool.close()之后
    print("-----end------")

输出结果如下

------start------
循环任务1由进程号33377进程执行
循环任务2由进程号33378进程执行
循环任务3由进程号33379进程执行
1 执行完毕,耗时0.08
循环任务4由进程号33377进程执行
2 执行完毕,耗时0.14
循环任务5由进程号33378进程执行
4 执行完毕,耗时0.46
3 执行完毕,耗时0.64
5 执行完毕,耗时0.93
-----end------

我们初始化了 3 个进程池 Pool(3),三个进程同时执行,当其中任何一个执行完毕时,开始执行下一个。从运行逻辑可以看出,for 循环里面先分配每个任务,然后 print 开始,然后 pool.close()关闭进程池,即已经接受了 6 个任务了,不再接受输入了,最后卡在 pool.join()等待着 6 个任务执行完毕,print 结束

下面是一个常用的多进程脚本

import os
from tqdm import tqdm
from multiprocessing import Pool
import json


def split_list_into_parts(lst, num_parts):
    avg = len(lst) / float(num_parts)
    out = []
    last = 0.0

    while last < len(lst):
        out.append(lst[int(last):int(last + avg)])
        last += avg

    return out


def work1(part, new_data):
    pass


train_list = [xxx]
divide = 32
parts = split_list_into_parts(train_list, divide)
pool = Pool(divide)  # 定义一个进程池,最大进程数为divide

results = []
for i in range(args.workers):
    result = pool.apply_async(func=work1, args=(parts[i], i,))  # 非阻塞方式运行 
    results.append(result)
pool.close()

for i, result in enumerate(results):
    error_message = result.get()  # 获取子进程的返回值
    if error_message:
        print(error_message)  # 输出错误信息

pool.join()
print("-----end------")

2. 阻塞式运行 apply

apply(func, [,args,]) 方法,使用阻塞的方式运行程序,即顺序执行,args 为传给 func 的参数列表

阻塞式运行就是说,所有的子进程是顺序执行的,执行完当前的在执行下一个,将上面的程序中 apply_async 改为 apply,得到如下结果

循环任务1由进程号33447进程执行
1 执行完毕,耗时0.58
循环任务2由进程号33446进程执行
2 执行完毕,耗时0.69
循环任务3由进程号33448进程执行
3 执行完毕,耗时0.10
循环任务4由进程号33447进程执行
4 执行完毕,耗时0.05
循环任务5由进程号33446进程执行
5 执行完毕,耗时0.81
------start------
-----end------

可以看到,先执行任务 1,任务 1 结束后执行任务 2,所有的任务并不是真正意义上的多线程的

3. 使用进程池得到结果

get()函数可以得到函数的返回结果

from multiprocessing import Pool
import os, time, random
 
def work1(id):
    return id
 
if __name__ == '__main__':
    pool = Pool(3)  # 定义一个进程池,最大进程数为3
    results = []
    for i in range(1, 6):
        results.append(pool.apply_async(func=work1, args=(i,)))  # 非阻塞方式运行 
    pool.close()
    pool.join()
    for i in results:
        print(i.get())

2.2 在不同进程之间交换结果

1. multiprocessing.Queue()方法

对于 Processor 类,用 multiprocessing.Queue()方法来获取进程的结果

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

输出下面结果。如果我们创建了多个进程,可以最后通过 get()方法来得到多个进程的结果然后一起处理

[42, None, 'hello']

2. multiprocessing.Manager().Queue()方法

对于 Pool 创建的进程,就需要 multiprocessing.Manager().Queue()方法了,下面展示了使用情况

from multiprocessing import Pool, Manager
import os, time, random
 
def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(1, 11):
        q.put(i)
 
if __name__ == '__main__':
    print("(%s) start" % os.getpid())  # 打印主进程(父进程)
    pool = Pool(5)  # 创建进程池
    q = Manager().Queue()  # 使用Manager中的Queue
    pool.apply_async(func=writer, args=(q,))
    time.sleep(2)  # 如果去掉的话,就直接打印print的结果了,此时writer方法中的q.put还没执行完
    print(q.qsize(), q.empty(), q.get())
    for i in range(1, 10):
        print(q.get())
    pool.close()
    pool.join()

下面是输出结果,在 sleep 的 2s 中,writer 函数已经执行完毕,此时,q 的大小为 10,q.get()可以顺序获取 q.put()的值,我们将其打印,然后用一个循环将剩余的 9 个数字打印出来

writer启动(37252),父进程为(37250)
10 False 1
2
3
4
5
6
7
8
9
10
4月 06, 2025
3月 10, 2025
12月 31, 2024