數(shù)據(jù)科學(xué)家們早已熟悉的R和Pandas等傳統(tǒng)數(shù)據(jù)分析框架 雖然提供了直觀易用的API,卻局限于單機(jī),無法覆蓋分布式大數(shù)據(jù)場景。在Spark 1.3.0以Spark SQL原有的SchemaRDD為藍(lán)本,引入了Spark DataFrame API,不僅為Scala、Python、Java三種語言環(huán)境提供了形如R和Pandas的API,而且自然而然地繼承了Spark SQL的分布式處理能力。此外,Spark 1.2.0中引入的外部數(shù)據(jù)源API也得到了進(jìn)一步的完善,集成了完整的數(shù)據(jù)寫入支持,從而補(bǔ)全了Spark SQL多數(shù)據(jù)源互操作的最后一塊拼圖。借小數(shù)據(jù)分析之力,撼大數(shù)據(jù)分析之巨石;四兩撥千斤,不亦樂乎!
圖1:飛速增長中的Spark
Spark SQL是Spark的核心組件之一,于2014年4月隨Spark 1.0版一同面世。上圖左側(cè)展示了自去年4月份Spark 1.0發(fā)布至今開源貢獻(xiàn)者數(shù)量的增長情況,基本上呈現(xiàn)了一個(gè)線性增長的態(tài)勢。右側(cè)所展示的每月PR數(shù)量的增長情況也同樣迅猛。值得一提的是,在Spark 1.3當(dāng)中,Spark SQL終于從alpha階段畢業(yè),除了部分developer API以外,所有的公共API都已經(jīng)穩(wěn)定,可以放心使用了。
作為Shark的繼任者,Spark SQL的主要功能之一便是訪問現(xiàn)存的Hive數(shù)據(jù)。在與Hive進(jìn)行集成的同時(shí),Spark SQL也提供了JDBC/ODBC接口。Tableau、Qlik等第三方工具可以通過該接口接入Spark SQL,借助Spark進(jìn)行數(shù)據(jù)處理。
然而,Spark SQL的應(yīng)用并不局限于SQL。實(shí)際上“Spark SQL”這個(gè)名字并不恰當(dāng)。根據(jù)Spark官方文檔的定義:Spark SQL是一個(gè)用于處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件——該定義強(qiáng)調(diào)的是“結(jié)構(gòu)化數(shù)據(jù)”,而非“SQL”。新近發(fā)布的Spark 1.3更加完整的表達(dá)了Spark SQL的愿景:讓開發(fā)者用更精簡的代碼處理盡量少的數(shù)據(jù),同時(shí)讓Spark SQL自動優(yōu)化執(zhí)行過程,以達(dá)到降低開發(fā)成本,提升數(shù)據(jù)分析執(zhí)行效率的目的。為此,我們在Spark 1.3中引入了與R和Python Pandas接口類似的DataFrame API,延續(xù)了傳統(tǒng)單機(jī)數(shù)據(jù)分析的開發(fā)體驗(yàn),并將之推廣到了分布式大數(shù)據(jù)場景。
DataFrame
與RDD類似,DataFrame也是一個(gè)分布式數(shù)據(jù)容器。然而DataFrame更像傳統(tǒng)數(shù)據(jù)庫的二維表格,除了數(shù)據(jù)以外,還掌握數(shù)據(jù)的結(jié)構(gòu)信息,即schema。同時(shí),與Hive類似,DataFrame也支持嵌套數(shù)據(jù)類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關(guān)系操作,比函數(shù)式的RDD API要更加友好,門檻更低。由于與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統(tǒng)單機(jī)數(shù)據(jù)分析的開發(fā)體驗(yàn)。
圖2:DataFrame和 RDD的區(qū)別
上圖直觀地體現(xiàn)了DataFrame和RDD的區(qū)別。左側(cè)的RDD[Person]雖然以Person為類型參數(shù),但Spark框架本身不了解Person`類的內(nèi)部結(jié)構(gòu)。而右側(cè)的DataFrame卻提供了詳細(xì)的結(jié)構(gòu)信息,使得Spark SQL可以清楚地知道該數(shù)據(jù)集中包含哪些列,每列的名稱和類型各是什么。了解了這些信息之后,Spark SQL的查詢優(yōu)化器就可以進(jìn)行針對性的優(yōu)化。舉一個(gè)不太恰當(dāng)?shù)睦樱渲械牟顒e有些類似于動態(tài)類型的Python與靜態(tài)類型的C++之間的區(qū)別。后者由于在編譯期有詳盡的類型信息,編譯期就可以編譯出更加有針對性、更加優(yōu)化的可執(zhí)行代碼。
外部數(shù)據(jù)源API
然而對于用戶來說,只有一個(gè)結(jié)構(gòu)化的數(shù)據(jù)抽象還是不夠的。數(shù)據(jù)往往會以各種各樣的格式存儲在各種各樣的系統(tǒng)之上,而用戶會希望方便地從不同的數(shù)據(jù)源獲取數(shù)據(jù),進(jìn)行混合處理,再將結(jié)果以特定的格式寫回?cái)?shù)據(jù)源或直接予以某種形式的展現(xiàn)。Spark 1.2引入的外部數(shù)據(jù)源API正是為了解決這一問題而產(chǎn)生的。Spark SQL外部數(shù)據(jù)源API的一大優(yōu)勢在于,可以將查詢中的各種信息下推至數(shù)據(jù)源處,從而充分利用數(shù)據(jù)源自身的優(yōu)化能力來完成列剪枝、過濾條件下推等優(yōu)化,實(shí)現(xiàn)減少IO、提高執(zhí)行效率的目的。自1.2發(fā)布以來,社區(qū)內(nèi)涌現(xiàn)出了多種多樣的外部數(shù)據(jù)源。下圖是Spark 1.3支持的各種數(shù)據(jù)源的一個(gè)概覽(左側(cè)是Spark SQL內(nèi)置支持的數(shù)據(jù)源,右側(cè)為社區(qū)開發(fā)者貢獻(xiàn)的數(shù)據(jù)源)。在外部數(shù)據(jù)源API的幫助下,DataFrame實(shí)際上成為了各種數(shù)據(jù)格式和存儲系統(tǒng)進(jìn)行數(shù)據(jù)交換的中間媒介:在Spark SQL內(nèi),來自各處的數(shù)據(jù)都被加載為DataFrame混合、統(tǒng)一成單一形態(tài),再以之基礎(chǔ)進(jìn)行數(shù)據(jù)分析和價(jià)值提取。
圖3:DataFrame支持 的各種外部數(shù)據(jù)源
Spark SQL助力大數(shù)據(jù)分析
精簡代碼
DataFrame帶來的最明 顯的優(yōu)點(diǎn)之一就是幫助用戶進(jìn)一步精簡代碼。下圖展示了分別用Hadoop MR、Python RDD API和Python DataFrame API實(shí)現(xiàn)同一業(yè)務(wù)邏輯的三段代碼片段。顯然Hadoop MR的代碼量最大,而且并不容易看明白業(yè)務(wù)邏輯到底是什么。Python RDD API的版本精簡了許多,但仍然不容易看出到底是在干什么。Python DataFrame API的版本相較Python RDD API的版本又更精進(jìn)了一步;更重要的是,凡是略懂SQL的人,都可以一眼看出它在做什么——可見,taFrame API不僅可以令代碼更加精簡,而且顯著提升了可讀性。Spark 1.3提供了Python、Scala、Java三種語言的DataFrame API binding,供用戶按需選用。
圖4:Hadoop MR、Python RDD API、Python DataFrame API代碼示例
除此以外,Spark SQL還針對大數(shù)據(jù)處理中的一些常見場景和模式提供了一些便利的工具,使得用戶在處理不同項(xiàng)目中重復(fù)出現(xiàn)的模式時(shí)可以避免編寫重復(fù)或高度類似的代碼:
JSON schema自動推導(dǎo)
JSON 是一種可讀性良好的重要結(jié)構(gòu)化數(shù)據(jù)格式,許多原始數(shù)據(jù)往往以JSON的形式存在。然而JSON數(shù)據(jù)的體積卻過于龐大,不利于批量數(shù)據(jù)分析。因此一個(gè)常見的數(shù)據(jù)處理步驟就是將JSON轉(zhuǎn)換為ORC、Parquet等高效的列式存儲格式。然而,不同版本的JSON數(shù)據(jù)往往具有不同的schema(例如新版本的Twitter API返回的數(shù)據(jù)可能比老版本的API返回的數(shù)據(jù)多出若干列)。人工合并整個(gè)JSON數(shù)據(jù)集所有記錄的schema是一件十分枯燥繁瑣的任務(wù)。Spark SQL在處理JSON數(shù)據(jù)時(shí)可以自動掃描整個(gè)數(shù)據(jù)集,得到所有記錄中出現(xiàn)的數(shù)據(jù)列的全集,推導(dǎo)出完整的schema。(對于同名但不同類型的列,Spark SQL會嘗試規(guī)約出一個(gè)公共類型。)
圖5:Spark對不規(guī)整JSON數(shù)據(jù)的處理
上圖展示了Spark SQL對三條不規(guī)整的個(gè)人信息JSON記錄進(jìn)行整理和schema推導(dǎo)的過程。第2條記錄跟第1條記錄類似,但多出了一個(gè)age字段,第3條與前兩條也很類似,但是身高字段的類型是double而不是int。對此,Spark SQL的JSON數(shù)據(jù)源作出的處理是,將出現(xiàn)的所有列都納入最終的schema中,對于名稱相同但類型不同的列,取所有類型的公共父類型(例如int和 double的公共父類型為double)。通過這樣的處理,我們最終就得到了右下方的DataFrame。
Hive風(fēng)格的分區(qū)表
Hive 的分區(qū)表可以認(rèn)為是一種簡易索引。分區(qū)表的每一個(gè)分區(qū)的每一個(gè)分區(qū)列都對應(yīng)于一級目錄,目錄以<列名>=<列值>的格式命名。Spark 1.3中的Parquet數(shù)據(jù)源實(shí)現(xiàn)了自動分區(qū)發(fā)現(xiàn)的功能:當(dāng)數(shù)據(jù)以Hive分區(qū)表的目錄結(jié)構(gòu)存在時(shí),無須Hive metastore中的元數(shù)據(jù),Spark SQL也可以自動將之識別為分區(qū)表。于是,在處理這張表時(shí),分區(qū)剪枝等分區(qū)特有的優(yōu)化也可以得以實(shí)施。
提升執(zhí)行效率
利用DataFrame API,不僅代碼可以更加精簡,更重要的是,執(zhí)行效率也可以得到提升。下圖對比了用Scala、Python的RDD API和DataFrame API實(shí)現(xiàn)的累加一千萬整數(shù)對的四段程序的性能對比??梢钥吹?,Python DataFrame API相對于Python RDD API的執(zhí)行效率有了近五倍的提升。這是因?yàn)樵贒ataFrame API實(shí)際上僅僅組裝了一段體積小巧的邏輯查詢計(jì)劃,Python端只需將查詢計(jì)劃發(fā)送到JVM端即可,計(jì)算任務(wù)的大頭都由JVM端負(fù)責(zé)。在使用 Python RDD API時(shí),Python VM和JVM之間需要進(jìn)行大量的跨進(jìn)程數(shù)據(jù)交換,從而拖慢了Python RDD API的速度。
值得注意的是,不僅Python API有了顯著的性能提升,即便是使用Scala,DataFrame API的版本也要比RDD API快一倍。上述示例的邏輯極為簡單,查詢優(yōu)化器的作用不明顯,那么為什么會有加速效果呢?RDD API是函數(shù)式的,強(qiáng)調(diào)不變性,在大部分場景下傾向于創(chuàng)建新對象而不是修改老對象。這一特點(diǎn)雖然帶來了干凈整潔的API,卻也使得Spark應(yīng)用程序在運(yùn)行期傾向于創(chuàng)建大量臨時(shí)對象,對GC造成壓力。在現(xiàn)有RDD API的基礎(chǔ)之上,我們固然可以利用mapPartitions方法來重載RDD單個(gè)分片內(nèi)的數(shù)據(jù)創(chuàng)建方式,用復(fù)用可變對象的方式來減小對象分配和GC的開銷,但這犧牲了代碼的可讀性,而且要求開發(fā)者對Spark運(yùn)行時(shí)機(jī)制有一定的了解,門檻較高。另一方面,Spark SQL在框架內(nèi)部已經(jīng)在各種可能的情況下盡量重用對象,這樣做雖然在內(nèi)部會打破了不變性,但在將數(shù)據(jù)返回給用戶時(shí),還會重新轉(zhuǎn)為不可變數(shù)據(jù)。利用 DataFrame API進(jìn)行開發(fā),可以免費(fèi)地享受到這些優(yōu)化效果。
減少數(shù)據(jù)讀取
分析大數(shù)據(jù),最快的方法就是 ——忽略它。這里的“忽略”并不是熟視無睹,而是根據(jù)查詢條件進(jìn)行恰當(dāng)?shù)募糁Α?/p>
上文討論分區(qū)表時(shí)提到的分區(qū)剪 枝便是其中一種——當(dāng)查詢的過濾條件中涉及到分區(qū)列時(shí),我們可以根據(jù)查詢條件剪掉肯定不包含目標(biāo)數(shù)據(jù)的分區(qū)目錄,從而減少IO。
對于一些“智能”數(shù)據(jù)格 式,Spark SQL還可以根據(jù)數(shù)據(jù)文件中附帶的統(tǒng)計(jì)信息來進(jìn)行剪枝。簡單來說,在這類數(shù)據(jù)格式中,數(shù)據(jù)是分段保存的,每段數(shù)據(jù)都帶有最大值、最小值、null值數(shù)量等 一些基本的統(tǒng)計(jì)信息。當(dāng)統(tǒng)計(jì)信息表名某一數(shù)據(jù)段肯定不包括符合查詢條件的目標(biāo)數(shù)據(jù)時(shí),該數(shù)據(jù)段就可以直接跳過(例如某整數(shù)列a某段的最大值為100,而查詢條件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優(yōu)勢,僅掃描查詢真正涉及的列,忽略其余列的數(shù)據(jù)。
查詢優(yōu)化
Spark SQL的第三個(gè)目標(biāo),就是讓查詢優(yōu)化器幫助我們優(yōu)化執(zhí)行效率,解放開發(fā)者的生產(chǎn)力,讓新手也可以寫出高效的程序。
圖6:Spark SQL查詢優(yōu)化引擎
DataFrame的背后是 Spark SQL的全套查詢優(yōu)化引擎,其整體架構(gòu)如上圖所示。通過SQL/HiveQl parser或是DataFrame API構(gòu)造的邏輯執(zhí)行計(jì)劃經(jīng)過analyzer的分析之后再經(jīng)優(yōu)化得到優(yōu)化執(zhí)行計(jì)劃,接著再轉(zhuǎn)為物理執(zhí)行計(jì)劃,并最終轉(zhuǎn)換為RDD DAG在Spark引擎上執(zhí)行。
圖7:人口數(shù)據(jù)分析示例
為了說明查詢優(yōu)化,我們來看上圖展示的人口數(shù)據(jù)分析的示例。圖中構(gòu)造了兩個(gè)DataFrame,將它們join之后又做了一次filter操作。如果原封不動地執(zhí)行這個(gè)執(zhí)行計(jì)劃,最終的執(zhí)行效率是不高的。因?yàn)閖oin是一個(gè)代價(jià)較大的操作,也可能會產(chǎn)生一個(gè)較大的數(shù)據(jù)集。如果我們能將filter下推到 join下方,先對DataFrame進(jìn)行過濾,再join過濾后的較小的結(jié)果集,便可以有效縮短執(zhí)行時(shí)間。而Spark SQL的查詢優(yōu)化器正是這樣做的。簡而言之,邏輯查詢計(jì)劃優(yōu)化就是一個(gè)利用基于關(guān)系代數(shù)的等價(jià)變換,將高成本的操作替換為低成本操作的過程。
得到的優(yōu)化執(zhí)行計(jì)劃在轉(zhuǎn)換成物 理執(zhí)行計(jì)劃的過程中,還可以根據(jù)具體的數(shù)據(jù)源的特性將過濾條件下推只數(shù)據(jù)源內(nèi)。最右側(cè)的物理執(zhí)行計(jì)劃中Filter之所以消失不見,就是因?yàn)槿苋肓擞糜趫?zhí)行最終的讀取操作的表掃描節(jié)點(diǎn)內(nèi)。
對于普通開發(fā)者而言,查詢優(yōu)化 器的意義在于,即便是經(jīng)驗(yàn)并不豐富的程序員寫出的次優(yōu)的查詢,也可以被盡量轉(zhuǎn)換為高效的形式予以執(zhí)行。
DataFrame As The New RDD
在Spark 1.3中,DataFrame已經(jīng)開始替代RDD成為新的數(shù)據(jù)共享抽象。以下的Spark ML示例搭建了一整套由切詞、詞頻計(jì)算、邏輯回歸等多個(gè)環(huán)節(jié)組成的機(jī)器學(xué)習(xí)流水線。該流水線的輸入、各環(huán)節(jié)間的數(shù)據(jù)交換,以及流水線的輸出結(jié)果,都是以 DataFrame來表示的。
圖8:機(jī)器學(xué)習(xí)流水線
相對于RDD,DataFrame有幾個(gè)特點(diǎn):
包含schema信息,能夠進(jìn)行針對性的優(yōu)化。
對用戶有更加 友好、更直觀的API。
與外部數(shù)據(jù)源 API緊密集成,可以用作多種存儲格式和存儲系統(tǒng)間的數(shù)據(jù)交換媒介。
作為一個(gè)比RDD更加高效的數(shù) 據(jù)共享抽象,DataFrame使得我們可以更加便捷地搭建一體化的大數(shù)據(jù)流水線。