全部產品
Search
文件中心

E-MapReduce:Stream Load

更新時間:Jul 01, 2024

StarRocks支援從本地直接匯入資料,支援CSV檔案格式,資料量在10 GB以下。本文為您介紹Stream Load匯入的基本原理、使用樣本和最佳實務。

背景資訊

Stream Load是一種同步的匯入方式,通過發送HTTP請求將本地檔案或資料流匯入到StarRocks中。Stream Load同步執行匯入並返回匯入結果。您可以直接通過請求的傳回值判斷匯入是否成功。

基本概念

Coordinator:協調節點。負責接收資料並分發資料到其他資料節點,匯入完成後返回結果。

基本原理

Stream Load通過HTTP協議提交匯入命令。如果提交到FE節點,則FE節點會通過HTTP Redirect指令將請求轉寄給某一個BE節點,您也可以直接提交匯入命令給某一指定BE節點。該BE節點作為Coordinator節點,將資料按表Schema劃分並分發資料到相關的BE節點。匯入的最終結果由Coordinator節點返回給使用者。

Stream Load的主要流程如下圖所示。Stream Load

匯入樣本

建立匯入任務

Stream Load通過HTTP協議提交和傳輸資料。本樣本通過curl命令展示如何提交匯入任務。您也可以通過其他HTTP Client進行操作。

  • 文法
    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \
        http://fe_host:http_port/api/{db}/{table}/_stream_load
    說明
    • 當前支援HTTP chunked與非chunked兩種上傳方式,對於非chunked方式,必須要有Content-Length來標示上傳的內容長度,保證資料的完整性。
    • 建議設定Expect Header欄位內容為100-continue,可以在某些出錯情境下避免不必要的資料轉送。

    Header中支援的屬性見下表的匯入任務參數描述,格式為-H "key1:value1"。如果同時有多個任務參數,需要用多個-H來指示,類似於-H "key1:value1" -H "key2:value2"……

    建立匯入任務的詳細文法可以通過HELP STREAM LOAD命令查看。Stream Load中所有與匯入任務相關的參數均設定在Header中。相關參數描述如下表所示。

    參數描述
    簽名參數user:passwdStream Load建立匯入任務使用的是HTTP協議,已通過Basic access authentication進行簽名。StarRocks系統會根據簽名來驗證使用者身份和匯入許可權。
    匯入任務參數label 匯入任務的標籤,相同標籤的資料無法多次匯入。

    您可以通過指定Label的方式來避免一份資料重複匯入的問題。當前StarRocks系統會保留最近30分鐘內成功完成的任務的Label。

    column_separator用於指定匯入檔案中的資料行分隔符號,預設為\t。

    如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如,Hive檔案的分隔字元\x01,需要指定為-H "column_separator:\x01"

    row_delimiter指定匯入檔案中的行分隔字元,預設為\n。
    重要 curl命令無法傳遞\n,分行符號手動指定為\n時,shell會先傳遞反斜線(\),然後傳遞n而不是直接傳遞分行符號\n。

    Bash支援另一種逸出字元串文法,傳遞\n和\t時,使用貨幣符號和全形單引號($')啟動字串並以半形單引號(')結束字串。例如,-H $'row_delimiter:\n'

    columns用於指定匯入檔案中的列和Table中列的對應關係。
    如果源檔案中的列正好對應表中的內容,則無需指定該參數。如果源檔案與表Schema不對應,則需要該參數來配置資料轉換規則。列有兩種形式,一種是直接對應於匯入檔案中的欄位,可以直接使用欄位名表示,一種需要通過計算得出。
    • 樣本1:表中有3列c1, c2, c3,源檔案中的3列依次對應的是c3,c2,c1,則需要指定-H "columns: c3, c2, c1"
    • 樣本2:表中有3列c1, c2, c3,源檔案中前3列與表中的列一一對應,但是還有多餘1列,則需要指定-H "columns: c1, c2, c3, temp",最後1列隨意指定名稱用於佔位即可。
    • 樣本3:表中有3列year, month, day,源檔案中只有一個時間列,為2018-06-01 01:02:03格式,則可以指定 -H "columns: col, year = year(col), month=month(col), day=day(col)"完成匯入。
    where用於抽取部分資料。使用者如需將不需要的資料過濾掉,那麼可以通過設定這個選項來達到。

    例如,只匯入k1列等於20180601的資料,則可以在匯入時指定-H "where: k1 = 20180601"

    max_filter_ratio最大容忍可過濾(例如,因為資料不規範等原因而過濾)的資料比例。預設零容忍。
    說明 此處資料不規範的資料不包括通過WHERE條件過濾的資料。
    partitions用於指定該匯入所涉及的Partition。

    如果您能夠確定資料對應的Partition,則推薦指定該項。不滿足指定分區的資料將被過濾掉。例如,指定匯入到p1和p2分區,可以指定-H "partitions: p1, p2"

    timeout指定匯入的逾時時間。預設是600秒。

    設定範圍為1~259200,單位為秒。

    strict_mode指定此次匯入是否開啟strict 模式,預設為開啟。

    關閉方式為-H "strict_mode: false"

    timezone指定本次匯入所使用的時區。預設為東八區。

    該參數會影響所有匯入涉及和時區有關的函數結果。

    exec_mem_limit匯入記憶體限制。預設值為2 GB。
  • 樣本
    curl --location-trusted -u root -T date -H "label:123" \
        http://abc.com:8030/api/test/date/_stream_load
  • 返回結果

    匯入任務完成後,Stream Load會以JSON格式返回匯入任務的相關內容,返回結果樣本如下。

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "ErrorURL": "[http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005](http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005)"
    }
    參數描述
    TxnId匯入的事務ID。使用者可不感知。
    Label匯入的Label。由使用者指定或系統自動產生。
    Status匯入完成狀態。
    • Success:表示匯入成功。
    • Publish Timeout:表示匯入已經完成,只是資料可能會延遲可見,無需重試。
    • Label Already Exists:Label重複,需更換Label。
    • Fail:匯入失敗。
    ExistingJobStatus已存在Label對應的匯入作業的狀態。該欄位只有當Status為Label Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的匯入作業的狀態。
    • RUNNING:表示作業在執行中。
    • FINISHED:表示作業成功。
    Message匯入狀態的詳細說明。匯入失敗時會返回具體的失敗原因。
    NumberTotalRows從資料流中讀取到的總行數。
    NumberLoadedRows匯入任務的資料行數,僅在匯入狀態為Success時有效。
    NumberFilteredRows匯入任務過濾掉的行數,即資料品質不合格的行。
    NumberUnselectedRows通過Where條件被過濾掉的行數。
    LoadBytes匯入任務的源檔案資料量大小。
    LoadTimeMs匯入任務所用的時間,單位為ms。
    ErrorURL被過濾資料的具體內容,僅保留前1000條資料。如果匯入任務失敗,可以直接用以下方式擷取被過濾的資料並進行分析,以調整匯入任務。
    wget http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005

取消匯入任務

Stream Load無法手動取消,Stream Load在逾時或者匯入錯誤後會被系統自動取消。

最佳實務

應用情境

Stream Load的最佳使用情境是原始檔案在記憶體中或者儲存在本地磁碟中。由於Stream Load是一種同步的匯入方式,所以當您希望用同步方式擷取匯入結果時,也可以使用該匯入方式。

資料量

由於Stream Load是由BE發起的匯入並分發資料,建議的匯入資料量在1 GB到10 GB之間。系統預設的最大Stream Load匯入資料量為10 GB,所以匯入超過10 GB的檔案需要修改BE的配置項streaming_load_max_mb。例如,待匯入檔案大小為15 GB,則可以修改BE的配置項streaming_load_max_mb大於15 GB即可。

Stream Load的預設逾時為300秒,按照StarRocks目前最大的匯入限速來看,匯入超過3 GB大小的檔案就需要修改匯入任務預設的逾時時間了。例如,匯入一個10 GB的檔案,timeout應該設定為1000s。

匯入任務逾時時間 = 匯入資料量 / 10M/s ,具體的平均匯入速度需要您根據自己的叢集情況計算。

完整樣本

資料情況:資料在用戶端本地磁碟路徑/home/store-sales中,匯入的資料量約為15 GB,希望匯入到資料庫bj-sales的表store-sales中。

叢集情況:Stream Load的並發數不受叢集大小影響。

樣本如下:
  1. 因為匯入檔案大小超過預設的最大匯入大小10 GB,所以需要修改BE的設定檔BE.conf

    例如,修改參數streaming_load_max_mb,將最大匯入大小調整為16000。

  2. 計算大概的匯入時間是否超過預設timeout值,匯入時間為15000 / 10 = 1500s,如果超過了預設的timeout時間,則需要修改FE的配置FE.conf,修改參數stream_load_default_timeout_second,將匯入時間調整為1500。
  3. 建立匯入任務。
    curl --location-trusted -u user:password -T /home/store_sales \
        -H "label:abc" [http://abc.com:8000/api/bj_sales/store_sales/_stream_load](http://abc.com:8000/api/bj_sales/store_sales/_stream_load)

代碼整合樣本