python 多核并行计算-python并行解析xml文件
Python多核并行计算
2016年11月2日·蟒蛇
我以前写小程序,根本不关心并行性。 单核跑没问题,我的电脑只有双核和四个超线程(以下统称核心)。 密集型任务)。 然后由于我用的是32核128GB内存,看到htop里面一堆空核,自然就觉得这个并行肯定要折腾了。 后来发现,Python的并行其实很简单。
多处理与线程
Python自带的库功能全面,使用方便,这也是我特别喜欢Python的原因之一。 Python中有两个库,multiprocessing和threading,就是用来实现并行的。 使用线程应该是很自然的想法。 毕竟(直觉上)开销小python 多核并行计算,而且有共享内存的好处,线程在其他语言中确实使用频率很高。 不过,我可以很负责任的说,如果你使用的是CPython实现,那么使用线程就相当于告别了并行计算(事实上,它甚至会比单线程更慢),除非是IO密集型任务。
吉尔
参考python.org提供的Python实现。 是的,Python是一种语言,它有各种各样的实现,比如PyPy、Jython、IronPython等等……我们用的最多的就是CPython,几乎可以等同于Python。
在CPython的实现中,使用全局锁来简化解释器的实现,使解释器一次只在一个线程中执行字节码。 也就是说,除非你是在等待一个IO操作,否则CPython的多线程就是彻头彻尾的谎言!
以下两个关于 GIL 的来源写得很好:
多处理。 水池
threading因为GIL不能用了,还是好好研究下multiprocessing吧。 (当然,如果你说你不用CPython,没有GIL问题,那也很好。)
先介绍一个简单粗暴,非常实用的工具,那就是。 如果你的任务可以通过ys = map(f, xs)来解决,大家可能都知道这种形式自然是最容易并行化的,那么用Python并行计算这个任务就真的很容易了。 例如,要对每个数字进行平方:
import multiprocessing
def f(x):
return x * x
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
xs = range(5)
# method 1: map
print pool.map(f, xs) # prints [0, 1, 4, 9, 16]
# method 2: imap
for y in pool.imap(f, xs):
print y # 0, 1, 4, 9, 16, respectively
# method 3: imap_unordered
for y in pool.imap_unordered(f, xs):
print(y) # may be in any order
map直接返回列表,i开头的两个函数返回迭代器; imap_unordered 返回无序。
当计算时间比较长的时候,我们可能会想加一个进度条。 这时候i系列的好处就体现出来了。 另外还有一个小技巧,就是输出\r可以让光标回到行首而不换行,这样就可以做一个简单的进度条了。
cnt = 0
for _ in pool.imap_unordered(f, xs):
sys.stdout.write('done %d/%d\r' % (cnt, len(xs)))
cnt += 1
更复杂的操作
对于更复杂的操作,可以直接使用对象。 要在进程之间进行通信,请使用:
其中我强烈推荐Queue,因为其实很多场景都是生产者消费者模型,此时Queue就可以解决问题。 使用的方法也很简单。 现在父进程创建Queue,然后将其作为args或kwargs传递给Process。
使用Theano或Tensorflow等工具时的注意事项
需要注意的是,在导入theano或import tensorflow等调用Cuda的工具时,会出现一些副作用。 这些副作用会原样复制到子进程中python 多核并行计算,然后就会出现错误,比如:
could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED
解决方法是保证父进程不引入这些工具,而是在子进程创建后,让子进程单独导入。
如果您使用 Process,请将其导入目标函数。 例如:
import multiprocessing
def hello(taskq, resultq):
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth=True
sess = tf.Session(config=config)
while True:
name = taskq.get()
res = sess.run(tf.constant('hello ' + name))
resultq.put(res)
if __name__ == '__main__':
taskq = multiprocessing.Queue()
resultq = multiprocessing.Queue()
p = multiprocessing.Process(target=hello, args=(taskq, resultq))
p.start()
taskq.put('world')
taskq.put('abcdabcd987')
taskq.close()
print(resultq.get())
print(resultq.get())
p.terminate()
p.join()
如果使用Pool,可以写一个函数,在这个函数中import,把这个函数传入Pool的构造函数中作为初始化器。 例如:
import multiprocessing
def init():
global tf
global sess
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth=True
sess = tf.Session(config=config)
def hello(name):
return sess.run(tf.constant('hello ' + name))
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=2, initializer=init)
xs = ['world', 'abcdabcd987', 'Lequn Chen']
print pool.map(hello, xs)
Pool.apply_async:传入不确定的参数
import multiprocessing as mp
import time
def foo_pool(x):
time.sleep(2)
return x*x
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)
def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
参考: