何謂數據傾斜?數據傾斜指的是并行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多于其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。
一、數據傾斜概述
1.1 什么是數據傾斜
對Hadoop、Spark、Flink這樣的大數據系統來講,數據量大并不可怕,可怕的是數據傾斜。
何謂數據傾斜?數據傾斜指的是,并行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多于其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。
對于分布式系統而言,理想情況下,隨著系統規模(節點數量)的增加,應用整體耗時線性下降。如果一臺機器處理一批大量數據需要120分鐘,當機器數量增加到三時,理想的耗時為120 / 3 = 40分鐘,如下圖所示
但是,上述情況只是理想情況,實際上將單機任務轉換成分布式任務后,會有overhead,使得總的任務量較之單機時有所增加,所以每臺機器的執行時間加起來比單臺機器時更大。這里暫不考慮這些overhead,假設單機任務轉換成分布式任務后,總任務量不變。
但即使如此,想做到分布式情況下每臺機器執行時間是單機時的1 / N,就必須保證每臺機器的任務量相等。不幸的是,很多時候,任務的分配是不均勻的,甚至不均勻到大部分任務被分配到個別機器上,其它大部分機器所分配的任務量只占總得的小部分。比如一臺機器負責處理80%的任務,另外兩臺機器各處理10%的任務,如下圖所示
1.2 數據傾斜發生時的現象
• 絕大多數 task 執行得都非常快,但個別 task 執行極慢。比如,總共有 1000 個 task,997 個 task 都在 1 分鐘之內執行完了,但是剩余兩三個 task 卻要一兩個小時。這種情況很常見。
• 原本能夠正常執行的 Spark 作業,某天突然報出 OOM(內存溢出)異常,觀察異常棧,是我們寫 的業務代碼造成的。這種情況比較少見。
• Task 類似下圖所示
總結:
1. 大部分任務都很快執行完,用時也相差無幾,但個別Task執行耗時很長,整個應用程序一直處于99%左右 的狀態。
2. 一直運行正常的Spark Application昨晚突然OOM了。
1.3 數據傾斜發生的原理
數據傾斜的原理很簡單: 在進行 shuffle 的時候,必須將各個節點上相同的 key 的數據拉取到某個節點 上的一個 task 來進行處理,比如按照 key 進行聚合或 join 等操作。此時如果某個 key 對應的數據量特 別大的話,就會發生數據傾斜。比如大部分 key 對應 10 條數據,但是個別 key 卻對應了 100 萬條數 據,那么大部分 task 可能就只會分配到 10 條數據,然后 1 秒鐘就運行完了;但是個別 task 可能分配 到了 100 萬數據,要運行一兩個小時。因此,整個 Spark 作業的運行進度是由運行時間最長的那個 task 決定的。
因此出現數據傾斜的時候,Spark 作業看起來會運行得非常緩慢,甚至可能因為某個 task 處理的數據 量過大導致內存溢出。
下圖就是一個很清晰的例子:hello 這個 key,在三個節點上對應了總共 7 條數據,這些數據都會被拉 取到同一個task中進行處理;而world 和 you 這兩個 key 分別才對應 1 條數據,所以另外兩個 task 只 要分別處理 1 條數據即可。此時第一個 task 的運行時間可能是另外兩個 task 的 7 倍,而整個 stage 的 運行速度也由運行最慢的那個 task 所決定。
總結:
* 數據傾斜發生的本質,就是在執行多階段的計算的時候,中間的shuffle策略可能導致分發到下 游Task的數據量不均勻,進而導致下游Task執行時長的不一致。不完全均勻是正常的,但是如果相差太大,那么就產生性能問題了。
1.4 數據傾斜的危害
從上圖可見,當出現數據傾斜時,小量任務耗時遠高于其它任務,從而使得整體耗時過大,未能充分發 揮分布式系統的并行計算優勢。另外,當發生數據傾斜時,
少量部分任務處理的數據量過大,可能造成 內存不足使得任務失敗,并進而引進整個應用失敗。如果應用并沒有因此失敗,但是大量正常任務都早 早完成處于等待狀態,資源得不到充分利用。
總結:
1. 整體耗時過大(整個任務的完成由執行時間最長的那個Task決定)
2. 應用程序可能異常退出(某個Task執行時處理的數據量遠遠大于正常節點,則需要的資源容易出現瓶頸, 當資源不足,則應用程序退出)
3. 資源閑置(處理等待狀態的Task資源得不到及時的釋放,處于閑置浪費狀態)
1.5 數據傾斜是如何造成的
在 Spark 中,同一個 Stage 的不同 Partition 可以并行處理,而具有依賴關系的不同 Stage 之間是串行 處理的。假設某個 Spark Job 分為Stage0 和 Stage1 兩個 Stage,且 Stage1 依賴于 Stage0,那 Stage0 完全處理結束之前不會處理 Stage1。而 Stage0 可能包含 N 個Task,這 N 個 Task 可以并行進行。如 果其中 N-1 個 Task 都在 10 秒內完成,而另外一個 Task 卻耗時 1 分鐘,那該 Stage 的總時間至少為 1 分鐘。換句話說,一個 Stage 所耗費的時間,主要由最慢的那個 Task 決定。由于同一個 Stage 內的所有 Task 執行相同的計算,在排除不同計算節點計算能力差異的前提下,不同 Task 之間耗時的差異主要由該 Task 所處理的數據量決定。Stage 的數據來源主要分為如下兩類:
1. 數據源本身分布有問題:從數據源直接讀取。如讀取HDFS,Kafka,有可能出現,大概率不會
2. 自己指定的分區規則:讀取上一個 Stage 的 Shuffle 數據
樸素的分布式計算的核心思想:
1. 大問題拆分成小問題:分而治之
2. 既然要分開算,那最后就一定要把分開計算的那么多的小 Task 的結果執行匯總
3. 所以必然分布式計算引擎的設計中,應用程序的執行一定是分階段
4. 分布計算引擎的而核心:一個復雜的分布式計算應用程序的執行肯定要分成多個階段,每個階段分布式并 行運行多個Task
5. DAG引擎:
Spark: stage1 ==> stage2 ===> stage3
mapreduce: 就只有兩個階段:mapper reducer
階段與階段之間需要進行 shuffle,只要進行了數據混洗,就存在著數據分發不均勻的情況。如果情況嚴 重,就是數據傾斜。
分布式計算引擎的設計,免不了有shuffle,既然有shuffle操作,就一定有產生數據傾斜的可能。如果 你是做大數據處理的,就一定會遇到 數據傾斜!