多进程(multiprocessing)

多进程(multiprocessing)

徐静

python的os模块封装了常见的系统调用,其中包括了fork,可以在Python程序中轻松创建子进程:


import os

print('Process(%s) starting......' %os.getpid())
#only works on Unix/Linux/Mac

pid = os.fork()

if pid ==0:
	print('I am child process(%s) and my parent is %s.' %(os.getpid(),os.getppid()))
else:
	print('I (%s) just created a child processing (%s).' %(os.getpid(),pid))

Windows 没有fork调用,上面的代码在Windows上无法运行,Mac是基于BSD(Unix的一种),常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程处理新的http请求。

multiprocessing

Windows没有fork调用,如何在Windows上编写多进程程序,multiprocessing模块就是跨平台版本的多进程模块。

它提供了一个Process类来代表一个进程对象,小面的例子演示了启动一个子进程并等待其结束

from multiprocessing import Process
import os

#子进程套执行程序代码

def run_proc(name):
	print('Run child Process %s (%s)...' %(nameosgetpid()))


if __name__ == '__main__':
	print('Parent process %s.' %os.getpid())
	p = Process(target=run_proc,args=('test',))
	p.start()
	p.join()
	print('Child process end.')

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。 join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:


from multiprocessing import Pool
import os,time,random

def long_time_task(name):
	print('Run task %s(%s)...' %(name,os.getpid()))
	start = time.time()
	time.sleep(random.random()*3)
	end = time.time()
	print('task %s'runs %0.2f)seconds.' %(name,(end-start)))

if __name__ == '__main__':
	print('Oarent process %s.' % os.getpid())
	p = Pool(4)
	for i in range(5):
		p.apply_async(long_time_task,arg=(i,))
	print('Waiting fpr all subprocesses done...')
	p.close()
	p.join()
	print('All subprocesses done.')

进程间的通信

Process之间肯定是需要通信的,Python的multiprocessing模块包装了底层的机制,提供了Queue,Pipes等多种方式来交换数据。我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:


from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
	print('Process to write: %s' % os.getpid())
	for value in ['A', 'B', 'C']:
		print('Put %s to queue...' % value)
		q.put(value)
		time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
	print('Process to read: %s' % os.getpid())
	while True:
	value = q.get(True)
	print('Get %s from queue.' % value)
if __name__=='__main__':
	# 父进程创建Queue,并传给各个子进程:
	q = Queue()
	pw = Process(target=write, args=(q,))
	pr = Process(target=read, args=(q,))
	# 启动子进程pw,写入:
	pw.start()
	# 启动子进程pr,读取:
	pr.start()
	# 等待pw结束:
	pw.join()
	# pr进程里是死循环,无法等待其结束,只能强行终止:
	pr.terminate()

Author face

徐静

数据科学从业者,算法工程师. 善于用数据科学的工具透析业务,模型的线上化部署,网络爬虫及前端可视化. 喜欢研究机器学习,深度学习及相关软件实现.目前自己还是小白一个,希望多多学习.

最近发表的文章