当前位置: 主页 > Python语言

python 多核并行计算-python并行解析xml文件

发布时间:2023-02-13 09:09   浏览次数:次   作者:佚名

Python多核并行计算

2016年11月2日·蟒蛇

我以前写小程序,根本不关心并行性。 单核跑没问题,我的电脑只有双核和四个超线程(以下统称核心)。 密集型任务)。 然后由于我用的是32核128GB内存,看到htop里面一堆空核,自然就觉得这个并行肯定要折腾了。 后来发现,Python的并行其实很简单。

htop 32cores 128GB RAM

多处理与线程

Python自带的库功能全面,使用方便,这也是我特别喜欢Python的原因之一。 Python中有两个库,multiprocessing和threading,就是用来实现并行的。 使用线程应该是很自然的想法。 毕竟(直觉上)开销小python 多核并行计算,而且有共享内存的好处,线程在其他语言中确实使用频率很高。 不过,我可以很负责任的说,如果你使用的是CPython实现,那么使用线程就相当于告别了并行计算(事实上,它甚至会比单线程更慢),除非是IO密集型任务。

吉尔

参考python.org提供的Python实现。 是的,Python是一种语言,它有各种各样的实现,比如PyPy、Jython、IronPython等等……我们用的最多的就是CPython,几乎可以等同于Python。

在CPython的实现中,使用全局锁来简化解释器的实现,使解释器一次只在一个线程中执行字节码。 也就是说,除非你是在等待一个IO操作,否则CPython的多线程就是彻头彻尾的谎言!

以下两个关于 GIL 的来源写得很好:

多处理。 水池

threading因为GIL不能用了,还是好好研究下multiprocessing吧。 (当然,如果你说你不用CPython,没有GIL问题,那也很好。)

先介绍一个简单粗暴,非常实用的工具,那就是。 如果你的任务可以通过ys = map(f, xs)来解决,大家可能都知道这种形式自然是最容易并行化的,那么用Python并行计算这个任务就真的很容易了。 例如,要对每个数字进行平方:

import multiprocessing
def f(x):
    return x * x
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)

python 多核并行计算_python 并行 写文件_python并行解析xml文件

xs = range(5) # method 1: map print pool.map(f, xs) # prints [0, 1, 4, 9, 16] # method 2: imap for y in pool.imap(f, xs): print y # 0, 1, 4, 9, 16, respectively # method 3: imap_unordered for y in pool.imap_unordered(f, xs): print(y) # may be in any order

map直接返回列表,i开头的两个函数返回迭代器; imap_unordered 返回无序。

当计算时间比较长的时候,我们可能会想加一个进度条。 这时候i系列的好处就体现出来了。 另外还有一个小技巧,就是输出\r可以让光标回到行首而不换行,这样就可以做一个简单的进度条了。

cnt = 0
for _ in pool.imap_unordered(f, xs):
    sys.stdout.write('done %d/%d\r' % (cnt, len(xs)))
    cnt += 1

python 并行 写文件_python并行解析xml文件_python 多核并行计算

更复杂的操作

对于更复杂的操作,可以直接使用对象。 要在进程之间进行通信,请使用:

其中我强烈推荐Queue,因为其实很多场景都是生产者消费者模型,此时Queue就可以解决问题。 使用的方法也很简单。 现在父进程创建Queue,然后将其作为args或kwargs传递给Process。

使用Theano或Tensorflow等工具时的注意事项

需要注意的是,在导入theano或import tensorflow等调用Cuda的工具时,会出现一些副作用。 这些副作用会原样复制到子进程中python 多核并行计算,然后就会出现错误,比如:

could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED

解决方法是保证父进程不引入这些工具,而是在子进程创建后,让子进程单独导入。

如果您使用 Process,请将其导入目标函数。 例如:

import multiprocessing
def hello(taskq, resultq):
    import tensorflow as tf
    config = tf.ConfigProto()
    config.gpu_options.allow_growth=True
    sess = tf.Session(config=config)
    while True:
        name = taskq.get()
        res = sess.run(tf.constant('hello ' + name))
        resultq.put(res)

python并行解析xml文件_python 并行 写文件_python 多核并行计算

if __name__ == '__main__': taskq = multiprocessing.Queue() resultq = multiprocessing.Queue() p = multiprocessing.Process(target=hello, args=(taskq, resultq)) p.start() taskq.put('world') taskq.put('abcdabcd987') taskq.close() print(resultq.get()) print(resultq.get()) p.terminate() p.join()

如果使用Pool,可以写一个函数,在这个函数中import,把这个函数传入Pool的构造函数中作为初始化器。 例如:

import multiprocessing

python 多核并行计算_python并行解析xml文件_python 并行 写文件

def init(): global tf global sess import tensorflow as tf config = tf.ConfigProto() config.gpu_options.allow_growth=True sess = tf.Session(config=config) def hello(name): return sess.run(tf.constant('hello ' + name)) if __name__ == '__main__': pool = multiprocessing.Pool(processes=2, initializer=init) xs = ['world', 'abcdabcd987', 'Lequn Chen'] print pool.map(hello, xs)

Pool.apply_async:传入不确定的参数

import multiprocessing as mp
import time
def foo_pool(x):

python 并行 写文件_python 多核并行计算_python并行解析xml文件

time.sleep(2) return x*x result_list = [] def log_result(result): # This is called whenever foo_pool(i) returns a result. # result_list is modified only by the main process, not the pool workers. result_list.append(result) def apply_async_with_callback(): pool = mp.Pool() for i in range(10): pool.apply_async(foo_pool, args = (i, ), callback = log_result) pool.close() pool.join() print(result_list) if __name__ == '__main__': apply_async_with_callback()

参考: