基于Spark的公安大數(shù)據(jù)實(shí)時(shí)運(yùn)維技術(shù)實(shí)踐

責(zé)任編輯:editor005

2017-01-04 14:18:17

摘自:網(wǎng)絡(luò)大數(shù)據(jù)

數(shù)據(jù)傳輸層:采用高吞吐的分布式消息隊(duì)列Kafka集群,保證匯聚的日志、消息的可靠傳輸;數(shù)據(jù)存儲(chǔ)層:實(shí)時(shí)數(shù)據(jù)存入MySQL中便于實(shí)時(shí)的業(yè)務(wù)應(yīng)用和展示;全量數(shù)據(jù)存入ES以及HBase中便于后續(xù)的檢索分析;

公安行業(yè)存在數(shù)以萬(wàn)計(jì)的前后端設(shè)備,前端設(shè)備包括相機(jī)、檢測(cè)器及感應(yīng)器,后端設(shè)備包括各級(jí)中心機(jī)房中的服務(wù)器、應(yīng)用服務(wù)器、網(wǎng)絡(luò)設(shè)備及機(jī)房動(dòng)力系統(tǒng),數(shù)量巨大、種類繁多的設(shè)備給公安內(nèi)部運(yùn)維管理帶來(lái)了巨大挑戰(zhàn)。傳統(tǒng)通過(guò)ICMP/SNMP、Trap/Syslog等工具對(duì)設(shè)備進(jìn)行診斷分析的方式已不能滿足實(shí)際要求,由于公安內(nèi)部運(yùn)維管理的特殊性,現(xiàn)行通過(guò)ELK等架構(gòu)的方式同樣也滿足不了需要。為尋求合理的方案,我們將目光轉(zhuǎn)向開(kāi)源架構(gòu),構(gòu)建了一套適用于公安行業(yè)的實(shí)時(shí)運(yùn)維管理平臺(tái)。

實(shí)時(shí)運(yùn)維平臺(tái)整體架構(gòu)

數(shù)據(jù)采集層:Logstash+Flume,負(fù)責(zé)在不同場(chǎng)景下收集、過(guò)濾各類前后端硬件設(shè)備輸出的Snmp Trap、Syslog日志信息以及應(yīng)用服務(wù)器自身產(chǎn)生的系統(tǒng)和業(yè)務(wù)日志;

數(shù)據(jù)傳輸層:采用高吞吐的分布式消息隊(duì)列Kafka集群,保證匯聚的日志、消息的可靠傳輸;

數(shù)據(jù)處理層:由Spark實(shí)時(shí)Pull Kafka數(shù)據(jù),通過(guò)Spark Streaming以及RDD操作進(jìn)行數(shù)據(jù)流的處理以及邏輯分析;

數(shù)據(jù)存儲(chǔ)層:實(shí)時(shí)數(shù)據(jù)存入MySQL中便于實(shí)時(shí)的業(yè)務(wù)應(yīng)用和展示;全量數(shù)據(jù)存入ES以及HBase中便于后續(xù)的檢索分析;

業(yè)務(wù)服務(wù)層:基于存儲(chǔ)層,后續(xù)的整體業(yè)務(wù)應(yīng)用涵蓋了APM、網(wǎng)絡(luò)監(jiān)控、拓?fù)洹⒏婢?、工單、CMDB等。

整體系統(tǒng)涉及的主要開(kāi)源框架情況如下:

另外,整體環(huán)境基于JDK 8以及Scala 2.10.4。公安系統(tǒng)設(shè)備種類繁多,接下來(lái)將以交換機(jī)Syslog日志為例,詳細(xì)介紹日志處理分析的整體流程。

  圖1 公安實(shí)時(shí)運(yùn)維平臺(tái)整體架構(gòu)

Flume+Logstash日志收集

Flume是Cloudera貢獻(xiàn)的一個(gè)分布式、可靠及高可用的海量日志采集系統(tǒng),支持定制各類Source(數(shù)據(jù)源)用于數(shù)據(jù)收集,同時(shí)提供對(duì)數(shù)據(jù)的簡(jiǎn)單處理以及通過(guò)緩存寫入Sink(數(shù)據(jù)接收端)的能力。

Flume中,Source、Channel及Sink的配置如下:

該配置通過(guò)syslog source配置localhost tcp 5140端口來(lái)接收網(wǎng)絡(luò)設(shè)備發(fā)送的Syslog信息,event緩存在內(nèi)存中,再通過(guò)KafkaSink將日志發(fā)送到kafka集群中名為“syslog-kafka”的topic中。

Logstash來(lái)自Elastic公司,專為收集、分析和傳輸各類日志、事件以及非結(jié)構(gòu)化的數(shù)據(jù)所設(shè)計(jì)。它有三個(gè)主要功能:事件輸入(Input)、事件過(guò)濾器(Filter)以及事件輸出(Output),在后綴為.conf的配置文件中設(shè)置,本例中Syslog配置如下:

Input(輸入)插件用于指定各種數(shù)據(jù)源,本例中的Logstash通過(guò)udp 514端口接收Syslog信息;

Filter(過(guò)濾器)插件雖然在本例中不需要配置,但它的功能非常強(qiáng)大,可以進(jìn)行復(fù)雜的邏輯處理,包括正則表達(dá)式處理、編解碼、k/v切分以及各種數(shù)值、時(shí)間等數(shù)據(jù)處理,具體可根據(jù)實(shí)際場(chǎng)景設(shè)置;

Output(輸出)插件用于將處理后的事件數(shù)據(jù)發(fā)送到指定目的地,指定了Kafka的位置、topic以及壓縮類型。在最后的Codec編碼插件中,指定來(lái)源主機(jī)的IP地址(host)、Logstash處理的時(shí)間戳(@timestamp)作為前綴并整合原始的事件消息(message),方便在事件傳輸過(guò)程中判斷Syslog信息來(lái)源。單條原始Syslog信息流樣例如下:

147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

Logstash Output插件處理后的信息流變成為:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

其中紅色字段就是codec編碼插件植入的host以及timestamp信息。處理后的Syslog信息會(huì)發(fā)送至Kafka集群中進(jìn)行消息的緩存。

Kafka日志緩沖

Kafka是一個(gè)高吞吐的分布式消息隊(duì)列,也是一個(gè)訂閱/發(fā)布系統(tǒng)。Kafka集群中每個(gè)節(jié)點(diǎn)都有一個(gè)被稱為broker的實(shí)例,負(fù)責(zé)緩存數(shù)據(jù)。Kafka有兩類客戶端,Producer(消息生產(chǎn)者的)和Consumer(消息消費(fèi)者)。Kafka中不同業(yè)務(wù)系統(tǒng)的消息可通過(guò)topic進(jìn)行區(qū)分,每個(gè)消息都會(huì)被分區(qū),用以分擔(dān)消息讀寫負(fù)載,每個(gè)分區(qū)又可以有多個(gè)副本來(lái)防止數(shù)據(jù)丟失。消費(fèi)者在具體消費(fèi)某個(gè)topic消息時(shí),指定起始偏移量。Kafka通過(guò)Zero-Copy、Exactly Once等技術(shù)語(yǔ)義保證了消息傳輸?shù)膶?shí)時(shí)、高效、可靠以及容錯(cuò)性。

Kafka集群中某個(gè)broker的配置文件server.properties的部分配置如下:

其中需指定集群里不同broker的id,此臺(tái)broker的id為1,默認(rèn)監(jiān)聽(tīng)9092端口,然后配置Zookeeper(后續(xù)簡(jiǎn)稱zk)集群,再啟動(dòng)broker即可。

Kafka集群名為syslog-kafka的topic:

Kafka集群的topic以及partition等信息也可以通過(guò)登錄zk來(lái)觀察。然后再通過(guò)下列命令查看Kafka接收到的所有交換機(jī)日志信息:

  部分日志樣例如下:

  Spark日志處理邏輯

Spark是一個(gè)為大規(guī)模數(shù)據(jù)處理而生的快速、通用的引擎,在速度、效率及通用性上表現(xiàn)極為優(yōu)異。

在Spark主程序中,通過(guò)Scala的正則表達(dá)式解析Kafka Source中名為“syslog-kafka” 的topic中的所有Syslog信息,再將解析后的有效字段封裝為結(jié)果對(duì)象,最后通過(guò)MyBatis近實(shí)時(shí)地寫入MySQL中,供前端應(yīng)用進(jìn)行實(shí)時(shí)地可視化展示。另外,全量數(shù)據(jù)存儲(chǔ)進(jìn)入HBase及ES中,為后續(xù)海量日志的檢索分析及其它更高級(jí)的應(yīng)用提供支持。主程序示例代碼如下:

  整體的處理分析主要分為4步:

初始化SparkContext并指定Application的參數(shù);

創(chuàng)建基于Kafka topic “syslog-kafka” 的DirectStream;

將獲取的每一行數(shù)據(jù)映射為Syslog對(duì)象,調(diào)用Service進(jìn)行對(duì)象封裝并返回;

遍歷RDD,記錄不為空時(shí)保存或者更新Syslog信息到MySQL中。

Syslog POJO的部分基本屬性如下:

SwSyslog實(shí)體中的基本屬性對(duì)應(yīng)Syslog中的接口信息,注解中的name對(duì)應(yīng)MySQL中的表sw_syslog 以及各個(gè)字段,MyBatis完成成員屬性和數(shù)據(jù)庫(kù)結(jié)構(gòu)的ORM(對(duì)象關(guān)系映射)。

程序中的SwSyslogService有兩個(gè)主要功能:

encapsulateSwSyslog()將Spark處理后的每一行Syslog通過(guò)Scala的正則表達(dá)式解析為不同的字段,然后封裝并返回Syslog對(duì)象;遍歷RDD分區(qū)生成的每一個(gè)Syslog對(duì)象中都有ip以及接口信息,saveSwSyslog()會(huì)據(jù)此判斷該插入還是更新Syslog信息至數(shù)據(jù)庫(kù)。另外,封裝好的Syslog對(duì)象通過(guò)ORM工具M(jìn)yBatis與MySQL進(jìn)行互操作。

獲取到的每一行Syslog信息如之前所述:

這段信息需解析為設(shè)備ip、服務(wù)器時(shí)間、信息序號(hào)、設(shè)備時(shí)間、Syslog類型、屬性、設(shè)備接口、接口狀態(tài)等字段。Scala正則解析邏輯如下:

通過(guò)正則過(guò)濾、Syslog封裝以及MyBatis持久層映射,Syslog接口狀態(tài)信息最終解析如下:

  最后,諸如APM、網(wǎng)絡(luò)監(jiān)控或者告警等業(yè)務(wù)應(yīng)用便可以基于MySQL做可視化展示。

總結(jié)

本文首先對(duì)公安運(yùn)維管理現(xiàn)狀做了簡(jiǎn)要介紹,然后介紹公安實(shí)時(shí)運(yùn)維平臺(tái)的整體架構(gòu),再以交換機(jī)Syslog信息為例,詳細(xì)介紹如何使用Flume+Logstash+Kafka+Spark Streaming進(jìn)行實(shí)時(shí)日志處理分析,對(duì)處理過(guò)程中大量的技術(shù)細(xì)節(jié)進(jìn)行了描述并通過(guò)代碼詳細(xì)地介紹整體處理步驟。本文中的示例實(shí)時(shí)地將數(shù)據(jù)寫入MySQL存在一定的性能瓶頸,后期會(huì)對(duì)包含本例的相關(guān)代碼重構(gòu),數(shù)據(jù)將會(huì)實(shí)時(shí)寫入HBase來(lái)提高性能。

鏈接已復(fù)制,快去分享吧

企業(yè)網(wǎng)版權(quán)所有?2010-2024 京ICP備09108050號(hào)-6京公網(wǎng)安備 11010502049343號(hào)