05月10 Tensorflow学习笔记
Tensorflow简介
Tensorflow的定位
TensorFlow是一个采用数据流图,用于数值计算的开源软件库。被开发用于机器学习和深度神经网络方面的研究,但这个系统的通用性使其也可广泛用于其他计算领域。
通用计算框架、算法引擎,提供了多种机器学习的基础计算单元op(operator,如自动反向传播求解梯度、卷积、LSTM),用户可以方便的设计实现神经网络结构。
- 低于深度学习框架(如Caffe、Paddle),用户除了配置网络结构参数外,需要自己使用计算单元op实现具体的算法,可自定义计算逻辑,算法实现的自由度更高。
- 高于编程框架(如ELF、目前的Hippo),在计算引擎上层封装了包含多种计算单元op的高阶机器学习库,用户主要关注算法逻辑,而不是基本计算单元(ELF的map, Hippo的node)的实现
- 类似数学分析领域的MatLab,提供强大的运算接口,上层的应用怎么实现看用户编程。
「hadoop是大数据时代的infrastructure, 而tensorflow会是人工智能时代的infrastructure」
Tensorflow的发展和现状
Tensorflow是由Jeff Dean领头的Google Brain团队基于Google内部第一代深度学习系统DistBelief改进而来的通用计算框架。
- DistBelief是Google2011年开发的内部深度学习工具,在内部取得了巨大成功,在Google的图片搜索和语音识别业务上都有突破性的进展(开创了图片搜索功能,语音识别错误率降低了25%,相当于前十年的总和)。
- 2015年11月发布基于Apache2.0开源协议的Tensorflow,去除了DistBelif对Google内部的系统依赖,计算模型更加通用、计算速度更快、支持计算平台更多、支持深度学习算法更广,系统稳定性更高。
- 2016年5月发布专门为机器学习和Tensorflow定制的张量处理单元TPU,可编程的AI加速器,提供高吞吐量的低精度计算,面向使用或运行模型而不是训练模型。
- 2017年2月发布大版本Tensorflow 1.0,更快、更灵活的新特性,Keras正式成为Tensorflow默认API,提供高级别的神经网络框架API。
Tensorflow在Google内部得到广泛的应用,在排序系统的上千种排序算法中,基于Tensorflow实现的RankBrain是第三重要的排序算法。包括网页搜索在内,Tensorflow已经成功应用到Google内部语音搜索、广告、电商、图片、翻译、YouTube、街景图等各款产品中。Deepmind在尝试半年后正式宣布之后所有的研究都将使用Tensorflow作为实现深度学习算法的工具。
包括Uber、Snapchat、Twitter、京东、小米国内外多家科技公司都在使用Tensorflow,Google I/O 2016大会上提到已经有1500个GitHub代码库在使用Tensorflow,其中只有5个是Google官方提供的。
各种开源深度学习框架和工具层出不穷,各有优点。Tensorflow坐拥巨头Google的全力支持,以及自身设计神经网络结构的代码的简洁度、分布式深度学习算法的执行效率、部署的便利性等亮点,在开源社区活跃度上占有绝对优势,大有一统江湖之势。
左图为不同深度学习工具社区流行度指标比较, 截止到2016年11月github的star和fork数量;右图不同深度学习工具社区参与度指标比较,2016年11月单月讨论和提交代码数量。tensorflow各个指标都远超其他,并且领先优势还在扩大。
Tensorflow使用
编程模型
Tensorflow是基于计算图Graph的计算框架,要求用户将自己需要实现的机器学习算法都实现为数据以张量Tensor的形式在多个计算单元Operator之间计算流动的过程,即张量的数据流Tensor-Flow。
主要概念
张量Tensor:
- 基本数据单位,Tensorflow里所有的数据皆为Tensor。
- Tensor形式为多维数组,在Tensorflow Python API里以Numpy.NDArray的形式。
- 常用Tensor:0阶Tensor为标量,1阶Tensor为向量、数组,2阶Tensor为矩阵
- Tensor只是对数据结果的引用,没有真正的保存数据,保存的是如何得到这些数据的计算过程。在Session中运行时才能得到真正的数据。
计算单元Operator:计算图中的每一个节点,单个计算过程;每个计算单元的输入可以是0或多个Tensor,输出可以是0或多个Tensor。
计算图Graph:由多个计算单元Operator通过边连接形成的图,计算单元之间的边表示了计算之间的Tensor数据传递关系;特殊的边控制依赖controll dependencies,控制计算节点之间的依赖关系。
变量Variable:需要保存的参数状态。而参数是在图中有其固定的位置的,不能像普通数据那样正常流动。因而,Tensroflow中将Variables实现为一个特殊的算子,该算子会返回它所保存的可变tensor的句柄。
会话Session:拥有并管理Tensorflow程序运行时的所有资源,用户定义完的计算图Graph必须再Session中执行才能得到真正的计算结果;可以配置并行线程数、CPU/GPU分配策略等运行参数。
占位符Placeholder、数据Feed/Fetch:数据占位符Placeholder可以为计算单元op预留指定格式的Tensor输入,在Session运行中使用Feed机制使用新的指定格式Tensor临时替换作为输入,使用Fetch机制可以取回Session运行中的Tensor结果。
MNIST DNN Demo
网络结构:
模型结果:
W1、B1、W2、B2四个tensor
W1.shape = [784, 4] ,B1.shape = [4] , W2.shape = [4, 10] , B2.shape = [10]
计算图模型:
前向传播:
mul1 = X * W1 add1 = mul1 + b1 tanh1 = tanh(add1) mul2 = tanh1 * W2 add2 = mul2 + b2 tanh2 = tanh(add2) loss = softmax_loss(tanh2) |
后向传播:
dloss = 1 dtanh2 = softmax_loss_diff(tanh2) * dloss dadd2 = tanh_diff(add2) * dtanh2 db2 = 1 * dadd2 dmul2 = 1 * dadd2 dW2 = tanh1 * dmul2 dtanh1 = W2 * dmul2 dadd1 = tanh_diff(add1) * dtanh1 db1 = 1 * dadd1 dmul1 = 1 * dadd1 dW1 = X * dmul1 |
TensorFlow代码:
# -*- coding=utf-8 -*-
"""
MNIST DNN demo by tobycc
"""
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
#------------------------DNN训练配置-------------------------------------
# 全链接网络结构配置
# 输入层784个节点为MNIST输入样本的特征,一层4个节点的隐藏层
# 一层4个节点的隐藏层
# 输出层SoftMax10个节点为MNIST样本的label
NET_CONFIG = [784, 4, 10]
# 单次batch样本数
BATCH_SIZE = 100
# 最大训练轮数(batch)
MAX_ITER = 10000
# 隐藏层激活函数,支持tanh、sigmoid、relu、linear
ACTIVATION = 'tanh'
# 学习率
LEARNING_RATE = 0.8
#------------------------------------------------------------------------
#--------------------构造DNN训练计算图-----------------------------------
# 存储模型训练结果
model_layer_weight = []
model_layer_bias = []
# 输入训练数据Tensor的占位符,shape=[BATCH_SIZE, 特征数]
input_feature = tf.placeholder(tf.float32, shape=(BATCH_SIZE, NET_CONFIG[0]))
# 输入真实label Tensor的占位符,shape=[BATCH_SIZE, label数]
input_label = tf.placeholder(tf.float32, shape=(BATCH_SIZE, NET_CONFIG[-1]))
# 前向传播过程
# 初始化当前层输入节点数
in_dim = NET_CONFIG[0]
# 从输入层开始,循环往下层进行前向传播
cur_layer = input_feature
for i in range(1, len(NET_CONFIG)):
# 当前层输出接点数
out_dim = NET_CONFIG[i]
# 当前层weight Tensor,shape=[in_dim, out_dim]
cur_layer_weight = tf.Variable(tf.random_normal([in_dim, out_dim]), dtype = tf.float32)
# 当前层bias Tensor,shape=[out_dim]
cur_layer_bias = tf.Variable(tf.random_normal([out_dim]), dtype = tf.float32)
# 添加到模型存储结果
model_layer_weight.append(cur_layer_weight)
model_layer_bias.append(cur_layer_bias)
# 前向传播,进行线性矩阵变换
# 当前层输入矩阵 X 当前层weight + 当前层bias
cur_layer = tf.matmul(cur_layer, cur_layer_weight) + cur_layer_bias
# 根据设置的激活函数,进行非线性变化
if ACTIVATION == 'tanh':
cur_layer = tf.nn.tanh(cur_layer)
elif ACTIVATION == 'sigmoid':
cur_layer = tf.nn.sigmoid(cur_layer)
elif ACTIVATION == 'relu':
cur_layer = tf.nn.relu(cur_layer)
# 进入下一层,更新输入节点数
in_dim = NET_CONFIG[i]
# 得到前向传播计算结果,cur_layer为batch数据的预测结果Tensor, shape=[BATCH_SIZE, label数(NET_CONFIG[-1])]
# 进行softmax变换
predict_label = tf.nn.softmax(cur_layer)
# 计算当前batch的损失函数值
# 预测结果Tensor和当前batch实际结果Tensor的交叉熵
loss = -tf.reduce_mean(input_label * tf.log(predict_label))
#loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits = cur_layer, labels = tf.argmax(input_label, 1)))
# 后向传播过程
# 计算每个节点相对损失函数的梯度
# 根据后向传播结果,对model_layer_weight和model_layer_bias进行梯度下降,更新权重
train_step = tf.train.GradientDescentOptimizer(LEARNING_RATE).minimize(loss)
#------------------------------------------------------------------------
#--------------------执行DNN训练计算图-----------------------------------
# 加载MNIST数据集,包括train、validation、test三部分
mnist_dataset = input_data.read_data_sets("./MNIST_data", one_hot=True)
with tf.Session() as sess:
# 初始化计算图中的所有Variables
sess.run(tf.global_variables_initializer())
for i in range(MAX_ITER):
# 读取一个batch的训练数据
train_batch_feature, train_batch_label = mnist_dataset.train.next_batch(BATCH_SIZE)
# 执行DNN训练计算图,完成在当前batch数据上的一次前向传播+后向传播过程
# 通过Feed机制指定当前batch数据作为输入Tensor,通过Fetch机制获取当前损失函数值
_train, loss_value = sess.run([train_step, loss],
feed_dict={input_feature: train_batch_feature, input_label: train_batch_label})
# 每过1000个batch输出在当前训练batch上的损失函数值
if i % 1000 == 0:
print '''After %d training step(s), loss on training batch is %g.''' % (i, loss_value)
# 模型训练结束,输出模型结果
print '''Finish Model Train:'''
for i in range(0, len(model_layer_weight)):
print '''Layer-Weight-%d:''' % i
print model_layer_weight[i].eval()
print '''Layer-Bias-%d:''' % i
print model_layer_bias[i].eval()
print '''MNIST DNN All Finish'''
#------------------------------------------------------------------------
LR Demo
# -*- coding: utf-8 -*-
import numpy as np
import tensorflow as tf
import input_data
def init_weights(shape):
return tf.Variable(tf.random_normal(shape, stddev = 0.01))
def model(X, w):
return tf.matmul(X, w)
# 设置占位符
X = tf.placeholder("float", [None, 784])
Y = tf.placeholder("float", [None, 10])
# 初始化权重
w = init_weights([784, 10])
# 构建模型
py_x = model(X, w)
# 构建损失函数,我们采用softmax和交叉熵来训练模型
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = py_x, labels = Y))
learning_rate = 0.01
train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
predict_op = tf.argmax(py_x, 1)
# 导入数据
mnist_dataset = input_data.read_data_sets("./MNIST_data", one_hot=True)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in xrange(100):
trX, trY = mnist_dataset.train.images, mnist_dataset.train.labels
teX, teY = mnist_dataset.test.images, mnist_dataset.test.labels
sess.run(train_op, feed_dict = {X: trX, Y: trY})
print i, np.mean(np.argmax(teY, axis = 1) == sess.run(predict_op, feed_dict = {X: teX, Y: teY}))
KMeans Demo
import numpy as np
import time
N=10000
K=4
MAX_ITERS = 1000
start = time.time()
points = tf.Variable(tf.random_uniform([N,2]))
cluster_assignments = tf.Variable(tf.zeros([N], dtype=tf.int64))
# Silly initialization: Use the first K points as the starting
# centroids. In the real world, do this better.
centroids = tf.Variable(tf.slice(points.initialized_value(), [0,0], [K,2]))
# Replicate to N copies of each centroid and K copies of each
# point, then subtract and compute the sum of squared distances.
rep_centroids = tf.reshape(tf.tile(centroids, [N, 1]), [N, K, 2])
rep_points = tf.reshape(tf.tile(points, [1, K]), [N, K, 2])
sum_squares = tf.reduce_sum(tf.square(rep_points - rep_centroids),
reduction_indices=2)
# Use argmin to select the lowest-distance point
best_centroids = tf.argmin(sum_squares, 1)
did_assignments_change = tf.reduce_any(tf.not_equal(best_centroids,
cluster_assignments))
def bucket_mean(data, bucket_ids, num_buckets):
total = tf.unsorted_segment_sum(data, bucket_ids, num_buckets)
count = tf.unsorted_segment_sum(tf.ones_like(data), bucket_ids, num_buckets)
return total / count
means = bucket_mean(points, best_centroids, K)
# Do not write to the assigned clusters variable until after
# computing whether the assignments have changed - hence with_dependencies
with tf.control_dependencies([did_assignments_change]):
do_updates = tf.group(
centroids.assign(means),
cluster_assignments.assign(best_centroids))
init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)
changed = True
iters = 0
while changed and iters < MAX_ITERS:
iters += 1
[changed, _] = sess.run([did_assignments_change, do_updates])
[centers, assignments] = sess.run([centroids, cluster_assignments])
end = time.time()
print ("Found in %.2f seconds" % (end-start)), iters, "iterations"
print "Centroids:"
print centers
print "Cluster assignments:", assignments
支持的数值计算
Tensorflow架构实现
整体架构
Tensorflow的系统架构图如下所,从底向上分为设备管理和通信层、数据操作层、图计算层、API接口层、应用层。
其中设备管理和通信层、数据操作层、图计算层是TF的核心层。
底层设备通信层负责网络通信和设备管理。设备管理可以实现Tensorflow设备异构的特性,支持CPU、GPU、Mobile等不同设备。网络通信依赖gRPC通信协议实现不同设备间的数据传输和更新。
第二层是Tensor的OpKernels实现。这些OpKernels以Tensor为处理对象,依赖网络通信和设备内存分配,实现了各种Tensor操作或计算。Opkernels不仅包含MatMul等计算操作,还包含Queue等非计算操作。
第三层是图计算层(Graph),包含本地计算流图和分布式计算流图的实现。Graph模块包含Graph的创建、编译、优化和执行等部分,Graph中每个节点都是OpKernels类型表示。
第四层是API接口层。Tensor C API是对Tensorflow功能模块的接口封装,便于其他语言平台调用。
第四层以上是应用层。不同编程语言在应用层通过API接口层调用TF核心功能实现相关实验和应用。
主要实现
TensorFlow Large-Scale Machine Learning on Heterogeneous Distributed Systems.pdf
在一个 TensorFlow 系统中,用户通过 Session 和 TensorFlow 的 master 进程交互,master 进程将任务分配给不同的 worker 进程,而每个 worker 进程负责在一个或多个设备上执行运算。
单设备执行
TensorFlow 的最简单的执行模型是:
- 一个worker进程在一个设备上进行计算。使用拓扑算法来决定执行哪一个节点,数据流图中 node按照定义的相互依赖关系执行。
- TensorFlow 会保存每个 node 所依赖的,并且没有执行完毕的 node 的个数,当所有依赖的 node 执行完毕之后,该 node 会被加入一个「就绪队列」中。
多设备执行
在一套标准系统上通常有多个计算设备. TensorFlow 支持 CPU 和 GPU 这两种设备. 我们用指定字符串 strings 来标识这些设备. 比如:
- “/cpu:0”: 机器中的 CPU
- “/gpu:0”: 机器中的 GPU, 如果你有一个的话.
- “/gpu:1”: 机器中的第二个 GPU, 以此类推…
如果一个 TensorFlow 的 operation 中兼有 CPU 和 GPU 的实现, 当这个算子被指派设备时, GPU 有优先权. 比如matmul中 CPU 和 GPU kernel 函数都存在. 那么在 cpu:0 和 gpu:0 中, matmul operation 会被指派给 gpu:0
如果你不想使用系统来为 operation 指派设备, 而是手工指派设备, 你可以用 with tf.device 创建一个设备环境, 这个环境下的 operation 都统一运行在环境指定的设备上.
with tf.device('/cpu:0'):
a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
c = tf.matmul(a, b)
# 新建session with log_device_placement并设置为True.
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
# 运行这个op.
print sess.run(c)
/job:localhost/replica:0/task:0/gpu:0 -> device: 0, name: Tesla K40c, pci bus
id: 0000:05:00.0
b: /job:localhost/replica:0/task:0/cpu:0
a: /job:localhost/replica:0/task:0/cpu:0
MatMul: /job:localhost/replica:0/task:0/gpu:0
[[ 22. 28.]
[ 49. 64.]]
决定设备
对于一个给定的数据流图,TensorFlow 会使用设备分配算法(placement algorithm)负责将计算任务映射到可用的设备上。设备分配分配算法需要将成本模型(cost model)作为参数,它包含了每个 node 中计算操作的输入和输入张量的大小(以字节为单位)和该 node 估计的计算时间。
- 设备分配算法模拟数据流图的计算过程并使用贪心策略(greedy heuristic)来为每个 node 分配运算设备。
- 设备分配算法首先从数据流图的源头开始对每个 node 的计算过程进行模拟。当某个 node 需要计算资源时,设备分配算法会将运行该计算的预计时间最短的可用设备分配给该节点。对于需要多个计算设备的 node,分配算法会使用贪心策略考虑将计算分配到不同设备后所需要的计算时间,并会考虑设备间数据通信的成本。总之,分配算法会将执行某计算操作最快的可用设备分配给 node。
预估时间有两种方法:
- 使用启发式的算法,通过把输入和输出的类型以及tensor的大小输入进去,得到时间的预估。
- 使用模拟的方法,对图的计算进行一个模拟,得到各个计算在其可用的设备上的时间。
寻找合适设备是Tensorflow区分与之前很多系统的地方,之前的系统比如Parameter Server,是参数分离出来,运算在一起,同时使用数据切分来达到分布式。而Tensorflow是把每个op都映射到某个机器上,意味着每个op可能在不同的机器上,这是对系统的进一步剖离,因而可以达到更高的可扩展性。
- Tensorflow的分布式是op粒度的分布式,是把计算逻辑在各节点进行了分布式,一个计算图执行过程中数据流是一致的;在ELF/Hippo系统中,是数据流的分布式,每个节点执行的计算逻辑是一致的。
- Tensorflow中模型参数的分布式,也依赖于存储参数的Variable op的分布式设置,若一个Variable op的参数超内存,需要在应用程序里来进行拆分Varaible op实现参数分布式存储。
- Tensorflow中一个计算单元op无法被设置到多个设备上来执行,即计算逻辑无法并行;若需要并行计算,需要在应用程序里调用Sesson会话执行计算图的过程并行化(见后面并行执行部分)。
跨设备通信
当设备分配算法结束时,数据流图会被划分为多个子图,每个子图对应一个计算设备。位于不同运算设备的任务 x 和 y 之间的通讯被新建的 Send 和 Receive node所接管。
插入 Receive 和 Send 节点后,TensorFlow 使依赖于特定张量的操作使用同一个 Receive node,而不是每个操作都拥有一个 Receive node,这样可以避免不必要的内存分配,并可以解决数据同步问题。
这样处理设备通讯的方法可以使得管理分配在不同设备上的 node 实现去中心化。因为 Send 和 Receive nodes 解决了数据同步问题,所以 master 进程仅仅需要对每个 worker 进程发出运行指令,而不需要管理位于不同运算设备上计算任务之间的通信。
分布式系统执行
TensorFlow 在分布式系统上的执行和在多个设备上的执行方式类似,没有类似Yarn的单独集群调度系统。
在设备分配算法运行完后,每个子数据流图被分配到某个设备上,Send 和 Receive node 使用远程连接协议,比如:TCP 和 RDMA,在不同系统间传输数据。
容错
在分布式系统的运行过程中,错误可能在许多地方被检测到。TensorFlow 主要依赖于:
- 在 Send 和 Receive 之间的通信错误
- master 进程对每个 worker 进程的周期性检查
当某个错误被检测到时,整个数据流图的计算将被中断并且从头开始。注意,因为 Variable 保存在计算过程中持续存在的张量,所以 TensorFlow 将每个 Variable 与一个 Save 节点连接,Save 节点会定义保存 Variable 的状态。当错误发生时,TensorFlow 可以从 Save 保存的最近的状态恢复。
计算图在batch数据粒度的Failover,一旦检测到容错,计算图在所有设备所有worker进程上的状态恢复到当前batch开始前,重新计算当前batch的所有计算过程。
数值计算实现
Tensor
Tensorflow的Tensor定义和运算主要是调用Eigen矩阵计算库完成的,Eigen是高效易用的C++开源库,有效支持线性代数,矩阵和矢量运算,数值分析及其相关的算法,支持CPU和GPU加速计算。
Tensor的定义在Tensorflow源码中的第三方依赖/third_party/eigen3下,UML定义如图,其中TensorBuffer指针指向Eigen::Tensor类型,Eigen::Tensor不属于Eigen官方维护的程序,由贡献者提供文档和维护,所以Tensor定义在Eigen unsupported模块中。
Tensor主要包含两个变量m_data和m_dimension,m_data保存了Tensor的数据块,T是泛化的数据类型,m_dimensions保存了Tensor的维度信息。
Eigen::Tensor的成员变量很简单,却支持非常多的基本运算,再借助Eigen的加速机制实现快速计算,Eigen::Tensor主要包含了
- 一元运算(Unary),如sqrt、square、exp、abs等。
- 二元运算(Binary),如add,sub,mul,div等
- 选择运算(Selection),即if / else条件运算
- 归纳运算(Reduce),如reduce_sum, reduce_mean等
- 几何运算(Geometry),如reshape,slice,shuffle,chip,reverse,pad,concatenate,extract_patches,extract_image_patches等
- 张量积(Contract)
- 卷积运算(Convolve)
Operator Kernel
OpKernel类(core/framework/op_kernel.h)是所有Op类的基类。继承OpKernel还可以自定义新的Op类。用的较多的Op如(MatMul, Conv2D, SoftMax, AvgPooling, Argmax等)。
所有Op包含注册(Register Op)和实现(正向计算、梯度定义)两部分。所有Op类的实现需要overide抽象基函数 void Compute(OpKernelContext* context),实现自身Op功能。
若Op需要支持多种类型设备上的异构计算(CPU/GPU,Tensorflow目前不支持FPGA),需要在实现OpKernel类时根据Device类型实现不同的正向计算过程。
用户可以根据需要自定义新的Op操作,下面以最常用的矩阵乘Op MatMul为例说明Operator Kernel的实现。
op的属性和定义
/core/ops/ops.pbtxt
name: "MatMul"
input_arg {
name: "a"
type_attr: "T"
}
input_arg {
name: "b"
type_attr: "T"
}
output_arg {
name: "product"
type_attr: "T"
}
attr {
name: "transpose_a"
type: "bool"
default_value {
b: false
}
description: "If true, "a" is transposed before multiplication."
}
attr {
name: "transpose_b"
type: "bool"
default_value {
b: false
}
description: "If true, "b" is transposed before multiplication."
}
attr {
name: "T"
type: "type"
allowed_values {
list {
type: DT_HALF
type: DT_FLOAT
type: DT_DOUBLE
type: DT_INT32
type: DT_COMPLEX64
type: DT_COMPLEX128
}
}
}
summary: "Multiply the matrix "a" by the matrix "b"."
description: "The inputs must be two-dimensional matrices and the inner dimension of\n"a" (after being transposed if transpose_a is true) must match the\nouter dimension of "b" (after being transposed if transposed_b is\ntrue).\n\n*Note*: The default kernel implementation for MatMul on GPUs uses\ncublas."
}
Register
/core/ops/math_ops.cc
.Input("a: T")
.Input("b: T")
.Output("product: T")
.Attr("transpose_a: bool = false")
.Attr("transpose_b: bool = false")
.Attr("T: {half, float, double, int32, complex64, complex128}")
.SetShapeFn(shape_inference::MatMulShape)
.Doc(R"doc(
Multiply the matrix "a" by the matrix "b".
The inputs must be two-dimensional matrices and the inner dimension of
"a" (after being transposed if transpose_a is true) must match the
outer dimension of "b" (after being transposed if transposed_b is
true).
*Note*: The default kernel implementation for MatMul on GPUs uses
cublas.
transpose_a: If true, "a" is transposed before multiplication.
transpose_b: If true, "b" is transposed before multiplication.
)doc");
正向计算
MatMul的实现部分在core/kernels/matmul_op.cc文件中,类MatMulOp继承于OpKernel,成员函数Compute完成计算操作。
class MatMulOp : public OpKernel {
public:
explicit MatMulOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_a", &transpose_a_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("transpose_b", &transpose_b_));
}
void Compute(OpKernelContext* ctx) override {
const Tensor& a = ctx->input(0);
const Tensor& b = ctx->input(1);
// Check that the dimensions of the two matrices are valid.
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(a.shape()),
errors::InvalidArgument("In[0] is not a matrix"));
OP_REQUIRES(ctx, TensorShapeUtils::IsMatrix(b.shape()),
errors::InvalidArgument("In[1] is not a matrix"));
Eigen::array<eigen::IndexPair<eigen::DenseIndex>, 1> dim_pair;
dim_pair[0].first = transpose_a_ ? 0 : 1;
dim_pair[0].second = transpose_b_ ? 1 : 0;
OP_REQUIRES(
ctx, a.dim_size(dim_pair[0].first) == b.dim_size(dim_pair[0].second),
errors::InvalidArgument(
"Matrix size-incompatible: In[0]: ", a.shape().DebugString(),
", In[1]: ", b.shape().DebugString()));
int a_dim_remaining = 1 - dim_pair[0].first;
int b_dim_remaining = 1 - dim_pair[0].second;
TensorShape out_shape(
{a.dim_size(a_dim_remaining), b.dim_size(b_dim_remaining)});
Tensor* out = nullptr;
OP_REQUIRES_OK(ctx, ctx->allocate_output(0, out_shape, &out));
if (out->NumElements() == 0) {
// If a has shape [0, x] or b has shape [x, 0], the output shape
// is a 0-element matrix, so there is nothing to do.
return;
}
if (a.NumElements() == 0 || b.NumElements() == 0) {
// If a has shape [x, 0] and b has shape [0, y], the
// output shape is [x, y] where x and y are non-zero, so we fill
// the output with zeros.
functor::SetZeroFunctor<device, T> f;
f(ctx->eigen_device<device>(), out->flat<t>());
return;
}
LaunchMatMul<device, T, USE_CUBLAS>::launch(ctx, this, a, b, dim_pair, out);
}
private:
bool transpose_a_;
bool transpose_b_;
};
MatMul实现了CPU和GPU两个版本,其中CPU版本使用Eigen库,GPU版本使用cuBLAS库。
CPU版的MatMul使用Eigen库,调用方式如下:
struct MatMulFunctor<cpudevice, T> {
void operator()(
const CPUDevice& d, typename MatMulTypes<t>::out_type out,
typename MatMulTypes<t>::in_type in0,
typename MatMulTypes<t>::in_type in1,
const Eigen::array<eigen::IndexPair<eigen::DenseIndex>, 1>& dim_pair) {
MatMul<cpudevice>(d, out, in0, in1, dim_pair);
}
};
GPU版的MatMul使用cuBLAS库,准确而言是基于cuBLAS的stream_executor库。Stream executor是google开发的开源并行计算库,调用方式如下:
其中stream类似于设备句柄,可以调用stream executor中的cuda模块完成运算。
struct LaunchMatMul<gpudevice, T, true /* USE_CUBLAS */> {
static void launch(
OpKernelContext* ctx, OpKernel* kernel, const Tensor& a, const Tensor& b,
const Eigen::array<eigen::IndexPair<eigen::DenseIndex>, 1>& dim_pair,
Tensor* out) {
perftools::gputools::blas::Transpose trans[] = {
perftools::gputools::blas::Transpose::kNoTranspose,
perftools::gputools::blas::Transpose::kTranspose};
const uint64 m = a.dim_size(1 - dim_pair[0].first);
const uint64 k = a.dim_size(dim_pair[0].first);
const uint64 n = b.dim_size(1 - dim_pair[0].second);
bool transpose_a = dim_pair[0].first == 0;
bool transpose_b = dim_pair[0].second == 1;
auto blas_transpose_a = trans[transpose_a];
auto blas_transpose_b = trans[transpose_b];
auto* stream = ctx->op_device_context()->stream();
OP_REQUIRES(ctx, stream, errors::Internal("No GPU stream available."));
auto a_ptr = AsDeviceMemory(a.template flat<t>().data());
auto b_ptr = AsDeviceMemory(b.template flat<t>().data());
auto c_ptr = AsDeviceMemory(out->template flat<t>().data());
// Cublas does
// C = A x B
// where A, B and C are assumed to be in column major.
// We want the output to be in row-major, so we can compute
// C' = B' x A' (' stands for transpose)
if (LaunchBlasGemv<t>::IsSupported() && n == 1) {
// This is a matrix*vector multiply so use GEMV to compute A * b.
// Here we are multiplying in the natural order, so we have to flip
// the transposition flag to compensate for the tensor being stored
// row-major.
LaunchBlasGemv<t>::Compute(ctx, stream, !transpose_a, transpose_a ? m : k,
transpose_a ? k : m, a_ptr, b_ptr, &c_ptr);
} else {
bool blas_launch_status =
stream
->ThenBlasGemm(blas_transpose_b, blas_transpose_a, n, m, k, 1.0f,
b_ptr, transpose_b ? k : n, a_ptr,
transpose_a ? m : k, 0.0f, &c_ptr, n)
.ok();
if (!blas_launch_status) {
ctx->SetStatus(errors::Internal(
"Blas GEMM launch failed : a.shape=(", a.dim_size(0), ", ",
a.dim_size(1), "), b.shape=(", b.dim_size(0), ", ", b.dim_size(1),
"), m=", m, ", n=", n, ", k=", k));
}
}
}
};
梯度定义
MatMul的梯度计算本质上也是一种kernel ops,描述为MatMulGrad。MatMulgrad操作是定义在grad_ops工厂中,类似于ops工厂。
在/core/ops/math_grad.cc中由Function Define Helper完成定义:
const string& attr_adj_x,
const string& attr_adj_y, const string& x0,
bool ax0, const string& x1, bool ax1,
const string& y0, bool ay0, const string& y1,
bool ay1) {
*g = FDH::Define(
// Arg defs
{"x: T", "y: T", "dz: T"},
// Ret val defs
{"dx: T", "dy: T"},
// Attr defs
{{"T: {half, float, double}"}},
// Nodes
{
{{"dx"},
opname,
{x0, x1},
{{"T", "$T"}, {attr_adj_x, ax0}, {attr_adj_y, ax1}}},
{{"dy"},
opname,
{y0, y1},
{{"T", "$T"}, {attr_adj_x, ay0}, {attr_adj_y, ay1}}},
});
return Status::OK();
}
Status MatMulGradCommon(const string& opname, const string& attr_adj_x,
const string& attr_adj_y, const AttrSlice& attrs,
FunctionDef* g) {
DataType T;
TF_RETURN_IF_ERROR(GetNodeAttr(attrs, "T", &T));
if (T == DT_COMPLEX64 || T == DT_COMPLEX128) {
return errors::Unimplemented(
"MatMul gradient for complex is not supported yet.");
}
bool ta;
bool tb;
TF_RETURN_IF_ERROR(GetNodeAttr(attrs, attr_adj_x, &ta));
TF_RETURN_IF_ERROR(GetNodeAttr(attrs, attr_adj_y, &tb));
if (!ta && !tb) {
return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "dz", false, "y",
true, "x", true, "dz", false);
}
if (!ta && tb) {
return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "dz", false, "y",
false, "dz", true, "x", false);
}
if (ta && !tb) {
return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "y", false, "dz",
true, "x", false, "dz", false);
}
CHECK(ta && tb);
return MatMulGradHelper(g, opname, attr_adj_x, attr_adj_y, "y", true, "dz",
true, "dz", true, "x", true);
}
Status MatMulGrad(const AttrSlice& attrs, FunctionDef* g) {
return MatMulGradCommon("MatMul", "transpose_a", "transpose_b", attrs, g);
}
REGISTER_OP_GRADIENT("MatMul", MatMulGrad);
// clang-format off
return GradForUnaryCwise(g, {
{{"y"}, "Tanh", {"x"}},
{{"y2"}, "Square", {"y"}, {}, {"dy"}},
FDH::Const("const", 1.0f),
{{"one"}, "Cast", {"const"}, {{"SrcT", DT_FLOAT}, {"DstT", "$T"}}},
{{"a"}, "Sub", {"one", "y2"}},
{{"dx"}, "Mul", {"dy", "a"}}, // dy * (1 - y*y)
});
// clang-format on
}
REGISTER_OP_GRADIENT("Tanh", TanhGrad);
高级特性
梯度计算
TensorFlow通过扩展图的方式实现了自动求导:
对于每张计算图,得到从输入I到输出C的路径,并从C到I回溯,回溯过程中对于路径上的每个节点A,添加另一个节点来计算A’来计算偏导,在计算偏导的过程中,A’不仅仅将上一层传下来的反向导数作为输入,还可能将A的输入和输出也作为其输入。
部分执行
TensorFlow 支持部分子图的运行。当用户将整个数据流图构建完毕之后,可以调用 Run 方法来确定要运行的任意子图,并且可以向数据流图的任意边输入数据,或从任意边读取数据。数据流图中的每个节点都有一个名字,该节点的每个输出都由节点名和输出端口确定,如 bar:0 表示 bar 节点的第一个输出。Run 方法的两个参数就可以确定唯一的子图。
当计算一个子图时,会在为子图创建一个 feed 节点作为输入,fetch 节点用来接收输出。
并行执行
在多设备执行中提到,Tensorflow中一个计算单元op无法被设置到多个设备上来执行,即计算逻辑无法并行;若需要并行计算,需要在应用程序里调用Sesson会话执行计算图的过程并行化。
可以通过数据并行、模型并行和并发步(concurrent steps)两种机制实现Tensorflow程序,来加速计算密集型的大规模神经网络的训练。
数据并行
数据并行分为同步数据并行和异步数据并行。数据并行的意思是,用很多的模型副本,每个副本运行在不同的数据上,然后同时训练,更新模型。通过更新模型的方式不同,可以分为同步和异步,同步的方式是一个用户线程驱动整个大循环,如图上部,等到所有的Δp(要更新的参数)都算出来后,将它们相加去更新模型。而异步的方式不同,每个模型副本自己异步地对模型参数进行更新,不用等到所有的梯度算出来再更新,每个模型副本有一个用户线程。见图下部。
模型并行和并发步
模型并行的意思是,对于同样一批数据,模型计算的不同部分分散在不同的计算设备上同时进行。如图是一个循环深度LSTM用来做序列到序列学习的例子。
并发步是另一种通常的做法,即通过在同样的设备集合中运行少数的并发步来将同一设备中的模型计算流水线化
并行执行是Tensorflow区分与Parameter Server的一个重要区别 ,对于Tensorflow来说,计算节点是分离的,参数(variable op, 特殊的计算节点)也是分离的,因而其ps也是分离的。每个设备上可能分配了计算节点,然后其对应的ps也在该设备上。
ParameterServer中,计算和参数是分开的,但计算和计算、参数和参数本身分别是在一起的;而Tensorflow中,计算本身是分离的,参数也是分离的,而计算和参数是在一起的。
设备限制
TensorFlow 的用户可以通过对 nodes 添加对计算设备的限制来控制 nodes 的运算设备分配。用户可以设置某 node 仅可以在 GPU 上运算,或仅可以在设备的某进程中计算。
控制流
TensorFlow 中添加了一些基本的控制流操作,可以处理环状的数据流图。switch 和 merge 操作使我们可以跳过整个子图的执行;enter,leave 和 nextIteration 操作使我们可以表达迭代。更高阶的 if 和 while 语句可以使用这些基本的原语来实现。
TensorFlow 实现了 tags 和 frames 标记,循环中的每一次迭代都被赋予唯一的 tag,循环的执行状态用 frame 来分割。一个可用的输入可以再任意时候进入迭代,这样,多次可以被并行执行。
TensorFlow 使用分布式定位技术(Distributed Coordination Mechanism)来执行带有控制流的数据流图。一般来说,一个循环可能包含被分贝在多个运算设备的 node。所以,管理循环的状态就变成了一个分布式的终止探测问题。TensorFlow 通过使用图重写(graph rewriting)来解决这个问题。在数据流图分割过程中,TensorFlow 会在每个子图中添加控制节点。这些节点实现了一个状态机,可以检测每次迭代的开始和结束,并决定是否终止循环。
我们常常使用梯度下降来训练机器学习模型,并且将梯度的计算过程作为数据流图的一部分。当一个模型包含了控制流时,我们必须判断控制流分支的方向,再计算梯度;同样的,当一个模型拥有一个 while 循环时,我们需要依赖于循环的中间值来进行计算。TensorFlow 尝试重写子图来保存计算梯度所需要的信息。
输入操作
TensorFlow 处理支持通过使用 feed node 来为计算提供数据外,也支持添加用于输入数据的 input node,它们使用文件名来配置,并可以每次执行时产生包含了一个或多个存储来文件中的数据的 tensor。
队列
TensorFlow 中的队列允许数据流图中的不同部分异步地执行,并且通过 enqueue 和 dequeue 要求或提供数据。enqueue 可以将队列阻塞,直到有足够的可用空间,之后将数据放入队列;dequeue 将队列阻塞直到队列中有足够的可用元素。队列使得 TensorFlow 可以实现并行计算,当上一组数据仍在被用于计算时,下一组数据可直接被取出用于计算。
除了 FIFO 队列外,TensorFlow 还实现了 Shuffling Queue,它可以随机地“打乱”队列中的元素,并输出其中一组。这样的队列对于需要 shuffling 功能的机器学习算法十分关键。
容器
容器用于管理长期存在的可变状态。一个 Variable 被存储在一个容器内,默认的容器知道进程结束后才会被销毁。通过使用容器,可以实现在与多个不同 session 想关联的数据流图之间共享状态。
优化工作
TensorFlow 中做了许多重要的性能优化,主要有:
- Common Subexpression Elimination。TensorFlow 可以删除多余的计算操作,如同一个计算操作的多个具有相同输入输出的拷贝。
- Controlling Data Communication and Memory Usage。TensorFlow 通过规划计算操作的执行对系统的性能实现了优化,尤其是在数据转移和内存占用方面。
- Asynchronous Kernels。TensorFlow 拥有非阻塞的内核,该内核函数被传入一个计算任务,在之前的计算任务结束后,被传入的任务继续执行。
- Optimized Libraries for Kernel Implementations。TensorFlow 使用高度优化的数学运算库来实现操作。
- Lossy Compression。当在不同设备间传输数据时,TensorFlow 使用 lossy compression 来压缩高精度数据,从而加快传输效率。
Tensorflow的特性
Tensorflow官方给出的特性:
非PServer架构
计算本身分离、参数本身分离、计算和参数在一起。
用户可自行分配Variable Op到固定设备或节点来实现类ParameterServer架构。
- 对系统的进一步剖离,因而可以达到更高的可扩展性
- 用户自己实现并行化,计算之间带来额外网络开销,影响性能
- 非Key-Value存储架构,对稀疏的训练数据支持差,对于稀疏特征的样本进行计算时,无法按Key取得需要的参数,只能获取所有参数的Tensor参与计算,带来很大的额外通信和内存消耗。
相比ParameterServer架构可以只取出现的Key-Value参数,极大减少稀疏样本下计算时的参数量,这也是ParameterServer架构的一个重要优点。
符号式编程模式
编程模式通常分为命令式编程(imperative style programs)和符号式编程(symbolic style programs)。caffe、mxnet和Tensorflow都使用了符号式编程。其中caffe、mxnet采用了两种编程模式混合的方法,而Tensorflow是完全采用了符号式编程。
命令式编程是常见的编程模式,编程语言如python/C++都采用命令式编程。命令式编程明确输入变量,并根据程序逻辑逐步运算,这种模式非常在调试程序时进行单步跟踪,分析中间变量。
符号式编程将计算过程抽象为计算图,计算流图可以方便的描述计算过程,所有输入节点、运算节点、输出节点均符号化处理。计算图通过建立输入节点到输出节点的传递闭包,从输入节点出发,沿着传递闭包完成数值计算和数据流动,直到达到输出节点。这个过程经过计算图优化,以数据(计算)流方式完成,节省内存空间使用,计算速度快,但不适合程序调试。
主要优点
- 灵活性,可扩展性:用户自己通过计算单元op构造数据流图,计算逻辑实现灵活。方便实现特殊结构的神经网络。
- 可移植性/异构性:支持CPU/GPU,比较方便地部署在个人电脑、服务器、移动设备、Docker容器。
- 可视化:提供强大的Tensorboad可视化工具。
- 社区活跃度:最高的社区活跃度,资料最多,发展最快。
主要缺点
- 性能:发布初期性能一直比其他深度学习工具差(tensorslow),Tensorflow1.0有了较大的性能提升,官方发布的性能测试benchmark已经持平或优于其他深度学习工具;性能依赖用户自己的具体实现。
- 易用性:使用门槛较较高,需要自己编写代码进行算法实现,要求用户有较好算法理论基础,API多、学习成本高;符号式编程调试相对困难;Tensorflow1.0引入Keras易用性有提升。
- 数据只支持Tensor,对稀疏格式和复杂参数支持较差,稀疏格式性能差。
- 新增加计算单元困难:如需要增加新的激活函数,需要基于C++源码,在现有的OpKenerl机制上进行扩展,从源码重新编译安装Tensorflow;开源社区的活跃弥补了一些。
- *Failover方案不够健壮。