1.HBase得二級索引的設計(或者Phoenix 二級索引-說說原理)
2.怎么提高Flink的執行性能(代碼方面)?
3.Spark 數據傾斜調優10策
答案區
1. HBase二級索引的設計方案一般有如下幾種:
1)協處理器coprocessor方案。 原理就是自定義協處理器,實現`雙寫`,就是寫主表的時候,同時寫索引表[這里這個索引表是根據業務對查詢的需求建立的]。 比如我們要查詢的主表是A, 里面有RowKey,還有一列ColumnA. 如果想對ColumnA這一列建立索引,就自定義一個協處理器(觀察者模式),當我們寫入A表中一條數據,比如 行鍵rowkey(123),cloumnA列值:abc,這時協處理在索引表(自己建立,比如A_INDEX)中插入一條記錄 行鍵為剛才列A的值abc,列值為主表的rowkey(123). 查詢的時候,先查索引表得到rowkey,然后根據rowkey在主表中查。
2)ES 方案,將想要構建的二級索引的字段值存儲到ES中,查詢時先去ES根據條件查到rowkey,然后根據rowkey再去hbase查數據。
3)Phoenix 方案。 Phoenix構建構建索引的方式,本質也在HBase中建立索引表。只不建表的過程,索引維護的過程,Phoenix自己內部實現,暴露給用戶的只是SQL接口。
其實在HBase構建二級索引,萬變不離其宗,最終的方向都是構建索引字段與行鍵的映射關系,先更加索引表查行鍵,在根據行鍵,查最終數據。
2.- 通用的優化方式
1)盡早fliter掉一些不需要的數據以及避免一些不必要的序列化。
2)避免使用深層嵌套數據類型。
3) 對于數據傾斜使用調整并行度或者雙層聚合的方式。
4)一些基數較少的并且本身較長維度可以采用數據字典的方式減少網絡傳輸及內存占用、gc開銷。
- 數據類型和序列化
Flink支持java、scala基本數據類型,以及java Tuples、scala Case Class、Flink Value,對于這些數據類型,flink會采用自身的序列化反序列化器去做序列化操作,對于其他數據類型,flink會采用kyro方式序列化,kyro序列化方式效率會比flink自帶的方式低很多。因此在數據序列化方面我們可以做如下工作
1) 嘗試使用transient修飾不需要序列化的變量,或者修飾你可以在下游通過其他方式獲取到變量,這個可以減少序列化流程和網絡傳輸(但可能帶來更多的內存占用用和gc消耗)
2) 對于一些特殊的數據你可以嘗試重寫writeObject() 和 readObject() 來自己控制一些序列化方式,如果更高效的話
3)如果使用了lambda或者泛型的話,顯式的指定類型信息讓flink類型提取系統識別到以提升性能。
- 多組相同keyby可使用DataStreamUtils
在多組keyby的場景可以采用DataStreamUtils.reinterpretAsKeyedStream的方式避免多次shuffle操作
- 盡量減少狀態的大小
1)設置合適的state TTL, 清洗過期狀態,避免狀態無限增大。
2)減少狀態字段數, 比如使用aggreteFunction 做窗口聚合時,可以只將要聚合的信息放入狀態,其他keyBy字段以及窗口信息,可以通過processWindowFunction的方式獲取,這樣就是 aggregateFunction + ProcessWindowFunction,agg函數獲取聚合信息,輸出的結果到processwindowFunction中取獲取窗口信息。
3)checkpoint頻率不宜過高,超時時間不要太長,可以異步化的地方盡量異步化
3.Spark 數據傾斜調優10策
何謂數據傾斜?數據傾斜指的是并行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多于其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。
- 數據傾斜概述
1.1 什么是數據傾斜
對Hadoop、Spark、Flink這樣的大數據系統來講,數據量大并不可怕,可怕的是數據傾斜。
何謂數據傾斜?數據傾斜指的是,并行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多于其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。
對于分布式系統而言,理想情況下,隨著系統規模(節點數量)的增加,應用整體耗時線性下降。如果一臺機器處理一批大量數據需要120分鐘,當機器數量增加到三時,理想的耗時為120 / 3 = 40分鐘,如下圖所示:
但是,上述情況只是理想情況,實際上將單機任務轉換成分布式任務后,會有overhead,使得總的任務量較之單機時有所增加,所以每臺機器的執行時間加起來比單臺機器時更大。這里暫不考慮這些overhead,假設單機任務轉換成分布式任務后,總任務量不變。
但即使如此,想做到分布式情況下每臺機器執行時間是單機時的`1 / N`,就必須保證每臺機器的任務量相等。不幸的是,很多時候,任務的分配是不均勻的,甚至不均勻到大部分任務被分配到個別機器上,其它大部分機器所分配的任務量只占總得的小部分。比如一臺機器負責處理80%的任務,另外兩臺機器各處理10%的任務,如下圖所示:
1.2 數據傾斜發生時的現象
- 絕大多數 task 執行得都非常快,但個別 task 執行極慢。比如,總共有 1000 個 task,997 個 task 都在 1 分鐘之內執行完了,但是剩余兩三個 task 卻要一兩個小時。這種情況很常見。
- 原本能夠正常執行的 Spark 作業,某天突然報出 OOM(內存溢出)異常,觀察異常棧,是我們寫 的業務代碼造成的。這種情況比較少見。
markdown
總結:
1. 大部分任務都很快執行完,用時也相差無幾,但個別Task執行耗時很長,整個應用程序一直處于99%左右 的狀態。
2. 一直運行正常的Spark Application昨晚突然OOM了。
1.3 數據傾斜發生的原理
數據傾斜的原理很簡單: 在進行 shuffle 的時候,必須將各個節點上相同的 key 的數據拉取到某個節點 上的一個 task 來進行處理,比如按照 key 進行聚合或 join 等操作。此時如果某個 key 對應的數據量特 別大的話,就會發生數據傾斜。比如大部分 key 對應 10 條數據,但是個別 key 卻對應了 100 萬條數 據,那么大部分 task 可能就只會分配到 10 條數據,然后 1 秒鐘就運行完了;但是個別 task 可能分配 到了 100 萬數據,要運行一兩個小時。因此,整個 Spark 作業的運行進度是由運行時間最長的那個 task 決定的。
因此出現數據傾斜的時候,Spark 作業看起來會運行得非常緩慢,甚至可能因為某個 task 處理的數據 量過大導致內存溢出。
下圖就是一個很清晰的例子:hello 這個 key,在三個節點上對應了總共 7 條數據,這些數據都會被拉 取到同一個task中進行處理;而world 和 you 這兩個 key 分別才對應 1 條數據,所以另外兩個 task 只 要分別處理 1 條數據即可。此時第一個 task 的運行時間可能是另外兩個 task 的 7 倍,而整個 stage 的 運行速度也由運行最慢的那個 task 所決定。
markdown
總結:
數據傾斜發生的本質,就是在執行多階段的計算的時候,中間的shuffle策略可能導致分發到下 游Task的數據量不均勻,進而導致下游Task執行時長的不一致。不完全均勻是正常的,但是如果相差太大,那么就產生性能問題了。
1.4 數據傾斜的危害
從上圖可見,當出現數據傾斜時,小量任務耗時遠高于其它任務,從而使得整體耗時過大,未能充分發 揮分布式系統的并行計算優勢。另外,當發生數據傾斜時,少量部分任務處理的數據量過大,可能造成 內存不足使得任務失敗,并進而引進整個應用失敗。如果應用并沒有因此失敗,但是大量正常任務都早 早完成處于等待狀態,資源得不到充分利用。
markdown
總結:
1. 整體耗時過大(整個任務的完成由執行時間最長的那個Task決定)
2. 應用程序可能異常退出(某個Task執行時處理的數據量遠遠大于正常節點,則需要的資源容易出現瓶頸, 當資源不足,則應用程序退出)
3. 資源閑置(處理等待狀態的Task資源得不到及時的釋放,處于閑置浪費狀態)
1.5 數據傾斜是如何造成的
在 Spark 中,同一個 Stage 的不同 Partition 可以并行處理,而具有依賴關系的不同 Stage 之間是串行 處理的。假設某個 Spark Job 分為Stage0 和 Stage1 兩個 Stage,且 Stage1 依賴于 Stage0,那 Stage0 完全處理結束之前不會處理 Stage1。而 Stage0 可能包含 N 個Task,這 N 個 Task 可以并行進行。如 果其中 N-1 個 Task 都在 10 秒內完成,而另外一個 Task 卻耗時 1 分鐘,那該 Stage 的總時間至少為 1 分鐘。換句話說,一個 Stage 所耗費的時間,主要由最慢的那個 Task 決定。由于同一個 Stage 內的所有 Task 執行相同的計算,在排除不同計算節點計算能力差異的前提下,不同 Task 之間耗時的差異主要由該 Task 所處理的數據量決定。Stage 的數據來源主要分為如下兩類:
markdown
1)數據源本身分布有問題:從數據源直接讀取。如讀取HDFS,Kafka,有可能出現,大概率不會
2)自己指定的分區規則:讀取上一個 Stage 的 Shuffle 數據
樸素的分布式計算的核心思想:
1) 大問題拆分成小問題:分而治之
2)既然要分開算,那最后就一定要把分開計算的那么多的小 Task 的結果執行匯總
3)所以必然分布式計算引擎的設計中,應用程序的執行一定是分階段
4)分布計算引擎的而核心:一個復雜的分布式計算應用程序的執行肯定要分成多個階段,每個階段分布式并 行運行多個Task
5)DAG引擎:
Spark: stage1 ==> stage2 ===> stage3
mapreduce: 就只有兩個階段:mapper reducer
階段與階段之間需要進行 shuffle,只要進行了數據混洗,就存在著數據分發不均勻的情況。如果情況嚴 重,就是數據傾斜。
分布式計算引擎的設計,免不了有shuffle,既然有shuffle操作,就一定有產生數據傾斜的可能。如果 你是做大數據處理的,就一定會遇到 數據傾斜!
- 如何避免數據傾斜
2.1 避免數據源傾斜-HDFS
Spark通過 textFile(path, minPartitions) 方法讀取文件時,使用 TextInputFormat。對于不可切分的文件,每個文件對應一個 Split 從而對應一個 Partition。此時各文件大小是否一致,很大程度上決定了是否存在數據源側的數據傾斜。另外,對于不可切分的壓縮文件,即使壓縮后的文件大 小一致,它所包含的實際數據量也可能差別很多,因為源文件數據重復度越高,壓縮比越高。反過來, 即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數據量差距也可能很大。此時可通過在數據生成端將不可切分文件存儲為可切分文件,或者保證各文件包含數據量相同的方式避免數據傾斜。
markdown
對于不可切分文件可能出現數據傾斜,對于可切分文件,一般來說,不存在數據傾斜問題。
1)可切分: 基本上不會! 默認數據塊大小:128M
2)不可切分: 源文件不均勻,最終導致 分布式引用程序計算產生數據傾斜 日志:每一個小時生成一個日志文件
2.2 避免數據源傾斜-Kaka
Topic 主題: 分布式的組織形式:分區, 既然要進行數據分區,那就有可能產生數據分布不均勻
以 Spark Stream 通過 DirectStream 方式讀取 Kafka 數據為例。由于 Kafka 的每一個 Partition 對應 Spark 的一個 Task(Partition),所以 Kafka 內相關 Topic 的各 Partition 之間數據是否平衡,直接決 定 Spark 處理該數據時是否會產生數據傾斜。
Kafka 某一 Topic 內消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實現 類決定。如果使用隨機 Partitioner,則每條消息會隨機發送到一個 Partition 中,從而從概率上來講, 各 Partition 間的數據會達到平衡。此時源 Stage(直接讀取 Kafka 數據的 Stage)不會產生數據傾斜。
但很多時候,業務場景可能會要求將具備同一特征的數據順序消費,此時就需要將具有相同特征的數據 放于同一個 Partition 中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個 Partition 中。此時,如果產生了數據傾斜,則需要通過其它方式處理。
markdown
* 以 Spark Stream 通過 DirectStream 方式讀取 Kafka 數據為例。由于 Kafka 的每一個 Partition 對應 Spark 的一個 Task(Partition),所以 Kafka 內相關 Topic 的各 Partition 之間數據是否平衡,直接決 定 Spark 處理該數據時是否會產生數據傾斜。
* Kafka 某一 Topic 內消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實現 類決定。如果使用隨機 Partitioner,則每條消息會隨機發送到一個 Partition 中,從而從概率上來講, 各 Partition 間的數據會達到平衡。此時源 Stage(直接讀取 Kafka 數據的 Stage)不會產生數據傾斜。
* 但很多時候,業務場景可能會要求將具備同一特征的數據順序消費,此時就需要將具有相同特征的數據 放于同一個 Partition 中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個 Partition 中。此時,如果產生了數據傾斜,則需要通過其它方式處理。
2.3 定位處理邏輯 **- Stage** **和** Task
歸根結底,數據傾斜產生的原因,就是兩個 stage 中的 shuffle 過程導致的。所以我們只需要研究Shuffle 算子即可。我們知道了導致數據傾斜的問題就是 shuffle 算子,所以我們先去找到代碼中的 shuffle 的算子,比如 distinct、groupByKey、reduceByKey、aggergateByKey、join、cogroup、repartition 等,那么問 題一定就出現在這里。spark的執行,按照hsuffle算子分成多個stage來執行。
markdown
* 如果 Spark Application 運行過程中,出現數據傾斜,可以通過 web 管理監控界面,查看 各stage 的運行情況,如果某一個 stage 的運行很長,并且這個 stage 的大部分Task都運行很快,則
2.4 查看導致傾斜的key的數據分布情況
知道了數據傾斜發生在哪里之后,通常需要分析一下那個執行了shuffle操作并且導致了數據傾斜的 RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術方案提供依據。針對不同 的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術方案來解決。此時根據你執行操作的情況不同,可以有很多種查看key分布的方式:
markdown
1)如果是Spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下 SQL 中使用的表的key 分布情況。
2)如果是對 Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看 key 分布 的代碼,比如 RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印 一下,就可以看到key的分布情況。
舉例來說,對于上面所說的單詞計數程序,如果確定了是 stage1 的 reduceByKey 算子導致了數據傾 斜,那么就應該看看進行 reduceByKey 操作的 RDD 中的 key 分布情況,在這個例子中指的就是 pairs RDD。如下示例,我們可以先對 pairs 采樣 10% 的樣本數據,然后使用 countByKey 算子統計出每個 key 出現的次數,最后在客戶端遍歷和打印樣本數據中各個 key 的出現次數。
scala
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
采樣!(離線處理:無放回采樣, 流式處理:魚塘采樣)
- 數據傾斜解決方案
3.1 Hive ETL處理
3.1.1 適用場景
導致數據傾斜的是 Hive 表。如果該 Hive 表中的數據本身很不均勻(比如某個 key 對應了 100 萬數 據,其他 key 才對應了 10 條數據),而且業務場景需要頻繁使用 Spark 對 Hive 表執行某個分析操作,那么比較適合使用這種技術方案
3.1.2 實現思路
此時可以評估一下,是否可以通過Hive來進行數據預處理(即通過 Hive ETL 預先對數據按照 key 進行 聚合,或者是預先和其他表進行join),然后在 Spark 作業中針對的數據源就不是原來的 Hive 表了, 而是預處理后的Hive表。此時由于數據已經預先進行過聚合或join操作了,那么在 Spark 作業中也就不 需要使用原先的 shuffle 類算子執行這類操作了。
3.1.3 實現原理
這種方案從根源上解決了數據傾斜,因為徹底避免了在Spark中執行shuffle類算子,那么肯定就不會有 數據傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標不治本。因為畢竟數據本身就存在 分布不均勻的問題,所以Hive ETL中進行group by或者join等shuffle操作時,還是會出現數據傾斜,導 致Hive ETL的速度很慢。我們只是把數據傾斜的發生提前到了Hive ETL中,避免Spark程序發生數據傾斜而已。
3.1.4 方案優缺點
markdown
* 優點: 實現起來簡單便捷,效果還非常好,完全規避掉了數據傾斜,Spark作業的性能會大幅度提升。
* 缺點:治標不治本,Hive ETL中還是會發生數據傾斜。
3.1.5 企業最佳實踐
markdown
* 在一些 Java 系統與 Spark 結合使用的項目中,會出現 Java 代碼頻繁調用 Spark 作業的場景,而且對 Spark 作業的執行性能要求很高,就比較適合使用這種方案。將數據傾斜提前到上游的 Hive ETL,每天 僅執行一次,只有那一次是比較慢的,而之后每次 Java 調用 Spark作業時,執行速度都會很快,能夠 提供更好的用戶體驗。
* 在美團·點評的交互式用戶行為分析系統中使用了這種方案,該系統主要是允許用戶通過 Java Web 系統 提交數據分析統計任務,后端通過Java 提交 Spark作業進行數據分析統計。要求 Spark 作業速度必須要 快,盡量在10 分鐘以內,否則速度太慢,用戶體驗會很差。所以我們將有些 Spark 作業的shuffle操作 提前到了Hive ETL中,從而讓 Spark 直接使用預處理的 Hive 中間表,盡可能地減少 Spark 的 shuffle操 作,大幅度提升了性能,將部分作業的性能提升了6倍以上。
3.2 調整shuffle操作的并行度
3.2.1 適用場景
大量不同的Key被分配到了相同的Task造成該Task數據量過大。
如果我們必須要對數據傾斜迎難而上,那么建議優先使用這種方案,因為這是處理數據傾斜最簡單的一 種方案。但是也是一種屬于碰運氣的方案。因為這種方案,并不能讓你一定解決數據傾斜,甚至有可能 加重。那當然,總歸,你會調整到一個合適的并行度是能解決的。前提是這種方案適用于 Hash散列的 分區方式。湊巧的是,各種分布式計算引擎,比如MapReduce,Spark 等默認都是使用 Hash散列的方 式來進行數據分區。
Spark 在做 Shuffle 時,默認使用 HashPartitioner(非Hash Shuffle)對數據進行分區。如果并行度設 置的不合適,可能造成大量不相同的 Key 對應的數據被分配到了同一個 Task 上,造成該 Task 所處理 的數據遠大于其它 Task,從而造成數據傾斜。
如果調整 Shuffle 時的并行度,使得原本被分配到同一 Task 的不同 Key 發配到不同 Task 上處理,則可 降低原 Task 所需處理的數據量,從而緩解數據傾斜問題造成的短板效應。
3.2.2 實現思路
在對 RDD 執行 Shuffle 算子時,給 Shuffle 算子傳入一個參數,比如 reduceByKey(1000),該參數就 設置了這個 shuffle 算子執行時shuffle read task 的數量。對于 Spark SQL 中的 Shuffle 類語句,比如 group by、join 等,需要設置一個參數,即 spark.sql.shuffle.partitions,該參數代表了 shuffle readTask 的并行度,該值默認是 200,對于很多場景來說都有點過小。
3.2.3 實現原理
增加 shuffle read task 的數量,可以讓原本分配給一個 task 的多個 key 分配給多個 task,從而讓每個 task 處理比原來更少的數據。舉例來說,如果原本有 5 個key,每個 key 對應 10 條數據,這 5 個 key 都是分配給一個 task 的,那么這個 task 就要處理 50 條數據。而增加了 shuffle read task 以后,每個 task 就分配到一個 key,即每個 task 就處理 10 條數據,那么自然每個 task 的執行時間都會變短了。 具體原理如下圖所示。
一句話總結:調整并行度分散同一個 Task的不同 Key,之前由于運氣比較差,多個數據比較多的 key 都分布式在同一個 Task 上,如果調整了并行度,極大可能會讓這些 key 分布式到不同的 Task,有效緩 解數據傾斜。
3.2.4 方案優缺點
markdown
* 優點: 實現起來比較簡單,可以有效緩解和減輕數據傾斜的影響。實現簡單,可在需要Shuffle的操作算子上直接設 置并行度或者使用spark.default.parallelism設置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設置并行度。可用最小的代價解決問題。一般如果出現 數據傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。
* 缺點:只是緩解了數據傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。適用場景少,只能將分配到 同一Task的不同Key分散開,但對于同一Key傾斜嚴重的情況該方法并不適用。并且該方法一般只能緩解數據 傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。
3.2.5 企業最佳實踐
markdown
* 該方案通常無法徹底解決數據傾斜,因為如果出現一些極端情況,比如某個key對應的數據量有100萬, 那么無論你的task數量增加到多少,這個對應著100萬數據的key肯定還是會分配到一個task中去處理, 因此注定還是會發生數據傾斜的。所以這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗 試去用嘴簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。
3.3 過濾少數導致傾斜的key
3.3.1 適用場景
如果發現導致傾斜的 key 就少數幾個,而且對計算本身的影響并不大的話,那么很適合使用這種方案。 比如 99% 的 key 就對應 10 條數據,但是只有一個 key 對應了 100 萬數據,從而導致了數據傾斜。
3.3.2 實現思路
如果我們判斷那少數幾個數據量特別多的 key,對作業的執行和計算結果不是特別重要的話,那么干脆 就直接過濾掉那少數幾個 key。比如,在 Spark SQL 中可以使用 where 子句過濾掉這些 key 或者在 SparkCore 中對 RDD 執行 filter 算子過濾掉這些 key。如果需要每次作業執行時,動態判定哪些 key 的數據量最多然后再進行過濾,那么可以使用 sample 算子對 RDD 進行采樣,然后計算出每個 key 的 數量,取數據量最多的 key 過濾掉即可。
3.2.3 實現原理
將導致數據傾斜的 key 給過濾掉之后,這些 key 就不會參與計算了,自然不可能產生數據傾斜。
3.3.4 方案優缺點
markdown
* 優點:實現簡單,而且效果也很好,可以完全規避掉數據傾斜。
* 缺點:適用場景不多,大多數情況下,導致傾斜的key還是很多的,并不是只有少數幾個
3.3.5 企業最佳實踐
markdown
* 在項目中我們也采用過這種方案解決數據傾斜。有一次發現某一天 Spark 作業在運行的時候突然 OOM 了,追查之后發現,是 Hive 表中的某一個 key 在那天數據異常,導致數據量暴增。因此就采取每次執 行前先進行采樣,計算出樣本中數據量最大的幾個 key 之后,直接在程序中將那些key給過濾掉。
3.4 將reduce join轉為map join
3.4.1 適用場景
在對 RDD 使用 join 類操作,或者是在 Spark SQL 中使用 join 語句時,而且 join 操作中的一個 RDD 或 表的數據量比較小(比如幾百M或者一兩G),比較適用此方案。
在分布式計算引擎中,實現Join的思路有兩種:
1)MapJoin,顧名思義,Join邏輯的完成是在 Mapper 階段就完成了,這是假定執行的是 MapReduce任務,如果是 Spark任務,表示只用一個 Stage 就執行完了 Join 操作。
markdown
* 優點:避免了兩階段之間的shuffle,效率高,沒有shuffle也就沒有了傾斜。
* 缺點:多使用內存資源,只適合大小表做join的場景
2)ReduceJoin,顧名思義,Join邏輯的完成是在 Reducer 階段完成的。那么如果是MapReduce任 務,則表示 Maper階段執行完之后把數據 Shuffle到 Reducer階段來執行 Join 邏輯,那么就可能導致數 據傾斜。如果是 Spark任務,意味著,上一個stage的執行結果數據shuffle到 下一個stage中來完成 Join 操作,同樣也可能產生數據傾斜。
markdown
* 優點:這是一種通用的join,在不產生數據傾斜的情況下,能完成各種類型的join
* 缺點:會發生數據傾斜的情況
3.4.2 實現思路
不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉 shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到 Driver端的內存中來,然后對其創建一個Broadcast變量;接著對另外一個RDD執行map類算子,在算 子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行 比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。
3.4.3 實現原理
普通的 join 是會走 shuffle 過程的,而一旦 shuffle,就相當于會將相同 key 的數據拉取到一個 shuffle read task 中再進行 join,此時就是 reduce join。但是如果一個 RDD 是比較小的,則可以采用廣播小 RDD 全量數據 +map 算子來實現與 join 同樣的效果,也就是 map join,此時就不會發生 shuffle 操 作,也就不會發生數據傾斜。具體原理如下圖所示。
3.4.4 方案優缺點
markdown
* 優點: 對join操作導致的數據傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生數據傾斜。
* 缺點: 適用場景較少,因為這個方案只適用于一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比 較消耗內存資源,driver 和每個Executor 內存中都會駐留一份小 RDD 的全量數據。如果我們廣播出去 的 RDD 數據比較大,比如 10G 以上,那么就可能發生內存溢出了。因此并不適合兩個都是大表的情況。
3.5 采樣傾斜 key并分拆 join操作
3.5.1 適用場景
兩個 RDD/Hive 表進行 join 的時候,如果數據量都比較大,無法采用3.5方案,那么此時可以看一 下兩個 RDD/Hive 表中的 key 分布情況。如果出現數據傾斜,是因為其中某一個 RDD/Hive 表中的少數 幾個 key 的數據量過大,而另一個 RDD/Hive 表中的所有 key 都分布比較均勻,那么采用這個解決方案 是比較合適的。
3.5.2 實現思路
markdown
1)對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個 key的數量,計算出來數據量最大的是哪幾個key。
2)然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key都打上n以內的 隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
3)接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據并形成一個單獨的RDD,將每條數據 膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
4)再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散 成n份,分散到多個task中去進行join了。
5)而另外兩個普通的RDD就照常join即可。
6)最后將兩次join的結果使用union算子合并起來即可,就是最終的join結果。
3.5.3 實現原理
對于 join 導致的數據傾斜,如果只是某幾個 key 導致了傾斜,可以將少數幾個 key 分拆成獨立 RDD, 并附加隨機前綴打散成 n 份去進行join,此時這幾個 key 對應的數據就不會集中在少數幾個 task 上, 而是分散到多個 task 進行 join 了。
3.5.4 方案優缺點
markdown
* 優點: 對于join導致的數據傾斜,如果只是某幾個key導致了傾斜,采用該方式可以用最有效的方式打散key進行 join。而且只需要針對少數傾斜key對應的數據進行擴容n倍,不需要對全量數據進行擴容。避免了占用過多 內存。
* 缺點: 如果導致傾斜的key特別多的話,比如成千上萬個key都導致數據傾斜,那么這種方式也不適合。
3.6 兩階段聚合(局部聚合+全局聚合)
3.6.1 適用場景
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時, 比較適用這種方案。
3.6.2 實現思路
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數, 比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數后的數據,執行 reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然 后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果 了,比如(hello, 4)。
3.6.3 實現原理
將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據 分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機前綴,再 次進行全局聚合,就可以得到最終的結果。具體原理見下圖:
3.6.4 方案優缺點
markdown
* 優點: 對于聚合類的shuffle操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大 幅度緩解數據傾斜,將Spark作業的性能提升數倍以上。
* 缺點: 僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決 方案。
3.7 使用隨機前綴和擴容 RDD 進行 join
3.7.1 適用場景
如果在進行 join 操作時,RDD 中有大量的 key 導致數據傾斜,那么進行分拆 key 也沒什么意義。
3.7.2 實現思路
markdown
1)該方案的實現思路基本和3.5方案類似,首先查看 RDD/Hive 表中的數據分布情況,找到那個造成 數據傾斜的 RDD/Hive 表,比如有多個key 都對應了超過1萬條數據。
2)然后將該RDD的每條數據都打上一個n以內的隨機前綴。
3)同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個 0~n的前綴。
4)最后將兩個處理后的RDD進行join即可。
3.7.3 實現原理
將原先一樣的 key 通過附加隨機前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多 個task中去處理,而不是讓一個task處理大量的相同key。該方案與3.6方案的不同之處就在于,上 一種方案是盡量只對少數傾斜key對應的數據進行特殊處理,由于處理過程需要擴容RDD,因此上一種 方案擴容RDD后對內存的占用并不大;而這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分 出來進行單獨處理,因此只能對整個RDD進行數據擴容,對內存資源要求很高。
3.7.4 方案優缺點
markdown
* 優點: 對join類型的數據傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。
* 缺點: 該方案更多的是緩解數據傾斜,而不是徹底避免數據傾斜。而且需要對整個RDD進行擴容,對內存資源要求很高。
3.7.5 企業最佳實踐
markdown
* 開發一個數據需求的時候,發現一個join導致了數據傾斜。優化之前,作業的執行時間大約是60分鐘左右;使用該方案優化之后,執行時間縮短到10分鐘左右,性能提升了6倍。
3.8 任務橫切,一分為二,單獨處理
3.8.1 適用場景
有時候,一個Spark應用程序中,導致傾斜的因素不是一個單一的,比如有一部分傾斜的因素是null, 有一部分傾斜的因素是某些個key分布特別多。那么拆分出來也得使用不同的手段來處理
3.8.2 實現思路
在了解清楚數據的分布規律,以及確定了數據傾斜是由何種原因導致的,那么按照這些原因,進行數據的拆分,然后單獨處理每個部分的數據,最后把結果合起來。
3.8.3 實現原理
3.6方案其實是一種縱切,3.8方案就是一種橫切。原理同思路。
3.8.4 方案優缺點
markdown
* 優點: 將多種簡單的方案綜合起來,解決一個復雜的問題。可以算上一種萬能的方案。
* 缺點: 確定數據傾斜的因素比較復雜,導致解決該數據傾斜的方案比較難實現落地。代碼復雜度也較高。
3.9 多方案組合使用
markdown
* 在實踐中發現,很多情況下,如果只是處理較為簡單的數據傾斜場景,那么使用上述方案中的某一種基 本就可以解決。但是如果要處理一個較為復雜的數據傾斜場景,那么可能需要將多種方案組合起來使 用。比如說,我們針對出現了多個數據傾斜環節的Spark作業,可以先運用3.1和3.2方案,預處理一 部分數據,并過濾一部分數據來緩解;其次可以對某些shuffle操作提升并行度,優化其性能;最后還可 以針對不同的聚合或join操作,選擇一種方案來優化其性能。大家需要對這些方案的思路和原理都透徹 理解之后,在實踐中根據各種不同的情況,靈活運用多種方案,來解決自己的數據傾斜問題。
* 如果這多種方案,組合使用也不行,最后一招:自定義分區規則
3.10 自定義Partitioner
3.10.1 適用場景
大量不同的Key被分配到了相同的Task造成該Task數據量過大。
3.10.2 實現思路
先通過抽樣,了解數據的 key 的分布規律,然后根據規律,去定制自己的數據分區規則,盡量保證所有 的 Task 的數據量相差無幾。
3.10.3 實現原理
使用自定義的 Partitioner(默認為HashPartitioner),將原本被分配到同一個 Task 的不同 Key 分配 到不同 Task。
3.10.4 分區方案
- 隨機分區
markdown
* 優點: 數據分布均勻
* 缺點: 具有相同特點的數據不會保證被分配到相同的分區
- 輪詢分區
markdown
* 優點: 確保一定不會出現數據傾斜
* 缺點: 無法根據存儲/計算能力分配存儲/計算壓力
- Hash散列
markdown
* 優點: 具有相同特點的數據保證被分配到相同的分區
* 缺點: 極容易產生數據傾斜
- 范圍分區
markdown
* 優點: 相鄰的數據都在相同的分區
* 缺點: 部分分區的數據量會超出其他的分區,需要進行裂變以保持所有分區的數據量是均勻的,如果每個分區不排序,那么裂變就會非常困難。
3.10.5 方案優缺點
markdown
* 優點: 靈活,通用。
* 缺點: 必須根據對應的場景設計合理的分區方案。沒有現成的方案可用,需臨時實現。
四、案例
4.1 問題
如果是兩張特大寬表做 Join 怎么辦?
markdown
# 解決方案: 位圖法
4.2 例子
最近7天`連續登錄`的用戶有哪些?假如每天登陸的用戶存在多張表或者一張表的多個分區中。如果用戶基數很高比如`10億`。 那Join的方案將會比較低效。位圖解決是一個不錯的方案
4.3 實現思路
對每一天的用戶登陸數據維護一個Bitmap, 如果用戶登錄對應的Bitmap位就置為1。將7個Bitmap按位求與,就可以得到7天連續登錄的用戶了。
更多關于大數據培訓的問題,歡迎咨詢千鋒教育在線名師。千鋒教育多年辦學,課程大綱緊跟企業需求,更科學更嚴謹,每年培養泛IT人才近2萬人。不論你是零基礎還是想提升,都可以找到適合的班型,千鋒教育隨時歡迎你來試聽。