大數據處理肯定是分布式的了,那就面臨著幾個核心問題:可擴展性,負載均衡,容錯處理。Spark是如何處理這些問題的呢?接著上一篇的“動手寫WordCount”,今天要做的就是透過這個大數據界的HelloWorld來看看Spark隱藏了哪些魔法。
請各位看官,帶著分布式的問題往下看。
分布式架構
大數據時代,單機裝下PB級的數據,然后在可接受的時間內處理完,不可能,所以一定是分布式的。
分布式存儲
HDFS(Hadoop Distributed File System)是最常見的,和Spark配合的分布式存儲系統(tǒng)。HDFS的存儲結構如下圖
每個文件被分成固定大小的塊,而塊作為最小的存儲單位放到眾多服務器上。那一旦存某個塊的機器掛了,不是整個文件就洗白了嗎?HDFS當然不會這么傻,文件的每個塊都有備份,默認情況下一個塊會存3份,分到不同的服務器。這樣一來,除非某個塊涉及的三臺服務器全掛,否則不用擔心。在合理分布3個塊的情況下,三臺服務器全掛的可能性比中500萬還低。下面是/file.txt有三個文件塊的情況。
NN是Name Node,存儲文件塊放在哪兒等元信息。DN是Data Node,用來存放具體的文件塊。
分布式處理
有一類系統(tǒng)數據是分布式存儲,但是處理卻集中在一起。比如Mysql分庫分表存數據,然后在某個服務器上,挨個獲取所有庫所有表的數據進行處理,這種系統(tǒng)的本質還是“數據分發(fā)到計算邏輯側”,它的性能瓶頸就在于做數據處理的那臺服務器。
而分布式處理的核心觀念在于“把計算邏輯分發(fā)到數據側”,有兩大優(yōu)點:
計算邏輯分發(fā)明顯比數據分發(fā)節(jié)省網絡帶寬,而網絡帶寬是分布式系統(tǒng)中最寶貴的資源
計算邏輯在數據側執(zhí)行,消除了集中式處理中計算邏輯側的性能瓶頸
Spark + HDFS的運行架構如下:
Driver是程序開始運行的地方,也是總控,它把計算邏輯(閉包的實例)發(fā)送到有數據塊的Slave上去執(zhí)行,結果再收回去匯總。
是不是看出來了?
數據更多了,加機器唄,機器多了磁盤多,磁盤多了存的多。
跑的慢了,加機器唄,機器多了磁盤多,并行加載起來,數據吐吞量大。機器多了,內存CPU也多,并行處理起來,數據吞吐量大。
提示: 分布式處理系統(tǒng)會把計算邏輯分發(fā)到數據側,極大提高系統(tǒng)的水平擴展性。
WordCount運行機制
講了一堆理論知識,為了讓各位看官透徹理解,也為Spark程序算法優(yōu)化打下堅實的基礎,我們拿WordCount來舉例說明,順便說說負載均衡。
額。。。還沒看“動手寫WordCount”的兄弟姐妹們,建議先去看看。
數據位置感知
下面是WordCount的業(yè)務邏輯代碼:
val file = "hdfs://127.0.0.1:9000/file.txt"
val lines = sc.textFile(file)
val words = lines.flatMap(line => line.split("\s+"))
val wordCount = words.countByValue()
lines是Spark的RDD,它包含了在哪些機器上有file文件的塊,信息是從HDFS來的。每文件塊映射到RDD上就是一個分區(qū),對的,沒看錯。如果一個文件塊128MB,那么HDFS上一個1GB大小的文件就有8個文件塊,由這個文件創(chuàng)建的RDD就會有8個分區(qū)。
之前說了,在HDFS上每個文件塊默認會有3份,那RDD的分區(qū)選擇了那一份呢?對滴,根據負載選擇服務器負載最低的那一份。負載自動均衡了吧。
計算邏輯分發(fā)
有了這些信息,我們就知道把后續(xù)的計算邏輯該分發(fā)到哪兒去。
首先,我們得說清楚什么是計算邏輯,各位看官們想一下,類方法里面的代碼是如何運行的。充分必要條件:方法代碼 + 類實例(對象)的狀態(tài)。似成相識吧,程序 = 算法 + 數據。算法在代碼中,數據在對象的狀態(tài)中。
Spark要分發(fā)計算邏輯,也是分了兩部分。
第一部分是代碼。為什么spark-submit執(zhí)行一開始,總是一堆jar包被分發(fā),原因就在這兒。
第二部分是類實例。類在哪兒?作為RDD各API參數的閉包。
val words = lines.flatMap(line => line.split("\s+"))
flatMap的參數 **_.split("s+")** 是閉包,閉包是引用了外部自由變量的函數,在Scala中是由匿名類實現的。更多信息,請小伙伴們GFSOSO哈。
上面的一行代碼中,Spark要分發(fā)的實例就是 **_.split("s+")** 的實例。
val wordCount = words.countByValue()
實際上RDD的API countByValue 也有需要分發(fā)的閉包實例,只是都在Spark的源碼中,讓一碼給大家整理到明面上來哈。
val wordCount = words
.mapPartitions(convertWordsInPartitionToWordCountMap)
.reduce(mergeMaps)
前面我們提到了RDD的分區(qū),mapPartitions會方法中的邏輯放到RDD的每個分區(qū)上執(zhí)行,注意是遠程在Slave上執(zhí)行的哈。而reduce是在把每個分區(qū)的結果拿到Driver后,對結果進行兩兩合并,最終得到結果。
WordCount分布式運行原理
先仔細看圖,相信不用下面的解釋,各位看官也能看懂了。(上面的圖是張巨高清的圖,手機上看不清,建議轉發(fā)文章到郵箱,然后到電腦上看,看懂這張圖,就真的把WordCount分布式運行的機制搞懂了。)
對于WordCount而言,分布式在每個Slave的每個分區(qū)上,統(tǒng)計本分區(qū)內的單詞計數,生成一個Map,然后將它傳回給Driver,再由Driver兩兩合并來自各個分區(qū)的所有Map,形成最終的單詞計數。
今天我們不僅說清楚了WordCount背后的分布式運行機制,而且解釋了Spark的水平擴展能力,以及負載均衡。
下一篇將透過WordCount來看重中之重的容錯處理,這涉及到Spark的應用場景與RDD的設計來源,可以毫不夸張地說,這才是Spark的精髓。
提示匯總
分布式處理系統(tǒng)會把計算邏輯分發(fā)到數據側,極大提高系統(tǒng)的水平擴展性。