W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
一個流應用程序必須全天候運行,所有必須能夠解決應用程序邏輯無關的故障(如系統(tǒng)錯誤,JVM崩潰等)。為了使這成為可能,Spark Streaming需要checkpoint足夠的信息到容錯存儲系統(tǒng)中,以使系統(tǒng)從故障中恢復。
Metadata checkpointing:保存流計算的定義信息到容錯存儲系統(tǒng)如HDFS中。這用來恢復應用程序中運行worker的節(jié)點的故障。元數(shù)據(jù)包括
Incomplete batches:操作存在隊列中的未完成的批
元數(shù)據(jù)checkpoint主要是為了從driver故障中恢復數(shù)據(jù)。如果transformation操作被用到了,數(shù)據(jù)checkpoint即使在簡單的操作中都是必須的。
應用程序在下面兩種情況下必須開啟checkpoint
updateStateByKey
或者reduceByKeyAndWindow
,checkpoint目錄必需提供用以定期checkpoint RDD。注意,沒有前述的有狀態(tài)的transformation的簡單流應用程序在運行時可以不開啟checkpoint。在這種情況下,從driver故障的恢復將是部分恢復(接收到了但是還沒有處理的數(shù)據(jù)將會丟失)。這通常是可以接受的,許多運行的Spark Streaming應用程序都是這種方式。
在容錯、可靠的文件系統(tǒng)(HDFS、s3等)中設置一個目錄用于保存checkpoint信息。著可以通過streamingContext.checkpoint(checkpointDirectory)
方法來做。這運行你用之前介紹的有狀態(tài)transformation。另外,如果你想從driver故障中恢復,你應該以下面的方式重寫你的Streaming應用程序。
start()
方法// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory
存在,上下文將會利用checkpoint數(shù)據(jù)重新創(chuàng)建。如果這個目錄不存在,將會調(diào)用functionToCreateContext
函數(shù)創(chuàng)建一個新的上下文,建立DStreams。請看RecoverableNetworkWordCount例子。
除了使用getOrCreate
,開發(fā)者必須保證在故障發(fā)生時,driver處理自動重啟。只能通過部署運行應用程序的基礎設施來達到該目的。在部署章節(jié)將有更進一步的討論。
注意,RDD的checkpointing有存儲成本。這會導致批數(shù)據(jù)(包含的RDD被checkpoint)的處理時間增加。因此,需要小心的設置批處理的時間間隔。在最小的批容量(包含1秒的數(shù)據(jù))情況下,checkpoint每批數(shù)據(jù)會顯著的減少操作的吞吐量。相反,checkpointing太少會導致譜系以及任務大小增大,這會產(chǎn)生有害的影響。因為有狀態(tài)的transformation需要RDD checkpoint。默認的間隔時間是批間隔時間的倍數(shù),最少10秒。它可以通過dstream.checkpoint
來設置。典型的情況下,設置checkpoint間隔是DStream的滑動間隔的5-10大小是一個好的嘗試。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: