當(dāng)下,Spark已經(jīng)在國(guó)內(nèi)得到了廣泛的認(rèn)可和支持:2014年,Spark Summit China在北京召開,場(chǎng)面火爆;同年,Spark Meetup在北京、上海、深圳和杭州四個(gè)城市舉辦,其中僅北京就成功舉辦了5次,內(nèi)容更涵蓋Spark Core、Spark Streaming、Spark MLlib、Spark SQL等眾多領(lǐng)域。而作為較早關(guān)注和引入Spark的移動(dòng)互聯(lián)網(wǎng)大數(shù)據(jù)綜合服務(wù)公司,TalkingData也積極地參與到國(guó)內(nèi)Spark社區(qū)的各種活動(dòng),并多次在Meetup中分享公司的Spark使用經(jīng)驗(yàn)。本文則主要介紹TalkingData在大數(shù)據(jù)平臺(tái)建設(shè)過程中,逐漸引入Spark,并且以Hadoop YARN和Spark為基礎(chǔ)來構(gòu)建移動(dòng)大數(shù)據(jù)平臺(tái)的過程。
初識(shí)Spark
作為一家在移動(dòng)互聯(lián)網(wǎng)大數(shù)據(jù)領(lǐng)域創(chuàng)業(yè)的公司,時(shí)刻關(guān)注大數(shù)據(jù)技術(shù)領(lǐng)域的發(fā)展和進(jìn)步是公司技術(shù)團(tuán)隊(duì)必做的功課。而在整理Strata 2013公開的講義時(shí),一篇主題為《An Introduction on the Berkeley Data Analytics Stack_BDAS_Featuring Spark,Spark Streaming,and Shark》的教程引起了整個(gè)技術(shù)團(tuán)隊(duì)的關(guān)注和討論,其中Spark基于內(nèi)存的RDD模型、對(duì)機(jī)器學(xué)習(xí)算法的支持、整個(gè)技術(shù)棧中實(shí)時(shí)處理和離線處理的統(tǒng)一模型以及Shark都讓人眼前一亮。同時(shí)期我們關(guān)注的還有Impala,但對(duì)比Spark,Impala可以理解為對(duì)Hive的升級(jí),而Spark則嘗試圍繞RDD建立一個(gè)用于大數(shù)據(jù)處理的生態(tài)系統(tǒng)。對(duì)于一家數(shù)據(jù)量高速增長(zhǎng),業(yè)務(wù)又是以大數(shù)據(jù)處理為核心并且在不斷變化的創(chuàng)業(yè)公司而言,后者無疑更值得進(jìn)一步關(guān)注和研究。
Spark初探
2013年中期,隨著業(yè)務(wù)高速發(fā)展,越來越多的移動(dòng)設(shè)備側(cè)數(shù)據(jù)被各個(gè)不同的業(yè)務(wù)平臺(tái)收集。那么這些數(shù)據(jù)除了提供不同業(yè)務(wù)所需要的業(yè)務(wù)指標(biāo),是否還蘊(yùn)藏著更多的價(jià)值?為了更好地挖掘數(shù)據(jù)潛在價(jià)值,我們決定建造自己的數(shù)據(jù)中心,將各業(yè)務(wù)平臺(tái)的數(shù)據(jù)匯集到一起,對(duì)覆蓋設(shè)備的相關(guān)數(shù)據(jù)進(jìn)行加工、分析和挖掘,從而探索數(shù)據(jù)的價(jià)值。初期數(shù)據(jù)中心主要功能設(shè)置如下所示:
1. 跨市場(chǎng)聚合的安卓應(yīng)用排名;
2. 基于用戶興趣的應(yīng)用推薦。
基于當(dāng)時(shí)的技術(shù)掌握程度和功能需求,數(shù)據(jù)中心所采用的技術(shù)架構(gòu)如圖1。
整個(gè)系統(tǒng)構(gòu)建基于Hadoop 2.0(Cloudera CDH4.3),采用了最原始的大數(shù)據(jù)計(jì)算架構(gòu)。通過日志匯集程序,將不同業(yè)務(wù)平臺(tái)的日志匯集到數(shù)據(jù)中心,并通過ETL將數(shù)據(jù)進(jìn)行格式化處理,儲(chǔ)存到HDFS。其中,排名和推薦算法的實(shí)現(xiàn)都采用了MapReduce,系統(tǒng)中只存在離線批量計(jì)算,并通過基于Azkaban的調(diào)度系統(tǒng)進(jìn)行離線任務(wù)的調(diào)度。
第一個(gè)版本的數(shù)據(jù)中心架構(gòu)基本上是以滿足“最基本的數(shù)據(jù)利用”這一目的進(jìn)行設(shè)計(jì)的。然而,隨著對(duì)數(shù)據(jù)價(jià)值探索得逐漸加深,越來越多的實(shí)時(shí)分析需求被提出。與此同時(shí),更多的機(jī)器學(xué)習(xí)算法也亟需添加,以便支持不同的數(shù)據(jù)挖掘需求。對(duì)于實(shí)時(shí)數(shù)據(jù)分析,顯然不能通過“對(duì)每個(gè)分析需求單獨(dú)開發(fā)MapReduce任務(wù)”來完成,因此引入Hive 是一個(gè)簡(jiǎn)單而直接的選擇。鑒于傳統(tǒng)的MapReduce模型并不能很好地支持迭代計(jì)算,我們需要一個(gè)更好的并行計(jì)算框架來支持機(jī)器學(xué)習(xí)算法。而這些正是我們一直在密切關(guān)注的Spark所擅長(zhǎng)的領(lǐng)域——憑借其對(duì)迭代計(jì)算的友好支持,Spark理所當(dāng)然地成為了不二之選。2013年9月底,隨著Spark 0.8.0發(fā)布,我們決定對(duì)最初的架構(gòu)進(jìn)行演進(jìn),引入Hive作為即時(shí)查詢的基礎(chǔ),同時(shí)引入Spark計(jì)算框架來支持機(jī)器學(xué)習(xí)類型的計(jì)算,并且驗(yàn)證Spark這個(gè)新的計(jì)算框架是否能夠全面替代傳統(tǒng)的以MapReduce為基礎(chǔ)的計(jì)算框架。圖2為整個(gè)系統(tǒng)的架構(gòu)演變。
在這個(gè)架構(gòu)中,我們將Spark 0.8.1部署在YARN上,通過分Queue,來隔離基于Spark的機(jī)器學(xué)習(xí)任務(wù),計(jì)算排名的日常MapReduce任務(wù)和基于Hive的即時(shí)分析任務(wù)。
想要引入Spark,第一步需要做的就是要取得支持我們Hadoop環(huán)境的Spark包。我們的Hadoop環(huán)境是Cloudera發(fā)布的CDH 4.3,默認(rèn)的Spark發(fā)布包并不包含支持CDH 4.3的版本,因此只能自己編譯。Spark官方文檔推薦用Maven進(jìn)行編譯,可是編譯卻不如想象中順利。各種包依賴由于眾所周知的原因,不能順利地從某些依賴中心庫(kù)下載。于是我們采取了最簡(jiǎn)單直接的繞開辦法,利用AWS云主機(jī)進(jìn)行編譯。需要注意的是,編譯前一定要遵循文檔的建議,設(shè)置:
否則,編譯過程中就會(huì)遇到內(nèi)存溢出的問題。針對(duì)CDH 4.3,mvn build的參數(shù)為:
在編譯成功所需要的Spark包后,部署和在Hadoop環(huán)境中運(yùn)行Spark則是非常簡(jiǎn)單的事情。將編譯好的Spark目錄打包壓縮后,在可以運(yùn)行Hadoop Client的機(jī)器上解壓縮,就可以運(yùn)行Spark了。想要驗(yàn)證Spark是否能夠正常在目標(biāo)Hadoop環(huán)境上運(yùn)行,可以參照Spark的官方文檔,運(yùn)行example中的SparkPi來驗(yàn)證:
完成Spark部署之后,剩下的就是開發(fā)基于Spark的程序了。雖然Spark支持Java、Python,但最合適開發(fā)Spark程序的語言還是Scala。經(jīng)過一段時(shí)間的摸索實(shí)踐,我們掌握了Scala語言的函數(shù)式編程語言特點(diǎn)后,終于體會(huì)了利用Scala開發(fā)Spark應(yīng)用的巨大好處。同樣的功能,用MapReduce幾百行才能實(shí)現(xiàn)的計(jì)算,在Spark中,Scala通過短短的數(shù)十行代碼就能完成。而在運(yùn)行時(shí),同樣的計(jì)算功能,Spark上執(zhí)行則比MapReduce有數(shù)十倍的提高。對(duì)于需要迭代的機(jī)器學(xué)習(xí)算法來講,Spark的RDD模型相比MapReduce的優(yōu)勢(shì)則更是明顯,更何況還有基本的MLlib的支持。經(jīng)過幾個(gè)月的實(shí)踐,數(shù)據(jù)挖掘相關(guān)工作被完全遷移到Spark,并且在Spark上實(shí)現(xiàn)了適合我們數(shù)據(jù)集的更高效的LR等等算法。
全面擁抱Spark
進(jìn)入2014年,公司的業(yè)務(wù)有了長(zhǎng)足的發(fā)展,對(duì)比數(shù)據(jù)中心平臺(tái)建立時(shí),每日處理的數(shù)據(jù)量亦翻了幾番。每日的排名計(jì)算所花的時(shí)間越來越長(zhǎng),而基于Hive的即時(shí)計(jì)算只能支持日尺度的計(jì)算,如果到周這個(gè)尺度,計(jì)算所花的時(shí)間已經(jīng)很難忍受,到月這個(gè)尺度則基本上沒辦法完成計(jì)算?;谠赟park上的認(rèn)知和積累,是時(shí)候?qū)⒄麄€(gè)數(shù)據(jù)中心遷移到Spark上了。
2014年4月,Spark Summit China在北京舉行。抱著學(xué)習(xí)的目的,我們技術(shù)團(tuán)隊(duì)也參加了在中國(guó)舉行的這一次Spark盛會(huì)。通過這次盛會(huì),我們了解到國(guó)內(nèi)的很多同行已經(jīng)開始采用Spark來建造自己的大數(shù)據(jù)平臺(tái),而Spark也變成了在ASF中最為活躍的項(xiàng)目之一。另外,越來越多的大數(shù)據(jù)相關(guān)的產(chǎn)品也逐漸在和Spark相融合或者在向Spark遷移。Spark無疑將會(huì)變?yōu)橐粋€(gè)相比Hadoop MapReduce更好的生態(tài)系統(tǒng)。通過這次大會(huì),我們更加堅(jiān)定了全面擁抱Spark的決心。
基于YARN和Spark,我們開始重新架構(gòu)數(shù)據(jù)中心依賴的大數(shù)據(jù)平臺(tái)。整個(gè)新的數(shù)據(jù)平臺(tái)應(yīng)該能夠承載:
1. 準(zhǔn)實(shí)時(shí)的數(shù)據(jù)匯集和ETL;
2. 支持流式的數(shù)據(jù)加工;
3. 更高效的離線計(jì)算能力;
4. 高速的多維分析能力;
5. 更高效的即時(shí)分析能力;
6. 高效的機(jī)器學(xué)習(xí)能力;
7. 統(tǒng)一的數(shù)據(jù)訪問接口;
8. 統(tǒng)一的數(shù)據(jù)視圖;
9. 靈活的任務(wù)調(diào)度.
整個(gè)新的架構(gòu)充分地利用YARN和Spark,并且融合公司的一些技術(shù)積累,架構(gòu)如圖3所示。
在新的架構(gòu)中,引入了Kafka作為日志匯集的通道。幾個(gè)業(yè)務(wù)系統(tǒng)收集的移動(dòng)設(shè)備側(cè)的日志,實(shí)時(shí)地寫入到Kafka 中,從而方便后續(xù)的數(shù)據(jù)消費(fèi)。
利用Spark Streaming,可以方便地對(duì)Kafka中的數(shù)據(jù)進(jìn)行消費(fèi)處理。在整個(gè)架構(gòu)中,Spark Streaming主要完成了以下工作。
1. 原始日志的保存。將Kafka中的原始日志以JSON格式無損的保存在HDFS中。
2. 數(shù)據(jù)清洗和轉(zhuǎn)換,清洗和標(biāo)準(zhǔn)化之后,轉(zhuǎn)變?yōu)镻arquet格式,存儲(chǔ)在HDFS中,方便后續(xù)的各種數(shù)據(jù)計(jì)算任務(wù)。
3. 定義好的流式計(jì)算任務(wù),比如基于頻次規(guī)則的標(biāo)簽加工等等,計(jì)算結(jié)果直接存儲(chǔ)在MongoDB中。
排名計(jì)算任務(wù)則在Spark上做了重新實(shí)現(xiàn),借力Spark帶來的性能提高,以及Parquet列式存儲(chǔ)帶來的高效數(shù)據(jù)訪問。同樣的計(jì)算任務(wù),在數(shù)據(jù)量提高到原來3倍的情況下,時(shí)間開銷只有原來的1/6。
同時(shí),在利用Spark和Parquet列式存儲(chǔ)帶來的性能提升之外,曾經(jīng)很難滿足業(yè)務(wù)需求的即時(shí)多維度數(shù)據(jù)分析終于成為了可能。曾經(jīng)利用Hive需要小時(shí)級(jí)別才能完成日尺度的多維度即時(shí)分析,在新架構(gòu)上,只需要2分鐘就能夠順利完成。而周尺度上也不過十分鐘就能夠算出結(jié)果。曾經(jīng)在Hive上無法完成的月尺度多維度分析計(jì)算,則在兩個(gè)小時(shí)內(nèi)也可以算出結(jié)果。另外Spark SQL的逐漸完善也降低了開發(fā)的難度。
利用YARN提供的資源管理能力,用于多維度分析,自主研發(fā)的Bitmap引擎也被遷移到了YARN上。對(duì)于已經(jīng)確定好的維度,可以預(yù)先創(chuàng)建Bitmap索引。而多維度的分析,如果所需要的維度已經(jīng)預(yù)先建立了Bitmap索引,則通過Bitmap引擎由Bitmap計(jì)算來實(shí)現(xiàn),從而可以提供實(shí)時(shí)的多維度的分析能力。
在新的架構(gòu)中,為了更方便地管理數(shù)據(jù),我們引入了基于HCatalog的元數(shù)據(jù)管理系統(tǒng),數(shù)據(jù)的定義、存儲(chǔ)、訪問都通過元數(shù)據(jù)管理系統(tǒng),從而實(shí)現(xiàn)了數(shù)據(jù)的統(tǒng)一視圖,方便了數(shù)據(jù)資產(chǎn)的管理。
YARN只提供了資源的調(diào)度能力,在一個(gè)大數(shù)據(jù)平臺(tái),分布式的任務(wù)調(diào)度系統(tǒng)同樣不可或缺。在新的架構(gòu)中,我們自行開發(fā)了一個(gè)支持DAG的分布式任務(wù)調(diào)度系統(tǒng),結(jié)合YARN提供的資源調(diào)度能力,從而實(shí)現(xiàn)定時(shí)任務(wù)、即時(shí)任務(wù)以及不同任務(wù)構(gòu)成的pipeline。
基于圍繞YARN和Spark的新的架構(gòu),一個(gè)針對(duì)數(shù)據(jù)業(yè)務(wù)部門的自服務(wù)大數(shù)據(jù)平臺(tái)得以實(shí)現(xiàn),數(shù)據(jù)業(yè)務(wù)部門可以方便地利用這個(gè)平臺(tái)對(duì)進(jìn)行多維度的分析、數(shù)據(jù)的抽取,以及進(jìn)行自定義的標(biāo)簽加工。自服務(wù)系統(tǒng)提高了數(shù)據(jù)利用的能力,同時(shí)也大大提高了數(shù)據(jù)利用的效率。
使用Spark遇到的一些坑
任何新技術(shù)的引入都會(huì)歷經(jīng)陌生到熟悉,從最初新技術(shù)帶來的驚喜,到后來遇到困難時(shí)的一籌莫展和惆悵,再到問題解決后的愉悅,大數(shù)據(jù)新貴Spark同樣不能免俗。下面就列舉一些我們遇到的坑。
【坑一:跑很大的數(shù)據(jù)集的時(shí)候,會(huì)遇到org.apache.spark.SparkException: Error communicating with MapOutputTracker】
這個(gè)錯(cuò)誤報(bào)得很隱晦,從錯(cuò)誤日志看,是Spark集群partition了,但如果觀察物理機(jī)器的運(yùn)行情況,會(huì)發(fā)現(xiàn)磁盤I/O非常高。進(jìn)一步分析會(huì)發(fā)現(xiàn)原因是Spark在處理大數(shù)據(jù)集時(shí)的shuffle過程中生成了太多的臨時(shí)文件,造成了操作系統(tǒng)磁盤I/O負(fù)載過大。找到原因后,解決起來就很簡(jiǎn)單了,設(shè)置spark.shuffle.consolidateFiles為true。這個(gè)參數(shù)在默認(rèn)的設(shè)置中是false的,對(duì)于linux的ext4文件系統(tǒng),建議大家還是默認(rèn)設(shè)置為true吧。Spark官方文檔的描述也建議ext4文件系統(tǒng)設(shè)置為true來提高性能。
【坑二:運(yùn)行時(shí)報(bào)Fetch failure錯(cuò)】
在大數(shù)據(jù)集上,運(yùn)行Spark程序,在很多情況下會(huì)遇到Fetch failure的錯(cuò)。由于Spark本身設(shè)計(jì)是容錯(cuò)的,大部分的Fetch failure會(huì)經(jīng)過重試后通過,因此整個(gè)Spark任務(wù)會(huì)正常跑完,不過由于重試的影響,執(zhí)行時(shí)間會(huì)顯著增長(zhǎng)。造成Fetch failure的根本原因則不盡相同。從錯(cuò)誤本身看,是由于任務(wù)不能從遠(yuǎn)程的節(jié)點(diǎn)讀取shuffle的數(shù)據(jù),具體原因則需要利用:
查看Spark的運(yùn)行日志,從而找到造成Fetch failure的根本原因。其中大部分的問題都可以通過合理的參數(shù)配置以及對(duì)程序進(jìn)行優(yōu)化來解決。2014年Spark Summit China上陳超的那個(gè)專題,對(duì)于如何對(duì)Spark性能進(jìn)行優(yōu)化,有非常好的建議。
當(dāng)然,在使用Spark過程中還遇到過其他不同的問題,不過由于Spark本身是開源的,通過源代碼的閱讀,以及借助開源社區(qū)的幫助,大部分問題都可以順利解決。
下一步的計(jì)劃
Spark在2014年取得了長(zhǎng)足的發(fā)展,圍繞Spark的大數(shù)據(jù)生態(tài)系統(tǒng)也逐漸的完善。Spark 1.3引入了一個(gè)新的DataFrame API,這個(gè)新的DataFrame API將會(huì)使得Spark對(duì)于數(shù)據(jù)的處理更加友好。同樣出自于AMPLab的分布式緩存系統(tǒng)Tachyon因?yàn)槠渑cSpark的良好集成也逐漸引起了人們的注意。鑒于在業(yè)務(wù)場(chǎng)景中,很多基礎(chǔ)數(shù)據(jù)是需要被多個(gè)不同的Spark任務(wù)重復(fù)使用,下一步,我們將會(huì)在架構(gòu)中引入Tachyon來作為緩存層。另外,隨著SSD的日益普及,我們后續(xù)的計(jì)劃是在集群中每臺(tái)機(jī)器都引入SSD存儲(chǔ),配置Spark的shuffle的輸出到SSD,利用SSD的高速隨機(jī)讀寫能力,進(jìn)一步提高大數(shù)據(jù)處理效率。
在機(jī)器學(xué)習(xí)方面,H2O機(jī)器學(xué)習(xí)引擎也和Spark有了良好的集成從而產(chǎn)生了Sparkling-water。相信利用Sparking-water,作為一家創(chuàng)業(yè)公司,我們也可以利用深度學(xué)習(xí)的力量來進(jìn)一步挖掘數(shù)據(jù)的價(jià)值。
結(jié)語
2004年,Google的MapReduce論文揭開了大數(shù)據(jù)處理的時(shí)代,Hadoop的MapReduce在過去接近10年的時(shí)間成了大數(shù)據(jù)處理的代名詞。而Matei Zaharia 2012年關(guān)于RDD的一篇論文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”則揭示了大數(shù)據(jù)處理技術(shù)一個(gè)新時(shí)代的到來。伴隨著新的硬件技術(shù)的發(fā)展、低延遲大數(shù)據(jù)處理的廣泛需求以及數(shù)據(jù)挖掘在大數(shù)據(jù)領(lǐng)域的日益普及,Spark作為一個(gè)嶄新的大數(shù)據(jù)生態(tài)系統(tǒng),逐漸取代傳統(tǒng)的MapReduce而成為新一代大數(shù)據(jù)處理技術(shù)的熱門。我們過去兩年從MapReduce到Spark架構(gòu)的演變過程,也基本上代表了相當(dāng)一部分大數(shù)據(jù)領(lǐng)域從業(yè)者的技術(shù)演進(jìn)的歷程。相信隨著Spark生態(tài)的日益完善,會(huì)有越來越多的企業(yè)將自己的數(shù)據(jù)處理遷移到Spark上來。而伴隨著越來越多的大數(shù)據(jù)工程師熟悉和了解Spark,國(guó)內(nèi)的Spark社區(qū)也會(huì)越來越活躍,Spark作為一個(gè)開源的平臺(tái),相信也會(huì)有越來越多的華人變成Spark相關(guān)項(xiàng)目的Contributor,Spark也會(huì)變得越來越成熟和強(qiáng)大。