TensorFlow高性能计算提升及分析技巧

这段时间,我师傅交给我一些任务,让我加速TensorFlow模型的计算速度。主要还都是他在提想法,我负责实现就行了。这篇博客主要将学到的知识进行一下总结。由于之前在科研上多数应用的是Pytorch构建模型,现在忽然让用TensorFlow,对其中的一些计算机制的不熟悉,在实现过程中遇到了不少问题
文章主要分为两点:

  • TFRecordDataset的原理和使用
  • Pipeline的原理和使用
  • Profile 性能分析

TFRecordDataset的原理和使用

TensorFlow中有一个tf.data的API,可以根据简单的可重用片段构建复杂的输入管道:比如图片模型的管道可能会汇聚分布式文件系统中的文件中的数据,对每个图片进行随机扰动,并将随机选择的图片合并用于训练的批次。
tf.data在TensorFlow中引入了两个新的抽象类:

  • tf.data.Dataset 表示一系列元素,其中每个元素包含一个或者多个tensor对象。比如在图像管道中,元素可能是单个训练样本,具有一对表示图像数据和标签的张量。可以是通过两种不同方式来创建数据集。
    • 创建来源(Dataset.from_tensor_slice()),以通过一个或者多个tf.Tensor对象构建数据集
    • 应用转换(Dataset.batch()),通过一个或者多个tf.data.Dataset对象构建数据集
  • tf.data.Iterator 提供从数据集中提取元素的主要方法,Iterator.get_next()返回的操作会在执行时生成Dataset的下一个元素,并且此操作通常充当输入管道代码和模型之间的接口。最简单的迭代器是‘单次迭代器’,它和特定的Dataset相关联,并对其进行一次迭代。要实现更复杂的用途,可以通过Iterator.inititalizer操作使用不同的数据集重新初始化和参数化迭代器

如何使用TFRecord数据
tf.dataAPI支持多种文件格式,可以 处理那些不适合在内存中存储的大型数据集,比如,TFRecord文件格式是一种面向记录的二进制格式,很多TensorFlow应用此格式来训练数据。通过tf.data.TFRecordDataset类,可以将一个或者多个TFRecord文件的内容作为输入管道的一部分进行流式传输。

# 创建一个从两个文件读取数据集
filenames = ["/var/file1.tfrecord","/var/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)

TFRecordDataset初始化程序的filenames的参数可以是字符串,字符串列表,也可以是字符串tf.Tensor。因此,面对需要训练和验证的文件,可以使用tf.placeholder(tf.string)来表示文件名,并使用适当的文件名初始化迭代器:

filenames = tf.placeholder(tf.string,shape=[None])
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)  # parse the record into tensors
dataset = dataset.repeat()  # repeat the input indefinitely
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()

##  初始化 训练数据
training_filenames = ["/var/validation1.tf.record",....]
sess.run(iterator.initializer,feed_dict={filenames: validation_filenames})

还有一些函数的使用细节,大家可以在第一个参考文献中看到。

Pipeline的原理和使用

pipeline也叫作数据输入流水线性能优化。这个性能优化策略简单来说就是,并行处理CPU数据读取和GPU计算,让数据读取和GPU计算交替进行缩短整个模型的时间。
太长不看(最佳做法摘要):

  • 使用prefetch转换可将提供方和使用方的工作重叠,将prefetch(n)(其中n是单步训练使用的元素数/批次数)添加到输入流水线的末尾,以便在CPU上执行的转换与加速器上执行的训练重叠
  • 通过设置num_parallel_calls参数并行处理map转换。建议将其值设置为CPU核心数量
  • 使用batch转换将预处理元素组合到一个批次中,建议使用map_and_batch混合转换
  • 如果要处理远程存储的数据并/或需要反序列化,可以使用parallel_interleave转换来重叠从不同文件读取数据的操作
  • 向量化传递给map转换的低开销用户定义函数,来分摊与调度和执行相应函数相关的开销。
  • 如果内存可以容纳数据,就请使用cache转换在第一个周期中将数据缓存在内存中,以便后续周期可以避免与读取、解析和转换该数据的相关开销
  • 如果预处理操作会增加数据大小,建议先应用interleaveprefetchshuffle减少内存的使用量
  • 建议在repeat转换之前先应用shuffle转换,最好使用shuffle_and_repeat混合转换

输入流水线结构

典型的TensorFlow训练输入流水线可以看做是ETL流程:

  • 提取: 从存储上读取数据
  • 转换: 使用CPU核心解析数据并执行预处理操作,比如图像解压缩,数据增强转换,重排和批处理
  • 加载:将转换之后的数据加载到执行机器学习模型的加速器设备上

这种模式就已经能够高效利用CPU,同时预留加速器来完成对模型进行训练的繁重工作。使用tf.estimator.EstimatorAPI时,前两个阶段(提取和转换)是在input_fn(传递给tf.estimator.Estimator,train)中捕获的。代码:

def parse_fn(example):
	"Parse TFExample records and perform simple data augmentation"
	example_fmt ={
	"image":tf.FixedLengthFeature((),tf.string,""),
	"label":tf.FixedLengthFeature((),tf.int64,-1)
	}
	parsed = tf.parse_single_example(example,example_fmt)
	image = tf.image.decord_image(parsed["image"])
	image = _augment_helper(image)  # augments image using slice,reshape,resize_bilinear
	return image,parsed["label"]
def input_fn():
	files = tf.data.Dataset.list_files("/path/train-*.tfrecord")
	dataset = files.interleave(tf.data.TFRecordDataset)
	dataset = dataset.shuffle(buffer_size = FLAGS.shuffle_buffer_size)
	dataset = dataset.map(map_func = parse_fn)
	dataset = dataset.batch(batch_size = FLAGS.batch_size)

由于GPU或者TPU可以不断提升神经网络的训练速度,因此,CPU处理很容易成为瓶颈。tf.dataAPI 为用户提供构建块来设计可高效利用CPU的输入流水线,并优化ETL流程的每个步骤。
在执行训练步骤,必须提取并转换训练数据,然后将其提供给加速器上运行的模型。
TensorFlow高性能计算提升及分析技巧
如上所示,在一个简单的同步实现中,当CPU准备数据时,GPU处于空闲状态,当GPU训练模型的时候,CPU处于空闲的状态。所以训练步的用时是CPU+GPU的和
如果使用流水线可以显著减少空闲时间:
TensorFlow高性能计算提升及分析技巧

  • 流水线:tf.dataAPI 通过tf.data.Dataset.prefetch转换提供了一种软件流水线机制,该机制可用于将生成数据的时间和使用数据的时间分离开,具体来说,该转换使用了后台线程和内部缓冲,以便在请求元素之前从输入数据集中预取这些元素。因此为了实现上面的流水线效果,可以将prefetch(1)作为最终转换添加到数据集流水线中(如果单步训练使用n个元素,就需要添加prefetch(n)
  • 并行处理数据转换:准备批次数据,可能需要预处理输入元素。为此,tf.dataAPI提供了tf.data.Dataset.map转换,将用户定义的函数应用于输入数据集的每个元素。由于输入数据彼此独立,因此可以跨多个CPU核心并行执行处理。为了实现这一点,map转换提供了num_parallel_calls参数来指定并行处理级别。
    TensorFlow高性能计算提升及分析技巧
    设置num_parallel_calls参数选择最佳值取决于硬件,训练数据的特征,映射函数的成本以及同时在CPU上进行的其他处理
  • 并行处理数据提取: 实际设置中,输入数据可能会远程存储,这是因为输入数据不适合本地存储,或是因为训练是分布式训练,因此每台机器上复制输入数据没有意义。非常适合在本地上读取数据的数据集流水线在远程读取数据时可能会遇到IO瓶颈,这是因为本地和远程存储之间存在一些差异:
    • 首字节时间:与本地存储相比,从远程存储读取文件的首字节所用时间可能要多出几个数量级
    • 读取吞吐量:虽然远程存储通常可提供较大的聚合带宽,但读取单个文件可能只能利用此带宽中的一小部分
      为了降低各种数据提取开销的影响,tf.dataAPI提供了tf.contrib.data.parallel_interleave转换。使用此转换可以并行执行其他数据集并交错这些数据集中的内容,通过cycle_length参数指定要重叠的数据集数量
      TensorFlow高性能计算提升及分析技巧
      具体相关的操作可以参看第二个文献。

Profile 性能分析

当程序达到一个性能瓶颈之后,需要按照每步的性能损耗来进行优化,在第三篇的参考文献中,讲述了七款python的性能分析工具,其中我使用的是line_profiler 可以统计每行代码执行的次数和执行时间等,时间单位是微秒。
测试代码:
C:\Python34\test.py

import time
@profile
def fun():
    a = 0
    b = 0
    for i in range(100000):
        a = a + i * i
    for i in range(3):
        b += 1
        time.sleep(0.1)
    return a + b
fun()

使用:
1.在需要测试的函数加上@profile装饰,这里我们把测试代码写在C:\Python34\test.py文件上.
2.运行命令行:kernprof -l -v C:\Python34\test.py
显示效果的说明:

Total Time:测试代码的总运行时间 
Hits:表示每行代码运行的次数  
Time:每行代码运行的总时间  
Per Hits:每行代码运行一次的时间  
% Time:每行代码运行时间的百分比

参考资料

TensorFlow 导入数据
TensorFlow输入流水线性能
Python的7种性能测试工具:timeit、profile、cProfile、line_profiler、memory_profiler、PyCharm图形化性能测试工具、objgraph