全部產品
Search
文件中心

MaxCompute:資料即時入倉實踐

更新時間:Sep 19, 2024

為滿足業務對資料倉儲中高度時效性資料的需求,MaxCompute基於Delta Table實現了分鐘級近即時資料寫入和主鍵更新功能,顯著提升了資料倉儲的資料更新效率。

資料寫入情境

面對具有突發性和熱點性的客戶行為日誌,如評論、評分和點贊,傳統的關係型資料庫和離線資料分析方法在處理這類資料時可能存在資源消耗大、成本高、資料延遲以及更新複雜的問題,通常只能滿足次日分析需求。

針對上述問題,您可以採用近即時數倉資料入倉方案,可以在分鐘層級內實現資料增量同步處理到Delta Table,從而將資料寫入到查詢的延遲控制在5~10分鐘,極大地提高了資料分析的時效性。如果您的生產任務是將資料同步至MaxCompute ODS(Operational Data Store)層的普通表,為避免生產任務改造的風險,您可以使用Delta Table的Upsert功能,它能有效將資料同步至Delta Table,同時防止資料重複儲存,並提高儲存效率和降低儲存成本。

樣本

Flink資料寫入Delta Table

本文以第三方引擎Flink為例,介紹了Flink整合MaxCompute Flink Connector進行近即時寫入資料至Delta Table的主要流程。

image

介紹如下:

序號

說明

【1】

支援按照資料的Primary Key列進行分組並發寫入。

若您的並發寫入的分區較多,且每個分區資料分布均勻,同時表的Bucket數量較少(如個位元),那麼您也可以根據Partition列進行分組寫入,有助於提高寫入輸送量。

【2】

UpsertWriterTask收到資料後,會解析資料所屬分區並向UpsertOperatorCoordinator發起請求,然後建立分區即時寫入的Upsert Session。

【3】

UpsertOperatorCoordinator向UpsertWriterTask返回已建立的Upsert session。

【4】

UpsertWriterTask根據Upsert Session建立Upsert Writer,並串連MaxCompute的資料轉送通道服務Tunnel Server,將資料持續寫入。

在資料轉送過程中,若啟用了檔案快取,資料將會先進入Flink本地磁碟的緩衝區,直到資料檔案大小達到特定閾值或Checkpoint流程啟動後,才將資料轉送至Tunnel Server。

【5】

Checkpoint流程啟動後,Upsert Writer將資料全量提交至Tunnel Server,再向UpsertOperatorCoordinator發起請求,觸發Commit操作,成功後資料可見。

【6】

若開啟自動Major Compact,當分區Commit次數超過特定閾值時,由UpsertOperatorCoordinator向Storage Service發起Major compact操作。

說明

根據表資料量大小,此操作可能會對即時資料匯入造成延時,因此需要謹慎使用。

將Flink資料寫入至MaxCompute Delta Table的操作,詳情請參見使用Flink寫入資料到Delta Table

Upsert寫入參數配置建議

您可以通過調整Upsert即時寫入情境的配置參數來提高系統輸送量和效能,並確保穩定性,以滿足不同的業務需求。Upsert寫入參數詳情,請參見Upsert寫入參數

通用關鍵參數配置

  • 表Bucket數量可影響同時寫入的最大並發數,在一定程度上決定了最大寫入吞吐,推薦按照1 M/s * 表Bucket數量來計算總吞吐。

    實際能達到的輸送量與Sink節點並發等參數相關。詳情請參見表格式和資料治理

  • sink.parallelism:資料寫入的Sink節點並發數,強烈建議表Bucket數量是該配置值的整數倍,可達到較好的效能效果。當sink.parallelism參數值與表Bucket數量一致時,理論上可以實現最佳效能。

非分區表提升吞吐的參數配置

  • 如果設定了sink parallelism參數以增加寫入並發,但發現輸送量並未提升,可能的問題在於Sink節點的上遊資料處理鏈路效率低下,建議您可最佳化資料處理鏈路來提高整體效能。

  • 若表Bucket的數量是sink.parallelism的整數倍,那單個Sink節點寫入的Bucket數量 = 表Bucket數量 ÷ sink.parallelism,若Bucket值過大,也會影響效能。建議您優先調整表Bucket數量和sink.parallelism參數值。若upsert.writer.buffer-size ÷ 單節點Bucket數量低於特定閾值(如128 K)時,可能會導致網路傳輸效率降低。為改善網路效能,建議考慮增大upsert.writer.buffer-size。

  • upsert.flush.concurrent參數:預設值為2,表示可並發flush的Bucket數。為了最佳化輸送量,可以適當增加該值以觀察效能提升。

    說明

    需要注意的是,如果此值設定得過大,可能會導致過多的Bucket同時發送,從而引起網路擁堵,反而會使整體輸送量下降。因此,在調整這個參數時需要謹慎,找到一個平衡點以確保系統的穩定和高效運行。

少量分區並發寫入提升吞吐的參數配置

在此情境下,您可以參考通用關鍵參數配置和非分區表參數配置建議。同時,您還可以參考以下內容。

  • 單個Sink節點在寫入資料時涉及多個分區的操作,同時在Checkpoint階段,每個分區需要獨立進行Commit操作,這些特性可能會對整體的寫入輸送量產生影響。

  • 單個Sink節點Buffer資料的最大記憶體=upsert.writer.buffer-size * 分區數,因此如果發生記憶體溢出(OOM),建議調整upsert.writer.buffer-size參數,減小其值以防止記憶體超出限制。

  • 增加upsert.commit.thread-num參數值,可減少checkpoint階段Commit的耗時。此參數預設值為16,意味著有16個線程並發處理分區進行Commit操作。

    說明

    儘管可以適當增加這個數值以提高效能,但要注意不應超過32,以防止過度並發可能導致的問題。

海量分區並發寫入(FileCached模式)提升吞的參數配置

在此情境下,您可以參考少量分區並發寫入參數配置建議。同時,您還可以參考以下內容:

  • 每個分區的資料都會首先緩衝在本地檔案中,然後在Checkpoint階段並發寫入MaxCompute中。

  • sink.file-cached.writer.num參數預設值為16,增加該參數值(不建議超過32),可增加單個Sink節點並發寫入的分區數量。建議並發寫入的Bucket數量建議等於sink.file-cached.writer.num * upsert.flush.concurrent。但需注意此值不應設定得過大,以防止引髮網絡擁堵問題,從而導致整體輸送量下降。

說明

FileCached模式寫入參數詳情,請參見FileCached模式寫入參數

其他建議

如果參考以上參數建議都無法達到吞吐要求,或者吞吐不穩定,需考慮以下因素:

  • 每個專案空間可免費使用的公用Data Transmission Service資源群組是有限的,達到上限後,會Block資料寫入,從而導致整體吞吐下降。如果資料寫入吞吐較大,同時對延時要求比較高,建議購買獨享Data Transmission Service資源群組,確保資源供給。

  • Connector的上遊資料處理鏈路效率低下,導致整體吞吐率不高。建議您最佳化資料處理鏈路,以提高整體效能。

常見問題

Flink相關問題

  • 問題一:

    • 問題現象:提示出現報錯資訊“Checkpoint xxx expired before completing”。

    • 問題原因:Checkpoint流程逾時,通常由於Checkpoint過程中寫入的分區數過多。

    • 解決措施:

  • 問題二:

    • 問題現象:提示出現報錯資訊“org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. ”。

    • 問題原因:通常由於JobManager與TaskManager通訊異常導致,任務會自動發起重試。

    • 解決措施:建議提升任務資源來確保任務穩定性。

資料寫入問題

  • 問題一:

    • 問題現象:TIMESTAMP類型的資料在寫入MaxCompute後,時間位移了8小時。

    • 問題原因:Flink中的TIMESTAMP類型不包含時區資訊,且在MaxCompute寫入過程中也不會進行時區轉換,因此資料會被視為零時區資料。然而,MaxCompute在讀取這些資料時,會根據專案的時區設定對資料進行轉換。

    • 解決措施:使用TIMESTAMP_LTZ類型替換MaxCompute Sink Table中的TIMESTAMP類型。

Tunnel相關問題

  • 問題一:

    • 問題現象:資料寫入時出現Tengine相關報錯,報錯資訊內容如下。

      <body>
      <h1>An error occurred.</h1>
      <p>Sorry, the page you are looking for is currently unavailable.<br/>
      Please try again later.</p>
      <p>If you are the system administrator of this resource then you should check
      the <a href="http://nginx.org/r/error_log">error log</a> for details.</p>
      <p><em>Faithfully yours, tengine.</em></p>
      </body>
      </html>
    • 問題原因:遠程Tunnel服務暫時不可用。

    • 解決措施:等待Tunnel服務恢複後任務可以自動重試成功。

  • 問題二:

    • 問題現象:提示出現報錯資訊“java.io.IOException: RequestId=xxxxxx, ErrorCode=SlotExceeded, ErrorMessage=Your slot quota is exceeded.”。

    • 問題原因:寫入Quota超出限制,需要降低寫入並發,或者增加獨享Tunnel並發數。

    • 解決措施: