本文介紹了DataHub新版本帶來的改動,介紹batch的原理和實現,以及使用batch後所帶來的效能的提升和費用的減少。切換為batch後,對於DataHub而言,服務端的資源消耗會明顯降低,同時,效能會明顯提升,使用費用也會大幅降低。
升級內容
支援zstd壓縮
DataHub在新版本中對zstd壓縮演算法做了支援,相較於DataHub支援的lz4和deflate壓縮演算法,效果卓越。
zstd是一種高效能壓縮演算法,由Facebook開發,於2016年開源,zstd在壓縮速度和壓縮比兩方面都有不俗的表現,非常契合DataHub的使用情境。
序列化改造
DataHub引入了batch序列化,batch序列化本質上就是DataHub對資料轉送中資料的定義的一種組織方式,batch並不是特指某種序列化的方式,而是對序列化的資料做了二次封裝。例如:一次發送100條資料,將100條資料序列化後得到一個buffer,給這個buffer選擇一個壓縮演算法得到壓縮後的buffer,這個時候給這個壓縮後的buffer添加一個header記錄這個buffer大小、資料條數、壓縮演算法、crc等資訊,從而獲得一條完整batch buffer。
解決的問題:
可以有效避免業務層面的髒資料。
減少服務端CPU開銷,提高資料處理效能。
較少同時讀寫延遲。
batch的buffer發送到服務端後,因用戶端已經做了充分的資料有效性的校正,所以服務端只需檢驗資料中的crc確認為有效buffer後,便可以直接落盤,省去了序列化、還原序列化、加解壓以及校正的操作,服務端效能提升超過80%,因為是多條資料一起壓縮的,所以壓縮率也提高了,儲存成本也降低了。
費用對比
驗證batch所帶來的實際效果,進行以下測試,假設情境如下:
測試資料為廣告投放相關的資料,大約200列,資料中null比例大約20%~30%。
1000條資料一個batch。
batch內部的序列化使用的是avro。
lz4是之前版本預設的壓縮演算法,壓縮使用zstd來替代lz4。
測試結果如下:
資料來源大小(Byte) | lz4壓縮(Byte) | zstd壓縮(Byte) | |
protobuf序列化 | 11,506,677 | 3,050,640 | 1,158,868 |
batch序列化 | 11,154,596 | 2,931,729 | 1,112,693 |
Datahub的收費項主要是這兩個維度為儲存與流量,其他主要是為了防止濫用而設定的懲罰性質的收費項。因此,以上測試結果從儲存與流量兩個角度進行分析。
從儲存成本上來看,DataHub的protobuf序列化是沒有儲存壓縮的(只是HTTP傳輸環節壓縮),如果替換為batch+zstd,那麼儲存會由11506KB降為1112KB,也就是說,儲存成本下降幅度達到約90%。
從流量成本上來看,DataHub的protobuf+lz4後的大小為3050KB,batch+zstd的大小為1112KB,也就是說,流量成本會降低約60%。
以上為樣本資料測試結果,不同資料測試效果有差異,請您根據您業務實際情況進行測試。
使用batch
注意事項
batch寫入最大的優勢需要充分攢批,如果用戶端無法攢批,或者攢批的資料較少,可能帶來的效果提升並不顯著。
為了讓使用者移轉更加方便,DataHub在各種讀寫方式之間做了相容,保證使用者中間狀態可以更平滑地過渡,即batch寫入依舊可以使用原方式讀取,原方式寫入依舊可以使用batch讀取。因此,在寫入端更新為batch寫入之後,最好消費端也更新為batch,寫入和消費不對應反而會降低效能。
前提條件
需在Topic中已開啟多Version後才可使用。
DataHub版本須在client-library 1.4及以上的版本。
目前僅支援Java SDK的batch寫入。
開啟Version
控制台開啟多Version
在控制台上無法修改已建立的Topic資訊,所以若需使用batch,則需建立Topic啟動多Version。建立Topic詳情可參見:Topic操作。

SDK開啟多Version
public static void createTopicWithOption() {
try {RecordSchema recordSchema = new RecordSchema() {{
this.addField(new Field("field1", FieldType.STRING));
this.addField(new Field("field2", FieldType.BIGINT));
}};
TopicOption option = new TopicOption();
//開啟多version
option.setEnableSchemaRegistry(true);
option.setComment(Constant.TOPIC_COMMENT);
option.setExpandMode(ExpandMode.ONLY_EXTEND);
option.setLifeCycle(Constant.LIFE_CYCLE);
option.setRecordType(RecordType.TUPLE);
option.setRecordSchema(recordSchema);
option.setShardCount(Constant.SHARD_COUNT);
datahubClient.createTopic(Constant.PROJECT_NAME, Constant.TOPIC_NAME, option);
LOGGER.info("create topic successful");
} catch (ResourceAlreadyExistException e) {
LOGGER.info("topic already exists, please check if it is consistent");
} catch (ResourceNotFoundException e) {
// project not found
e.printStackTrace();
throw e;
} catch (DatahubClientException e) {
// other error
e.printStackTrace();
throw e;
}
}配置batch
如果服務端已經支援batch,則DataHub會預設使用batch。如果服務端沒有支援(例如服務端還未升級到新版本或者專有雲較老的版本),那麼會自動回退到原來的序列化方式,用戶端自適應,使用者無需增加額外配置。下面以1.4.1版本為例給出一個簡單樣本。壓縮演算法也會自適應選擇一個較優的方案,目前1.4版本以後會預設選擇zstd演算法。
Maven依賴
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.25.3</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.3</version>
</dependency>配置batch
ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
List<RecordEntry> recordList = new ArrayList<>();
// 為了達到更好的效能,建議在RecordList中添加儘可能多的Record;
// 儘可能使整個RecordList大小為512KB~1M
for (int i = 0; i < 1000; ++i) {
RecordEntry record = new RecordEntry();
TupleRecordData data = new TupleRecordData(schema);
// 假設schema為 {"fields":[{"name":"f1", "type":"STRING"},{"name":"f2", "type":"BIGINT"}]}
data.setField("f1", "value" + i);
data.setField("f2", i);
record.setRecordData(data);
// 添加使用者自訂屬性,可選
record.addAttribute("key1", "value1");
recordList.add(record);
}
try {
// 迴圈寫入1000遍
for (int i = 0; i < 1000; ++i) {
try {
String shardId = datahubProducer.send(recordList);
LOGGER.info("Write shard {} success, record count:{}", shardId, recordList.size());
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重試
Thread.sleep(1000);
}
}
} finally {
// 關閉producer相關資源
datahubProducer.close();
}後續支援
如果您在使用中有任何問題或者疑問,歡迎提工單諮詢或加入使用者群諮詢,群號:33517130。