Kafka - 第二章 | Apache Kafka 基礎
對於大數據,我們要考慮的問題有很多,首先海量數據如何收集(如Flume
),然後對於收集到的數據如何存儲(典型的分佈式文件系統HDFS
、分佈式資料庫HBase
、NoSQL
資料庫Redis
),其次存儲的數據不是存起來就沒事了,要通過計算從中獲取有用的信息,這就涉及到計算模型(典型的離線計算MapReduce
、流式實時計算Storm
、Spark
),或者要從數據中挖掘信息,還需要相應的機器學習算法。在這些之上,還有一些各種各樣的查詢分析數據的工具(如Hive
、Pig
等)。除此之外,要構建分佈式應用還需要一些工具,比如分佈式協調服務Zookeeper
等等。
這裡,我們講到的是消息系統,Kafka
專為分佈式高吞吐量系統而設計,其他消息傳遞系統相比,Kafka
具有更好的吞吐量,內置分區,複製和固有的容錯能力,這使得它非常適合大規模消息處理應用程序。
消息系統
消息系統負責將數據從一個應用程序傳輸到另外一個應用程序,使得應用程序可以專注於處理邏輯,而不用過多的考慮如何將消息共享出去。
分佈式消息系統基於可靠消息隊列的方式,消息在應用程序和消息系統之間異步排隊。實際上,消息系統有兩種消息傳遞模式:一種是點對點,另外一種是基於發布-訂閱(publish-subscribe
)的消息系統。
- 點對點的消息系統(
Point to Point Messaging System
)
在點對點的消息系統中,消息保留在隊列中,一個或者多個消費者可以消耗隊列中的消息,但是消息最多只能被一個消費者消費,一旦有一個消費者將其消費掉,消息就從該隊列中消失。這裡要注意:多個消費者可以同時工作,但是最終能拿到該消息的只有其中一個。最典型的例子就是訂單處理系統,多個訂單處理器可以同時工作,但是對於一個特定的訂單,只有其中一個訂單處理器可以拿到該訂單進行處理。
- 發布-訂閱消息系統(
Publish-Subscribe Messaging System
)
在發布 - 訂閱系統中,消息被保留在主題中。與點對點系統不同,消費者可以訂閱一個或多個主題並使用該主題中的所有消息。在發布 - 訂閱系統中,消息生產者稱為發布者,消息使用者稱為訂閱者。一個現實生活的例子是Dish電視,它發布不同的渠道,如運動,電影,音樂等,任何人都可以訂閱自己的頻道集,並獲得他們訂閱的頻道時可用。
Apache Kafka 簡介
Apache Kafka
是一個分佈式發布 - 訂閱消息系統和一個強大的隊列,可以處理大量的數據,並使你能夠將消息從一個端點傳遞到另一個端點。Kafka
適合離線和在線消息消費。Kafka
消息保留在磁盤上,並在群集內復制以防止數據丟失。Kafka
構建在ZooKeeper
同步服務之上。它與Apache Storm
和Spark
非常好地集成,用於實時流式數據分析。
Kafka
是一個分佈式消息隊列,具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列裡寫消息,消費者從隊列裡取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。
基本術語:
- 生產者和消費者(
producer
和consumer
):消息的發送者叫Producer
,消息的使用者和接受者是Consumer
,生產者將數據保存到Kafka
集群中,消費者從中獲取消息進行業務的處理。 - broker:
Kafka
集群中有很多台Server
,其中每一台Server
都可以存儲消息,將每一台Server
稱為一個kafka
實例,也叫做broker
。 - 主題(topic):一個
topic
裡保存的是同一類消息,相當於對消息的分類,每個producer
將消息發送到kafka
中,都需要指明要存的topic
是哪個,也就是指明這個消息屬於哪一類。 - 分區(partition):每個
topic
都可以分成多個partition
,每個partition
在存儲層面是append log
文件。任何發佈到此partition
的消息都會被直接追加到log
文件的尾部。為什麼要進行分區呢?最根本的原因就是:kafka
基於文件進行存儲,當文件內容大到一定程度時,很容易達到單個磁盤的上限,因此,採用分區的辦法,一個分區對應一個文件,這樣就可以將數據分別存儲到不同的server
上去,另外這樣做也可以負載均衡,容納更多的消費者。 - 偏移量(Offset):一個分區對應一個磁盤上的文件,而消息在文件中的位置就稱為
offset
(偏移量),offset
為一個long
型數字,它可以唯一標記一條消息。由於kafka
並沒有提供其他額外的索引機制來存儲offset
,文件只能順序的讀寫,所以在kafka
中幾乎不允許對消息進行「隨機讀寫」。
- 生產者和消費者(
Kafka
的幾個要點:kafka
是一個基於發布-訂閱的分佈式消息系統(消息隊列)。Kafka
面向大數據,消息保存在主題中,而每個topic
有分為多個分區。kafak
的消息數據保存在磁盤,每個partition
對應磁盤上的一個文件,消息寫入就是簡單的文件追加,文件可以在集群內復製備份以防丟失。- 即使消息被消費,
kafka
也不會立即刪除該消息,可以通過配置使得過一段時間後自動刪除以釋放磁盤空間。 kafka
依賴分佈式協調服務Zookeeper
,適合離線/在線信息的消費,與storm
和saprk
等實時流式數據分析常常結合使用。
Apache Kafka基本原理
深入主题和日誌(Topic和Log)
Topic
是發布的消息的類別名,一個topic
可以有零個,一個或多個消費者訂閱該主題的消息。
對於每個topic
,Kafka
集群都會維護一個分區log
,就像下圖中所示:
每一個分區都是一個順序的、不可變的消息隊列,並且可以持續的添加。分區中的消息都被分了一個序列號,稱之為偏移量(offset
),在每個分區中此偏移量都是唯一的。
Kafka
集群保持所有的消息,直到它們過期(無論消息是否被消費)。實際上消費者所持有的僅有的元數據就是這個offset
(偏移量),也就是說offset
由消費者來控制:正常情況當消費者消費消息的時候,偏移量也線性的的增加。但是實際偏移量由消費者控制,消費者可以將偏移量重置為更早的位置,重新讀取消息。可以看到這種設計對消費者來說操作自如,一個消費者的操作不會影響其它消費者對此log
的處理。
分佈式和分區(distributed、partitioned)
Kafka
是一個分佈式消息系統,所謂的分佈式,實際上我們已經大致了解。消息保存在Topic
中,而為了能夠實現大數據的存儲,一個 topic
劃分為多個分區,每個分區對應一個文件,可以分別存儲到不同的機器上,以實現分佈式的集群存儲。另外,每個partition
可以有一定的副本,備份到多台機器上,以提高可用性。
總結起來就是:一個topic
對應的多個partition
分散存儲到集群中的多個broker
上,存儲方式是一個partition
對應一個文件,每個 broker
負責存儲在自己機器上的partition
中的消息讀寫。
副本(replicated )
kafka
還可以配置partitions
需要備份的個數(replicas
),每個partition
將會被備份到多台機器上,以提高可用性,備份的數量可以通過配置文件指定。
這種冗餘備份的方式在分佈式系統中是很常見的,那麼既然有副本,就涉及到對同一個文件的多個備份如何進行管理和調度。kafka
採取的方案是:每個partition
選舉一個server
作為「leader
」,由leader
負責所有對該分區的讀寫,其他server
作為「follower
」只需要簡單的與leader
同步,保持跟進即可。如果原來的leader
失效,會重新選舉由其他的follower
來成為新的leader
。
至於如何選取leader
,實際上如果我們了解ZooKeeper
,就會發現其實這正是Zookeeper
所擅長的,Kafka
使用ZK
在Broker
中選出一個 Controller
,用於Partition
分配和Leader
選舉。
另外,這裡我們可以看到,實際上作為leader
的server
承擔了該分區所有的讀寫請求,因此其壓力是比較大的,從整體考慮,從多少個partition
就意味著會有多少個leader
,kafka
會將leader
分散到不同的broker
上,確保整體的負載均衡。
整體流程
數據生產過程(Produce)
對於生產者要寫入的一條記錄,可以指定四個參數:分別是topic
、partition
、key
和value
,其中topic
和value
(要寫入的數據)是必須要指定的,而key
和partition
是可選的。
對於一條記錄,先對其進行序列化,然後按照Topic
和Partition
,放進對應的發送隊列中。如果Partition
沒填,那麼情況會是這樣的:
- Key有填:按照
Key
進行哈希,相同Key
去一個Partition
。 - Key沒填:
Round-Robin
來選Partition
。
producer
將會和Topic
下所有partition leader
保持socket
連接,消息由producer
直接通過socket
發送到broker
。其中partition leader
的位置(host : port
)註冊在zookeeper
中,producer
作為zookeeper client
,已經註冊了watch
用來監聽partition leader
的變更事件,因此,可以準確的知道誰是當前的leader
。
producer
端採用異步發送:將多條消息暫且在客戶端buffer
起來,並將他們批量的發送到broker
,小數據I/O
太多,會拖慢整體的網路延遲,批量延遲發送事實上提升了網路效率。
數據消費過程(Consume)
對於消費者,不是以單獨的形式存在的,每一個消費者屬於一個consumer group
,一個group
包含多個consumer
。特別需要注意的是:訂閱Topic
是以一個消費組來訂閱的,發送到Topic
的消息,只會被訂閱此Topic
的每個group
中的一個consumer
消費。
如果所有的Consumer
都具有相同的group
,那麼就像是一個點對點的消息系統;如果每個consumer
都具有不同的group
,那麼消息會廣播給所有的消費者。
具體說來,這實際上是根據partition
來分的,一個Partition
,只能被消費組裡的一個消費者消費,但是可以同時被多個消費組消費,消費組裡的每個消費者是關聯到一個partition
的,因此有這樣的說法:對於一個topic
,同一個group
中不能有多於partitions
個數的consumer
同時消費,否則將意味著某些consumer
將無法得到消息。
同一個消費組的兩個消費者不會同時消費一個partition
:
在kafka
中,採用了pull
方式,即consumer
在和broker
建立連接之後,主動去pull
(或者說fetch
)消息,首先consumer
端可以根據自己的消費能力適時的去fetch
消息並處理,且可以控制消息消費的進度(offset
)。
partition
中的消息只有一個consumer
在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker
端是相當輕量級的。當消息被consumer
接收之後,需要保存Offset
記錄消費到哪,以前保存在ZK
中,由於ZK
的寫性能不好,以前的解決方法都是Consumer
每隔一分鐘上報一次,在0.10
版本後,Kafka
把這個Offset
的保存,從ZK
中剝離,保存在一個名叫consumeroffsets topic
的Topic
中,由此可見,consumer
客戶端也很輕量級。
其他
Kafka的保證(Guarantees)
生產者發送到一個特定的Topic
的分區上,消息將會按照它們發送的順序依次加入,也就是說,如果一個消息M1
和M2
使用相同的producer
發送,M1
先發送,那麼M1
將比M2
的offset
低,並且優先的出現在日誌中。
消費者收到的消息也是此順序。
如果一個Topic
配置了複製因子(replication factor
)為N
,那麼可以允許N-1
服務器當機而不丟失任何已經提交(committed
)的消息。
kafka作為一個消息系統
Kafka
的流與傳統企業消息系統相比的概念如何?
傳統的消息有兩種模式:隊列和發布訂閱。在隊列模式中,消費者池從服務器讀取消息(每個消息只被其中一個讀取;發布訂閱模式:消息廣播給所有的消費者。這兩種模式都有優缺點,隊列的優點是允許多個消費者瓜分處理數據,這樣可以擴展處理。但是,隊列不像多個訂閱者,一旦消息者進程讀取後故障了,那麼消息就丟了。而發布和訂閱允許你廣播數據到多個消費者,由於每個訂閱者都訂閱了消息,所以沒辦法縮放處理。
kafka
中消費者組有兩個概念:隊列:消費者組(consumer group
)允許同名的消費者組成員瓜分處理。發布訂閱:允許你廣播消息給多個消費者組(不同名)。
kafka
的每個topic
都具有這兩種模式。
kafka
有比傳統的消息系統更強的順序保證。
傳統的消息系統按順序保存數據,如果多個消費者從隊列消費,則服務器按存儲的順序發送消息,但是,儘管服務器按順序發送,消息異步傳遞到消費者,因此消息可能亂序到達消費者。這意味著消息存在並行消費的情況,順序就無法保證。消息系統常常通過僅設1個消費者來解決這個問題,但是這意味著沒用到並行處理。
kafka
做的更好。通過並行topic
的parition
—— kafka
提供了順序保證和負載均衡。每個partition
僅由同一個消費者組中的一個消費者消費到。並確保消費者是該partition
的唯一消費者,並按順序消費數據。每個topic
有多個分區,則需要對多個消費者做負載均衡,但請注意,相同的消費者組中不能有比分區更多的消費者,否則多出的消費者一直處於空等待,不會收到消息。
kafka作為一個存儲系統
所有發布消息到消息隊列和消費分離的系統,實際上都充當了一個存儲系統(發布的消息先存儲起來)。Kafka
比別的系統的優勢是它是一個非常高性能的存儲系統。
寫入到kafka
的數據將寫到磁盤並複製到集群中保證容錯性。並允許生產者等待消息應答,直到消息完全寫入。
kafka
的磁盤結構 - 無論你服務器上有50KB
或50TB
,執行是相同的。
client
來控制讀取數據的位置。你還可以認為kafka是一種專用於高性能,低延遲,提交日誌存儲,複製,和傳播特殊用途的分佈式文件系統。
kafka的流處理
僅僅讀,寫和存儲是不夠的,kafka
的目標是實時的流處理。
在kafka
中,流處理持續獲取輸入topic
的數據,進行處理加工,然後寫入輸出topic
。例如,一個零售APP
,接收銷售和出貨的輸入流,統計數量或調整價格後輸出。
可以直接使用producer
和consumer API
進行簡單的處理。對於復雜的轉換,Kafka
提供了更強大的Streams API
。可構建聚合計算或連接流到一起的複雜應用程序。
助於解決此類應用面臨的硬性問題:處理無序的數據,代碼更改的再處理,執行狀態計算等。
Sterams API
在Kafka
中的核心:使用producer
和consumer API
作為輸入,利用Kafka
做狀態存儲,使用相同的組機制在stream
處理器實例之間進行容錯保障。