在DataHub發布的最新版本中,DataHub序列化相關的模組進行了一次重大升級,在效能、成本、資源使用方面都有較大的最佳化,同時DataHub技術升級所帶來的成本紅利會輻射到每個使用者身上,根據我們實際的調研發現,大部分使用者的使用成本都可以達到30%以上的降幅,部分使用者使用成本降幅可以達到90%,本文將會詳細介紹這次改動的具體內容,以及相關的最佳實務。
主要升級內容
1、支援zstd壓縮
zstd是一種高效能壓縮演算法,由Facebook開發,並於2016年開源。zstd在壓縮速度和壓縮比兩方面都有不俗的表現,非常契合datahub的使用情境,因此datahub在新版本中對zstd壓縮演算法做了支援,相較於DataHub目前支援的lz4和deflate壓縮演算法,整體效果會好很多。
2、序列化改造
DataHub因為在設計上是存在TUPLE這種強schema結構的,我們最初為了防止髒資料,在服務端校正了資料的有效性,這就導致了需要在服務端解析出來完整的資料,然後根據schema做個校正,如果類型不符,那麼會返回錯誤。隨著版本的迭代發展,我們我發現這樣的做法並沒有起到很有效避免髒資料的效果(因為髒資料更多是業務層面的髒資料而不是資料類型),反倒是給服務端增加了巨大的cpu開銷,同時讀寫延遲也更大。
歷史經驗告訴我們,讀寫操作在服務端感知使用者的資料內容是一個相對冗餘的操作,所以我們讓資料使用一個大的buffer來互動,不再去感知真正的資料內容,真正需要用到資料內容的地方再解析出來(例如同步任務),寫入時的資料校正全部推到用戶端上來做,實際上用戶端本來就有校正的,所以對用戶端而言其實並沒有增加開銷,然後服務端通過crc校正來保證資料buffer的正確性,這樣既節省了服務端的資源消耗,也提供了更好的效能。
這個就是我們引入的batch序列化,batch序列化本質上就是DataHub資料轉送中資料的定義的一種組織方式,batch並不是特指某種序列化的方式,而是對序列化的資料做了一個二次封裝,比如我一次發送100條資料,那我把這100條資料序列化後得到一個buffer,給這個buffer選擇一個壓縮演算法得到壓縮後的buffer,這個時候給這個壓縮後的buffer添加一個header記錄這個buffer的大小、資料條數、壓縮演算法、crc等資訊,這個時候就是一條完整的batch的buffer。
這個batch的buffer發送到服務端後,因為用戶端已經做了充分的資料有效性的校正,所以服務端只檢驗一下資料中的crc確認為有效buffer後,便可以直接落盤,省去了序列化還原序列化、加解壓以及校正的操作,服務端效能提升超過80%,因為是多條資料一起壓縮的,所以壓縮率也提高了,儲存成本也降低了。
主要費用對比
為了驗證batch所帶來的實際效果,我們拿一些資料進行了對比測試
測試資料為廣告投放相關的資料,大約200列,資料中null比例大約20%~30%
1000條資料一個batch
batch內部的序列化使用的是avro
lz4是之前版本預設的壓縮演算法,壓縮使用zstd來替代lz4
測試結果如下表所示
資料原大小(Byte) | lz4壓縮(Byte) | zstd壓縮(Byte) | |
protobuf序列化 | 11,506,677 | 3,050,640 | 1,158,868 |
batch序列化 | 11,154,596 | 2,931,729 | 1,112,693 |
就以上面的測試結果為準,我們可以評估一下從費用角度來評估一下使用batch後可以節省多少,根據線上的運行情況來看,之前一般都是lz4的壓縮居多,所以我們就假設由protobuf+lz4替換到了batch+zstd。費用就從儲存和流量兩個角度來考慮,因為Datahub的收費項主要是這兩個維度,其他主要是為了防止濫用而設定的懲罰性質的收費項,正常使用方式下可以直接忽略不計。
從儲存成本上來看,Datahub的protobuf序列化是沒有儲存壓縮的(只是http傳輸環節壓縮),如果替換為batch+zstd,那麼儲存會由11506KB,降為1112KB,也就是說,儲存成本下降幅度達到約90%!
從流量成本上來看,Datahub的protobuf+lz4後的大小為3050KB,batch+zstd的大小為1112KB,也就是說,流量成本會降低約60%!
以上為樣本資料測試結果,不同資料測試效果有差異,請您根據您業務實際情況進行測試。
如何使用
只有開啟了多version的才能使用batch
多version樣本:
控制台開啟樣本:
建立Topic時點擊啟動多Version 按鈕開啟
SDK開啟樣本:
public static void createTopicWithOption() {
try {RecordSchema recordSchema = new RecordSchema() {{
this.addField(new Field("field1", FieldType.STRING));
this.addField(new Field("field2", FieldType.BIGINT));
}};
TopicOption option = new TopicOption();
//開啟多version
option.setEnableSchemaRegistry(true);
option.setComment(Constant.TOPIC_COMMENT);
option.setExpandMode(ExpandMode.ONLY_EXTEND);
option.setLifeCycle(Constant.LIFE_CYCLE);
option.setRecordType(RecordType.TUPLE);
option.setRecordSchema(recordSchema);
option.setShardCount(Constant.SHARD_COUNT);
datahubClient.createTopic(Constant.PROJECT_NAME, Constant.TOPIC_NAME, option);
LOGGER.info("create topic successful");
} catch (ResourceAlreadyExistException e) {
LOGGER.info("topic already exists, please check if it is consistent");
} catch (ResourceNotFoundException e) {
// project not found
e.printStackTrace();
throw e;
} catch (DatahubClientException e) {
// other error
e.printStackTrace();
throw e;
}
}
本文使用Java為例,介紹batch如何使用。
使用batch要求使用client-library 1.4及以上的版本,如果服務端已經支援,DataHub會預設使用batch,服務端沒有支援(例如服務端還未升級到新版本或者專有雲較老的版本),那麼會自動回退到原來的序列化方式,用戶端自適應,使用者無需增加額外配置。下面已1.4.1版本為例給出一個簡單樣本。壓縮演算法也會自適應選擇一個較優的方案,目前1.4版本以後會預設選擇zstd演算法。
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.25.3</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.3</version>
</dependency>
ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
List<RecordEntry> recordList = new ArrayList<>();
// 為了達到更好的效能,建議在RecordList中添加儘可能多的Record;
// 儘可能使整個RecordList大小為512KB~1M
for (int i = 0; i < 1000; ++i) {
RecordEntry record = new RecordEntry();
TupleRecordData data = new TupleRecordData(schema);
// 假設schema為 {"fields":[{"name":"f1", "type":"STRING"},{"name":"f2", "type":"BIGINT"}]}
data.setField("f1", "value" + i);
data.setField("f2", i);
record.setRecordData(data);
// 添加使用者自訂屬性,可選
record.addAttribute("key1", "value1");
recordList.add(record);
}
try {
// 迴圈寫入1000遍
for (int i = 0; i < 1000; ++i) {
try {
String shardId = datahubProducer.send(recordList);
LOGGER.info("Write shard {} success, record count:{}", shardId, recordList.size());
} catch (DatahubClientException e) {
if (!ExceptionChecker.isRetryableException(e)) {
LOGGER.info("Write data fail", e);
break;
}
// sleep重試
Thread.sleep(1000);
}
}
} finally {
// 關閉producer相關資源
datahubProducer.close();
}
注意事項
batch寫入最大的優勢需要充分攢批,如果用戶端無法攢批,或者攢批的資料較少,可能帶來的效果提升並不顯著。
為了讓使用者移轉更加方便,我們在各種讀寫方式之間做了相容,保證使用者中間狀態可以更平滑的過渡,即batch寫入的依舊可以使用原方式讀取,原方式寫入依舊可以使用batch讀取。所以在寫入端更新為batch寫入之後,最好消費端也更新為batch,寫入和消費不對應反倒會降低效能。
目前只有Java的SDK支援了batch寫入,其他語言的用戶端和相關雲產品(如Flink、Data Integration等)仍在努力開發中,如果您想要儘快切換到batch寫入,可以直接提需求給我們。
總結
本文介紹了DataHub新版本帶來的改動,其中介紹了batch的原理和實現,以及使用batch後所帶來的效能的提升和費用的減少。切換為batch後,對於DataHub而言,服務端的資源消耗會明顯降低,同時,效能會明顯提升,使用費用也會大幅降低,所以是一個服務側和使用者側同時受益的改動。如果您在使用中有任何問題或者疑問,歡迎提工單聯絡我們或者加入使用者群來提問,群號:33517130。