本文介紹雲訊息佇列 Kafka 版發行者的最佳實務,協助您降低發送訊息的錯誤率。本文最佳實務基於Java用戶端。對於其他語言的用戶端,其基本概念與思想是相通的,但實現細節可能存在差異。
發送訊息
發送訊息的範例程式碼如下:
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
topic, //訊息主題。
null, //分區編號。建議為null,由Producer分配。
System.currentTimeMillis(), //時間戳記。
String.valueOf(value.hashCode()), //訊息鍵。
value //訊息值。
));
完整範例程式碼,請參見SDK概述。
Key和Value
0.10.2版本的雲訊息佇列 Kafka 版的訊息有以下兩個欄位:
Key:訊息的標識。
Value:訊息內容。
為了便於追蹤,請為訊息設定一個唯一的Key。您可以通過Key追蹤某訊息,列印發送日誌和消費日誌,瞭解該訊息的發送和消費情況。
如果訊息發送量較大,建議不要設定Key,並使用黏性分區策略。黏性分區策略詳情,請參見黏性分區策略。
在0.11.0以及之後的版本,雲訊息佇列 Kafka 版開始支援headers,如果您需要使用headers,需要將服務端升級至2.2.0版本。
失敗重試
分布式環境下,由於網路等原因偶爾發送失敗是常見的。導致這種失敗的原因可能是訊息已經發送成功,但是ACK失敗,也有可能是確實沒發送成功。
雲訊息佇列 Kafka 版是VIP網路架構,長時間不進行通訊串連會被主動斷開,因此,不是一直活躍的用戶端會經常收到connection reset by peer
錯誤,建議重試訊息發送。
您可以根據業務需求,設定以下重試參數:
retries
:訊息發送失敗時的重試次數。retry.backoff.ms
,訊息發送失敗的稍候再試,建議設定為1000
,單位:毫秒。
非同步發送
發送介面是非同步,如果您想接收發送的結果,可以調用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
安全執行緒
Producer是安全執行緒的,且可以往任何Topic發送訊息。通常情況下,一個應用對應一個Producer。
Acks
Acks的說明如下:
acks=0
:無需服務端的Response、效能較高、丟資料風險較大。acks=1
:服務端主節點寫成功即返回Response、效能中等、丟資料風險中等、主節點宕機可能導致資料丟失。acks=all
:服務端主節點寫成功且備節點同步成功才返回Response、效能較差、資料較為安全、主節點和備節點都宕機才會導致資料丟失。
為了提升發送效能, 建議設定為acks=1
。
提升發送效能(減少片段化發送請求)
一般情況下,一個雲訊息佇列 Kafka 版Topic會有多個分區。雲訊息佇列 Kafka 版Producer用戶端在向服務端發送訊息時,需要先確認往哪個Topic的哪個分區發送。我們給同一個分區發送多條訊息時,Producer用戶端將相關訊息打包成一個Batch,批量發送到服務端。Producer用戶端在處理Batch時,是有額外開銷的。一般情況下,小Batch會導致Producer用戶端產生大量請求,造成請求隊列在用戶端和服務端的排隊,並造成相關機器的CPU升高,從而整體推高了訊息發送和消費延遲。一個合適的Batch大小,可以減少發送訊息時用戶端向服務端發起的請求次數,在整體上提高訊息發送的吞吐和延遲。
Batch機制,雲訊息佇列 Kafka 版Producer端主要通過兩個參數進行控制:
batch.size
: 發往每個分區(Partition)的訊息緩衝量(訊息內容的位元組數之和,不是條數)。達到設定的數值時,就會觸發一次網路請求,然後Producer用戶端把訊息批量發往伺服器。如果batch.size
設定過小,有可能影響發送效能和穩定性。建議保持預設值16384。單位:位元組。linger.ms
: 每條訊息在緩衝中的最長時間。若超過這個時間,Producer用戶端就會忽略batch.size
的限制,立即把訊息發往伺服器。建議根據業務情境, 設定linger.ms
在100~1000之間。單位:毫秒。
因此,雲訊息佇列 Kafka 版Producer用戶端什麼時候把訊息批量發送至伺服器是由batch.size
和linger.ms
共同決定的。您可以根據具體業務需求進行調整。為了提升發送的效能,保障服務的穩定性, 建議您設定batch.size=16384
和linger.ms=1000
。
黏性分區策略
只有發送到相同分區的訊息,才會被放到同一個Batch中,因此決定一個Batch如何形成的一個因素是雲訊息佇列 Kafka 版Producer端設定的分區策略。雲訊息佇列 Kafka 版Producer允許通過設定Partitioner的實作類別來選擇適合自己業務的分區。在訊息指定Key的情況下,雲訊息佇列 Kafka 版Producer的預設策略是對訊息的Key進行雜湊,然後根據雜湊結果選擇分區,保證相同Key的訊息會發送到同一個分區。
在訊息沒有指定Key的情況下,雲訊息佇列 Kafka 版2.4版本之前的預設策略是迴圈使用主題的所有分區,將訊息以輪詢的方式發送到每一個分區上。但是,這種預設策略Batch的效果會比較差,在實際使用中,可能會產生大量的小Batch,從而使得實際的延遲增加。鑒於該預設策略對無Key訊息的分區效率低問題,雲訊息佇列 Kafka 版在2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。
黏性分區策略主要解決無Key訊息分散到不同分區,造成小Batch問題。其主要策略是如果一個分區的Batch完成後,就隨機播放另一個分區,然後後續的訊息儘可能地使用該分區。這種策略在短時間內看,會將訊息發送到同一個分區,如果拉長整個已耗用時間,訊息還是可以均勻地發布到各個分區上的。這樣可以避免訊息出現分區傾斜,同時還可以降低延遲,提升服務整體效能。
如果您使用的雲訊息佇列 Kafka 版Producer用戶端是2.4及以上版本,預設的分區策略就採用黏性分區策略。如果您使用的Producer用戶端版本小於2.4,可以根據黏性分區策略原理,自行實現分區策略,然後通過參數partitioner.class
設定指定的分區策略。
關於黏性分區策略實現,您可以參考如下Java版代碼實現。該代碼的實現邏輯主要是根據一定的時間間隔,切換一次分區。
public class MyStickyPartitioner implements Partitioner {
// 記錄上一次切換分區時間。
private long lastPartitionChangeTimeMillis = 0L;
// 記錄當前分區。
private int currentPartition = -1;
// 分區切換時間間隔,可以根據實際業務選擇切換分區的時間間隔。
private long partitionChangeTimeGap = 100L;
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 擷取所有分區資訊。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
// 判斷當前可用分區。
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// 對於有key的訊息,根據key的雜湊值選擇分區。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
// 如果超過分區切換時間間隔,則切換下一個分區,否則還是選擇之前的分區。
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}
OOM
結合雲訊息佇列 Kafka 版的Batch設計思路,雲訊息佇列 Kafka 版會緩衝訊息並打包發送,如果緩衝太多,則有可能造成OOM(Out of Memory)。
buffer.memory
: 發送的記憶體池大小。如果記憶體池設定過小,則有可能導致申請記憶體耗時過長,從而影響發送效能,甚至導致發送逾時。建議buffer.memory ≧ batch.size * 分區數 * 2
。單位:位元組。buffer.memory
的預設數值是32 MB,對於單個Producer而言,可以保證足夠的效能。重要如果您在同一個JVM中啟動多個Producer,那麼每個Producer都有可能佔用32 MB緩衝空間,此時便有可能觸發OOM。
在生產時,一般沒有必要啟動多個Producer;如有特殊情況需要,則需要考慮
buffer.memory
的大小,避免觸發OOM。
分區順序
單個分區(Partition)內,訊息是按照發送順序儲存的,是基本有序的。
預設情況下,雲訊息佇列 Kafka 版為了提升可用性,並不保證單個分區內絕對有序,在升級或者宕機時,會發生少量訊息亂序(某個分區掛掉後把訊息Failover到其它分區)。
如果業務要求分區保證嚴格有序,請在建立Topic時選擇使用Local儲存。