TensorFlow多GPU并行

TensorFlow多GPU并行

1. TensorFlow使用GPU

TensorFlow程序通过tf.device函数来指定运行每一个操作的设备,这个设备可以是CPU或GPU,也可以是某一台远程的服务器,设备名称:CPU:/cup:0,GPU: /gpu:n。在配置好GPU的环境中TF会优先选择GPU。 注意不是所有的操作都可以放在GPU上,如果将无法放在GPU上的操作指定到GPU上,程序将会报错,并且不同版本的TF对GPU的支持也是不一样的。

TF提供了一个快捷的方式查看运行每一个运算的设备,在生成会话时,可以通过设置log_device_placement参数来打印运行每一个运算的设备。

import tensorflow as tf

a = tf.constant([1.0,2.0,3.0],shape=[3],name="a")
b = tf.constant([1.0,2.0,3.0],shape=[3],name="b")

c = a + b

# 通过log_device_placement 参数来输出运行每一个运算的设备
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
sess = tf.Session(config = tf.ConfigProto(log_device_placement=True))
print(sess.run(c))

'''
a2dd: (Add): /job:localhost/replica:0/task:0/device:CPU:0

add: (Add)b/job:localhost/replica:0/task:0/device:CPU:0
/job:localhost/replica:0/task:0/device:CPU:0

b: (Const)/job:localhost/replica:0/task:0/device:CPU:0
a: (Const)/job:localhost/replica:0/task:0/device:CPU:0
[2. 4. 6.]
'''

如果将某些运算放在不同的GPU中或CPU中,需要通过tf.device来指定

import tensorflow as tf

# 通过tf.device将运算指定到特定的设备上

with tf.device('/cpu:0'):
	a = tf.constant([1.0,2.0,3.0],shape=[3],name='a')
	b = tf.constant([1.0,2.0,3.0],shape=[3],name='b')

with tf.device('/cpu:1'):
	c = a+b

sess = tf.Session(config = tf.ConfigProto(log_device_placement=True))

print(sess.run(c))

为了避免指定GPU的操作而不支持GPU,TF在生成会话时可以指定allow_soft_placement参数,当allow_soft_placement参数设置为True时,如果运算无法由GPU执行,会自动放在CPU中,例子如下:

import tensorflow as tf

a_cpu = tf.Variable(0,name="a_cpu")
with tf.device("/gpu:0"):
	a_gpu = tf.Variable(0,name="a_gpu")

# 通过allow_soft_placement参数自动将GPU上的操作放回CPU

sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True,
	log_device_placement=True))
sess.run(tf.global_variables_initializer())

2.多GPU并行

a. 深度学习并行的模式:同步模式和异步模式

异步模式(模型并行): 在每一轮迭代时,不同设备会读取参数的最新取值,但因为不同设备在读取参数取值时的时间不一样,所以得到的值也有可能不一样。根据当前参数的取值和随机获取的一小部分训练数据,不同设备各自运行反向传播的过程并独立的更新参数。可以简单的认为异步模式就是单机模式复制了多份,每一份使用不同的训练数据进行训练。在异步模式下,不同的设备之间是完全独立的。

同步模式(数据并行): 为了避免更新不同步的问题,可以使用同步模式,所有设备同时读取参数的取值,并且当反向传播算法完成之后同步更新参数的取值。单个设备不会单独对参数更新,而会·等待所有设备都完成反向传播后再统一更新参数。注意虽然所有设备使用的参数是一致的,但是因为训练数据不同,当所有设备完成反向传播的计算后,需要计算出不同设备上参数梯度的平均值,最后通过平均值对参数进行更新。但是同步模式低于异步模式,如果设备的运行速率不一致,那么每一轮训练都需要等待最慢的设备结束才能开始更新参数。

同步模式几乎适用于所有深度学习模型。使用数据并行时最好GPU的型号,速度最好一致,这样效率最高,而异步的数据并行,则不等待所有GPU都完成一次训练,而是哪个GPU完成了训练,就立即将梯度更新到共享的模型参数中,通常来说数据并行比异步并行的模式收敛速度更快,模型的精度更高。

3.Example

import os.path
import re
import time
import numpy as np
import tensorflow as tf
import cifar10 # 训练的CNN模型

batch_size = 128
max_step  = 1000000
num_gpus = 4 # GPU个数

# 计算不同设备训练的损失
def tower_loss(scope):
	images,labels = cifar10.distorted_inputs()
	logits = cifar10.inference(images)
	_ = cifar10.loss(logits,labels)
	losses = tf.get_collection("lossess",scope)
	total_loss = tf.add_n(losses,name='total_loss')

	return total_loss


# 计算不同GPU上相同变量的梯度的平均值
# 用来更新该变量
def average_gradients(tower_grads):
	average_grads = []
	for grad_and_vars in zip(*tower_grads):
		grads = []
		for g,_ in grad_and_vars:
			expanded_g = tf.expand_dims(g,0)
			grads.append(expand_g)

		grad = tf.concat(grads, 0)
		grad = tf.reduce_mean(grad,0)

		v = grad_and_vars[0][1]
		grad_and_var = (grad,v)
		average_grads.append(grad_and_var)

	return average_grads

def train():
	with tf.Graph().as_default(),tf.device("/cpu:0"):
		global_step = tf,get_variables('global_step',[],
			initializer=tf.constant_initializer(0),
			trainable=False)

		num_batches_per_epoch = xxx
		deacy_steps = int(num_batches_per_epoch*xxxx)

		# 创建随训练步数衰减的学习率
		lr = tf.train.exponential_deacy("初始学习率",
			"全局训练的步数","每次衰减需要的步数",
			"衰减率",starcase=True)# 代表阶梯衰减

		opt = tf.train.GradientDescentOptimizer(lr)

		# 储存GPU计算结果的列表
		tower_grads = []

		for i in range(num_gpus):
			with tf.device("/gpu:%d" %i):
				with tf.name_scope("%s_%d" %(name1,i)) as scope:
					loss = tower_loss(scope)
					# 重用参数,让所有GPU公用一个
					# 模型及完全相同的参数
					tf.get_variable_scope().reuse_variables()
					# 计算梯度
					grads = opt.compute_gradient(loss)
					tower_grads.append(grads)

			grads = average_gradients(tower_grads)
			# 更新模型参数,global_step计数功能,从1开始
			apply_gradient_op = opt.apply_gradients(grads,
				global_step=global_step)


		server = tf.train.Saver(tf.all_variables())
		init = tf.global_variables_initializer()
		sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))

		sess.run(init)
		tf.train.start_queue_runners(sess=sess)

		for step in range(max_steps):
			start_time = time.time()
			_,loss_value = sess.run([apply_gradient_op,loss])
			duration = time.time() - start_time

			if step % 10 == 0:
				# print something
			
			if step % 1000 == 0 or (step+1) == max_steps:
				# global_step计数作用
				saver.save(sess,"path/to/model/model.ckpt",
					global_step=step)
Author face

徐静

数据科学从业者,算法工程师. 善于用数据科学的工具透析业务,模型的线上化部署,网络爬虫及前端可视化. 喜欢研究机器学习,深度学习及相关软件实现.目前自己还是小白一个,希望多多学习.

最近发表的文章