全部產品
Search
文件中心

E-MapReduce:匯入概述

更新時間:Jul 01, 2024

為了更好地滿足各種不同的業務情境,StarRocks支援多種資料模型,StarRocks中儲存的資料需要按照特定的模型進行組織。本文為您介紹資料匯入的基本概念、原理、系統配置、不同匯入方式的適用情境,以及一些最佳實務案例和常見問題。

背景資訊

資料匯入功能是將未經處理資料按照相應的模型進行清洗轉換並載入到StarRocks中,方便查詢使用。StarRocks提供了多種匯入方式,您可以根據資料量大小或匯入頻率等要求選擇最適合自己業務需求的匯入方式。

StarRocks匯入方式與各資料來源關係圖如下。StarRocks schematic diagram
您可以根據不同的資料來源選擇不同的匯入方式:
  • 離線資料匯入:如果資料來源是Hive或HDFS,推薦使用Broker Load。如果資料表很多匯入比較麻煩可以使用Hive外表,效能會比Broker load匯入效果差,但是可以避免資料搬遷。如果單表的資料量特別大,或者需要做為全域資料字典來精確去重可以考慮使用Spark Load
  • 即時資料匯入:日誌資料和業務資料庫的Binlog同步到Kafka後,優先推薦通過Routine Load匯入StarRocks。如果匯入處理程序中有複雜的多表關聯和ETL預先處理可以使用Flink(Flink Connector)處理以後,再通過Stream Load寫入StarRocks。
  • 程式寫入StarRocks:推薦使用Stream Load,可以參見Stream Load中Java或Python的Demo。
  • 文字檔匯入:推薦使用Stream Load
  • MySQL資料匯入:推薦使用MySQL外表,通過insert into new_table select * from external_table的方式匯入。
  • StarRocks內部匯入:推薦使用Insert Into方式匯入,跟外部調度器配合實現簡單的ETL處理。
說明 本文圖片和部分內容來源於開源StarRocks的Overview of data loading

注意事項

向StarRocks匯入資料時,通常會採用程式對接的方式。以下是匯入資料時的一些注意事項:
  • 選擇合適的匯入方式:根據資料量大小、匯入頻次或資料來源所在位置選擇匯入方式。

    例如,如果未經處理資料存放在HDFS上,則使用Broker load匯入。

  • 確定匯入方式的協議:如果選擇了Broker Load匯入方式,則外部系統需要能使用MySQL協議定期提交和查看匯入作業。
  • 確定匯入方式的類型:匯入方式分為同步或非同步。如果是非同步匯入方式,外部系統在提交建立匯入後,必須調用查看匯入命令,根據查看匯入命令的結果來判斷匯入是否成功。
  • 制定Label建置原則:Label建置原則需滿足對每一批次資料唯一且固定的原則。
  • 保證Exactly-Once:外部系統需要保證資料匯入的At-Least-Once,StarRocks的Label機制可以保證資料匯入的At-Most-Once,即可整體上保證資料匯入的Exactly-Once。

基本概念

名詞描述
匯入作業讀取使用者提交的來源資料並進行清洗轉換後,將資料匯入到StarRocks系統中。匯入完成後,資料即可被使用者查詢到。
Label用於標識一個匯入作業,所有匯入作業都有一個Label。

Label可由使用者指定或系統自動產生。Label在一個資料庫內是唯一的,一個Label僅可用於一個成功的匯入作業。當一個Label對應的匯入作業成功後,不可再重複使用該Label提交匯入作業。如果某Label對應的匯入作業失敗,則該Label可以被再使用。該機制可以保證Label對應的資料最多被匯入一次,即At-Most-Once語義。

原子性StarRocks中所有匯入方式都提供原子性保證,即同一個匯入作業內的所有有效資料要麼全部生效,要麼全部不生效,不會出現僅匯入部分資料的情況。此處的有效資料不包括由於類型轉換錯誤等資料品質問題而被過濾的資料,資料品質問題可以參見資料匯入常見問題
MySQL和HTTP協議StarRocks提供MySQL協議和HTTP協議兩種訪問協議介面來提交作業。
Broker LoadBroker匯入,即通過部署的Broker程式讀取外部資料源(例如HDFS)中的資料,並匯入到StarRocks。Broker進程利用自身的計算資源對資料進行預先處理匯入。
Spark LoadSpark匯入,即通過外部資源(例如Spark)對資料進行預先處理產生中間檔案,StarRocks讀取中間檔案匯入。Spark Load是一種非同步匯入方式,您需要通過MySQL協議建立匯入,並通過查看匯入命令檢查匯入結果。
FEFrontend,StarRocks系統的中繼資料和調度節點。在匯入流程中主要負責匯入執行計畫的產生和匯入任務的調度工作。
BEBackend,StarRocks系統的計算和儲存節點。在匯入流程中主要負責資料的ETL和儲存。
TabletStarRocks表的邏輯分區,一個表按照分區、分桶規則可以劃分為多個分區,詳情請參見資料分布

基本原理

匯入執行流程如下圖所示。StarRocks flow chart
一個匯入作業主要分為以下五個階段。
階段描述
PENDING非必須。該階段是指使用者提交匯入作業後,等待FE調度執行。

Broker Load和Spark Load包括該步驟。

ETL非必須。該階段執行資料的預先處理,包括清洗、分區、排序和彙總等。

Spark Load包括該步驟,他使用外部計算資源Spark完成ETL。

LOADING該階段先對資料進行清洗和轉換,然後將資料發送給BE處理。當資料全部匯入後,進入等待生效過程,此時匯入作業依舊是LOADING狀態。
FINISHED在匯入作業涉及的所有資料均生效後,作業的狀態變成FINISHED,FINISHED後匯入的資料均可查詢。FINISHED是匯入作業的最終狀態。
CANCELLED在匯入作業狀態變為FINISHED之前,作業隨時可能被取消並進入CANCELLED狀態,例如,您手動取消或匯入出現錯誤等。CANCELLED也是匯入作業的一種最終狀態。
資料匯入格式如表。
類型描述
整型類TINYINT、SMALLINT、INT、BIGINT、LARGEINT。例如:1,1000,1234。
浮點類FLOAT、DOUBLE、DECIMAL。例如:1.1,0.23,0.356。
日期類DATE、DATETIME。例如:2017-10-03,2017-06-13 12:34:03。
字串類CHAR、VARCHAR。例如:I am a student,a。

匯入方式

為適配不同的資料匯入需求,StarRocks系統提供了5種不同的匯入方式,以支援不同的資料來源(例如HDFS、Kafka和本地檔案等),或者按不同的方式匯入資料,StarRocks目前置入資料的方式分為同步匯入和非同步匯入兩種。

所有匯入方式都支援CSV資料格式。其中Broker Load還支援Parquet和ORC資料格式。

匯入方式介紹

匯入方式描述匯入類型
Broker Load通過Broker進程訪問並讀取外部資料源,然後採用MySQL協議向StarRocks建立匯入作業。提交的作業將非同步執行,您可以通過SHOW LOAD命令查看匯入結果。

Broker Load適用於來源資料在Broker進程可訪問的儲存系統(例如HDFS)中,資料量為幾十GB到上百GB,詳細資料請參見Broker Load

非同步匯入
Spark Load通過外部的Spark資源實現對匯入資料的預先處理,提高StarRocks巨量資料量的匯入效能並且節省StarRocks叢集的計算資源。Spark Load是一種非同步匯入方式,需要通過MySQL協議建立匯入作業,並通過SHOW LOAD查看匯入結果。

Spark Load適用於初次遷移巨量資料量(可達到TB層級)到StarRocks的情境,且來源資料在Spark可訪問的儲存系統(例如HDFS)中,詳細資料請參見Spark Load

非同步匯入
Stream Load是一種同步執行的匯入方式。您可以通過HTTP協議發送請求將本地檔案或資料流匯入到StarRocks中,並等待系統返回匯入的結果狀態,從而判斷匯入是否成功。

Stream Load適用於匯入本地檔案,或通過程式匯入資料流中的資料,詳細資料請參見Stream Load

同步匯入
Routine LoadRoutine Load(例行匯入)提供了一種自動從指定資料來源進行資料匯入的功能。您可以通過MySQL協議提交例行匯入作業,產生一個常駐線程,不間斷的從資料來源(例如Kafka)中讀取資料並匯入到StarRocks中,詳細資料請參見Routine Load非同步匯入
Insert Into類似MySQL中的Insert語句,StarRocks提供INSERT INTO tbl SELECT ...;的方式從StarRocks的表中讀取資料並匯入到另一張表,或者通過INSERT INTO tbl VALUES(...);插入單條資料,詳細資料請參見Insert Into同步匯入

匯入類型

重要 如果是外部程式接入StarRocks的匯入功能,需要先判斷使用匯入方式是哪類,然後再確定接入邏輯。
  • 同步匯入

    同步匯入方式即使用者建立匯入任務,StarRocks同步執行,執行完成後返回匯入結果。使用者可以通過該結果判斷匯入是否成功。

    操作步驟:
    1. 使用者(外部系統)建立匯入任務。
    2. StarRocks返回匯入結果。
    3. 使用者(外部系統)判斷匯入結果。如果匯入結果為失敗,則可以再次建立匯入任務。
  • 非同步匯入

    非同步匯入方式即使用者建立匯入任務後,StarRocks直接返回建立成功。建立成功不代表資料已經匯入成功。匯入任務會被非同步執行,使用者在建立成功後,需要通過輪詢的方式發送查看命令查看匯入作業的狀態。如果建立失敗,則可以根據失敗資訊,判斷是否需要再次建立。

    操作步驟:
    1. 使用者(外部系統)建立匯入任務。
    2. StarRocks返回建立任務的結果。
    3. 使用者(外部系統)判斷建立任務的結果,如果成功則進入步驟4;如果失敗則可以回到步驟1,重新嘗試建立匯入任務。
    4. 使用者(外部系統)輪詢查看任務狀態,直至狀態變為FINISHED或CANCELLED。

適用情境

情境描述
HDFS匯入如果HDFS匯入來源資料儲存在HDFS中,當資料量為幾十GB到上百GB時,則可以採用Broker Load方法向StarRocks匯入資料。此時要求部署的Broker進程可以訪問HDFS資料來源。匯入資料的作業非同步執行,您可以通過SHOW LOAD命令查看匯入結果。

如果來源資料儲存在HDSF中,當資料量達到TB層級時,則可以採用Spark Load方法向StarRocks匯入資料。此時要求部署的Spark進程可以訪問HDFS資料來源。匯入資料的作業非同步執行,您可以通過SHOW LOAD命令查看匯入結果。

對於其他外部資料源,只要Broker或Spark進程能讀取對應資料來源,也可以採用Broker Load或Spark Load方法匯入資料。

本地檔案匯入資料存放區在本地檔案中,資料量小於10 GB,可以採用Stream Load方法將資料快速匯入StarRocks系統。採用HTTP協議建立匯入作業,作業同步執行,您可以通過HTTP請求的傳回值判斷匯入是否成功。
Kafka匯入資料來自於Kafka等流式資料來源,需要向StarRocks系統匯入即時資料時,可以採用Routine Load方法。您通過MySQL協議建立例行匯入作業,StarRocks持續不斷地從Kafka中讀取並匯入資料。
Insert Into匯入手工測試及臨時資料處理時可以使用Insert Into方法向StarRocks表中寫入資料。

其中,INSERT INTO tbl SELECT ...;語句是從StarRocks的表中讀取資料並匯入到另一張表,INSERT INTO tbl VALUES(...);語句是向指定表裡插入單條資料。

記憶體限制

您可以通過設定參數來限制單個匯入作業的記憶體使用量,以防止匯入佔用過多的記憶體而導致系統OOM。不同匯入方式限制記憶體的方式略有不同,詳情可以參見各個匯入方式的文檔。

一個匯入作業通常會分布在多個BE上執行,記憶體參數限制的是一個匯入作業在單個BE上的記憶體使用量,而不是在整個叢集的記憶體使用量。同時,每個BE會設定可用於匯入作業的記憶體總上限,詳情請參見通用系統配置。配置限制了所有在該BE上啟動並執行匯入任務的總體記憶體使用量上限。

較小的記憶體限制可能會影響匯入效率,因為匯入流程可能會因為記憶體達到上限而頻繁的將記憶體中的資料寫回磁碟。而過大的記憶體限制可能導致當匯入並發較高時系統OOM。所以需要根據需求合理地設定記憶體參數。

通用系統配置

FE配置

以下配置屬於FE的系統配置,可以通過FE的設定檔fe.conf來修改。

參數描述
max_load_timeout_second匯入逾時時間的最大、最小取值範圍,均以秒為單位。預設的最大逾時時間為3天,最小逾時時間為1秒。您自訂的匯入逾時時間不可超過該範圍。該參數通用於所有類型的匯入任務。
min_load_timeout_second
desired_max_waiting_jobs等待隊列可以容納的最多匯入任務數目,預設值為100。

例如,FE中處於PENDING狀態(即等待執行)的匯入任務數目達到該值,則新的匯入請求會被拒絕。此配置僅對非同步執行的匯入有效,如果處於等待狀態的非同步匯入任務數達到限額,則後續建立匯入的請求會被拒絕。

max_running_txn_num_per_db每個資料庫中正在啟動並執行匯入任務的最大個數(不區分匯入類型、統一計數),預設值為100。

當資料庫中正在啟動並執行匯入任務超過最大值時,後續的匯入任務不會被執行。如果是同步作業,則作業會被拒絕;如果是非同步作業,則作業會在隊列中等待。

label_keep_max_second匯入任務記錄的保留時間。

已經完成的(FINISHED或CANCELLED)匯入任務記錄會在StarRocks系統中保留一段時間,時間長短則由此參數決定。參數預設值為3天。該參數通用於所有類型的匯入任務。

BE配置

以下配置屬於BE的系統配置,可以通過BE的設定檔be.conf來修改。

參數描述
push_write_mbytes_per_secBE上單個Tablet的寫入速度限制。預設值是10,即10MB/s。

根據Schema以及系統的不同,通常BE對單個Tablet的最大寫入速度大約在10~30MB/s之間。您可以適當調整該參數來控制匯入速度。

write_buffer_size匯入資料在BE上會先寫入到一個記憶體塊,當該記憶體塊達到閾值後才會寫回磁碟。預設值為100 MB。

過小的閾值可能導致BE上存在大量的小檔案。您可以適當提高該閾值減少檔案數量。但過大的閾值可能導致RPC逾時,詳細請參見參數tablet_writer_rpc_timeout_sec

tablet_writer_rpc_timeout_sec匯入處理程序中,發送一個Batch(1024行)的RPC逾時時間。預設為600秒。

因為該RPC可能涉及多個分區記憶體塊的寫盤操作,所以可能會因為寫盤導致RPC逾時,可以適當調整逾時時間來減少逾時錯誤(例如send batch fail)。同時,如果調大參數write_buffer_size,則tablet_writer_rpc_timeout_sec參數也需要適當調大。

streaming_load_rpc_max_alive_time_sec在匯入處理程序中,StarRocks會為每個Tablet開啟一個Writer,用於接收資料並寫入。該參數指定了Writer的等待逾時時間。預設為600秒。

如果在參數指定時間內Writer沒有收到任何資料,則Writer會被自動銷毀。當系統處理速度較慢時,Writer可能長時間接收不到下一批資料,導致匯入報錯TabletWriter add batch with unknown id。此時可適當調大該參數。

load_process_max_memory_limit_percent分別為最大記憶體和最大記憶體百分比,限制了單個BE上可用於匯入任務的記憶體上限。系統會在兩個參數中取較小者,作為最終的BE匯入任務記憶體使用量上限。
  • load_process_max_memory_limit_percent:表示對BE總記憶體限制的百分比。預設為80。總記憶體限制mem_limit預設為80%,表示對實體記憶體的百分比。即假設實體記憶體為M,則預設匯入記憶體限制為M * 80% * 80%。
  • load_process_max_memory_limit_bytes:預設為100 GB。
load_process_max_memory_limit_bytes