多进程(multiprocessing)
徐静
python的os模块封装了常见的系统调用,其中包括了fork,可以在Python程序中轻松创建子进程:
import os
print('Process(%s) starting......' %os.getpid())
#only works on Unix/Linux/Mac
pid = os.fork()
if pid ==0:
print('I am child process(%s) and my parent is %s.' %(os.getpid(),os.getppid()))
else:
print('I (%s) just created a child processing (%s).' %(os.getpid(),pid))
Windows 没有fork调用,上面的代码在Windows上无法运行,Mac是基于BSD(Unix的一种),常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程处理新的http请求。
multiprocessing
Windows没有fork调用,如何在Windows上编写多进程程序,multiprocessing模块就是跨平台版本的多进程模块。
它提供了一个Process类来代表一个进程对象,小面的例子演示了启动一个子进程并等待其结束
from multiprocessing import Process
import os
#子进程套执行程序代码
def run_proc(name):
print('Run child Process %s (%s)...' %(name,osgetpid()))
if __name__ == '__main__':
print('Parent process %s.' %os.getpid())
p = Process(target=run_proc,args=('test',))
p.start()
p.join()
print('Child process end.')
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。 join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
Pool
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
print('Run task %s(%s)...' %(name,os.getpid()))
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print('task %s'runs %0.2f)seconds.' %(name,(end-start)))
if __name__ == '__main__':
print('Oarent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task,arg=(i,))
print('Waiting fpr all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
进程间的通信
Process之间肯定是需要通信的,Python的multiprocessing模块包装了底层的机制,提供了Queue,Pipes等多种方式来交换数据。我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()