2017年6月8日分布式深度學習的時代開始了。 在這一天,Facebook發(fā)表了一篇論文,展示了他們使用32臺服務器上的256塊GPU,將卷積神經(jīng)網(wǎng)絡(ImageNet上的RESNET-50)的訓練時間從兩周減少到一個小時。 在軟件實現(xiàn)上,他們引入了一種技術,在Mini-Batch樣本數(shù)量極大的情況下,訓練卷積神經(jīng)網(wǎng)絡(ConvNets):讓學習率跟Mini-Batch大小成比例。這意味著,任何人現(xiàn)在都可以使用TensorFlow,將分布式訓練擴展到數(shù)百個GPU。 不過,這不是分布式TensorFlow的唯一優(yōu)勢:通過在許多GPU上并行運行多個實驗,您還可以大量減少實驗時間。 這樣可以減少為神經(jīng)網(wǎng)絡尋找超參數(shù)所需的時間。
AI的未來,是彈性可擴展的計算方法。
-Rich Sutton,強化學習之父
在本教程中,我們將探索使用TensorFlow的兩種不同的分布式方法:
1. 在許多GPU(和服務器)上運行并行實驗來搜索好的超參數(shù)
2. 通過多個GPU(和服務器)分布式訓練單個網(wǎng)絡,減少訓練時間
我們將在這篇文章中,提供方法(1)和(2)的樣例代碼,但首先,我們需要澄清我們將要討論的分布式深度學習的類型。
模型并行與數(shù)據(jù)并行
一些神經(jīng)網(wǎng)絡模型非常龐大,單個設備(GPU)的內(nèi)存是存不下的。 Google的神經(jīng)機器翻譯(Neural Machine Translation,NMT)系統(tǒng)就是這樣的一個例子。 這些模型需要在許多設備(TensorFlow文檔中的所謂的workers)上分開,進行并行訓練。 例如,神經(jīng)網(wǎng)絡中的不同層(layer)可以在不同的GPU上并行訓練。 這種訓練過程通常稱為『模型并行』(TensorFlow文檔中,也稱為『計算圖內(nèi)復制』)。 取得良好的表現(xiàn)是很具有挑戰(zhàn)性的,我們對于這種方法在這里不再贅述。
在『數(shù)據(jù)并行』方法(TensorFlow文檔中,也稱為『計算圖間復制』)中,每個設備上都有相同的模型,但是每個設備都使用不同的訓練樣本進行模型訓練。 這與『模型并行』在設備之間劃分模型、存儲相同數(shù)據(jù)形成了鮮明對比。 每個設備將獨立地計算其訓練樣本預測值與樣本標記的輸出(這些訓練樣本的真值)之間的誤差。 由于每個設備都使用不同的樣本進行訓練,因此它為模型計算的變化(即『梯度』)也有所不同。然而,對每個新迭代,算法都依賴于將所有結果整合進行處理,就像算法在單個處理器上運行一樣。 因此,每個設備都必須將所有變化發(fā)送到所有其他設備上的模型中。
在本文中,我們關心的是『數(shù)據(jù)并行』。圖1描繪了典型的數(shù)據(jù)并行方法,為256卡GPU集群中的每一卡分配32張不同的圖像。總計,一次迭代的mini-batch大小是8,092張圖(32 x 256)。

圖1.在數(shù)據(jù)并行中,設備使用不同的訓練數(shù)據(jù)子集進行訓練。感謝Jim Dowling提供圖片
同步與異步分布式訓練
隨機梯度下降(SGD)是一種尋找最優(yōu)值的迭代算法,在訓練AI的算法中是眾多最受歡迎的選擇之一。 它涉及多輪訓練,每一輪的結果都納入模型中,產(chǎn)生結果,為下一輪的訓練做好準備。多輪迭代可以在多個設備上同步或異步執(zhí)行。
每次SGD迭代運行在被稱為mini-batch的一小批訓練樣本上(Facebook使用了8092張圖的大型mini-batch)。 在同步訓練中,所有設備使用單個(大)mini-batch的不同部分數(shù)據(jù)來訓練其本地的模型。然后,它們將本地計算好的梯度(直接或間接)與其他所有設備進行通信。只有在所有設備成功計算并發(fā)送了梯度后,模型才會更新。 然后,更新的模型與下一個被均分的mini-batch一起,被發(fā)送到所有節(jié)點。 也就是說,計算設備在小批量的非重復分割(子集)上進行訓練。
雖然并行有很大的加速訓練的潛力,但它自然而然會引入額外的計算開銷。大型模型和/或慢速網(wǎng)絡會增加訓練時間。 如果存在一個拖后腿的設備(計算速度慢或者網(wǎng)絡連接速度慢),訓練過程會嚴重停滯。我們還希望減少訓練模型所需的迭代次數(shù),因為每次迭代中,都需要將更新的模型廣播到所有節(jié)點。
因此,這這意味著,我們在不會讓訓練模型精度降低的前提下,盡可能增大mini-batch的數(shù)據(jù)量。
在他們的論文中,F(xiàn)acebook引入了一種線性縮放規(guī)則來提高學習速度,從而實現(xiàn)在巨大的mini-batch上進行訓練的目的。這一規(guī)則闡述了,『Mini-batch放大k倍時,學習率也要放大k倍』,不過先決條件是,在達到目標學習速率之前,在前面的幾個epoch上緩慢的增加這個放大系數(shù)。
在異步訓練中,任何設備都不會等待來自任何其他設備的模型更新。這些設備可以獨立運行,在對等設備(peer) 之間共享結果,或者依靠一個或多個中心化的服務器『參數(shù)服務器(Parameter Server)』進行通信。 在對等體系結構(Peer Architecture)中,每個設備都執(zhí)行這種循環(huán):讀取數(shù)據(jù),計算梯度,將它們(直接或間接)發(fā)送到所有設備,并將模型更新為最新版本。 在更中心化的體系結構中,計算設備將輸出以梯度的形式發(fā)送到參數(shù)服務器。 這些服務器對梯度進行收集和聚合。 在同步訓練中,參數(shù)服務器計算模型的最新版本,并將其發(fā)送回設備。 在異步訓練中,參數(shù)服務器將梯度發(fā)送到本地設備以計算新模型。 在兩種體系結構中,循環(huán)都會一直重復到訓練。 圖2說明了異步和同步訓練之間的區(qū)別。

圖2.隨機梯度下降(SGD)的異步和同步訓練。 感謝Jim Dowling提供圖片
參數(shù)服務器架構
當并行SGD使用參數(shù)服務器時,算法首先把模型廣播給worker(計算設備)。 在每個迭代中,每個worker從mini-batch中讀取自己的那部分,計算屬于自己的梯度,并將這些梯度發(fā)送到一個或多個參數(shù)服務器。 參數(shù)服務器會聚合來自設備的所有梯度,并等待所有設備完成,然后在下一次迭代中計算新模型,再廣播給所有設備。 數(shù)據(jù)流如圖3所示。

圖3.服務于同步SGD的參數(shù)服務器體系結構 感謝Jim Dowling提供圖片
Ring-allreduce架構
在ring-allreduce體系結構中,不存在給worker提供聚合梯度計算的中心化服務器。 相反,在迭代中,每個工作設備讀取mini-batch中屬于自己的那一部分,計算其梯度,將梯度發(fā)送到環(huán)上的后繼近鄰節(jié)點,并從環(huán)上的上一個近鄰節(jié)點接收梯度。對于具有N個worker的環(huán),所有worker都需要收集到經(jīng)過其他worker的N-1個梯度信息之后,才足夠計算能夠計算新模型的梯度。
Ring-allreduce針對帶寬優(yōu)化的,因為它確保了每個主機上可用的上行和下載網(wǎng)絡帶寬得到充分利用(這一點與參數(shù)服務器模型相反)。 Ring-allreduce還可以將深層神經(jīng)網(wǎng)絡中較低層的梯度計算與高層梯度的傳輸重疊,從而進一步減少訓練時間。 數(shù)據(jù)流如圖4所示。

圖4.用于SGD的ring-allreduce體系結構 感謝Jim Dowling提供的圖片
并行實驗
至此,到目前為止,我們已經(jīng)覆蓋了分布式訓練的命題。 不過,利用很多GPU,也可以用于并行化的超參數(shù)優(yōu)化。也就是說,當我們想要找到適當?shù)膶W習率,或mini-batch的大小,我們可以使用不同的超參數(shù)組合,并行運行許多實驗。 在所有實驗完成后,我們可以使用結果來確定,是否還需要進行更多實驗,或者當前的超參數(shù)是否足夠好。如果現(xiàn)有的超參數(shù)是可接受的,你可以用這組超參數(shù)在許多GPU上訓練模型。
在TensorFlow中分布式GPU的兩種用途
以下部分說明如何使用TensorFlow進行并行實驗和分布式訓練。
并行實驗
在許多GPU上,并行參數(shù)掃描是很容易的,因為我們只需要一個中心點來安排實驗。 TensorFlow不提供內(nèi)置的啟停TensorFlow服務器的功能,因此我們將使用Apache Spark,在PySpark中的mapper函數(shù)中,運行每個TensorFlow Python程序。 下面,我們定義一個啟動函數(shù),接受三個參數(shù):(1)Spark session的對象(2)一個map_fun ,指定要在每個Spark executor中執(zhí)行的TensorFlow函數(shù),以及(3)包含超參數(shù)的字典args_dict。 Spark能夠在Spark executor中,并行運行許多TensorFlow服務器。 Spark executor是執(zhí)行分布式任務的分布式服務。在這個例子中,每個executor都會從args_dict讀取數(shù)據(jù),利用executor_num作為索引,計算出它該使用的的參數(shù)值param_val ,然后用所提供的訓練函數(shù),結合這些超參數(shù)進行訓練。
def launch ( spark_session , map_fun , args_dict ):
“”” Execute a ‘map_fun’ for each hyperparameter combination from the dictionary ‘args_dict’
Args:
:spark_session: SparkSession object
:map_fun: The TensorFlow function to run (wrapped inside a Spark mapper function)
:args_dict: hyperparameters to insert as arguments for each TensorFlow function
“””
sc = spark_session . sparkContext
# Length of the list of the first list of arguments represents the number of Spark tasks
num_tasks = len ( args_dict ()[ 0 ])
# Create a number of partitions (tasks)
nodeRDD = sc . parallelize ( range ( num_tasks ), num_tasks )
# Execute each of the hyperparameter arguments as a task
nodeRDD . foreachPartition ( _do_search ( map_fun , args_dict ))
def _do_search ( map_fun , args_dict ):
def _wrapper_fun ( iter ):
for i in iter :
executor_num = i
arg_count = map_fun . func_code . co_argcount
names = map_fun . func_code . co_varnames
args = []
arg_index = 0
while arg_count > 0 :
# Get arguments for hyperparameter combination
param_name = names [ arg_index ]
param_val = args_dict [ param_name ][ executor_num ]
args . append ( param_val )
arg_count -= 1
arg_index += 1
map_fun ( * args )
return _wrapper_fun
現(xiàn)在可以在Spark中,調用mnist 數(shù)據(jù)集適合的TensorFlow訓練函數(shù)。 請注意,我們只調用啟動函數(shù)一次,但是對于每種超參數(shù)組合,都有一個不同的executor執(zhí)行任務(總共四個):
args_dict = { ‘learning_rate’ : [ 0.001 ], ‘dropout’ args_dict ‘learning_rate’ : [ args_dict ]}
def mnist ( learning_rate , mnist ):
“””
An implementation of FashionMNIST should go here
“””
launch ( spark , mnist , args_dict ):
分布式訓練
我們將簡要介紹TensorFlow分布式訓練的三種框架:原生的分布式TensorFlow,TensorFlowOnSpark,以及Horovod。
分布式TensorFlow
分布式TensorFlow應用包含一個集群,其中有一個或多個參數(shù)服務器,以及許多worker。因為Worker在訓練期間計算梯度,通常將其在GPU上執(zhí)行。 參數(shù)服務器只需要聚合梯度,把更新廣播出去,因此它們通常被置于CPU而不是GPU上。
其中一個worker,被稱為『主worker』,它負責協(xié)調模型訓練,模型初始化,已完成訓練步驟的統(tǒng)計,會話監(jiān)控,TensorBoard的日志保存,為從故障中恢復進行模型斷點的保存和恢復。主worker也會管理故障,在一個worker或者參數(shù)服務器失效的情況下確保容錯能力。如果主worker自己宕機,那么需要從最近的模型斷點開始恢復訓練過程。
作為TensorFlow核心的一部分,分布式TensorFlow的一個缺點是,必須顯式地管理服務器的啟停。 這意味著,要跟蹤程序中所有TensorFlow服務器的IP地址和端口,并且手動啟停這些服務器。一般來說,這會導致代碼中有很多switch語句,來確定哪些代碼應該在當前服務器上執(zhí)行。 因此,通過使用集群管理器和Spark,生活輕松加愉快。希望你永遠不必像這樣編寫代碼,手動定義一個
ClusterSpec(集群配置參數(shù)):
tf . train . ClusterSpec ({ “local” : [ “localhost:2222” , “localhost:2223” ]})
tf . train . ClusterSpec ({
“worker” : [
“worker0.example.com:2222” ,
“worker1.example.com:2222” ,
“worker2.example.com:2222”
],
“ps” : [
“ps0.example.com:2222” ,
“ps1.example.com:2222”
]})
…
if FLAGS . job_name == “ps” :
server . join ()
elif FLAGS . job_name == “worker” :
…
使用主機的IP地址和端口號創(chuàng)建ClusterSpec,是易于出錯,不切實際的。作為替代品,您應該使用YARN,Kubernetes或Mesos等集群管理器,降低配置與啟動TensorFlow應用的復雜性。主流的選擇,要么是云端管理解決方案(如Google Cloud ML或Databrick的深度學習數(shù)據(jù)管線),要么是像Mesos或YARN那樣的通用資源管理器。
TensorFlowOnSpark
TensorFlowOnSpark是一個允許從Spark程序中啟動分布式TensorFlow應用的框架。它可以在獨立的Spark集群上運行,也可以在YARN集群上運行。下面的TensorFlowOnSpark程序使用ImageNet數(shù)據(jù)集,執(zhí)行Inception模型的分布式訓練。
它引入的新思路,是用一個名為TFCluster對象進行集群啟動,執(zhí)行訓練和模型推斷。集群可以以SPARK模式或TENSORFLOW模式啟動。 SPARK模式使用RDD向TensorFlow的worker們提供數(shù)據(jù)。 這對于構建從Spark到TensorFlow的集成數(shù)據(jù)管線而言非常使用,但是這里存在性能瓶頸,因為Python是用單線程將RDD序列化為TensorFlow worker所需要的字典,feed_dict 。 TENSORFLOW輸入模式通常是首選,因為數(shù)據(jù)可以更高效地通過分布式文件系統(tǒng)(比如HDFS)從多線程輸入隊列中讀取。 當一個集群啟動時,它啟動TensorFlow worker和參數(shù)服務器(它們可能位于不同的主機)。 參數(shù)服務器只執(zhí)行server.join()命令,而worker讀取ImageNet數(shù)據(jù),并執(zhí)行分布式訓練。主worker擁有任務編號task_id ‘0’ 。
以下的程序用于收集Spark所需的啟動/管理參數(shù)服務器/worker的信息。
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from tensorflowonspark import TFCluster , TFNode
from datetime import datetime
import os
import sys
import tensorflow import as tf
import time
def main_fun ( argv , ctx ):
# extract node metadata from ctx
worker_num = ctx . worker_num
job_name = ctx . job_name
task_index = ctx . task_index
in [ ‘ps’ , ‘worker’ ], assert job_name ], ‘job_name must be ps or worker’
from inception import inception_distributed_train
from inception.imagenet_data import ImagenetData
import tensorflow import as tf
# instantiate FLAGS on workers using argv from driver and add job_name and task_id
print ( “argv:” , argv )
sys . argv = argv
FLAGS = tf . app flags . FLAGS
FLAGS . job_name = job_name
FLAGS . task_id = task_index
print ( “FLAGS:” , FLAGS ‘__flags’ [ ‘__flags’ ])
# Get TF cluster and server instances
cluster_spec , server = TFNode . start_cluster_server ( ctx , 4 , start_cluster_server )
if FLAGS . job_name == ‘ps’ :
# `ps` jobs wait for incoming connections from the workers.
server . join ()
else :
# `worker` jobs will actually do the work.
dataset = ImagenetData ( subset = ImagenetData )
assert dataset . data_files ()
# Only the chief checks for or creates train_dir.
if FLAGS . task_id == 0 :
if not tf . gfile . Exists ( train_dir ):
tf . gfile . MakeDirs ( train_dir )
inception_distributed_train . train ( server target , dataset , cluster_spec , ctx )
# parse arguments needed by the Spark driver
import argparse
parser = argparse . ArgumentParser ()
parser . add_argument ( “–epochs” , help = “number of epochs” , type = int , default = 5 )
parser . add_argument ( “–steps” , help = “number of steps” , type = int , default = 500000 )
parser . add_argument ( “–input_mode” , help = “method to ingest data: (spark|tf)” , choices = [ “spark” , “tf” ], default = “tf” )
parser . add_argument ( “–tensorboard” , help = “launch tensorboard process” , action = “store_true” )
( args , rem ) = parser . parse_known_args ()
input_mode = TFCluster . InputMode . SPARK if args . input_mode == ‘spark’ TFCluster . InputMode . TENSORFLOW
print ( “{0} ===== Start” ( datetime () . isoformat ()))
sc = spark . sparkContext
num_executors = int ( _conf ( “spark.executor.instances” ))
num_ps = int ( _conf ( “spark.tensorflow.num.ps” ))
cluster = TFCluster . run ( sc , main_fun , num_executors , num_ps , tensorboard , input_mode , input_mode )
if input_mode == TFCluster . InputMode . SPARK :
dataRDD = sc . newAPIHadoopFile ( newAPIHadoopFile ,
“org.tensorflow.hadoop.io.TFRecordFileInputFormat” ,
keyClass = “org.apache.hadoop.io.BytesWritable” ,
valueClass = “org.apache.hadoop.io.NullWritable” )
cluster . train ( dataRDD , dataRDD )
cluster . shutdown ()
請注意,Apache YARN尚不支持GPU作為資源,而TensorFlowOnSpark使用YARN節(jié)點標簽來調度那些擁有GPU的TensorFlow worker主機。 前述的例子也可以在支持GPU作為資源的Hops YARN上運行,從而實現(xiàn)CPU和GPU資源更細粒度的共享。
容錯性
可以創(chuàng)建一個MonitoredTrainingSession對象,以便在發(fā)生故障時從最新斷點(checkpoint)自動恢復之前session的訓練狀態(tài)。
saver = tf . train . Saver ( sharded = True )
is_chief = True if FLAGS . task_id == 0 else False
with tf . Session ( server . target ) as sess :
# sess.run(init_op)
# re-initialze from checkpoint, if there is one.
saver . restore ( sess , … )
while True :
if is_chief and step % 1000 == 0 :
saver . save ( sess , “hdfs://….” )
with tf . train . MonitoredTrainingSession ( is_chief , is_chief ) as sess :
while not sess . should_stop ():
sess . run ( train_op )
Spark將重啟宕機的executor。 如果executor不是主worker,它將聯(lián)系參數(shù)服務器,并繼續(xù)像以前一樣進行訓練,因為worker實際上是無狀態(tài)(stateless)的。 如果參數(shù)服務器宕機了,主worker可以在新的參數(shù)服務器加入系統(tǒng)后,從最新的斷點恢復。 主worker每1000步迭代就保存一個模型副本,作為斷點。 如果主worker本身宕機了,那么訓練停止,一個新的訓練任務被啟動,但是仍然可以最新的完整斷點恢復訓練。
Horovod
TensorFlow有兩個可用的ring-allreduce框架: tensorflow.contrib.mpi_collectives (由百度貢獻)和來自Uber的Horovod,構建在Nvidia的NCCL 2庫之上。 我們將研究Horovod的原因是,它在Nvidia GPU上具有更簡單的API以及良好的性能,如圖5所示。Horovod使用pip進行安裝,并且需要事先安裝Open MPI和NCCL-2庫。 對于改造TensorFlow代碼而言,Horovod比分布式TensorFlow或TensorFlowOnSpark需要更少的代碼改動。 它引入了必須被初始化的hvd對象,并且必須對優(yōu)化器進行封裝(hvd使用allreduce或allgather進行梯度的均值計算)。 它使用本地優(yōu)先級的機制把GPU和進程綁定,并且在初始化期間,以優(yōu)先級為0把變量廣播到所有其他進程。
使用mpirun命令來啟動Horovod Python程序。 它將每臺服務器的主機名稱和要使用的GPU數(shù)量作為參數(shù)。 mpirun的一個備選方案,是使用Hops Hadoop平臺,從Spark中運行Horovod,該平臺使用HopsYARN為Horovod進程自動化管理GPU分配。 目前,Horovod對容錯性操作并不支持,模型應該定期保存斷點,以便故障發(fā)生后訓練可以從最新的斷點恢復回來。
import horovod.tensorflow as hvd ; import tensorflow import as tf
def main ( _ ):
hvd . init ()
loss = …
tf . ConfigProto () . gpu_options . visible_device_list = str ( local_rank ())
opt = tf . train . AdagradOptimizer ( 0.01 )
opt = hvd . DistributedOptimizer ( opt )
hooks = [ hvd . BroadcastGlobalVariablesHook ( 0 )]
train_op = opt . minimize ( loss )

圖5. Horovod/TensorFlow在ImageNet數(shù)據(jù)集上使用ResNet-101進行訓練時,在DeepLearning11服務器上,利用至多10個GPU,幾乎能達到線性加速比。(成本:$ 15,000美元)。 感謝Jim Dowling提供圖片
深度學習可擴展性的層次結構
在看過許多TensorFlow和大型mini-batch 隨機梯度下降(SGD)的分布式訓練架構之后,我們現(xiàn)在可以定義如下的『可擴展性的層次結構』。 金字塔的頂端是當前TensorFlow上最可擴展的算法,allreduce系列(包括ring-allreduce)。最底層是可擴展性最低的算法(因此訓練網(wǎng)絡最慢)。 雖然并行實驗是分布式訓練的一種補充,但正如我們所展示的,它們最普通的并行化(具有很弱的可擴展性),因此在金字塔上結構上處于較低的位置。

圖6.對于同步SGD的深度學習而言可擴展性的層次結構。感謝Jim Dowling提供的圖片
結論
做得很棒! 您現(xiàn)在已經(jīng)知道,分布式TensorFlow能夠做什么,以及如何修改您的TensorFlow程序來進行分布式訓練,或者運行并行實驗了。全部代碼和樣例可以在這里找到。
這篇文章是O’Reilly和TensorFlow合作的一部分。在這里查看我們的編輯獨立性聲明。
Jim Downling
Jim Dowling是瑞典信息通信技術SICS學院(SICS-Swedish ICT)的高級研究員,以及斯德哥爾摩瑞典皇家理工學院(KTH Stockholm)的副教授。他是分布式系統(tǒng),機器學習和大型計算機系統(tǒng)領域的研究員。 他還曾擔任MySQL AB的高級顧問。 他是Hadoop Open Platform-as-a-Service(Hops, www.hops.io),這是一個更具擴展性、高可用的Hadoop發(fā)行版。


更多人工智能內(nèi)容請關注2018年4月10-13日人工智能北京大會。