Flink 在 58 同城的應用與實踐

本文整理自 58 同城實時計算平臺負責人馮海濤在 Flink Forward Asia 2020 分享的議題《Flink 在 58 同城應用與實踐》,內容包括:

1。實時計算平臺架構

2。實時 SQL 建設

3。Storm 遷移 Flink 實踐

4。一站式實時計算平臺

5。後續規劃

一、實時計算平臺架構

實時計算平臺的定位是為 58 集團海量資料提供高效、穩定的實時計算一站式服務。一站式服務主要分為三個方向:

第一個方向是實時資料儲存,主要負責為線上業務接入提供高速度的實時儲存能力。

第二是實時資料計算,主要為海量資料的處理提供分散式計算框架。

第三是實時資料分發,主要負責將計算後的資料分發到後續的實時儲存,供上層應用。

Flink 在 58 同城的應用與實踐

平臺建設主要分為兩個部分:

第一部分是基礎能力建設,目前主要包括 Kafka 叢集、storm 叢集、 Flink 叢集、SparkStreaming 叢集。

另一部分是平臺化建設,主要是包括兩點:第一個是資料分發,我們的資料分發是基於 Kafka Connect 打造的一個平臺,目標是實現異構資料來源的整合與分發。在實際使用資料場景過程中,經常需要將不同的資料來源匯聚到一起進行計算分析。傳統方式可能需要針對不同的儲存採用不同的資料同步方案。我們的資料分發是透過提供一套完整的架構,實現不同資料來源的整合和分發。第二個是我們基於 Flink 打造的一站式實時計算平臺,後文會有詳細的介紹。

Flink 在 58 同城的應用與實踐

上圖是我們的實時計算平臺的架構。

在實時資料接入這部分,我們採用的是 Kafka,binlog 提供 canal 和 debezium 兩種方式進行接入。

在業務日誌這部分,我們主要採用 flume 進行線上業務的 log 的採集。

在實時計算引擎這部分,根據開源社群發展以及使用者的需求,從最早的 Storm 到後來引入 SparkStreaming,以及現在主流的 Flink。

在實時儲存這部分,為了滿足多元化的實時需求,我們支援 Kafka、Druid、Hbase、ES、ClickHouse。

同時在計算架構之上,我們建設了一些管理平臺,比如叢集管理,它主要負責叢集的擴容,穩定性的管理。

另一個是 Nightfury,主要負責叢集治理,包括資料接入、許可權治理、資源管理等等。

我們在業務發展過程中,引入了 Flink 計算框架。首先從業務來說,58 是一個一站式生活服務平臺,包含很多業務線。隨著業務的發展,資料量越來越大,場景越來越豐富,需要一個更加強大的計算框架來滿足使用者的需求。

第一個場景是實時 ETL,主要是針對原始日誌進行資訊轉化,結構化處理,運用於後續計算,需要高吞吐低延遲的計算能力。

第二塊是實時數倉,它作為離線數倉的一個補充,主要是提升一些實時指標的時效性。第三種場景是實時監控,它需要比較靈活的時間視窗支援。

最後一種場景是實時資料流分析,比如說,資料亂序的處理、中間狀態的管理、Exactly once 語義保障。

我們前期基於 Storm 和 SparkStreaming 構建的計算叢集在很大程度上並不能滿足這些場景需求。於是對 Flink 進行了調研,發現 Flink 不論是在計算效能,還是流資料特性支援上,都體現出了非常大的優勢。因此,我們決定採用 Flink 作為主流的計算框架。

Flink 在 58 同城的應用與實踐

上圖是我們 Flink 叢集的建設情況。Flink 作為實時計算框架,經常需要 7×24 小時的可用性。我們在建設底層叢集的時候,需要考慮高可用的架構。

首先在部署模式上,主要是採用 Flink On YARN,實現叢集的高可用。

在底層的 HDFS 上,採用 HDFS federation 機制,既可以避免離線叢集的抖動對實時這邊造成影響,同時也減少了維護的 HDFS 數量。

在叢集隔離上,主要是採用 Node Labe 機制,就可以實現把重要業務執行在一些指定節點上。同時在這個基礎之上,引入了 Cgroup,對 CPU 進行隔離,避免任務間的 CPU 搶佔。

在管理層面,不同的業務提交到不同的佇列進行管理,避免業務間的資源搶佔。

在計算場景上,根據不同的計算場景,比如說計算型、IO 型,會提交到不同的節點,從而提升整個叢集的資源利用率。

Flink 計算框架在 58 經歷了大概兩年多的發展。目前我們的叢集有 900 多臺機器,2000 多個實時任務,每天處理大概 2。5 萬億的實時資料,資料量峰值達到了 3000 萬每秒。

二、實時 SQL 建設

1。 實時 SQL 演進

SQL 程式設計具有低門檻、自動最佳化、版本統一等特點。同時 Flink SQL 作為實時數倉的主要工具,是我們在建設 Flink 平臺時考慮的一個主要方向。

我們最早上線的 Flink 是基於 1。6 版本的,當時這個版本只支援 DML,我們在當時的版本基礎上進行了一些擴充套件,主要是在 DDL 語法上的擴充套件支援。在使用者使用層面,為了簡化 DDL 的定義,也透過一個配置化的方式來實現自動生成 DDL。在開發的時候,提供視覺化開發的功能和線上除錯的功能。

隨著社群的開源,我們將 Flink SQL 切換到了社群版本,之後也升級相關的版本,以及合併比較多的社群版本特性,比如說 Blink 相關、批流合一、對 Hive 的支援。

最後針對 Flink SQL 這塊的實時數倉,也做了一些數倉化的工作,主要包括元資料管理、血緣關係、數倉分層、許可權管理等等。

Flink 在 58 同城的應用與實踐

2。 儲存擴充套件

關於儲存擴充套件這一塊,最開始我們是基於 Flink 自己實現的一套 DDL。隨著社群開源,切換到社群的 Flink SQL 版本,然後在上面做了一些擴充套件,主要有幾個方面:

第一,打通了主流儲存和內部的實時儲存。比如說,在源表上支援了內部的 wmb,它是一個分散式訊息佇列。在維表上支援這種 redis,內部的 wtable。在結果表上支援了 ClickHouse,redis,以及我們內部的 wtable;

第二,定製 format 支援。因為在實際業務中,很多資料格式並不是標準的,沒法透過 DDL 來定義一個表。我們提供了一種通用的方式,可以採用一個欄位來代表一條日誌,讓使用者可以透過 udf 去自定義,並解析一條日誌。

最後,在 source 和 sink DDL 定義基礎上,增加了併發度的設定。這樣使用者就可以更靈活地控制任務的併發。

Flink 在 58 同城的應用與實踐

3。 效能最佳化

關於效能最佳化,主要是兩方面:

第一個是對 Blink 特性的引進,Blink 提供了大量的特性,比如透過 mini batch 的處理方式,提高任務的吞吐。透過 local global 兩階段聚合,緩解資料熱點問題。還有透過 emit,增強視窗的功能。把這些功能整合到我們的計算平臺,使用者透過一些按鈕可以直接開啟。

另一個是對非同步 lO 的應用。在實時數倉化建設過程中,維表之間的關聯是比較大的應用場景,經常因為維表的效能導致整個任務的吞吐不高。因此我們增加了一個非同步 IO 的機制,主要有兩種實現:一種針對目標儲存支援非同步 client,直接基於非同步 client 來實現。比如 MySQL 和 redis。另一種不支援非同步 client 的,我們就藉助現成的機制來模擬,同時在這個基礎之上增加了一套快取的機制,避免所有的資料直接查詢到目標儲存,減少目標儲存的壓力。同時在快取基礎上,也增加 LRU 機制,更加靈活的控制整個快取。同樣,資料寫入這一塊遇到大併發量寫入的時候,儘量提高併發來解決寫入性的問題,這樣就會導致整個任務的 CPU 利用率比較低,所以就採用單併發度多執行緒的寫入機制,它的實現是在 sink 運算元裡面增加一個 buffer,資料流入到 sink 之後會首先寫入到 buffer,然後會啟動多執行緒機制去消費這個 buffer,最終寫到儲存裡面。

Flink 在 58 同城的應用與實踐

4。 數倉化建設

實時數倉作為 Flink 的一個比較典型的應用場景,相較於離線數倉它可能存在一些平臺化不完善的方面:

首先,元資料管理功能不完善。

然後,Flink SQL 這一塊,對於每個任務我們都可能需要重新定義一個數據表。並且由於資料沒有分層的概念,導致任務比較獨立,煙囪式開發,資料和資源使用率比較低下。

另外,也缺乏資料血緣資訊。

為了提升實時數倉建設的效率,我們提供了面向數倉化實時 SQL 能力,在數倉設計,任務開發,平臺化管理方面全面對齊離線數倉的建設模式。

Flink 在 58 同城的應用與實踐

4。1 數倉化

數倉化主要是參考離線數倉的模型,對我們實時數倉這一塊進行模型建設。

比如說,最原始的資料會進入ODS 層,經過一些清洗落入到行為明細層,之後會拆分到具體的主題明細層,然後再將一些相關的維表資訊進行計算,再到彙總層,最終提供給最上層的應用,包括一些實時報表,Ad-hoc 查詢等。

Flink 在 58 同城的應用與實踐

4。2 數倉平臺

實時數倉目前主要還是基於這種 Lambda 架構來進行平臺化的建設。

首先,在元資料管理這一塊,Flink 預設採用記憶體對元資料進行管理,我們就採用了 HiveCatalog 機制對庫表進行持久化。

同時我們在資料庫的許可權管理上,藉助 Hive ACL 來進行許可權管理。

有了元資料持久化之後,就可以提供全域性的元資料檢索。

同時任務模式就可以由傳統的 DDL+DML 簡化為 DML。

最後,我們也做了血緣關係,主要是在 Flink SQL 提交過程中,自動發現 SQL 任務血緣依賴關係。

Flink 在 58 同城的應用與實踐

三、Storm 遷移 Flink 實踐

1。 Flink 與 Storm 對比

Flink 相對於 Storm 來說,有比較多的優勢。

在資料保障上,Flink 支援 Exactly once 語義,在吞吐量、資源管理、狀態管理,使用者越來越多的基於 Flink 進行開發。

而 Storm 對使用者來說,程式設計模型簡單,開發成本高,流式計算特性缺乏,吞吐低無法滿足效能。在平臺側,獨立叢集多、運維困難、任務缺少平臺化管理、使用者體驗差。

因此我們決定遷移到 Flink。

Flink 在 58 同城的應用與實踐

2。 Flink-Storm 工具

在 Storm 遷移到 Flink 的時候,如果讓使用者重新基於 Flink 進行邏輯開發,可能需要比較大的工作量。因此我們對 Flink 進行了調研,發現有個 Flink-Storm 工具。它實現了將 Storm Topology 轉到 Flink Topology。比如說,把 spout 轉換到 Flink 的 source function,把 bolt 轉換到 Transform 和 sink function。

在使用的過程中我們也發現一些問題,Flink-Storm 工具無法支援 Yarn 模式, 缺少 Storm 引擎功能,最後還有一個比較大的問題,我們的 storm 在發展過程中維護了很多版本,但是 Flink-Storm 工具只支援基於一個版本進行開發。於是,我們做了一些改進。

Flink 在 58 同城的應用與實踐

3。 對 Flink-Storm 的改進

3。1 訊息保障

Storm 有三個特點:

第一,ack 機制;

第二,依賴 zookeeper;

第三,at least once 語義保障。

我們做了四點改進:

第一,Flink-Storm 去掉 ack 支援;

第二,KafkaSpout 實現 CheckpointListener;

第三,KafkaSpout 實現 CheckpointedFunction;

第四,Flink-Storm 開啟 checkpoint。

Flink 在 58 同城的應用與實踐

3。2 對 Storm 定時器的支援

在早期版本里面其實是沒有視窗機制的,我們藉助 Storm 定時機制來實現視窗計算。它的機制是這樣的,Storm 引擎會定時向 bolt 裡面傳送一個系統訊號,使用者就可以透過這個系統訊號進行一個切分,模擬視窗操作。

同樣,Flink 也沒有這樣一個定時器的機制,於是我們就考慮從 Flink-Storm 層面來實現,改造了 BoltWrapper 類,它作為 bolt 類的一個封裝,實現機制跟 bolt 是一樣的,包括 5 點:

初始化 open 方式啟動非同步執行緒。

模擬構造 tick 的 StreamRecord;

呼叫 processeElement 函式傳送 tuple;

頻率由外部引數全域性控制;

close 中關閉執行緒。

Flink 在 58 同城的應用與實踐

3。3 Storm on Yarn

Storm on yarn 並不是直接提交到 YARN 叢集,它只是提交到 local 或者 stand alone 的模式。Flink on yarn 主要是提供了 ClusterClient 這樣一個代理,實現方式有三個步驟:

初始化 YarnClusterConfiguration Flink 配置 執行 jar 包 / 資源配置 載入 classpath;

啟動 yarn client;

複用 Flink on yarn 機制 deploy 轉換後的 jobGraph。

Flink 在 58 同城的應用與實踐

4。 任務遷移 在完善上述的一些改進之後,遷移就比較容易了。首先我們會把改造後的版本打包,上傳到公司的私服上。然後使用者在他的工程裡面只需要引入 jar 包。在程式碼這一塊,只需要將原來基於 storm 的提交方式改造成基於 Flink 的提交方式,邏輯是完全不用動的。在任務部署模式這一塊,也提供了 Flink 提交的模式,這樣一個指令碼可以實現 Flink Perjob 模式。

Flink 在 58 同城的應用與實踐

總結一下,除了一些比較極端的複雜情況,基本上做到了無縫遷移所有的任務。遷移到 Flink 之後,大部分任務的延遲都降低到毫秒級別,整個吞吐提升 3~5 倍。同時,整體資源節省了大概 40%,約等於 80 臺機器。完成了 5 個 storm 叢集完全下線,實現了任務平臺化管理。

Flink 在 58 同城的應用與實踐

四、一站式實時計算平臺 1。 Wstream 平臺 我們為了提升管理效率而打造了 Wstream 平臺,它構建在底層引擎和上層應用之間,對使用者可以遮蔽底層的叢集資訊,比如跨機房多叢集的一些資訊。

在任務接入方式上,支援 Flink Jar,Flink SQL,Flink-Storm,PyFlink 這 4 種方式,來滿足多元化的使用者需求。

在產品功能上,主要支援了任務管理、任務的建立、啟動刪除等。

另外,為了更好的讓使用者管理自己的任務和對任務進行問題定位,我們也提供了一個監控告警和任務診斷的系統。

針對數倉,提供了一些數倉平臺化的功能,包括許可權管理、血緣關係等等。

針對 Flink SQL 也提供了除錯探查的功能。

使用者可以在 Wstream 平臺之上很好的去構建他們的應用。

Flink 在 58 同城的應用與實踐

2。 狀態管理 狀態作為 Flink 一個比較重要的特性,在實際場景中有大量的應用。使用者在使用平臺的時候,沒法跟底層的 Flink 工具進行互動,於是我們就將底層的一些能力進行了整合。

在任務儲存方面,支援 Checkpoint,Savepoint,Cancel With Savepoint。

在容錯方面,支援 allowNonRestoredState,跳過無法恢復的狀態。

在分析方面,支援 Queryable State 實時查詢,基於離線的 State Processor 的分析方式,我們會幫使用者把這個狀態下載進行分析。

對於整個任務狀態管理來說,我們透過 jobgraph 設定定向到指定 Hdfs 目錄,進行統一目錄管理。在狀態小檔案這塊,控制併發度,jobgraph 最佳化,checkpoint 間隔時間,保留版本數量。

Flink 在 58 同城的應用與實踐

3。 SQL 除錯 針對 Flink SQL,我們也提供了一些除錯功能。這裡主要包括兩塊:

第一,語法層面的功能包括:智慧提示;語法校驗;轉換 graph 邏輯校驗。

第二,邏輯層面的功能包括:模擬輸入,DataGen 自定義資料來源;結果輸出,Print 重定向到標準輸出。

這樣我們可以更方便的對整個業務邏輯進行除錯。

Flink 在 58 同城的應用與實踐

4。 任務監控 關於任務監控,對於 Flink 實時計算任務來說,我們主要關心的是任務的穩定性、效能方面、以及業務邏輯是否符合預期。對於如何監控這些指標,主要包括 4 個層面:

第一個是 Flink 自帶的 Flink-metrics,提供大量的資訊,比如流量資訊、狀態資訊、反壓、檢查點、CPU、網路等等;

第二個是 yarn 層面,提供執行時長、任務狀態;

第三,從 kafka 層面提供訊息堆積;

最後,透過使用者自定義的一些 metrics,我們可以瞭解業務邏輯是否符合預期。

Flink 在 58 同城的應用與實踐

5。 監控體系 為了採集這些指標,我們也基於 Prometheus 搭建了一套監控體系。對於所有的 Flink 任務,會實時將 metrics 推到 pushgateway,然後會將收集到的指標推到 Prometheus,這一塊我們主要是採用的 federation 的機制。所有子節點負責指標採集,之後匯聚到一箇中心節點,由中心節點統一對外提供服務。最終可以實現整個指標的計算和告警。

Flink 在 58 同城的應用與實踐

6。 監控告警 有了上面這些指標之後,我們在告警這一塊就可以比較方便。針對實時計算比較關注的任務穩定性方面,我們可以從 Topic 訊息消費堆積、任務計算 qps 波動、Flink task Restart、Flink Checkpoint failed、任務失敗、延遲等資訊來觀察整個任務的執行情況。

Flink 在 58 同城的應用與實踐

7。 指標視覺化 在指標視覺化這一塊,主要是兩個層面:

第一個層面是 Job 層面,這一塊主要是把一些比較核心的指標匯聚到我們的實時計算平臺。比如說,qps 資訊、輸入輸出的資訊、延遲的資訊等等;

對於更底層的 task 級別的 metrics,透過 Grafana 可以瞭解具體的一些task資訊,比如流量資訊、反壓資訊等。

Flink 在 58 同城的應用與實踐

五、後續規劃 我們的後續規劃,主要包括 4 個方面:

第一個是社群比較流行的批流合一。因為我們當前這個實時架構大部分還是基於 Lambda 架構,這種架構會帶來很大的維護工作量,所以我們也希望藉助批流合一的能力來簡化架構;

第二個是資源調優,因為作為流式計算來說,缺少一些動態資源管理的機制,因此我們也希望有手段來進行這樣一些調優;

第三個是智慧監控,我們當前的監控和告警是事後的,希望有某種方式在任務出現問題之前進行預警;

最後是擁抱社群的新能力,包括對新場景的探索。

Flink 在 58 同城的應用與實踐