Apache Kafka 基礎(chǔ)

2021-07-27 16:23 更新

對于大數(shù)據(jù),我們要考慮的問題有很多,首先海量數(shù)據(jù)如何收集(如 Flume),然后對于收集到的數(shù)據(jù)如何存儲(典型的分布式文件系統(tǒng) HDFS、分布式數(shù)據(jù)庫 HBase、NoSQL 數(shù)據(jù)庫 Redis),其次存儲的數(shù)據(jù)不是存起來就沒事了,要通過計(jì)算從中獲取有用的信息,這就涉及到計(jì)算模型(典型的離線計(jì)算 MapReduce、流式實(shí)時(shí)計(jì)算Storm、Spark),或者要從數(shù)據(jù)中挖掘信息,還需要相應(yīng)的機(jī)器學(xué)習(xí)算法。在這些之上,還有一些各種各樣的查詢分析數(shù)據(jù)的工具(如 Hive、Pig 等)。除此之外,要構(gòu)建分布式應(yīng)用還需要一些工具,比如分布式協(xié)調(diào)服務(wù) Zookeeper 等等。

??這里,我們講到的是消息系統(tǒng),Kafka 專為分布式高吞吐量系統(tǒng)而設(shè)計(jì),其他消息傳遞系統(tǒng)相比,Kafka 具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和固有的容錯(cuò)能力,這使得它非常適合大規(guī)模消息處理應(yīng)用程序。

(一)消息系統(tǒng)

??首先,我們理解一下什么是消息系統(tǒng):消息系統(tǒng)負(fù)責(zé)將數(shù)據(jù)從一個(gè)應(yīng)用程序傳輸?shù)搅硗庖粋€(gè)應(yīng)用程序,使得應(yīng)用程序可以專注于處理邏輯,而不用過多的考慮如何將消息共享出去。

??分布式消息系統(tǒng)基于可靠消息隊(duì)列的方式,消息在應(yīng)用程序和消息系統(tǒng)之間異步排隊(duì)。實(shí)際上,消息系統(tǒng)有兩種消息傳遞模式:一種是點(diǎn)對點(diǎn),另外一種是基于發(fā)布-訂閱(publish-subscribe)的消息系統(tǒng)。

1、點(diǎn)對點(diǎn)的消息系統(tǒng)

??在點(diǎn)對點(diǎn)的消息系統(tǒng)中,消息保留在隊(duì)列中,一個(gè)或者多個(gè)消費(fèi)者可以消耗隊(duì)列中的消息,但是消息最多只能被一個(gè)消費(fèi)者消費(fèi),一旦有一個(gè)消費(fèi)者將其消費(fèi)掉,消息就從該隊(duì)列中消失。這里要注意:多個(gè)消費(fèi)者可以同時(shí)工作,但是最終能拿到該消息的只有其中一個(gè)。最典型的例子就是訂單處理系統(tǒng),多個(gè)訂單處理器可以同時(shí)工作,但是對于一個(gè)特定的訂單,只有其中一個(gè)訂單處理器可以拿到該訂單進(jìn)行處理。

2、發(fā)布-訂閱消息系統(tǒng)

??在發(fā)布 - 訂閱系統(tǒng)中,消息被保留在主題中。 與點(diǎn)對點(diǎn)系統(tǒng)不同,消費(fèi)者可以訂閱一個(gè)或多個(gè)主題并使用該主題中的所有消息。在發(fā)布 - 訂閱系統(tǒng)中,消息生產(chǎn)者稱為發(fā)布者,消息使用者稱為訂閱者。 一個(gè)現(xiàn)實(shí)生活的例子是Dish電視,它發(fā)布不同的渠道,如運(yùn)動,電影,音樂等,任何人都可以訂閱自己的頻道集,并獲得他們訂閱的頻道時(shí)可用。

(二)Apache Kafka 簡介

??Kafka is a distributed,partitioned,replicated commit logservice。

??Apache Kafka 是一個(gè)分布式發(fā)布 - 訂閱消息系統(tǒng)和一個(gè)強(qiáng)大的隊(duì)列,可以處理大量的數(shù)據(jù),并使你能夠?qū)⑾囊粋€(gè)端點(diǎn)傳遞到另一個(gè)端點(diǎn)。 Kafka 適合離線和在線消息消費(fèi)。 Kafka 消息保留在磁盤上,并在群集內(nèi)復(fù)制以防止數(shù)據(jù)丟失。 Kafka 構(gòu)建在 ZooKeeper 同步服務(wù)之上。 它與 Apache Storm 和 Spark 非常好地集成,用于實(shí)時(shí)流式數(shù)據(jù)分析。

??Kafka 是一個(gè)分布式消息隊(duì)列,具有高性能、持久化、多副本備份、橫向擴(kuò)展能力。生產(chǎn)者往隊(duì)列里寫消息,消費(fèi)者從隊(duì)列里取消息進(jìn)行業(yè)務(wù)邏輯。一般在架構(gòu)設(shè)計(jì)中起到解耦、削峰、異步處理的作用。

??關(guān)鍵術(shù)語:

??(1)生產(chǎn)者和消費(fèi)者(producer和consumer):消息的發(fā)送者叫 Producer,消息的使用者和接受者是 Consumer,生產(chǎn)者將數(shù)據(jù)保存到 Kafka 集群中,消費(fèi)者從中獲取消息進(jìn)行業(yè)務(wù)的處理。

??(2)broker:Kafka 集群中有很多臺 Server,其中每一臺 Server 都可以存儲消息,將每一臺 Server 稱為一個(gè) kafka 實(shí)例,也叫做 broker。

??(3)主題(topic):一個(gè) topic 里保存的是同一類消息,相當(dāng)于對消息的分類,每個(gè) producer 將消息發(fā)送到 kafka 中,都需要指明要存的 topic 是哪個(gè),也就是指明這個(gè)消息屬于哪一類。

??(4)分區(qū)(partition):每個(gè) topic 都可以分成多個(gè) partition,每個(gè) partition 在存儲層面是 append log 文件。任何發(fā)布到此 partition 的消息都會被直接追加到 log 文件的尾部。為什么要進(jìn)行分區(qū)呢?最根本的原因就是:kafka基于文件進(jìn)行存儲,當(dāng)文件內(nèi)容大到一定程度時(shí),很容易達(dá)到單個(gè)磁盤的上限,因此,采用分區(qū)的辦法,一個(gè)分區(qū)對應(yīng)一個(gè)文件,這樣就可以將數(shù)據(jù)分別存儲到不同的server上去,另外這樣做也可以負(fù)載均衡,容納更多的消費(fèi)者。

??(5)偏移量(Offset):一個(gè)分區(qū)對應(yīng)一個(gè)磁盤上的文件,而消息在文件中的位置就稱為 offset(偏移量),offset 為一個(gè) long 型數(shù)字,它可以唯一標(biāo)記一條消息。由于kafka 并沒有提供其他額外的索引機(jī)制來存儲 offset,文件只能順序的讀寫,所以在kafka中幾乎不允許對消息進(jìn)行“隨機(jī)讀寫”。

??綜上,我們總結(jié)一下 Kafka 的幾個(gè)要點(diǎn):

  • kafka 是一個(gè)基于發(fā)布-訂閱的分布式消息系統(tǒng)(消息隊(duì)列)
  • Kafka 面向大數(shù)據(jù),消息保存在主題中,而每個(gè) topic 有分為多個(gè)分區(qū)
  • kafka 的消息數(shù)據(jù)保存在磁盤,每個(gè) partition 對應(yīng)磁盤上的一個(gè)文件,消息寫入就是簡單的文件追加,文件可以在集群內(nèi)復(fù)制備份以防丟失
  • 即使消息被消費(fèi),kafka 也不會立即刪除該消息,可以通過配置使得過一段時(shí)間后自動刪除以釋放磁盤空間
  • kafka依賴分布式協(xié)調(diào)服務(wù)Zookeeper,適合離線/在線信息的消費(fèi),與 storm 和 spark 等實(shí)時(shí)流式數(shù)據(jù)分析常常結(jié)合使用

(三)Apache Kafka基本原理

??通過之前的介紹,我們對 kafka 有了一個(gè)簡單的理解,它的設(shè)計(jì)初衷是建立一個(gè)統(tǒng)一的信息收集平臺,使其可以做到對信息的實(shí)時(shí)反饋。Kafka is a distributed,partitioned,replicated commit logservice。接下來我們著重從幾個(gè)方面分析其基本原理。

1、分布式和分區(qū)(distributed、partitioned)

??我們說 kafka 是一個(gè)分布式消息系統(tǒng),所謂的分布式,實(shí)際上我們已經(jīng)大致了解。消息保存在 Topic 中,而為了能夠?qū)崿F(xiàn)大數(shù)據(jù)的存儲,一個(gè) topic 劃分為多個(gè)分區(qū),每個(gè)分區(qū)對應(yīng)一個(gè)文件,可以分別存儲到不同的機(jī)器上,以實(shí)現(xiàn)分布式的集群存儲。另外,每個(gè) partition 可以有一定的副本,備份到多臺機(jī)器上,以提高可用性。

??總結(jié)起來就是:一個(gè) topic 對應(yīng)的多個(gè) partition 分散存儲到集群中的多個(gè) broker 上,存儲方式是一個(gè) partition 對應(yīng)一個(gè)文件,每個(gè) broker 負(fù)責(zé)存儲在自己機(jī)器上的 partition 中的消息讀寫。

2、副本(replicated )

??kafka 還可以配置 partitions 需要備份的個(gè)數(shù)(replicas),每個(gè) partition 將會被備份到多臺機(jī)器上,以提高可用性,備份的數(shù)量可以通過配置文件指定。

??這種冗余備份的方式在分布式系統(tǒng)中是很常見的,那么既然有副本,就涉及到對同一個(gè)文件的多個(gè)備份如何進(jìn)行管理和調(diào)度。kafka 采取的方案是:每個(gè) partition 選舉一個(gè) server 作為“l(fā)eader”,由 leader 負(fù)責(zé)所有對該分區(qū)的讀寫,其他 server 作為 follower 只需要簡單的與 leader 同步,保持跟進(jìn)即可。如果原來的 leader 失效,會重新選舉由其他的 follower 來成為新的 leader。

??至于如何選取 leader,實(shí)際上如果我們了解 ZooKeeper,就會發(fā)現(xiàn)其實(shí)這正是 Zookeeper 所擅長的,Kafka 使用 ZK 在 Broker 中選出一個(gè) Controller,用于 Partition 分配和 Leader 選舉。

??另外,這里我們可以看到,實(shí)際上作為 leader 的 server 承擔(dān)了該分區(qū)所有的讀寫請求,因此其壓力是比較大的,從整體考慮,有多少個(gè) partition 就意味著會有多少個(gè)leader,kafka 會將 leader 分散到不同的 broker 上,確保整體的負(fù)載均衡。

3、整體數(shù)據(jù)流程

??Kafka 的總體數(shù)據(jù)流滿足下圖,該圖可以說是概括了整個(gè) kafka 的基本原理。


(1)數(shù)據(jù)生產(chǎn)過程(Produce)

??對于生產(chǎn)者要寫入的一條記錄,可以指定四個(gè)參數(shù):分別是 topic、partition、key 和 value,其中 topic 和 value(要寫入的數(shù)據(jù))是必須要指定的,而 key 和 partition 是可選的。

??對于一條記錄,先對其進(jìn)行序列化,然后按照 Topic 和 Partition,放進(jìn)對應(yīng)的發(fā)送隊(duì)列中。如果 Partition 沒填,那么情況會是這樣的:a、Key 有填。按照 Key 進(jìn)行哈希,相同 Key 去一個(gè) Partition。b、Key 沒填。Round-Robin 來選 Partition。

??producer 將會和Topic下所有 partition leader 保持 socket 連接,消息由 producer 直接通過 socket 發(fā)送到 broker。其中 partition leader 的位置( host : port )注冊在 zookeeper 中,producer 作為 zookeeper client,已經(jīng)注冊了 watch 用來監(jiān)聽 partition leader 的變更事件,因此,可以準(zhǔn)確的知道誰是當(dāng)前的 leader。

??producer 端采用異步發(fā)送:將多條消息暫且在客戶端 buffer 起來,并將他們批量的發(fā)送到 broker,小數(shù)據(jù) IO 太多,會拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率。

(2)數(shù)據(jù)消費(fèi)過程(Consume)

??對于消費(fèi)者,不是以單獨(dú)的形式存在的,每一個(gè)消費(fèi)者屬于一個(gè) consumer group,一個(gè) group 包含多個(gè) consumer。特別需要注意的是:訂閱 Topic 是以一個(gè)消費(fèi)組來訂閱的,發(fā)送到 Topic 的消息,只會被訂閱此 Topic 的每個(gè) group 中的一個(gè) consumer 消費(fèi)。

??如果所有的 Consumer 都具有相同的 group,那么就像是一個(gè)點(diǎn)對點(diǎn)的消息系統(tǒng);如果每個(gè) consumer 都具有不同的 group,那么消息會廣播給所有的消費(fèi)者。

??具體說來,這實(shí)際上是根據(jù) partition 來分的,一個(gè) Partition,只能被消費(fèi)組里的一個(gè)消費(fèi)者消費(fèi),但是可以同時(shí)被多個(gè)消費(fèi)組消費(fèi),消費(fèi)組里的每個(gè)消費(fèi)者是關(guān)聯(lián)到一個(gè) partition 的,因此有這樣的說法:對于一個(gè) topic,同一個(gè) group 中不能有多于 partitions 個(gè)數(shù)的 consumer 同時(shí)消費(fèi),否則將意味著某些 consumer 將無法得到消息。

??同一個(gè)消費(fèi)組的兩個(gè)消費(fèi)者不會同時(shí)消費(fèi)一個(gè) partition。

??在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立連接之后,主動去 pull(或者說 fetch )消息,首先 consumer 端可以根據(jù)自己的消費(fèi)能力適時(shí)的去 fetch 消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset)。

??partition 中的消息只有一個(gè) consumer 在消費(fèi),且不存在消息狀態(tài)的控制,也沒有復(fù)雜的消息確認(rèn)機(jī)制,可見 kafka broker 端是相當(dāng)輕量級的。當(dāng)消息被 consumer 接收之后,需要保存 Offset 記錄消費(fèi)到哪,以前保存在 ZK 中,由于 ZK 的寫性能不好,以前的解決方法都是 Consumer 每隔一分鐘上報(bào)一次,在 0.10 版本后,Kafka 把這個(gè) Offset 的保存,從 ZK 中剝離,保存在一個(gè)名叫 consumeroffsets topic 的 Topic 中,由此可見,consumer 客戶端也很輕量級。

4、消息傳送機(jī)制

??Kafka 支持 3 種消息投遞語義,在業(yè)務(wù)中,常常都是使用 At least once 的模型。

  • At most once:最多一次,消息可能會丟失,但不會重復(fù)。
  • At least once:最少一次,消息不會丟失,可能會重復(fù)。
  • Exactly once:只且一次,消息不丟失不重復(fù),只且消費(fèi)一次。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號