全部產品
Search
文件中心

DataHub:DataHub成本節省攻略

更新時間:Sep 20, 2024

在DataHub發布的最新版本中,DataHub序列化相關的模組進行了一次重大升級,在效能、成本、資源使用方面都有較大的最佳化,同時DataHub技術升級所帶來的成本紅利會輻射到每個使用者身上,根據我們實際的調研發現,大部分使用者的使用成本都可以達到30%以上的降幅,部分使用者使用成本降幅可以達到90%,本文將會詳細介紹這次改動的具體內容,以及相關的最佳實務。

主要升級內容

1、支援zstd壓縮

zstd是一種高效能壓縮演算法,由Facebook開發,並於2016年開源。zstd在壓縮速度和壓縮比兩方面都有不俗的表現,非常契合datahub的使用情境,因此datahub在新版本中對zstd壓縮演算法做了支援,相較於DataHub目前支援的lz4和deflate壓縮演算法,整體效果會好很多。

2、序列化改造

DataHub因為在設計上是存在TUPLE這種強schema結構的,我們最初為了防止髒資料,在服務端校正了資料的有效性,這就導致了需要在服務端解析出來完整的資料,然後根據schema做個校正,如果類型不符,那麼會返回錯誤。隨著版本的迭代發展,我們我發現這樣的做法並沒有起到很有效避免髒資料的效果(因為髒資料更多是業務層面的髒資料而不是資料類型),反倒是給服務端增加了巨大的cpu開銷,同時讀寫延遲也更大。

歷史經驗告訴我們,讀寫操作在服務端感知使用者的資料內容是一個相對冗餘的操作,所以我們讓資料使用一個大的buffer來互動,不再去感知真正的資料內容,真正需要用到資料內容的地方再解析出來(例如同步任務),寫入時的資料校正全部推到用戶端上來做,實際上用戶端本來就有校正的,所以對用戶端而言其實並沒有增加開銷,然後服務端通過crc校正來保證資料buffer的正確性,這樣既節省了服務端的資源消耗,也提供了更好的效能。

這個就是我們引入的batch序列化,batch序列化本質上就是DataHub資料轉送中資料的定義的一種組織方式,batch並不是特指某種序列化的方式,而是對序列化的資料做了一個二次封裝,比如我一次發送100條資料,那我把這100條資料序列化後得到一個buffer,給這個buffer選擇一個壓縮演算法得到壓縮後的buffer,這個時候給這個壓縮後的buffer添加一個header記錄這個buffer的大小、資料條數、壓縮演算法、crc等資訊,這個時候就是一條完整的batch的buffer。

yuque_diagram (6).jpg

這個batch的buffer發送到服務端後,因為用戶端已經做了充分的資料有效性的校正,所以服務端只檢驗一下資料中的crc確認為有效buffer後,便可以直接落盤,省去了序列化還原序列化、加解壓以及校正的操作,服務端效能提升超過80%,因為是多條資料一起壓縮的,所以壓縮率也提高了,儲存成本也降低了。

主要費用對比

為了驗證batch所帶來的實際效果,我們拿一些資料進行了對比測試

  • 測試資料為廣告投放相關的資料,大約200列,資料中null比例大約20%~30%

  • 1000條資料一個batch

  • batch內部的序列化使用的是avro

  • lz4是之前版本預設的壓縮演算法,壓縮使用zstd來替代lz4

測試結果如下表所示

資料原大小(Byte)

lz4壓縮(Byte)

zstd壓縮(Byte)

protobuf序列化

11,506,677

3,050,640

1,158,868

batch序列化

11,154,596

2,931,729

1,112,693

就以上面的測試結果為準,我們可以評估一下從費用角度來評估一下使用batch後可以節省多少,根據線上的運行情況來看,之前一般都是lz4的壓縮居多,所以我們就假設由protobuf+lz4替換到了batch+zstd。費用就從儲存流量兩個角度來考慮,因為Datahub的收費項主要是這兩個維度,其他主要是為了防止濫用而設定的懲罰性質的收費項,正常使用方式下可以直接忽略不計。

  • 從儲存成本上來看,Datahub的protobuf序列化是沒有儲存壓縮的(只是http傳輸環節壓縮),如果替換為batch+zstd,那麼儲存會由11506KB,降為1112KB,也就是說,儲存成本下降幅度達到約90%

  • 從流量成本上來看,Datahub的protobuf+lz4後的大小為3050KB,batch+zstd的大小為1112KB,也就是說,流量成本會降低約60%!

以上為樣本資料測試結果,不同資料測試效果有差異,請您根據您業務實際情況進行測試。

如何使用

說明

只有開啟了多version的才能使用batch

多version樣本:

  • 控制台開啟樣本:

    • 建立Topic時點擊啟動多Version 按鈕開啟

      image

  • SDK開啟樣本:

  public static void createTopicWithOption() {
    try {RecordSchema recordSchema = new RecordSchema() {{
        this.addField(new Field("field1", FieldType.STRING));
        this.addField(new Field("field2", FieldType.BIGINT));
      }};
      TopicOption option = new TopicOption();
       //開啟多version  
      option.setEnableSchemaRegistry(true);
     
      option.setComment(Constant.TOPIC_COMMENT);
      option.setExpandMode(ExpandMode.ONLY_EXTEND);
      option.setLifeCycle(Constant.LIFE_CYCLE);
      option.setRecordType(RecordType.TUPLE);
      option.setRecordSchema(recordSchema);
      option.setShardCount(Constant.SHARD_COUNT);
      datahubClient.createTopic(Constant.PROJECT_NAME, Constant.TOPIC_NAME, option);
      LOGGER.info("create topic successful");
    } catch (ResourceAlreadyExistException e) {
      LOGGER.info("topic already exists, please check if it is consistent");
    } catch (ResourceNotFoundException e) {
      // project not found
      e.printStackTrace();
      throw e;
    } catch (DatahubClientException e) {
      // other error
      e.printStackTrace();
      throw e;
    }
  }

本文使用Java為例,介紹batch如何使用。

使用batch要求使用client-library 1.4及以上的版本,如果服務端已經支援,DataHub會預設使用batch,服務端沒有支援(例如服務端還未升級到新版本或者專有雲較老的版本),那麼會自動回退到原來的序列化方式,用戶端自適應,使用者無需增加額外配置。下面已1.4.1版本為例給出一個簡單樣本。壓縮演算法也會自適應選擇一個較優的方案,目前1.4版本以後會預設選擇zstd演算法。

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.25.3</version>
</dependency>
<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>datahub-client-library</artifactId>
  <version>1.4.3</version>
</dependency>

ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

RecordSchema schema = producer.getTopicSchema();
List<RecordEntry> recordList = new ArrayList<>();
// 為了達到更好的效能,建議在RecordList中添加儘可能多的Record; 
// 儘可能使整個RecordList大小為512KB~1M
for (int i = 0; i < 1000; ++i) {
    RecordEntry record = new RecordEntry();
    TupleRecordData data = new TupleRecordData(schema);
    // 假設schema為 {"fields":[{"name":"f1", "type":"STRING"},{"name":"f2", "type":"BIGINT"}]}
    data.setField("f1", "value" + i);
    data.setField("f2", i);
    record.setRecordData(data);

    // 添加使用者自訂屬性,可選
    record.addAttribute("key1", "value1");
    recordList.add(record);
}

try {
      // 迴圈寫入1000遍
      for (int i = 0; i < 1000; ++i) {
        try {
          String shardId = datahubProducer.send(recordList);
          LOGGER.info("Write shard {} success, record count:{}", shardId, recordList.size());
        } catch (DatahubClientException e) {
          if (!ExceptionChecker.isRetryableException(e)) {
            LOGGER.info("Write data fail", e);
            break;
          }
          // sleep重試
          Thread.sleep(1000);
        }
      }
    } finally {
      // 關閉producer相關資源
      datahubProducer.close();
    }

注意事項

  1. batch寫入最大的優勢需要充分攢批,如果用戶端無法攢批,或者攢批的資料較少,可能帶來的效果提升並不顯著

  2. 為了讓使用者移轉更加方便,我們在各種讀寫方式之間做了相容,保證使用者中間狀態可以更平滑的過渡,即batch寫入的依舊可以使用原方式讀取,原方式寫入依舊可以使用batch讀取。所以在寫入端更新為batch寫入之後,最好消費端也更新為batch,寫入和消費不對應反倒會降低效能

  3. 目前只有Java的SDK支援了batch寫入,其他語言的用戶端和相關雲產品(如Flink、Data Integration等)仍在努力開發中,如果您想要儘快切換到batch寫入,可以直接提需求給我們。

總結

本文介紹了DataHub新版本帶來的改動,其中介紹了batch的原理和實現,以及使用batch後所帶來的效能的提升和費用的減少。切換為batch後,對於DataHub而言,服務端的資源消耗會明顯降低,同時,效能會明顯提升,使用費用也會大幅降低,所以是一個服務側和使用者側同時受益的改動。如果您在使用中有任何問題或者疑問,歡迎提工單聯絡我們或者加入使用者群來提問,群號:33517130。