今天主要是以一個(gè)數(shù)據(jù)分析者的角度來(lái)與大家分享如何使用spark進(jìn)行大數(shù)據(jù)分析。
我將分以下4部分為大家進(jìn)行介紹。首先介紹spark的相關(guān)背景,包括基本概念以及spark與hadoop的關(guān)系。接下來(lái)介紹如何使用spark RDD進(jìn)行數(shù)據(jù)分析。之后分享spark與大數(shù)據(jù)分析的關(guān)系,以及spark在大數(shù)據(jù)分析中所起到的作用。最后,為大家分享一下我與四位小伙伴基于去年的SODA開(kāi)放的交通數(shù)據(jù)做的案例:大型活動(dòng)大規(guī)模人群的檢測(cè)與疏散。
spark是一個(gè)快速易用的大規(guī)模數(shù)據(jù)計(jì)算框架,具有速度快、易使用、功能全的特點(diǎn),并且可以與Hadoop很好地集成。
那么我們什么時(shí)候需要使用spark呢?首先,當(dāng)我們需要處理的數(shù)據(jù)量超過(guò)了單機(jī)尺度(比如我們的計(jì)算機(jī)有4GB的內(nèi)存,而我們需要處理100GB以上的數(shù)據(jù))這時(shí)我們可以選擇spark集群進(jìn)行計(jì)算。有時(shí)我們可能需要處理的數(shù)據(jù)量并不大,但是計(jì)算很復(fù)雜,需要大量的時(shí)間,這時(shí)我們也可以選擇利用spark集群強(qiáng)大的計(jì)算資源,并行化地計(jì)算。
spark可以提供了豐富的數(shù)據(jù)處理操作,包括在線的流式數(shù)據(jù)處理、離線的批量數(shù)據(jù)處理、即席查詢、機(jī)器學(xué)習(xí)。
spark也提供了多種編程API接口,供具有不同開(kāi)發(fā)經(jīng)驗(yàn)的數(shù)據(jù)分析者使用。
spark與Hadoop是什么關(guān)系呢? Hadoop有兩個(gè)核心模塊,分布式存儲(chǔ)模塊HDFS和分布式計(jì)算模塊Mapreduce。spark本身并沒(méi)有提供分布式文件系統(tǒng),因此spark的分析大多依賴于Hadoop的分布式文件系統(tǒng)HDFS。另一方面,Hadoop的Mapreduce與spark都可以進(jìn)行數(shù)據(jù)計(jì)算,而相比于Mapreduce,spark的速度更快并且提供的功能更加豐富。
下面來(lái)介紹如何使用spark RDD進(jìn)行編程。
首先介紹一下spark RDD,spark建立在統(tǒng)一抽象的RDD之上,RDD指的是一個(gè)只讀的可分區(qū)的分布式數(shù)據(jù)集??梢詫⑺娜炕虿糠志彺嬖趦?nèi)存中,供多次計(jì)算重用。而且RDD提供了多種友好的操作函數(shù)供數(shù)據(jù)分析者做數(shù)據(jù)處理。
spark為什么會(huì)在迭代計(jì)算中比hadoop快很多呢?Hadoop進(jìn)行迭代數(shù)據(jù)處理時(shí),需要把數(shù)據(jù)從HDFS中讀出,分析,寫(xiě)回到HDFS中,再讀出、分析、寫(xiě)回。在此過(guò)程中進(jìn)行了大量的磁盤(pán)I/O操作,消耗了大量的時(shí)間。而spark可以將數(shù)據(jù)一次性地從HDFS讀到內(nèi)存中,并進(jìn)行多次計(jì)算,因而減少了大量的開(kāi)銷。
通過(guò)spark RDD進(jìn)行編程可以理解為利用RDD提供的算子、結(jié)合實(shí)際需求,設(shè)計(jì)一個(gè)數(shù)據(jù)處理的pipeline,將原始數(shù)據(jù)轉(zhuǎn)換成我們需要得到的數(shù)據(jù)。RDD算子分為transformation和action,transformation是得到一個(gè)新的RDD,并且不會(huì)執(zhí)行計(jì)算,直到遇到action算子的時(shí)候計(jì)算才會(huì)被觸發(fā)。
這是一些常用的spark RDD算子。
下面來(lái)介紹如何使用spark RDD進(jìn)行數(shù)據(jù)處理??偨Y(jié)起來(lái)可以分為以下三步:1.根據(jù)我們的目標(biāo)定義好輸入和輸出數(shù)據(jù)的格式,并比較兩者之間的差異;2.明確輸入輸出后我們根據(jù)RDD本身提供的算子以及自己定義的函數(shù)來(lái)設(shè)計(jì)pipeline;3.選擇一種API編程實(shí)現(xiàn)。
我們以詞頻統(tǒng)計(jì)為例進(jìn)行說(shuō)明。我們希望對(duì)一段非結(jié)構(gòu)化文本做詞頻統(tǒng)計(jì),即統(tǒng)計(jì)一段文本中每個(gè)單詞出現(xiàn)的次數(shù),并將單詞按照字母ASCII順序升序排列。首先定義好我們的輸入與輸出數(shù)據(jù)格式,輸入數(shù)據(jù)是一段介紹spark的文本,輸出是逗號(hào)分隔的詞頻統(tǒng)計(jì)。
第二步設(shè)計(jì)算子pipeline,首先將數(shù)據(jù)從HDFS中讀取,通過(guò)flatMap算子、map算子和reduceByKey算子統(tǒng)計(jì)出每個(gè)單詞出現(xiàn)的頻次,通過(guò)sortByKey算子將單詞升序排列,再通過(guò)一個(gè)map算子轉(zhuǎn)化成我們需要的目標(biāo)格式,最后通過(guò)save算子將處理好的結(jié)果寫(xiě)回到HDFS中。
這是我們進(jìn)行詞頻統(tǒng)計(jì)任務(wù)中使用的算子,包括4個(gè)transformation算子和一個(gè)action算子。
第三步我們來(lái)進(jìn)行編程實(shí)現(xiàn),在這里我們選擇python進(jìn)行編程。我們看到原本很復(fù)雜的pipeline,spark只需要短短的幾行代碼就可以實(shí)現(xiàn),可見(jiàn)spark的強(qiáng)大功能以及對(duì)數(shù)據(jù)分析者提供的友好接口。
下面和大家介紹spark與大數(shù)據(jù)分析的關(guān)系。
數(shù)據(jù)分析一般需要進(jìn)行兩次創(chuàng)造。首先是第一次創(chuàng)造,即從整體上進(jìn)行產(chǎn)品設(shè)計(jì),找到一個(gè)好的應(yīng)用問(wèn)題,并思考問(wèn)題是否有意義,數(shù)據(jù)源是否可靠,現(xiàn)有數(shù)據(jù)源可以解決該問(wèn)題嗎,是否需要其他數(shù)據(jù)源。在整體設(shè)計(jì)完成之后我們進(jìn)行第二次創(chuàng)造,即在細(xì)節(jié)上通過(guò)技術(shù)實(shí)現(xiàn),這個(gè)過(guò)程是一個(gè)不斷迭代往復(fù)的過(guò)程??偨Y(jié)起來(lái),數(shù)據(jù)分析,首先要找到正確的問(wèn)題,然后再正確地分析數(shù)據(jù)。當(dāng)然兩者并非完全獨(dú)立,比如對(duì)數(shù)據(jù)的基本統(tǒng)計(jì)往往會(huì)幫助我們不斷深入地理解數(shù)據(jù),進(jìn)而發(fā)現(xiàn)問(wèn)題。
下面介紹數(shù)據(jù)流與應(yīng)用問(wèn)題之間的關(guān)系,以及不同的數(shù)據(jù)分析工具在其中所起到的作用。在明確了應(yīng)用問(wèn)題,選擇好了數(shù)據(jù)源之后,我們首先將原始數(shù)據(jù)轉(zhuǎn)化為中間數(shù)據(jù)。原始數(shù)據(jù)往往量巨大(幾百GB、TB級(jí)別),并且多是未經(jīng)清洗的非結(jié)構(gòu)化數(shù)據(jù),因此我們需要用HDFS進(jìn)行存儲(chǔ),使用大數(shù)據(jù)分析工具spark進(jìn)行清洗壓縮編碼,得到結(jié)構(gòu)化的中間數(shù)據(jù),我們以后大部分的分析都可以基于中間數(shù)據(jù)進(jìn)行。中間數(shù)據(jù)往往會(huì)比原始數(shù)據(jù)量小(幾十GB),但單機(jī)仍然難以處理,因此也需要存儲(chǔ)到HDFS中,使用spark/Hive進(jìn)行進(jìn)一步的處理,得到小數(shù)據(jù)。小數(shù)據(jù)大多是一些統(tǒng)計(jì)結(jié)果、提取的特征等等,數(shù)據(jù)量也相對(duì)較小(幾MB至幾GB),我們可以通過(guò)python、R語(yǔ)言等工具在單機(jī)上進(jìn)行建模、分析,并將分析結(jié)果進(jìn)行可視化,可以選擇R語(yǔ)言、python繪制靜態(tài)的統(tǒng)計(jì)圖,也可以選擇echarts、D3等工具進(jìn)行交互展示。通過(guò)這些可視化的結(jié)果發(fā)現(xiàn)insight進(jìn)而解決實(shí)際問(wèn)題。
在大數(shù)據(jù)快速發(fā)展的今天,有多種多樣的大數(shù)據(jù)分析工具應(yīng)運(yùn)而生,我們?yōu)槭裁匆x擇spark作為我們的大數(shù)據(jù)分析工具?相比于其他分析工具,spark具有哪些優(yōu)勢(shì)?ETL、機(jī)器學(xué)習(xí)、即席查詢是大數(shù)據(jù)分析中非常重要的操作。已經(jīng)有了一些大數(shù)據(jù)工具為此提供了解決方案,例如hadoop mapreduce解決大數(shù)據(jù)ETL、mahout解決大數(shù)據(jù)機(jī)器學(xué)習(xí)、hive解決大數(shù)據(jù)即席查詢。然而這給數(shù)據(jù)分析者帶來(lái)了不便,對(duì)于每一種大數(shù)據(jù)操作,都要學(xué)習(xí)一種新的技術(shù),這帶來(lái)了很大的學(xué)習(xí)成本。
那么我們會(huì)設(shè)想,會(huì)不會(huì)有一種工具,將常用的大數(shù)據(jù)分析功能統(tǒng)一起來(lái)呢?
spark經(jīng)過(guò)近年來(lái)的飛速發(fā)展,已經(jīng)做到“one stack to rule them all”,通過(guò)RDD將三者統(tǒng)一在了一起。數(shù)據(jù)分析者可以通過(guò)spark core大數(shù)據(jù)ETL,通過(guò)spark Mllib進(jìn)行大數(shù)據(jù)機(jī)器學(xué)習(xí),通過(guò)spark SQL進(jìn)行大數(shù)據(jù)即席查詢。
因此,數(shù)據(jù)分析者只需掌握spark一種工具,即可實(shí)現(xiàn)絕大多數(shù)的大數(shù)據(jù)分析功能。
最后,我來(lái)與大家分享一下我與其他4位小伙伴(上海交通大學(xué)的張宏倫、李鐸、楊皓天,同濟(jì)大學(xué)的金建棟)使用去年SODA的開(kāi)放交通數(shù)據(jù)進(jìn)行案例分析的一些結(jié)果:大型活動(dòng)大規(guī)模人群的檢測(cè)與疏散。
上海經(jīng)常會(huì)舉辦大型活動(dòng),例如大型演唱會(huì)、足球賽等。這些大型活動(dòng)會(huì)聚集大量的人群,有時(shí)會(huì)因?yàn)槿藬?shù)過(guò)多產(chǎn)生安全隱患,例如2015年新年上海外灘的踩踏事件。這些活動(dòng)舉辦的時(shí)間地點(diǎn)不固定,也難以得知全部活動(dòng)的信息,如果活動(dòng)臨時(shí)更改時(shí)間地點(diǎn),也難以實(shí)時(shí)得到新的信息。
這給政府帶來(lái)了公共安全的隱患。對(duì)于參加活動(dòng)的人,在活動(dòng)結(jié)束時(shí),往往地鐵已經(jīng)停運(yùn),面對(duì)黑車的漫天要價(jià),會(huì)面臨回家難的問(wèn)題。而現(xiàn)在市場(chǎng)上已經(jīng)出現(xiàn)了一些專用巴士公司,他們希望尋找更多的客源創(chuàng)造更多的利潤(rùn)。然而三者之間聯(lián)系脆弱、信息孤立。
我們希望以開(kāi)放數(shù)據(jù)為基礎(chǔ),利用spark大數(shù)據(jù)分析技術(shù),使用算法模型,通過(guò)交通數(shù)據(jù)識(shí)別出大型活動(dòng)并提供疏散建議。為政府解決社會(huì)問(wèn)題,為活動(dòng)參加者解決回家難的問(wèn)題,同時(shí)為專用巴士公司提供更多客源,創(chuàng)造更多利潤(rùn)。
我們選取了公交卡刷卡數(shù)據(jù)、出租車運(yùn)行數(shù)據(jù)、地鐵運(yùn)行數(shù)據(jù)以及浦東公交車實(shí)時(shí)數(shù)據(jù)、氣象數(shù)據(jù)。其中使用最多的是一卡通乘客刷卡數(shù)據(jù),包含了2015年4月上海市的所有公交卡刷卡記錄,涵蓋用戶1000萬(wàn)以上,交易記錄2億4千萬(wàn)條以上。
如此多的數(shù)據(jù)量單機(jī)難以處理,因此我們選擇spark作為數(shù)據(jù)處理工具。這是我們的整體架構(gòu),首先根據(jù)我們的目標(biāo)進(jìn)行數(shù)據(jù)集擴(kuò)充,包括從非常票務(wù)網(wǎng)、大麥網(wǎng)等票務(wù)網(wǎng)站爬取的各大活動(dòng)的信息。之后進(jìn)行數(shù)據(jù)預(yù)處理工作,包括數(shù)據(jù)去噪、數(shù)據(jù)融合等。之后進(jìn)行數(shù)據(jù)分析挖掘,包括時(shí)序分析、空間挖掘、個(gè)體行為建模等,并將分析結(jié)果可視化。
我們對(duì)多種交通工具的每天出行時(shí)間分布進(jìn)行了統(tǒng)計(jì),可以看到地鐵和公交車有著明顯的早晚高峰,而出租車除了午夜時(shí)間一天的乘客數(shù)量較為平均。
我們對(duì)每天的交通總流量進(jìn)行了分析,發(fā)現(xiàn)交通流量穩(wěn)定,并以周為單位呈周期規(guī)律,而工作日的總流量要高于休息日。觀察一周的總流量,周一到周四的交通流量基本相同,周五流量要略高于周一至周四,而周六流量要低于工作日,周日的流量為一周最低。
在分析完每天的交通總流量之后,我們分析了一天中各個(gè)時(shí)段的流量。選取了周一到周四工作日中的2天(一個(gè)晴天、一個(gè)雨天),工作日周五和工作日周六。我們發(fā)現(xiàn)兩個(gè)周一到周四工作日的流量曲線幾乎重合,因此我們可以推測(cè),周一到周四的工作日不僅總流量穩(wěn)定,而且各個(gè)時(shí)段的交通總流量穩(wěn)定,且早晚高峰顯著。而觀察周五的流量,我們發(fā)現(xiàn)在大約10:00之前,流量曲線幾乎與周一到周四的流量重合,而10:00以后幾乎每個(gè)時(shí)段流量都會(huì)比平時(shí)高出一些,這解釋了為什么周五的總流量會(huì)高于周一到周四。而周六的流量沒(méi)有早晚高峰,但在空閑時(shí)段(如中午)流量要高于工作日。
我們分析了一個(gè)月以來(lái)地鐵乘客的公交卡刷卡次數(shù)分布。上海乘坐一次地鐵,進(jìn)出需要刷卡2次,因此正常情況下,乘客的刷卡次數(shù)一定是偶數(shù)。從分布圖中我們也可以觀察到這一點(diǎn),然而我們也發(fā)現(xiàn)也有一些乘客的刷卡次數(shù)呈奇數(shù),這可能是設(shè)備故障或乘客逃票行為導(dǎo)致。另外,一個(gè)月來(lái)乘客的交易次數(shù)呈重尾分布,而且一個(gè)月中出行2次的乘客最多。
在分析了宏觀上的流量之后,我們來(lái)分析個(gè)體的行為。我們用模序(motif)來(lái)對(duì)個(gè)體的行為進(jìn)行抽象,即用有向圖表示用戶一天的軌跡。比如第二幅圖中,乘客一天中先從站點(diǎn)1出發(fā)去2,再?gòu)恼军c(diǎn)2出發(fā)返回1,這是典型的通勤行為。我們發(fā)現(xiàn)乘客絕大多數(shù)的行為可以使用以上10種模序描述,因此絕大多數(shù)的乘客行為是規(guī)律的。我們也關(guān)注模序的變化,因?yàn)槟P虻淖兓凳局袨榈漠惓?。比如某天大量用戶的模序發(fā)生變化且都去一個(gè)共同的地點(diǎn),那么他們很可能去參加同一場(chǎng)大型活動(dòng)。
下面我們研究大型活動(dòng)與交通流量的關(guān)系。這是中華藝術(shù)宮地鐵站幾天的客流量。平時(shí)情況下,客流量較少。4.18號(hào)晚中華藝術(shù)宮附近場(chǎng)館舉辦了一場(chǎng)演唱會(huì),可以看到這一天在活動(dòng)開(kāi)始前與結(jié)束后客流量大大增加,遠(yuǎn)高于平時(shí),且出現(xiàn)了兩個(gè)尖峰。因此大型活動(dòng)確實(shí)對(duì)交通流量造成了較為顯著的影響,我們通過(guò)交通數(shù)據(jù)來(lái)識(shí)別大型活動(dòng)是可行的。
下面是我們使用spark技術(shù),通過(guò)模型做出的大型活動(dòng)識(shí)別結(jié)果,做圖顏色表示地鐵,例如藍(lán)色代表8號(hào)線,小長(zhǎng)方形表示地鐵站點(diǎn)。右圖表示一個(gè)月中哪一天算法檢測(cè)出了大型活動(dòng),白色表示沒(méi)有檢測(cè)到,紅色表示檢測(cè)到。右側(cè)兩條曲線分別表示當(dāng)日的客流量與歷史平均的客流量。
最后,我們基于虹口足球場(chǎng)4月11日晚(一場(chǎng)足球賽)的交通數(shù)據(jù)進(jìn)行了控制性模擬實(shí)驗(yàn)。我們發(fā)現(xiàn),在未采取控制前,需要?dú)v史很長(zhǎng)時(shí)間才可以完成疏散,而當(dāng)使用巴士協(xié)助疏散之后,所花時(shí)間大大減少,這也降低了風(fēng)險(xiǎn)。同時(shí),我們發(fā)現(xiàn)調(diào)配巴士數(shù)量越多、載客量越大疏散越快,但也有可能造成巴士資源浪費(fèi)、造成損失,因此存在使得盈利最大和疏散最快的最優(yōu)點(diǎn),可以通過(guò)最優(yōu)化模型得到。至此,我們以開(kāi)放數(shù)據(jù)為基礎(chǔ),利用spark大數(shù)據(jù)技術(shù)和算法模型,對(duì)乘客解決了活動(dòng)結(jié)束回家難問(wèn)題,對(duì)專用巴士提供了更多客源增加其收益,同時(shí)幫助政府減少了公共安全風(fēng)險(xiǎn)。
最后從一個(gè)數(shù)據(jù)分析者的角度,總結(jié)一下個(gè)人對(duì)數(shù)據(jù)分析的理解。我們首先要根據(jù)實(shí)際需求找到應(yīng)用問(wèn)題,數(shù)據(jù)是我們的研究基礎(chǔ),spark/hadoop等技術(shù)是我們的分析工具,算法模型是我們的理論方法,而數(shù)據(jù)可視化是一種呈現(xiàn)信息的手段。
作者: 科賽網(wǎng) 汪夢(mèng)夢(mèng) 鄧以勒