當您需要將本地檔案或資料流匯入到ApsaraDB for SelectDB執行個體時,您可以使用Stream Load進行資料同步匯入,並通過即時的返回結果判斷本次匯入是否成功。本文介紹如何通過Stream Load匯入資料至ApsaraDB for SelectDB執行個體中。
背景資訊
Stream Load屬於同步介面的匯入方式,您可以通過發送HTTP請求將本地檔案或資料流匯入到ApsaraDB for SelectDB執行個體中。Stream Load執行並返回匯入結果,您可以通過請求的返回結果判斷本次匯入是否成功。
Stream Load適用於匯入本地檔案或通過程式匯入資料流中的資料,支援的資料格式包括:CSV
(文本)、JSON
、PARQUET
和ORC
。
建立匯入
Stream Load通過HTTP協議提交和傳輸資料。以下通過curl命令提交匯入,也可以通過其他HTTP Client進行操作。
文法
# Header中支援的屬性,請參見下面的參數說明。
# 格式為: -H "key1:value1"。
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_name> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load
參數說明
參數名稱 | 參數說明 |
| 需要認證時,會將 |
| 指定ApsaraDB for SelectDB執行個體的使用者名稱和密碼。 |
| 指定本次Stream Load匯入請求的要求標頭(Header)內容。 |
| 指定需要匯入資料的檔案。 |
| HTTP請求的Method,採用PUT要求方法,指定ApsaraDB for SelectDB的資料匯入地址,其中包括參數如下。
|
Stream Load使用HTTP協議,因此匯入任務有關的參數主要設定在要求標頭(Header)中。常用的匯入參數如下。
參數名稱 | 參數說明 |
| 匯入任務的唯一標識。 說明 推薦同一批次資料使用相同的 |
| 指定匯入資料格式,預設值為 |
| 指定匯入檔案中的分行符號,預設為 |
| 指定匯入檔案中的資料行分隔符號,預設為 |
| 指定檔案的壓縮格式。僅支援 |
| 指定匯入任務的最大容忍率,預設為 |
| 指定是否開啟嚴格過濾模式,預設為 |
| 指定匯入使用的叢集。預設為該執行個體的預設叢集。如果該執行個體沒有設定預設叢集,則自動選擇一個有許可權的叢集。 |
| 指定是否只匯入資料到對應分區的一個tablet,預設值為 |
| 指定匯入任務的過濾條件。支援對未經處理資料指定 |
| 指定待匯入資料的分區(Partition)資訊。如果待匯入資料不屬於指定的分區(Partition)則不會被匯入。這些資料將計入dpp.abnorm.ALL。 |
| 指定待匯入資料的函數變換配置。支援的函數變換方法包含列的順序變化以及運算式變換,其中運算式變換的方法與查詢語句的一致。 |
| 指定資料合併類型,預設為 |
| 僅在指定 |
| 只適用於UNIQUE_KEYS,相同Key列下,保證Value列按照source_sequence列進行REPLACE, source_sequence可以是資料來源中的列,也可以是表結構中的一列。 |
| 指定匯入記憶體限制。單位為位元組,預設為 |
| 指定匯入的逾時時間,單位:秒,預設值為 |
| 指定本次匯入所使用的時區,預設為" |
| 指定是否開啟兩階段事務提交模式,預設為 |
Stream load是一種同步的匯入方式,因此匯入的結果會通過建立匯入的傳回值直接返回給使用者。返回結果樣本如下。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}
返回結果參數說明如下。
參數名稱 | 參數說明 |
| 匯入的事務ID。 |
| 匯入 |
| 匯入狀態,取值如下:
|
| 已存在的 這個欄位只有在當Status為" |
| 錯誤資訊提示。 |
| 匯入總處理的行數。 |
| 成功匯入的行數。 |
| 資料品質不合格的行數。 |
| 被 |
| 匯入的位元組數。 |
| 匯入完成時間,單位毫秒。 |
| 向Fe請求開始一個事務所花費的時間,單位:毫秒。 |
| 向Fe請求擷取匯入資料執行計畫所花費的時間,單位:毫秒。 |
| 讀取資料所花費的時間,單位:毫秒。 |
| 執行寫入資料操作所花費的時間,單位:毫秒。 |
| 向Fe請求提交並且發布事務所花費的時間,單位:毫秒。 |
| 如果有資料品質問題,通過訪問這個URL查看具體錯誤行。 |
使用Stream Load匯入資料,樣本如下。
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
取消匯入
無法手動取消Stream Load,Stream Load在逾時或者匯入錯誤後會被系統自動取消。
查看Stream Load
您可以通過show stream load
來查看已經完成的Stream load任務。預設BE(BackEnd)不保留Stream Load的啟用記錄,如果您要查看則需要在BE上啟用記錄,配置參數為:enable_stream_load_record=true,具體操作請參見BE配置項。
使用樣本
指令碼樣本
建立待匯入的SelectDB資料表,樣本如下。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
建立待匯入檔案
test.data
,樣本如下。1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.com
使用不同參數配置匯入資料,樣本如下。
將本地檔案
test.data
中的資料匯入到資料庫test_db
中的test_table
表,使用Label
用於去重,指定逾時時間為100秒。curl --location-trusted -u root -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
將本地檔案
test.data
中的資料匯入到資料庫test_db
中的test_table
表,使用Label用於去重,指定檔案的列名,並且只匯入address等於hangzhou的資料。curl --location-trusted -u root -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
將本地檔案
test.data
中的資料匯入到資料庫test_db
中的test_table
表,允許20%的錯誤率。curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
匯入資料進行strict 模式過濾,並設定時區為
Africa/Abidjan
。curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
刪除與這批匯入Key相同的資料。
curl --location-trusted -u root -H "merge_type: DELETE" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
將這批資料中address列為hangzhou的資料的行刪除,其他行正常追加。
curl --location-trusted -u root: -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.data http://host:port/api/testDb/testTbl/_stream_load
Java程式碼範例
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. 參數配置
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. 構建httpclient,特別注意需要開啟重新導向(isRedirectable)
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// 開啟重新導向
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. 構建httpPut請求對象
HttpPut httpPut = new HttpPut(loadUrl);
// 設定httpHeader...
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. 設定要發送的資料,這裡寫入csv
// 假設有一張表,欄位如下:
// field1,field2,field3,field4
// 這裡類比了三條csv記錄,doris 中csv的行分隔字元預設是\n,資料行分隔符號預設是\t
String data =
"1\t2\t3\t4\n" +
"11\t22\t33\t44\n" +
"111\t222\t333\t444";
httpPut.setEntity(new StringEntity(data));
// 5. 發送請求,處理結果
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// 選擇適合的JSON序列化組件,對返回結果進行序列化
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// 擷取SelectDB返回的狀態代碼...
String dorisStatus = respAsMap.get("Status");
// SelectDB返回以下狀態,都表示資料寫入成功
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}
相關係統配置
FE配置
stream_load_default_timeout_second
:匯入任務的逾時時間,單位:秒。預設值為600。匯入任務在設定的timeout時間內未完成則會被系統取消,變成CANCELLED。如果匯入的源檔案無法在規定時間內完成匯入,您可以在Stream load請求中設定單獨的逾時時間或者調整FE的參數stream_load_default_timeout_second
來設定全域的預設逾時時間。
BE配置
streaming_load_max_mb
:Stream load的最大匯入大小,單位:MB,預設值為10240。如果您的原始檔案超過該值,則需要調整BE參數streaming_load_max_mb
。
Http Stream模式
在Stream Load中,依託Table Value Function(TVF)功能,可以通過使用SQL運算式來表達匯入的參數。這個Stream Load依託TVF功能後名為http_stream
。更多Table Value Function(TVF)的使用方式,詳情請參見TVF。
使用http_stream進行Stream Load匯入時的Rest API URL不同於Stream Load普通匯入的 URL。
普通Stream Load的URL為:
http://host:http_port/api/{db}/{table}/_stream_load
。使用TVF http_stream的URL 為:
http://host:http_port/api/_http_stream
。
文法
Stream Load的Http Stream模式。
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream
Http Stream參數說明請參見參數說明。
使用樣本
在Http Header中添加一個SQL
的參數load_sql,去替代之前參數中的column_separator
、line_delimiter
、where
、columns
等參數,SQL
參數load_sql樣本如下。
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");
完整樣本:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_stream