當您需要將本地檔案或資料流匯入到ApsaraDB for SelectDB執行個體時,可以使用Stream Load進行資料匯入。本文介紹如何通過Stream Load匯入資料至ApsaraDB for SelectDB。
背景資訊
Stream Load屬於同步介面的匯入方式,您可以通過發送HTTP請求將本地檔案或資料流匯入到ApsaraDB for SelectDB。Stream Load執行後會立即返回匯入結果,您可以通過請求的返回結果判斷此次匯入是否成功。其支援的資料格式有CSV(文本)、JSON、PARQUET和ORC。
由於Stream load具有高吞吐、低延遲以及靈活可靠的特性,強烈建議您將Stream Load作為主要的資料匯入方式。
準備工作
確保發起Stream Load請求的終端與SelectDB網路互連:
為ApsaraDB for SelectDB執行個體申請公網地址。具體操作,請參見申請和釋放公網地址。
如果您發起Stream Load請求的終端與ApsaraDB for SelectDB執行個體位於同一VPC下,跳過此步驟。
將發起Stream Load請求終端的相關IP添加至ApsaraDB for SelectDB的白名單。具體操作,請參見設定白名單。
若發起Stream Load請求終端存在白名單機制,已將SelectDB執行個體所在網段IP添加至源叢集的白名單中。
擷取SelectDB執行個體VPC地址的IP,請參見如何查看雲資料庫 SelectDB 版執行個體所屬VPC的IP網段?
擷取SelectDB執行個體公網的IP地址,通過
ping命令訪問SelectDB執行個體的公網地址,擷取其對應的 IP 位址。
(可選)修改計算叢集(BackEnd)配置,開啟Stream Load操作記錄。
預設情況下,計算叢集不保留Stream Load操作記錄。
如果您需要跟蹤Stream Load的操作情況,需在建立匯入任務之前配置enable_stream_load_record=true並重啟叢集,以啟用Stream Load操作記錄。如需開啟此功能,請提交工單以獲得支援人員。
(可選)修改計算叢集配置,調整Stream load的最大匯入限制。
預設情況下,Stream Load匯入檔案的最大限制為10240MB。
如果您的原始檔案超過此值,則需調整後端參數streaming_load_max_mb。修改參數,請參見參數配置。
(可選)修改FE配置,調整匯入逾時時間。
預設情況下,Stream Load的匯入任務逾時時間為600秒。如果匯入任務在設定的逾時時間內未完成,系統將取消該任務,並將其狀態更改為CANCELLED。
如果匯入的源檔案無法在規定時間內完成,您可以在Stream Load請求中設定單獨的逾時時間,或者調整FE參數stream_load_default_timeout_second並重啟執行個體以設定全域的預設逾時時間。如需調整,請提交工單以獲得支援人員。
注意事項
單次Stream Load可以寫入幾百MB至1GB的資料。在業務情境中,頻繁的寫入少量資料可能導致執行個體效能大幅下降,甚至資料庫表死結。強烈建議您降低寫入頻率,對資料進行攢批處理。
業務端攢批:您需自行收集業務資料,然後向SelectDB發起Stream Load請求。
服務端攢批:SelectDB接收到Stream Load請求後,服務端將進行請求資料的批處理操作。如何操作,請參見Group Commit。
建立匯入任務
Stream Load通過HTTP協議提交和傳輸資料。以下為通過curl命令提交匯入樣本,該命令可在Linux或macOS系統的終端或Windows系統的命令提示字元下執行。此外,Stream Load還支援通過其他HTTP用戶端進行操作。
文法
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load參數說明
參數名稱 | 是否必選 | 參數說明 |
| 是 | 需要認證時,會將 |
| 是 | 指定SelectDB執行個體的使用者名稱和密碼。
|
| 否 | 指定本次Stream Load匯入請求的要求標頭(Header)內容。格式如下:
常見參數如下:
更多要求標頭參數,請參見要求標頭參數說明。 |
| 是 | 指定需要匯入資料的檔案路徑。 file_path:目標檔案路徑。 |
| 是 | HTTP請求的Method,採用PUT要求方法,指定SelectDB的資料匯入地址,具體參數如下:
|
要求標頭參數說明
Stream Load使用HTTP協議,因此匯入任務有關的參數主要設定在要求標頭(Header)中。常用的匯入參數如下。
參數名稱 | 參數說明 |
| 匯入任務的唯一標識。
重要 推薦同一批次資料使用相同的 |
| 指定匯入資料格式。
各類檔案的格式要求以及一些相關參數使用,請參見檔案格式。 |
| 指定匯入檔案中的分行符號。 您也可以使用多個字元的組合作為分行符號。例如,在Windows系統中,使用\r\n作為分行符號。
|
| 指定匯入檔案中的資料行分隔符號。 您也可使用多個字元的組合作為資料行分隔符號。例如,可以使用雙豎線 如果是不可見字元,則需要加
|
| 指定檔案的壓縮格式。僅支援 支援的壓縮格式: |
| 指定匯入任務的最大容忍率。 當匯入的錯誤率超過該閾值時,匯入將失敗。如需忽略錯誤行,必須將該參數設定為大於0,以確保匯入成功。
|
| 指定是否開啟嚴格過濾模式。
|
| 指定匯入使用的叢集。 預設為該執行個體的預設叢集。如果該執行個體沒有設定預設叢集,則會自動選擇一個有許可權的叢集。 |
| 指定是否僅將資料匯入到對應分區的一個tablet。該參數僅允許在對具有random分桶的Duplicate表進行資料匯入時設定。
|
| 指定匯入任務的過濾條件。 支援對未經處理資料指定
|
| 指定待匯入資料的分區(Partition)資訊。 如果待匯入資料不屬於指定的分區(Partition)則不會被匯入。這些資料將計入dpp.abnorm.ALL。
|
| 指定待匯入資料的函數變換配置。 支援的函數變換方法包含列的順序變化以及運算式變換,其中運算式變換的方法與查詢語句的一致。 |
| 指定資料合併類型。
重要
|
| 僅在指定 |
| 僅適用於Unique模型,相同Key列下,保證Value列按照source_sequence列進行REPLACE,source_sequence可以是資料來源中的列,也可以是表結構中的一列。 |
| 指定匯入記憶體限制。
|
| 指定匯入的逾時時間。
|
| 指定本次匯入所使用的時區。該參數會影響所有匯入涉及的和時區有關的函數結果。有關時區,您可通過IANA時區資料庫查看。 預設值: |
| 指定是否開啟兩階段事務提交模式。
|
jsonpaths | 匯入JSON資料格式有兩種方式:
|
json_root |
預設值:"",代表選擇整個JSON作為匯入解析的根節點。 |
read_json_by_line | Stream Load 中處理 JSON 格式資料的一個重要參數,它控制著如何解析包含多行JSON資料的輸入檔案。
|
strip_outer_array | Stream Load 中處理JSON格式資料的一個重要參數,它控制著如何解析包含外層數組的JSON資料。
重要 當需要匯入JSON格式的資料時,非數組格式的效能大幅高於數組格式。 |
樣本
將CSV格式的檔案data.csv匯入至VPC地址為selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com執行個體的test_db庫的test_table表中。此處僅提供建立匯入的curl指令樣本,完整樣本請參見匯入資料的完整樣本。
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load返回結果說明
Stream load是一種同步匯入方式,匯入結果將通過建立匯入的傳回值直接返回。返回結果樣本如下。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}返回結果參數說明如下。
參數名稱 | 參數說明 |
| 匯入的事務ID。 |
| 匯入標識。 您可自訂標識,也可通過系統自動產生。 |
| 匯入狀態,取值如下:
|
| 已存在的 該欄位僅在Status為 您通過此狀態,判斷已存在Label對應的匯入任務的狀態。
|
| 錯誤資訊提示。 |
| 匯入總處理的行數。 |
| 成功匯入的行數。 |
| 資料品質不合格的行數。 |
| 被 |
| 匯入的位元組數。 |
| 匯入耗時。 單位:毫秒。 |
| 向FE請求開始一個事務所花費的時間。 單位:毫秒。 |
| 向FE請求擷取匯入資料執行計畫所花費的時間。 單位:毫秒。 |
| 讀取資料所花費的時間。 單位:毫秒。 |
| 執行寫入資料操作所花費的時間。 單位:毫秒。 |
| 向FE請求提交並且發布事務所花費的時間。 單位:毫秒。 |
| 如果有資料品質問題,可通過訪問這個URL查看具體錯誤行。 |
取消匯入任務
Stream Load匯入任務建立後,無法手動取消。任務僅在出現逾時或匯入錯誤的情況下,由系統自動取消。您可根據返回結果中的errorUrl下載報錯資訊,進行錯誤排查。
查看Stream Load任務
如果您已開啟Stream Load操作記錄,您可以通過MySQL用戶端串連雲資料庫SelectDB版執行個體,執行show stream load語句查看已經完成的Stream Load任務。
匯入資料的完整樣本
準備工作
開始匯入操作前,您需完成準備工作。
匯入CSV資料
通過指令碼匯入樣本
建立待匯入資料的表。
串連SelectDB執行個體。具體操作,請參見通過DMS串連雲資料庫SelectDB版執行個體。
執行建庫語句。
CREATE DATABASE test_db;執行建表語句。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
在發起Stream Load的終端所在裝置,建立待匯入檔案
test.csv。1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.com匯入資料。
開啟目標裝置的終端,通過curl命令發起Stream Load任務,匯入資料。
建立匯入任務的具體文法以及參數說明,請參見建立匯入任務,以下為常見匯入情境的樣本。
使用
Label去重,指定逾時時間。將檔案
test.csv中的資料匯入到資料庫test_db中的test_table表,使用Label避免匯入重複批次的資料,並指定逾時時間為100秒。curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load使用
Label去重,並使用列篩選檔案中要匯入的資料。將檔案
test.csv中的資料匯入到資料庫test_db中的test_table表,使用Label避免匯入重複批次的資料,指定檔案的列名,並且只匯入address等於hangzhou的資料。curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load允許20%容錯率。
將檔案
test.csv中的資料匯入到資料庫test_db中的test_table表,允許20%的錯誤率。curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load使用strict 模式並設定時區。
匯入資料進行strict 模式過濾,並設定時區為
Africa/Abidjan。curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load刪除SelectDB中的資料。
刪除SelectDB中與test.csv檔案中相同的資料。
curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load根據條件刪除檔案中不需要匯入的資料,剩餘資料正常匯入SelectDB。
將test.csv檔案中address列為hangzhou的資料的行刪除,其他行正常匯入至SelectDB。
curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load
通過Java代碼匯入樣本
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. 參數配置
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. 構建httpclient,特別注意需要開啟重新導向(isRedirectable)
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// 開啟重新導向
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. 構建httpPut請求對象
HttpPut httpPut = new HttpPut(loadUrl);
// 設定httpHeader...
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. 設定要發送的資料,這裡寫入csv
// 假設有一張表,欄位如下:
// field1,field2,field3,field4
// 這裡類比了三條csv記錄,doris 中csv的行分隔字元預設是\n,資料行分隔符號預設是\t
// String data =
// "1\t2\t3\t4\n" +
// "11\t22\t33\t44\n" +
// "111\t222\t333\t444";
// 讀取所有行
List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
// 用\n串連所有行
String data = String.join("\n", lines);
httpPut.setEntity(new StringEntity(data));
// 5. 發送請求,處理結果
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// 選擇適合的JSON序列化組件,對返回結果進行序列化
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// 擷取SelectDB返回的狀態代碼...
String dorisStatus = respAsMap.get("Status");
// SelectDB返回以下狀態,都表示資料寫入成功
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}匯入JSON資料
建立待匯入資料的表。
串連SelectDB執行個體。具體操作,請參見通過DMS串連雲資料庫SelectDB版執行個體。
執行建庫語句。
CREATE DATABASE test_db;執行建表語句。
CREATE TABLE test_table ( id int, name varchar(50), age int ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES("replication_num" = "1");
匯入資料。
重要當需要匯入JSON格式的資料時,非數組格式的效能大幅高於數組格式。
匯入非數組格式資料
在發起Stream Load的終端,建立
json.data,檔案包含多行,一行一個JSON記錄。內容如下:{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}匯入資料。
開啟裝置終端,通過curl命令發起Stream Load任務,將檔案
json.data中的資料匯入到資料庫test_db的test_table表中。curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
匯入數組格式資料
在發起Stream Load的終端,建立JSON數組格式的資料檔案
json_array.data。[ {"userid":1,"username":"Emily","userage":25}, {"userid":2,"username":"Benjamin","userage":35}, {"userid":3,"username":"Olivia","userage":28}, {"userid":4,"username":"Alexander","userage":60}, {"userid":5,"username":"Ava","userage":17} ]匯入資料。
開啟裝置的終端,通過curl命令發起Stream Load任務,將本地檔案
json_array.data中的資料匯入到資料庫test_db的test_table表中。curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
Http Stream模式
在Stream Load中,依託Table Value Function(TVF)功能,可以通過使用SQL運算式來表達匯入的參數。這個Stream Load依託TVF功能後名為http_stream。更多Table Value Function(TVF)的使用方式,詳情請參見TVF。
使用http_stream進行Stream Load匯入時的Rest API URL不同於Stream Load普通匯入的 URL。
普通Stream Load的URL為:
http://host:http_port/api/{db}/{table}/_stream_load。使用TVF http_stream的URL 為:
http://host:http_port/api/_http_stream。
文法
Stream Load的Http Stream模式。
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_streamHttp Stream參數說明請參見參數說明。
使用樣本
在Http Header中添加一個SQL的參數load_sql,去替代之前參數中的column_separator、line_delimiter、where、columns等參數,SQL參數load_sql樣本如下。
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");完整樣本:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_stream常見問題
Q1:匯入處理程序中,報get table cloud commit lock timeout怎麼辦?
由於您寫入資料頻率太快,導致表死結。強烈建議您降低寫入頻率,對資料進行攢批處理。單次Stream Load可以寫入幾百MB至1GB的資料。
Q2:匯入CSV檔案格式時,資料中存在資料行分隔符號和行分隔字元應該如何處理?
您需重新指定資料行分隔符號和行分隔字元,並修改匯入資料文本,確保資料與分割符不衝突,使得資料能被正常解析。樣本如下:
存在行分隔字元
如果匯入資料中包含已指定的分行符號,例如預設的分行符號\n,則需重新指定分行符號。
例如,您的資料檔案為:
張三\n,25,陝西
李四\n,30,北京此情境中,檔案中的\n為資料而非分行符號,但該檔案的預設分行符號也是\n,如需檔案能被正常解析,您需通過line_delimiter指定分行符號,資料文本每行資料行末也需顯示寫分行符號。樣本如下:
設定匯入行分行符號。
例如,您將預設的分行符號
\n替換為\r\n,則匯入資料時,您需設定-H "line_delimiter:\r\n"。為匯入資料行末添加指定的分行符號。上述樣本文本則需修改為:
張三\n,25,陝西\r\n 李四\n,30,北京\r\n
存在資料行分隔符號
如果匯入資料中包含已指定的資料行分隔符號,例如預設的列分割符\t,則需重新指定列分割符。
例如,您的資料檔案為:
張三\t 25 陝西
李四\t 30 北京此情境中,檔案中的\t為資料而非資料行分隔符號,但該檔案預設使用的資料行分隔符號也是\t(定位字元),如需檔案能被正常解析,您需通過column_separator重新指定資料行分隔符號,資料文本每行資料也需顯示增加資料行分隔符號。樣本如下:
設定匯入列分割符。
例如,您將預設的列分割符
\t替換為逗號,,則匯入資料時,您需設定-H "column_separator:,"。為匯入資料列添加指定的資料行分隔符號。上述樣本文本則為:
張三\t,25,陝西 李四\t,30,北京