Spark on Angel:Spark機(jī)器學(xué)習(xí)的核心加速器

責(zé)任編輯:editor004

2017-08-21 11:11:47

摘自:INFOQ

基于內(nèi)存的計(jì)算過(guò)程,可以加速機(jī)器學(xué)習(xí)算法中計(jì)算梯度過(guò)程的耗時(shí)。如下代碼所示,LBFGS在Spark和Spark on Angel上的實(shí)現(xiàn),二者代碼的整體思路是一樣的,主要的區(qū)別是梯度向量的Aggregate和模型 的pull push。

Spark的核心概念是RDD,而RDD的關(guān)鍵特性之一是其不可變性,來(lái)規(guī)避分布式環(huán)境下復(fù)雜的各種并行問(wèn)題。這個(gè)抽象,在數(shù)據(jù)分析的領(lǐng)域是沒(méi)有問(wèn)題的,它能最大化的解決分布式問(wèn)題,簡(jiǎn)化各種算子的復(fù)雜度,并提供高性能的分布式數(shù)據(jù)處理運(yùn)算能力。

然而在機(jī)器學(xué)習(xí)領(lǐng)域,RDD的弱點(diǎn)很快也暴露了。機(jī)器學(xué)習(xí)的核心是迭代和參數(shù)更新。RDD憑借著邏輯上不落地的內(nèi)存計(jì)算特性,可以很好的解決迭代的問(wèn)題,然而RDD的不可變性,卻非常不適合參數(shù)反復(fù)多次更新的需求。這本質(zhì)上的不匹配性,導(dǎo)致了Spark的MLlib庫(kù),發(fā)展一直非常緩慢,從2015年開(kāi)始就沒(méi)有實(shí)質(zhì)性的創(chuàng)新,性能也不好。

為此,Angel在設(shè)計(jì)生態(tài)圈的時(shí)候,優(yōu)先考慮了Spark。在V1.0.0推出的時(shí)候,就已經(jīng)具備了Spark on Angel的功能,基于Angel為Spark加上了PS功能,在不變中加入了變化的因素,可謂如虎添翼。

我們將以L(fǎng)-BFGS為例,來(lái)分析Spark在機(jī)器學(xué)習(xí)算法的實(shí)現(xiàn)上的問(wèn)題,以及Spark on Angel是如何解決Spark在機(jī)器學(xué)習(xí)任務(wù)中的遇到的瓶頸,讓Spark的機(jī)器學(xué)習(xí)更加強(qiáng)大。

1. L-BFGS算法說(shuō)明

L-BFGS模型參數(shù)更新過(guò)程如下:

計(jì)算pk = Hk-1 gk 偽代碼如下所示,這是人們常說(shuō)的two-loop recursion算法,是Limited-BFGS算法的核心部分?!?br /> 返回值 r 是我們說(shuō)要的pk。

其中,H0-1 是單位陣,yk=gk-gk-1, sk=wk-w k-1k-1,L-BFGS算法將最近 m 輪生成的 yk 和 sk 序列,記做 {yk} 和 {sk}。基于計(jì)算 {yk} 和 {sk} 計(jì)算 pk 。

2.L-BFGS的Spark實(shí)現(xiàn)

2.1 實(shí)現(xiàn)框架

Spark中的driver負(fù)責(zé)協(xié)調(diào)整個(gè)Spark任務(wù)執(zhí)行的同時(shí),需要保存最近 m 輪的 {yk} 和 {sk} 序列,并在driver上執(zhí)行two-loop recursion算法。而executor負(fù)責(zé)分布式地計(jì)算梯度向量。

迭代過(guò)程: 
(1)每輪迭代,將每個(gè)executor計(jì)算的梯度Aggregate到driver 
(2)yk 和 sk 保存在driver上,在driver端執(zhí)行two-loop recursion算法 
(3)driver上更新模型 w,并將 w 廣播到每個(gè)Executor

2.2 性能分析

基于Spark的L-BFGS實(shí)現(xiàn)的算法優(yōu)點(diǎn)比較明顯:

HDFS I/O   
Spark可以快速讀寫(xiě)HDFS上的訓(xùn)練數(shù)據(jù);

細(xì)粒度的負(fù)載均衡   
并行計(jì)算梯度時(shí),Spark具有強(qiáng)大的并行調(diào)度機(jī)制,保證task快速執(zhí)行;

容錯(cuò)機(jī)制   
當(dāng)計(jì)算節(jié)點(diǎn)掛掉、任務(wù)失敗,Spark會(huì)根據(jù)RDD的DAG關(guān)系鏈實(shí)現(xiàn)數(shù)據(jù)的重計(jì)算。但是對(duì)于迭代式算法,每輪迭代要用RDD的action操作,打斷RDD的DAG,避免因?yàn)橹赜?jì)算引起邏輯的錯(cuò)亂;

基于內(nèi)存的計(jì)算   
基于內(nèi)存的計(jì)算過(guò)程,可以加速機(jī)器學(xué)習(xí)算法中計(jì)算梯度過(guò)程的耗時(shí)。

該實(shí)現(xiàn)的缺點(diǎn):

treeAggregate引起的網(wǎng)絡(luò)瓶頸   
Spark用treeAggregate聚合梯度時(shí),如果模型維度達(dá)到億級(jí),每個(gè)梯度向量都可能達(dá)到幾百兆;此時(shí)treeAggregate的shuffle的效率非常低;

driver單點(diǎn)

保存{yk}和{sk}序列需要較大的內(nèi)存空間; two-loop recursion算法是由driver單點(diǎn)執(zhí)行,該過(guò)程是多個(gè)高維度的向量的運(yùn)算; 每輪迭代,driver都需要和executor完成高維度向量的aggregate和broadcast。

3.L-BFGS的Spark on Angel實(shí)現(xiàn)

3.1 實(shí)現(xiàn)框架

Spark on Angel借助Angel PS-Service的功能為Spark引入PS的角色,減輕整個(gè)算法流程對(duì)driver的依賴(lài)。two-loop recursion算法的運(yùn)算交給PS,而driver只負(fù)責(zé)任務(wù)的調(diào)度,大大減輕的對(duì)driver性能的依賴(lài)。

Angel PS由一組分布式節(jié)點(diǎn)組成,每個(gè)vector、matrix被切分成多個(gè)partition保存到不同的節(jié)點(diǎn)上,同時(shí)支持vector和matrix之間的運(yùn)算;

{yk} 和 {sk} 序列分布式地保存到Angel PS上,two-loop recursion算法中高維度的向量計(jì)算也是在PS上完成。Spark executor每輪迭代過(guò)程會(huì)從PS上Pull w 到本地,并將計(jì)算的梯度向量Push到PS。

  迭代過(guò)程:

(1)每輪迭代,executor 將PS上的模型 w pull 到本地,計(jì)算梯度,然后梯度向量push給PS

(2)yk 和 sk 保存在PS上,在PS端執(zhí)行two-loop recursion算法

(3)PS上更新模型 w

3.2 性能分析

整個(gè)算法過(guò)程,driver只負(fù)責(zé)任務(wù)調(diào)度,而復(fù)雜的two-loop recursion運(yùn)算在PS上運(yùn)行,梯度的Aggregate和模型的同步是executor和PS之間進(jìn)行,所有運(yùn)算都變成分布式。在網(wǎng)絡(luò)傳輸中,高維度的PSVector會(huì)被切成小的數(shù)據(jù)塊再發(fā)送到目標(biāo)節(jié)點(diǎn),這種節(jié)點(diǎn)之間多對(duì)多的傳輸大大提高了梯度聚合和模型同步的速度。

這樣Spark on Angel完全避開(kāi)了Spark中driver單點(diǎn)的瓶頸,以及網(wǎng)絡(luò)傳輸高維度向量的問(wèn)題。

4.“輕易強(qiáng)快”的Spark on Angel

Spark on Angel是Angel為解決Spark在機(jī)器學(xué)習(xí)模型訓(xùn)練中的缺陷而設(shè)計(jì)的“插件”,沒(méi)有對(duì)Spark做"侵入式"的修改,是一個(gè)獨(dú)立的框架。可以用 “”、“”、“強(qiáng)”、“” 來(lái)概括Spark on Angel的特點(diǎn)。

4.1 輕——"插件式"的框架

Spark on Angel是Angel為解決Spark在機(jī)器學(xué)習(xí)模型訓(xùn)練中的缺陷而設(shè)計(jì)的“插件”。Spark on Angel沒(méi)有對(duì)Spark中的RDD做侵入式的修改,Spark on Angel是依賴(lài)于Spark和Angel的框架,同時(shí)其邏輯又獨(dú)立于Spark和Angel。

因此,Spark用戶(hù)使用Spark on Angel非常簡(jiǎn)單,只需在Spark的提交腳本里做三處改動(dòng)即可,詳情可見(jiàn)Angel的Github Spark on Angel Quick Start文檔。

可以看到提交的Spark on Angel任務(wù),其本質(zhì)上依然是一個(gè)Spark任務(wù),整個(gè)任務(wù)的執(zhí)行過(guò)程與Spark一樣的。

source ${Angel_HOME}/bin/spark-on-angel-env.sh$SPARK_HOME/bin/spark-submit --master yarn-cluster --conf spark.ps.jars=$SONA_ANGEL_JARS --conf spark.ps.instances=20 --conf spark.ps.cores=4 --conf spark.ps.memory=10g --jars $SONA_SPARK_JARS ....

Spark on Angel能夠成為如此輕量級(jí)的框架,得益于Angel對(duì)PS-Service的封裝,使Spark的driver和executor可以通過(guò)PsAgent、PSClient與Angel PS做數(shù)據(jù)交互。

  4.2 強(qiáng)——功能強(qiáng)大,支持breeze庫(kù)

breeze庫(kù)是scala實(shí)現(xiàn)的面向機(jī)器學(xué)習(xí)的數(shù)值運(yùn)算庫(kù)。Spark MLlib的大部分?jǐn)?shù)值優(yōu)化算法都是通過(guò)調(diào)用breeze來(lái)完成的。如下所示,Spark和Spark on Angel兩種實(shí)現(xiàn)都是通過(guò)調(diào)用breeze.optimize.LBFGS實(shí)現(xiàn)的。Spark的實(shí)現(xiàn)是傳入的類(lèi)型是breeze庫(kù)的DenseVector,而Spark on Angel的實(shí)現(xiàn)是傳入BreezePSVector。

BreezePSVector是指Angel PS上的Vector,該Vector實(shí)現(xiàn)了breeze NumericOps下的方法,如常用的 dot,scale,axpy,add等運(yùn)算,因此在LBFGS[BreezePSVector] two-loop recursion算法中的高維度向量運(yùn)算是BreezePSVector之間的運(yùn)算,而B(niǎo)reezePSVector之間全部在Angel PS上分布式完成。

Spark的L-BFGS實(shí)現(xiàn)import breeze.optimize.LBFGS val lbfgs = new LBFGS[DenseVector](maxIter, m, tol) val states = lbfgs.iterations(Cost(trainData), initWeight)
 
Spark on Angel的L-BFGS實(shí)現(xiàn)  
接口調(diào)用里的Vector泛型從DenseVector變成 BreezePSVector import breeze.optimize.LBFGS val lbfgs = new LBFGS[BreezePSVector](maxIter, m, tol) val states = lbfgs.iterations(PSCost(trainData), initWeightPS)
 

4.3 易——編程接口簡(jiǎn)單

Spark能夠在大數(shù)據(jù)領(lǐng)域這么流行的另外一個(gè)原因是:其編程方式簡(jiǎn)單、容易理解,Spark on Angel同樣繼承了這個(gè)特性。

Spark on Angel本質(zhì)是一個(gè)Spark任務(wù),整個(gè)代碼實(shí)現(xiàn)邏輯跟Spark是一致的;當(dāng)需要與PSVector做運(yùn)算時(shí),調(diào)用相應(yīng)的接口即可。

如下代碼所示,LBFGS在Spark和Spark on Angel上的實(shí)現(xiàn),二者代碼的整體思路是一樣的,主要的區(qū)別是梯度向量的Aggregate和模型 的pull/push。 
因此,如果將Spark的算法改造成Spark on Angel的任務(wù),只需要修改少量的代碼即可。

L-BFGS需要用戶(hù)實(shí)現(xiàn)DiffFunction,DiffFunction的calculte接口輸入?yún)?shù)是 ,遍歷訓(xùn)練數(shù)據(jù)并返回 loss 和 gradient。

其完整代碼,請(qǐng)前往Github SparseLogistic。

Spark的DiffFunction實(shí)現(xiàn) case class Cost(trainData: RDD[Instance]) extends DiffFunction[DenseVector] { def calculate(w: DenseVector): (Double, DenseVector) = { // 廣播 w val bcW = sc.broadcast(w) // 通過(guò)treeAggregate的方式計(jì)算loss和gradient val (cumGradient, cumLoss) = trainData .treeAggregate((new DenseVector(x.length), 0.0)) (seqOp, combOp) val resGradient = new DenseVector(cumGradient.toArray.map(_ / sampleNum)) (cumLoss / sampleNum, resGradient) }
 
Spark on Angel的DiffFunction實(shí)現(xiàn)

calculate接口輸入?yún)?shù)是 w ,遍歷訓(xùn)練數(shù)據(jù)并返回 loss 和 cumGradient。其中 w 和 cumGradient都是BreezePSVector;計(jì)算梯度時(shí),需要將 Pull 到本地,本地的gradient值,需要通過(guò)PSVector的incrementAndFlush方式Push到遠(yuǎn)程PS上的cumGradient向量。

 case class PSCost(trainData: RDD[Instance]) extends DiffFunction[BreezePSVector] { override def calculate(w: BreezePSVector): (Double, BreezePSVector) = { // 初始化gradient向量:cumGradient val cumGradient = pool.createZero().mkBreeze() // 計(jì)算梯度和loss val cumLoss = trainData.mapPartitions { iter => // pull模型 w 到 executor 本地 val localW = w.toRemote.pull() val (gradient, loss) = calculateGradAndLoss(iter, localW) // incement本地的grad到PS的cumGradient cumGradient.toRemote.incrementAndFlush(gradient) Iterator.single(loss) }.sum() cumGradient *= 1.0 / sampleNum (cumLoss / sampleNum, cumGradient) } }

4.4 快——性能強(qiáng)勁

我們分別實(shí)現(xiàn)了SGD、LBFGS、OWLQN三種優(yōu)化方法的LR,并在Spark和Spark on Angel上做了實(shí)驗(yàn)對(duì)比?!?br /> 該實(shí)驗(yàn)代碼請(qǐng)前往Github SparseLRWithX.scala。

數(shù)據(jù)集:騰訊內(nèi)部某業(yè)務(wù)的一份數(shù)據(jù)集,2.3億樣本,5千萬(wàn)維度 實(shí)驗(yàn)設(shè)置:  
說(shuō)明1:三組對(duì)比實(shí)驗(yàn)的資源配置如下,我們盡可能保證所有任務(wù)在資源充足的情況下執(zhí)行,因此配置的資源比實(shí)際需要的偏多;  
說(shuō)明2:執(zhí)行Spark任務(wù)時(shí),需要加大spark.driver.maxResultSize參數(shù);而Spark on Angel就不用配置此參數(shù)。

如上數(shù)據(jù)所示,Spark on Angel相較于Spark在訓(xùn)練LR模型時(shí)有50%以上的加速;對(duì)于越復(fù)雜的模型,其加速的比例越大。

5.結(jié)語(yǔ)

Spark on Angel的出現(xiàn)可以高效、低成本地克服Spark在機(jī)器學(xué)習(xí)領(lǐng)域遇到的瓶頸;我們將繼續(xù)優(yōu)化Spark on Angel,并提高其性能。也歡迎大家在Github上一起參與我們的改進(jìn)。

Angel項(xiàng)目Github:Angel,喜歡的話(huà)到Github上給我們Star。

感謝蔡芳芳對(duì)本文的審校。

鏈接已復(fù)制,快去分享吧

企業(yè)網(wǎng)版權(quán)所有?2010-2024 京ICP備09108050號(hào)-6京公網(wǎng)安備 11010502049343號(hào)