Spark Streaming消費(fèi)Kafka,對(duì)于offset的管理方式一般有如下方式:
1. checkpoint 方式管理,通過(guò)checkpoint可以將消費(fèi)的offset持久化存儲(chǔ)到hdfs,失敗后作業(yè)可以從checkpoint恢復(fù)。 但是這里的主要問(wèn)題是,如果你的程序作了升級(jí),比如業(yè)務(wù)邏輯變更了,你修改了代碼,這時(shí)是無(wú)法從之前的checkpoint恢復(fù)的。因?yàn)閏heckpoint第一次持久化的時(shí)候會(huì)把整個(gè)相關(guān)的jar給序列化成一個(gè)二進(jìn)制文件,每次重啟都會(huì)從里面恢復(fù),換句話說(shuō)不支持應(yīng)用升級(jí)。
2. mysql,可以將offset存儲(chǔ)到mysql中,自己管理,作業(yè)從mysql中讀取每個(gè)分區(qū)的offset,這樣可以解決應(yīng)用程序升級(jí)問(wèn)題,同時(shí)如果你想從之前的某個(gè)時(shí)刻消費(fèi)數(shù)據(jù),也可以選擇在mysql中保留條offset信息。比如我想從一個(gè)小時(shí)之前重新消費(fèi)數(shù)據(jù),因?yàn)檫@段時(shí)間數(shù)據(jù)出錯(cuò)了,我要重新計(jì)算,只需要指定讀取記錄的一個(gè)小時(shí)前的offset即可。 這里還有一點(diǎn)需要說(shuō)明的是,如果你spark輸出的數(shù)據(jù)存儲(chǔ)也在mysql中,通過(guò)mysql事物,就可以做到端到端的exactly once 語(yǔ)義。
3. zookeeper,可以將offset存儲(chǔ)到zk中
4. kafka 0.10 之后,offset存儲(chǔ)到kafka的一個(gè)topic,`__consumer_offsets`,同時(shí)提供了commitAsync 的方式提交offset。
5. 其他第三方存儲(chǔ),redis,hbase都是可以的
# 需要說(shuō)明的是,上面的這些方式,如果spark消費(fèi)的數(shù)據(jù)寫(xiě)入到其他存儲(chǔ)中,你只有保證offset的更新和你的數(shù)據(jù)寫(xiě)入在同一個(gè)事物中才能保證端到端的exactly once語(yǔ)義。 比如你如果Spark Streaming輸出數(shù)據(jù)寫(xiě)入hbase中,offset存儲(chǔ)在mysql中,你是無(wú)法維護(hù)offset的更新和數(shù)據(jù)寫(xiě)入hbase在一個(gè)事物的,如果先再寫(xiě)入hbase,再更新offset,保證的語(yǔ)義是at-least-once, 這種情況下數(shù)據(jù)不會(huì)丟失,但是會(huì)重復(fù)。 如果你先更新offset,再寫(xiě)hbase,這種情況可能造成數(shù)據(jù)丟失,語(yǔ)義是at-most-once.
更多關(guān)于“大數(shù)據(jù)培訓(xùn)”的問(wèn)題,歡迎咨詢(xún)千鋒教育在線名師。千鋒教育多年辦學(xué),課程大綱緊跟企業(yè)需求,更科學(xué)更嚴(yán)謹(jǐn),每年培養(yǎng)泛IT人才近2萬(wàn)人。不論你是零基礎(chǔ)還是想提升,都可以找到適合的班型,千鋒教育隨時(shí)歡迎你來(lái)試聽(tīng)。