本文主要介紹雲訊息佇列 Kafka 版訂閱者的最佳實務,協助您減少消費訊息出錯的可能性。
消費訊息基本流程
雲訊息佇列 Kafka 版訂閱者在訂閱訊息時的基本流程為:Poll資料→執行消費邏輯→再次Poll資料,詳情參見下圖。
負載平衡
每個Group可以包含多個消費執行個體,即可以啟動多個雲訊息佇列 Kafka 版Consumer,並把參數group.id
設定成相同的值。屬於同一個Group的消費執行個體會負載消費訂閱的Topic。
例如Group A訂閱了Topic A,並開啟三個消費執行個體C1、C2、C3,則發送到Topic A的每條訊息最終只會傳給C1、C2、C3的某一個。雲訊息佇列 Kafka 版預設會均勻地把訊息傳給各個訊息執行個體,以做到消費負載平衡。
雲訊息佇列 Kafka 版負載平衡消費的內部原理是,把訂閱的Topic的分區,平均分配給各個消費執行個體。因此,消費執行個體的個數不要大於分區的數量,否則會有消費執行個體分配不到任何分區而處於空跑狀態。這個負載平衡發生的時間,除了第一次啟動上線之外,後續消費執行個體發生重啟、增加、減少等變更時,都會觸發一次負載平衡。
消費用戶端(Consumer)頻繁出現Rebalance
心跳逾時會引發Rebalance,可以通過參數調整、提高消費速度等方法解決。更多資訊,請參見為什麼消費用戶端頻繁出現Rebalance?。
分區個數
分區個數主要影響的是消費者的並發數量。
對於同一個Group內的消費者來說,一個分區最多隻能被一個消費者消費。因此,消費執行個體的個數不要大於分區的數量,否則會有消費執行個體分配不到任何分區而處於空跑狀態。
控制台的預設分區個數是12,可以滿足絕大部分情境的需求。您可以根據業務使用量進行增加。不建議分區數小於12,否則可能影響消費發送效能;也不建議超過100個,否則易引發消費端Rebalance。
分區增加後,將不能減少,請小幅度調整。
多個訂閱
雲訊息佇列 Kafka 版支援以下多個訂閱者式:
Group訂閱多個Topic。
一個Group可以訂閱多個Topic,多個Topic的訊息被Group中的Consumer均勻消費。例如Group A訂閱了Topic A、Topic B、Topic C,則這三個Topic中的訊息,被Group中的Consumer均勻消費。
Group訂閱多個Topic的範例程式碼如下:
String topicStr = kafkaProperties.getProperty("topic"); String[] topics = topicStr.split(","); for (String topic: topics) { subscribedTopics.add(topic.trim()); } consumer.subscribe(subscribedTopics);
Topic被多個Group訂閱。
一個Topic可以被多個Group訂閱,且各個Group獨立消費Topic下的所有訊息。例如Group A訂閱了Topic A,Group B也訂閱了Topic A,則發送到Topic A的每條訊息,不僅會傳一份給Group A的消費執行個體,也會傳一份給Group B的消費執行個體,且這兩個過程相互獨立,相互沒有任何影響。
一個Group對應一個應用
建議一個Group對應一個應用,即不同的應用對應不同的代碼。如果您需要將不同的代碼寫在同一個應用中,請準備多份不同的kafka.properties。例如kafka1.properties、kafka2.properties。
消費位點
每個Topic會有多個分區,每個分區會統計當前訊息的總條數,這個稱為最大位點MaxOffset。
雲訊息佇列 Kafka 版Consumer會按順序依次消費分區內的每條訊息,記錄已經消費了的訊息條數,稱為消費位點ConsumerOffset。
剩餘的未消費的條數(也稱為訊息堆積量)=MaxOffset-ConsumerOffset。
消費位點提交
雲訊息佇列 Kafka 版消費者有兩個相關參數:
enable.auto.commit:是否採用自動認可位點機制。預設值為true,表示預設採用自動認可機制。
auto.commit.interval.ms: 自動認可位點時間間隔。預設值為1000,即1s。
這兩個參數組合的結果就是,每次poll資料前會先檢查上次提交位點的時間,如果距離目前時間已經超過參數auto.commit.interval.ms規定的時間長度,則用戶端會啟動位點提交動作。
因此,如果將enable.auto.commit設定為true,則需要在每次poll資料時,確保前一次poll出來的資料已經消費完畢,否則可能導致位點跳躍。
如果想自己控制位點提交,請把enable.auto.commit設為false,並調用commit(offsets)
函數自行控制位點提交。
消費位點重設
以下兩種情況,會發生消費位點重設:
當服務端不存在曾經提交過的位點時(例如用戶端第一次上線)。
當從非法位點拉取訊息時(例如某個分區最大位點是10,但用戶端卻從11開始拉取訊息)。
Java用戶端可以通過auto.offset.reset來配置重設策略,主要有三種策略:
latest:從最大位點開始消費。
earliest:從最小位點開始消費。
none:不做任何操作,即不重設。
建議設定成latest,而不要設定成earliest,避免因位點非法時從頭開始消費,從而造成大量重複。
如果是您自己管理位點,可以設定成none。
拉取大訊息
消費過程是由用戶端主動去服務端拉取訊息的,在拉取大訊息時,需要注意控制拉取速度,注意修改配置:
max.poll.records:每次Poll擷取的最大訊息數量。如果單條訊息超過1 MB,建議設定為1。
fetch.max.bytes:設定比單條訊息的大小略大一點。
max.partition.fetch.bytes:設定比單條訊息的大小略大一點。
拉取大訊息的核心是逐條拉取的。
訊息重複和消費等冪
雲訊息佇列 Kafka 版消費的語義是at least once, 也就是至少投遞一次,保證訊息不丟失,但是無法保證訊息不重複。在出現網路問題、用戶端重啟時均有可能造成少量重複訊息,此時應用消費端如果對訊息重複比較敏感(例如訂單交易類),則應該做訊息等冪。
以資料庫類應用為例,常用做法是:
發送訊息時,傳入key作為唯一流水號ID。
消費訊息時,判斷key是否已經消費過,如果已經被消費,則忽略,如果沒消費過,則消費一次。
如果應用本身對少量訊息重複不敏感,則不需要做此類等冪檢查。
消費失敗
雲訊息佇列 Kafka 版是按分區逐條訊息順序向前推進消費的,如果消費端拿到某條訊息後執行消費邏輯失敗,例如應用伺服器出現了髒資料,導致某條訊息處理失敗,等待人工幹預,那麼有以下兩種處理方式:
失敗後一直嘗試再次執行消費邏輯。這種方式有可能造成消費線程阻塞在當前訊息,無法向前推進,造成訊息堆積。
雲訊息佇列 Kafka 版沒有處理失敗訊息的設計,實踐中通常會列印失敗的訊息或者儲存到某個服務(例如建立一個Topic專門用來放失敗的訊息),然後定時檢查失敗訊息的情況,分析失敗原因,根據情況處理。
消費延遲
雲訊息佇列 Kafka 版的消費機制是由用戶端主動去服務端拉取訊息進行消費的。因此,如果用戶端能夠及時消費,則不會產生較大延遲。如果產生了較大延遲,請先關注是否有堆積,並注意提高消費速度。
消費阻塞以及堆積
消費端最常見的問題就是消費堆積,最常造成堆積的原因是:
消費速度跟不上生產速度,此時應該提高消費速度,詳情請參見提高消費速度。
消費端產生了阻塞。
消費端拿到訊息後,執行消費邏輯,通常會執行一些遠程調用,如果這個時候同步等待結果,則有可能造成一直等待,消費進程無法向前推進。
消費端應該竭力避免堵塞消費線程,如果存在等待調用結果的情況,建議設定等待的逾時時間,逾時後作為消費失敗進行處理。
提高消費速度
提高消費速度有以下兩個辦法:
增加Consumer執行個體個數。
可以在進程內直接增加(需要保證每個執行個體對應一個線程,否則沒有太大意義),也可以部署多個消費執行個體進程;需要注意的是,執行個體個數超過分區數量後就不再能提高速度,將會有消費執行個體不工作。
增加消費線程。
增加Consumer執行個體本質上也是增加線程的方式來提升速度,因此更加重要的效能提升方式是增加消費線程,最基本的步驟如下:
定義一個線程池。
Poll資料。
把資料提交到線程池進行並發處理。
等並髮結果返回成功後,再次poll資料執行。
訊息過濾
雲訊息佇列 Kafka 版自身沒有訊息過濾的語義。實踐中可以採取以下兩個辦法:
如果過濾的種類不多,可以採取多個Topic的方式達到過濾的目的。
如果過濾的種類多,則最好在用戶端業務層面自行過濾。
實踐中請根據業務具體情況進行選擇,也可以綜合運用上面兩種辦法。
訊息廣播
雲訊息佇列 Kafka 版沒有訊息廣播的語義,可以通過建立不同的Group來類比實現。
訂閱關係
同一個Group內,各個消費執行個體訂閱的Topic最好保持一致,避免給排查問題帶來幹擾。