全部產品
Search
文件中心

ApsaraDB for SelectDB:Stream Load

更新時間:Aug 30, 2025

當您需要將本地檔案或資料流匯入到ApsaraDB for SelectDB執行個體時,可以使用Stream Load進行資料匯入。本文介紹如何通過Stream Load匯入資料至ApsaraDB for SelectDB

背景資訊

Stream Load屬於同步介面的匯入方式,您可以通過發送HTTP請求將本地檔案或資料流匯入到ApsaraDB for SelectDBStream Load執行後會立即返回匯入結果,您可以通過請求的返回結果判斷此次匯入是否成功。其支援的資料格式有CSV(文本)、JSONPARQUETORC

重要

由於Stream load具有高吞吐、低延遲以及靈活可靠的特性,強烈建議您將Stream Load作為主要的資料匯入方式。

準備工作

  1. 確保發起Stream Load請求的終端與SelectDB網路互連:

    1. ApsaraDB for SelectDB執行個體申請公網地址。具體操作,請參見申請和釋放公網地址

      如果您發起Stream Load請求的終端與ApsaraDB for SelectDB執行個體位於同一VPC下,跳過此步驟。

    2. 將發起Stream Load請求終端的相關IP添加至ApsaraDB for SelectDB的白名單。具體操作,請參見設定白名單

    3. 若發起Stream Load請求終端存在白名單機制,已將SelectDB執行個體所在網段IP添加至源叢集的白名單中。

  2. (可選)修改計算叢集(BackEnd)配置,開啟Stream Load操作記錄。

    預設情況下,計算叢集不保留Stream Load操作記錄。

    如果您需要跟蹤Stream Load的操作情況,需在建立匯入任務之前配置enable_stream_load_record=true並重啟叢集,以啟用Stream Load操作記錄。如需開啟此功能,請提交工單以獲得支援人員。

  3. (可選)修改計算叢集配置,調整Stream load的最大匯入限制。

    預設情況下,Stream Load匯入檔案的最大限制為10240MB。

    如果您的原始檔案超過此值,則需調整後端參數streaming_load_max_mb。修改參數,請參見參數配置

  4. (可選)修改FE配置,調整匯入逾時時間。

    預設情況下,Stream Load的匯入任務逾時時間為600秒。如果匯入任務在設定的逾時時間內未完成,系統將取消該任務,並將其狀態更改為CANCELLED。

    如果匯入的源檔案無法在規定時間內完成,您可以在Stream Load請求中設定單獨的逾時時間,或者調整FE參數stream_load_default_timeout_second並重啟執行個體以設定全域的預設逾時時間。如需調整,請提交工單以獲得支援人員。

注意事項

單次Stream Load可以寫入幾百MB至1GB的資料。在業務情境中,頻繁的寫入少量資料可能導致執行個體效能大幅下降,甚至資料庫表死結。強烈建議您降低寫入頻率,對資料進行攢批處理。

  • 業務端攢批:您需自行收集業務資料,然後向SelectDB發起Stream Load請求。

  • 服務端攢批SelectDB接收到Stream Load請求後,服務端將進行請求資料的批處理操作。如何操作,請參見Group Commit

建立匯入任務

Stream Load通過HTTP協議提交和傳輸資料。以下為通過curl命令提交匯入樣本,該命令可在Linux或macOS系統的終端或Windows系統的命令提示字元下執行。此外,Stream Load還支援通過其他HTTP用戶端進行操作。

文法

curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

參數說明

參數名稱

是否必選

參數說明

--location-trusted

需要認證時,會將usernamepassword傳遞給被重新導向到的伺服器。

-u

指定SelectDB執行個體的使用者名稱和密碼。

  • username:使用者名稱。

  • password:密碼。

-H

指定本次Stream Load匯入請求的要求標頭(Header)內容。格式如下:

-H "key1:value1"

常見參數如下:

  • label:匯入任務的唯一標識。

  • column_separator:指定匯入檔案中的資料行分隔符號,預設為\t

    您也可以使用多個字元的組合作為資料行分隔符號。

    如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。

更多要求標頭參數,請參見要求標頭參數說明

-T

指定需要匯入資料的檔案路徑。

file_path:目標檔案路徑。

-XPUT

HTTP請求的Method,採用PUT要求方法,指定SelectDB的資料匯入地址,具體參數如下:

  • hostSelectDB執行個體的VPC地址或公網地址。

    • 非同一VPC使用公網地址:如果您執行命令的終端所在裝置與目標SelectDB執行個體不在同一VPC下,您需使用公網地址。如何申請公網,請參見申請和釋放公網地址

    • 同一VPC使用VPC地址:如果您執行命令的終端所在裝置為阿里雲產品,且與目標SelectDB執行個體處於同一VPC下,建議您使用VPC地址。

  • portSelectDB執行個體的HTTP連接埠號碼,預設為8080。

    您可以在SelectDB的執行個體詳情頁面查看執行個體的串連地址和連接埠號碼。

  • db_name:資料庫名。

  • table_name:資料表名。

要求標頭參數說明

Stream Load使用HTTP協議,因此匯入任務有關的參數主要設定在要求標頭(Header)中。常用的匯入參數如下。

參數名稱

參數說明

label

匯入任務的唯一標識。

Label作用:

  • 匯入命令中自訂的名稱。

  • 可用於查看對應匯入任務的執行情況。

  • 可用於防止重複匯入相同的資料。

  • 對應的匯入作業狀態為CANCELLED時,可以再次被使用。

重要

推薦同一批次資料使用相同的Label。這樣同一批次資料的重複請求只會被接受一次,保證了At-Most-Once

format

指定匯入資料格式。

  • 支援的格式:CSVJSONPARQUETORCcsv_with_names(CSV檔案行首過濾)和csv_with_names_and_typesCSV檔案前兩行過濾)。

  • 預設格式:CSV

各類檔案的格式要求以及一些相關參數使用,請參見檔案格式

line_delimiter

指定匯入檔案中的分行符號。

您也可以使用多個字元的組合作為分行符號。例如,在Windows系統中,使用\r\n作為分行符號。

column_separator

指定匯入檔案中的資料行分隔符號。

您也可使用多個字元的組合作為資料行分隔符號。例如,可以使用雙豎線||作為資料行分隔符號。

如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如:Hive檔案的分隔字元\x01,需要指定為-H"column_separator:\x01"

compress_type

指定檔案的壓縮格式。僅支援CSVJSON檔案的壓縮。

支援的壓縮格式:gzlzobz2lz4lzopdeflate

max_filter_ratio

指定匯入任務的最大容忍率。

當匯入的錯誤率超過該閾值時,匯入將失敗。如需忽略錯誤行,必須將該參數設定為大於0,以確保匯入成功。

  • 預設值:0,即零容忍。

  • 取值範圍:[0,1]。

strict_mode

指定是否開啟嚴格過濾模式。

  • false(預設值):不開啟。

  • true:開啟。開啟後,會對匯入處理程序中的列類型轉換進行嚴格過濾。

    • 錯誤的資料將被過濾。

    • 非空未經處理資料的列類型變換如果結果為NULL,也會被過濾。

cloud_cluster

指定匯入使用的叢集。

預設為該執行個體的預設叢集。如果該執行個體沒有設定預設叢集,則會自動選擇一個有許可權的叢集。

load_to_single_tablet

指定是否僅將資料匯入到對應分區的一個tablet。該參數僅允許在對具有random分桶的Duplicate表進行資料匯入時設定。

  • false(預設值):代表向random分桶的Duplicate模型表匯入資料時,不會只向對應分區的一個分桶寫入資料。

  • true:代表向random分桶的Duplicate模型表匯入資料時,會只向對應分區的一個分桶寫入資料,從而可以提高資料匯入的並發度和輸送量。

where

指定匯入任務的過濾條件。

支援對未經處理資料指定where語句進行過濾,被過濾的資料將不會被匯入,也不會參與filter ratio的計算,但會被計入到被where條件過濾的行數

num_rows_unselected中。

partitions

指定待匯入資料的分區(Partition)資訊。

如果待匯入資料不屬於指定的分區(Partition)則不會被匯入。這些資料將計入dpp.abnorm.ALL。

dpp.abnorm.ALL 是SelectDB中的一個計數器指標。表示在資料預先處理階段被過濾掉的總行數。匯入結果中的 NumberFilteredRows 就包含了 dpp.abnorm.ALL 統計的異常行數

columns

指定待匯入資料的函數變換配置。

支援的函數變換方法包含列的順序變化以及運算式變換,其中運算式變換的方法與查詢語句的一致。

merge_type

指定資料合併類型。

  • APPEND(預設值):表示本次匯入是普通的追加寫操作。

  • MERGE:需要配合delete參數使用,以標註Delete Flag列。

  • DELETE:表示本次匯入的所有資料皆為刪除資料。

重要

MERGEDELETE類型僅適用於Unique模型。

delete

僅在指定merge_type類型為MERGE時才具有意義,表示資料的刪除條件。

function_column.sequence_col

僅適用於Unique模型,相同Key列下,保證Value列按照source_sequence列進行REPLACE,source_sequence可以是資料來源中的列,也可以是表結構中的一列。

exec_mem_limit

指定匯入記憶體限制。

  • 單位:位元組。

  • 預設值:2147483648,即2 GiB

timeout

指定匯入的逾時時間。

  • 單位:秒。

  • 預設值為600

  • 範圍:[1,259200]。

timezone

指定本次匯入所使用的時區。該參數會影響所有匯入涉及的和時區有關的函數結果。有關時區,您可通過IANA時區資料庫查看

預設值:Asia/Shanghai,即東八區。

two_phase_commit

指定是否開啟兩階段事務提交模式。

  • false(預設值):不開啟。

  • true:開啟。開啟兩階段事務提交模式後,資料寫入完成即會返回資訊給使用者,此時資料不可見,事務狀態為PRECOMMITTED,使用者手動觸發commit操作之後,資料才可見。

  • 是否開啟建議

    開啟後資料匯入具有原子性。要麼全部成功,要麼全部失敗,同時避免了匯入處理程序中出現部分資料可見的情況。

    以下三種情境下適合開啟:

    • 金融交易資料:需要嚴格保證資料完整性和一致性。

    • 計費系統資料:不允許出現部分資料匯入的情況。

    • 關鍵業務資料:對資料準確性要求極高的情境。

    以下三種情境適合關閉:

    • 日誌分析:對一致性要求不高,追求匯入速度。

    • 巨量資料量批處理:資源有限,需要快速完成匯入。

    • 可重複匯入的資料:如果匯入失敗可以重新匯入的資料。

jsonpaths

匯入JSON資料格式有兩種方式:

  • 簡單模式:無需指定jsonpaths,要求JSON中的key列名與表中的列名是一一對應的,順序可以不一樣,如JSON資料{"k1":1, "k2":2, "k3":"hello"},其中k1、k2、k3分別對應表中的列名。

  • 匹配模式:JSON資料相對複雜時,通過參數jsonpaths匹配其中的key列到表對應的列,如jsonpaths:["$.status", "$.res.id", "$.res.count"]可抽取JSON資料中的嵌套欄位寫入對應表中,預設情況下jsonpaths抽取的欄位會按順序映射到表的相應欄位。

json_root

json_root可用於指定JSON中的子物件,作為匯入解析的根節點。

預設值:"",代表選擇整個JSON作為匯入解析的根節點。

read_json_by_line

Stream Load 中處理 JSON 格式資料的一個重要參數,它控制著如何解析包含多行JSON資料的輸入檔案。

  • false(預設值):將整個輸入檔案視為單個 JSON 值或數組,核心會嘗試解析整個檔案內容作為一個 JSON 對象或數組。

    例如,如果檔案中是以下內容:

    [
     {"id":1, "name":"Alice", "age":25},
     {"id":2, "name":"Bob", "age":30},
     {"id":3, "name":"Charlie", "age":35}
    ]

    則整個檔案內容會被解析為一個JSON數組。

  • true:表示匯入資料的每一行是一個JSON對象。

    例如,如果檔案中是以下內容:

    {"id":1, "name":"Alice", "age":25}
    {"id":2, "name":"Bob", "age":30}
    {"id":3, "name":"Charlie", "age":35}

    檔案中每行被解析為一個JSON對象。

strip_outer_array

Stream Load 中處理JSON格式資料的一個重要參數,它控制著如何解析包含外層數組的JSON資料。

  • false(預設值):表示會保留JSON資料的原始結構,不剝離外層數組,效果是將整個JSON數組作為一條記錄匯入。

    例如樣本資料[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在設定strip_outer_array為false後,會解析為一個數組資料匯入表中。

  • true:當匯入資料格式為JSON數組時,需要設定strip_outer_array為 true。

    例如樣本資料[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在設定strip_outer_array為true後,會解析為兩條資料匯入表中。

重要

當需要匯入JSON格式的資料時,非數組格式的效能大幅高於數組格式。

樣本

將CSV格式的檔案data.csv匯入至VPC地址為selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com執行個體的test_db庫的test_table表中。此處僅提供建立匯入的curl指令樣本,完整樣本請參見匯入資料的完整樣本

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

返回結果說明

Stream load是一種同步匯入方式,匯入結果將通過建立匯入的傳回值直接返回。返回結果樣本如下。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

返回結果參數說明如下。

參數名稱

參數說明

TxnId

匯入的事務ID。

Label

匯入標識

您可自訂標識,也可通過系統自動產生。

Status

匯入狀態,取值如下:

  • Success:匯入成功。

  • Publish Timeout匯入任務已完成,但資料可能會有所延遲,無需重複嘗試。

  • Label Already ExistsLabel重複,需更換Label

  • Fail:匯入失敗。

ExistingJobStatus

已存在的Label對應的匯入作業的狀態。

該欄位僅在Status為Label Already Exists時才會顯示。

您通過此狀態,判斷已存在Label對應的匯入任務的狀態。

  • RUNNING:任務還在執行。

  • FINISHED:任務成功。

Message

錯誤資訊提示。

NumberTotalRows

匯入總處理的行數。

NumberLoadedRows

成功匯入的行數。

NumberFilteredRows

資料品質不合格的行數。

NumberUnselectedRows

where條件過濾的行數。

LoadBytes

匯入的位元組數。

LoadTimeMs

匯入耗時。

單位:毫秒。

BeginTxnTimeMs

向FE請求開始一個事務所花費的時間。

單位:毫秒。

StreamLoadPutTimeMs

向FE請求擷取匯入資料執行計畫所花費的時間。

單位:毫秒。

ReadDataTimeMs

讀取資料所花費的時間。

單位:毫秒。

WriteDataTimeMs

執行寫入資料操作所花費的時間。

單位:毫秒。

CommitAndPublishTimeMs

向FE請求提交並且發布事務所花費的時間。

單位:毫秒。

ErrorURL

如果有資料品質問題,可通過訪問這個URL查看具體錯誤行。

取消匯入任務

Stream Load匯入任務建立後,無法手動取消。任務僅在出現逾時或匯入錯誤的情況下,由系統自動取消。您可根據返回結果中的errorUrl下載報錯資訊,進行錯誤排查。

查看Stream Load任務

如果您已開啟Stream Load操作記錄,您可以通過MySQL用戶端串連雲資料庫SelectDB版執行個體,執行show stream load語句查看已經完成的Stream Load任務。

匯入資料的完整樣本

準備工作

開始匯入操作前,您需完成準備工作

匯入CSV資料

通過指令碼匯入樣本

  1. 建立待匯入資料的表。

    1. 串連SelectDB執行個體。具體操作,請參見通過DMS串連雲資料庫SelectDB版執行個體

    2. 執行建庫語句。

      CREATE DATABASE test_db;
    3. 執行建表語句。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int,
          address varchar(50),
          url varchar(500)
      )
      UNIQUE KEY(`id`, `name`)
      DISTRIBUTED BY HASH(id) BUCKETS 16
      PROPERTIES("replication_num" = "1");
  2. 在發起Stream Load的終端所在裝置,建立待匯入檔案test.csv

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. 匯入資料。

    開啟目標裝置的終端,通過curl命令發起Stream Load任務,匯入資料。

    建立匯入任務的具體文法以及參數說明,請參見建立匯入任務,以下為常見匯入情境的樣本。

    • 使用Label去重,指定逾時時間。

      將檔案test.csv中的資料匯入到資料庫test_db中的test_table表,使用Label避免匯入重複批次的資料,並指定逾時時間為100秒。

       curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 使用Label去重,並使用列篩選檔案中要匯入的資料。

      將檔案test.csv中的資料匯入到資料庫test_db中的test_table表,使用Label避免匯入重複批次的資料,指定檔案的列名,並且只匯入address等於hangzhou的資料。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 允許20%容錯率。

      將檔案test.csv中的資料匯入到資料庫test_db中的test_table表,允許20%的錯誤率。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 使用strict 模式並設定時區。

      匯入資料進行strict 模式過濾,並設定時區為Africa/Abidjan

      curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 刪除SelectDB中的資料

      刪除SelectDBtest.csv檔案中相同的資料。

      curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 根據條件刪除檔案中不需要匯入的資料,剩餘資料正常匯入SelectDB

      將test.csv檔案中address列為hangzhou的資料的行刪除,其他行正常匯入至SelectDB

      curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load

通過Java代碼匯入樣本

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. 參數配置
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. 構建httpclient,特別注意需要開啟重新導向(isRedirectable)
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // 開啟重新導向
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. 構建httpPut請求對象
        HttpPut httpPut = new HttpPut(loadUrl);

        // 設定httpHeader...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. 設定要發送的資料,這裡寫入csv
        // 假設有一張表,欄位如下:
        // field1,field2,field3,field4
        // 這裡類比了三條csv記錄,doris 中csv的行分隔字元預設是\n,資料行分隔符號預設是\t
        // String data =
        //        "1\t2\t3\t4\n" +
        //        "11\t22\t33\t44\n" +
        //        "111\t222\t333\t444";
        // 讀取所有行
         List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
        // 用\n串連所有行
        String data = String.join("\n", lines);
        
        httpPut.setEntity(new StringEntity(data));

        // 5. 發送請求,處理結果
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // 選擇適合的JSON序列化組件,對返回結果進行序列化
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // 擷取SelectDB返回的狀態代碼...
            String dorisStatus = respAsMap.get("Status");
            // SelectDB返回以下狀態,都表示資料寫入成功
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

匯入JSON資料

  1. 建立待匯入資料的表。

    1. 串連SelectDB執行個體。具體操作,請參見通過DMS串連雲資料庫SelectDB版執行個體

    2. 執行建庫語句。

      CREATE DATABASE test_db;
    3. 執行建表語句。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int
      )
      UNIQUE KEY(`id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 16
      PROPERTIES("replication_num" = "1");

  2. 匯入資料。

    重要

    當需要匯入JSON格式的資料時,非數組格式的效能大幅高於數組格式。

    匯入非數組格式資料

    1. 在發起Stream Load的終端,建立json.data,檔案包含多行,一行一個JSON記錄。內容如下:

      {"id":1,"name":"Emily","age":25}
      {"id":2,"name":"Benjamin","age":35}
      {"id":3,"name":"Olivia","age":28}
      {"id":4,"name":"Alexander","age":60}
      {"id":5,"name":"Ava","age":17}
    2. 匯入資料。

      開啟裝置終端,通過curl命令發起Stream Load任務,將檔案json.data中的資料匯入到資料庫test_dbtest_table表中。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

    匯入數組格式資料

    1. 在發起Stream Load的終端,建立JSON數組格式的資料檔案json_array.data

      [
      {"userid":1,"username":"Emily","userage":25},
      {"userid":2,"username":"Benjamin","userage":35},
      {"userid":3,"username":"Olivia","userage":28},
      {"userid":4,"username":"Alexander","userage":60},
      {"userid":5,"username":"Ava","userage":17}
      ]
    2. 匯入資料。

      開啟裝置的終端,通過curl命令發起Stream Load任務,將本地檔案json_array.data中的資料匯入到資料庫test_dbtest_table表中。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Http Stream模式

在Stream Load中,依託Table Value Function(TVF)功能,可以通過使用SQL運算式來表達匯入的參數。這個Stream Load依託TVF功能後名為http_stream。更多Table Value Function(TVF)的使用方式,詳情請參見TVF

使用http_stream進行Stream Load匯入時的Rest API URL不同於Stream Load普通匯入的 URL。

  • 普通Stream Load的URL為:http://host:http_port/api/{db}/{table}/_stream_load

  • 使用TVF http_stream的URL 為:http://host:http_port/api/_http_stream

文法

Stream Load的Http Stream模式。

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

Http Stream參數說明請參見參數說明

使用樣本

在Http Header中添加一個SQL的參數load_sql,去替代之前參數中的column_separatorline_delimiterwherecolumns等參數,SQL參數load_sql樣本如下。

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

完整樣本:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream

常見問題

Q1:匯入處理程序中,報get table cloud commit lock timeout怎麼辦?

由於您寫入資料頻率太快,導致表死結。強烈建議您降低寫入頻率,對資料進行攢批處理。單次Stream Load可以寫入幾百MB至1GB的資料。

Q2:匯入CSV檔案格式時,資料中存在資料行分隔符號和行分隔字元應該如何處理?

您需重新指定資料行分隔符號和行分隔字元,並修改匯入資料文本,確保資料與分割符不衝突,使得資料能被正常解析。樣本如下:

存在行分隔字元

如果匯入資料中包含已指定的分行符號,例如預設的分行符號\n,則需重新指定分行符號。

例如,您的資料檔案為:

張三\n,25,陝西
李四\n,30,北京

此情境中,檔案中的\n為資料而非分行符號,但該檔案的預設分行符號也是\n,如需檔案能被正常解析,您需通過line_delimiter指定分行符號,資料文本每行資料行末也需顯示寫分行符號。樣本如下:

  1. 設定匯入行分行符號。

    例如,您將預設的分行符號\n替換為\r\n,則匯入資料時,您需設定-H "line_delimiter:\r\n"

  2. 為匯入資料行末添加指定的分行符號。上述樣本文本則需修改為:

    張三\n,25,陝西\r\n
    李四\n,30,北京\r\n

存在資料行分隔符號

如果匯入資料中包含已指定的資料行分隔符號,例如預設的列分割符\t,則需重新指定列分割符。

例如,您的資料檔案為:

張三\t  25  陝西
李四\t  30  北京

此情境中,檔案中的\t為資料而非資料行分隔符號,但該檔案預設使用的資料行分隔符號也是\t(定位字元),如需檔案能被正常解析,您需通過column_separator重新指定資料行分隔符號,資料文本每行資料也需顯示增加資料行分隔符號。樣本如下:

  1. 設定匯入列分割符。

    例如,您將預設的列分割符\t替換為逗號,,則匯入資料時,您需設定-H "column_separator:,"

  2. 為匯入資料列添加指定的資料行分隔符號。上述樣本文本則為:

    張三\t,25,陝西
    李四\t,30,北京