Spark Streaming Checkpointing

2018-11-26 16:38 更新

Spark Streaming Checkpointing

一個流應用程序必須全天候運行,所有必須能夠解決應用程序邏輯無關的故障(如系統(tǒng)錯誤,JVM崩潰等)。為了使這成為可能,Spark Streaming需要checkpoint足夠的信息到容錯存儲系統(tǒng)中,以使系統(tǒng)從故障中恢復。

  • Metadata checkpointing:保存流計算的定義信息到容錯存儲系統(tǒng)如HDFS中。這用來恢復應用程序中運行worker的節(jié)點的故障。元數(shù)據(jù)包括

  • Configuration :創(chuàng)建Spark Streaming應用程序的配置信息
  • DStream operations :定義Streaming應用程序的操作集合
  • Incomplete batches:操作存在隊列中的未完成的批

  • Data checkpointing :保存生成的RDD到可靠的存儲系統(tǒng)中,這在有狀態(tài)transformation(如結(jié)合跨多個批次的數(shù)據(jù))中是必須的。在這樣一個transformation中,生成的RDD依賴于之前批的RDD,隨著時間的推移,這個依賴鏈的長度會持續(xù)增長。在恢復的過程中,為了避免這種無限增長。有狀態(tài)的transformation的中間RDD將會定時地存儲到可靠存儲系統(tǒng)中,以截斷這個依賴鏈。

元數(shù)據(jù)checkpoint主要是為了從driver故障中恢復數(shù)據(jù)。如果transformation操作被用到了,數(shù)據(jù)checkpoint即使在簡單的操作中都是必須的。

何時checkpoint

應用程序在下面兩種情況下必須開啟checkpoint

  • 使用有狀態(tài)的transformation。如果在應用程序中用到了updateStateByKey或者reduceByKeyAndWindow,checkpoint目錄必需提供用以定期checkpoint RDD。
  • 從運行應用程序的driver的故障中恢復過來。使用元數(shù)據(jù)checkpoint恢復處理信息。

注意,沒有前述的有狀態(tài)的transformation的簡單流應用程序在運行時可以不開啟checkpoint。在這種情況下,從driver故障的恢復將是部分恢復(接收到了但是還沒有處理的數(shù)據(jù)將會丟失)。這通常是可以接受的,許多運行的Spark Streaming應用程序都是這種方式。

怎樣配置Checkpointing

在容錯、可靠的文件系統(tǒng)(HDFS、s3等)中設置一個目錄用于保存checkpoint信息。著可以通過streamingContext.checkpoint(checkpointDirectory)方法來做。這運行你用之前介紹的有狀態(tài)transformation。另外,如果你想從driver故障中恢復,你應該以下面的方式重寫你的Streaming應用程序。

  • 當應用程序是第一次啟動,新建一個StreamingContext,啟動所有Stream,然后調(diào)用start()方法
  • 當應用程序因為故障重新啟動,它將會從checkpoint目錄checkpoint數(shù)據(jù)重新創(chuàng)建StreamingContext
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在,上下文將會利用checkpoint數(shù)據(jù)重新創(chuàng)建。如果這個目錄不存在,將會調(diào)用functionToCreateContext函數(shù)創(chuàng)建一個新的上下文,建立DStreams。請看RecoverableNetworkWordCount例子。

除了使用getOrCreate,開發(fā)者必須保證在故障發(fā)生時,driver處理自動重啟。只能通過部署運行應用程序的基礎設施來達到該目的。在部署章節(jié)將有更進一步的討論。

注意,RDD的checkpointing有存儲成本。這會導致批數(shù)據(jù)(包含的RDD被checkpoint)的處理時間增加。因此,需要小心的設置批處理的時間間隔。在最小的批容量(包含1秒的數(shù)據(jù))情況下,checkpoint每批數(shù)據(jù)會顯著的減少操作的吞吐量。相反,checkpointing太少會導致譜系以及任務大小增大,這會產(chǎn)生有害的影響。因為有狀態(tài)的transformation需要RDD checkpoint。默認的間隔時間是批間隔時間的倍數(shù),最少10秒。它可以通過dstream.checkpoint來設置。典型的情況下,設置checkpoint間隔是DStream的滑動間隔的5-10大小是一個好的嘗試。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號