If you want to import local files or data streams into an ApsaraDB for SelectDB instance, you can use Stream Load to synchronously import data and determine whether the data is imported based on the response. This topic describes how to use Stream Load to import data into an ApsaraDB for SelectDB instance.
Background information
Stream Load is a synchronous method for importing data. This method allows you to import local files or data streams into ApsaraDB for SelectDB instances by sending HTTP requests. Stream Load synchronously imports data and returns the import results. You can determine whether a Stream Load job is successful based on the response.
Stream Load is suitable for importing local files or importing data streams by using applications. You can use Stream Load to import data in the CSV
, JSON
, Parquet
, or Optimized Row Columnar (ORC)
format.
Create a Stream Load job
Stream Load submits and transfers data over the HTTP protocol. In this example, the curl command is used to submit a Stream Load job. You can also use other HTTP clients to submit a Stream Load job.
Syntax
# The parameters that you can specify in the header, which are described in the following table.
# Format: -H "key1:value1".
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_name> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load
Parameters
Parameter | Description |
| Specifies that the |
| The username and password that are used to connect to the ApsaraDB for SelectDB instance. |
| The request header. |
| The data file to be imported. |
| The method of the HTTP request. In this case, the PUT method is used. You need to specify the URL of the ApsaraDB for SelectDB instance into which data is imported. The following parameters are included:
|
Stream Load uses the HTTP protocol. Therefore, the parameters related to the Stream Load job are mainly specified in the header. The following table describes the common parameters that are used to import data.
Parameter | Description |
| The unique identifier of the Stream Load job. You can customize a Note We recommend that you use the same |
| The format of the data to be imported. Default value: |
| The row delimiter of the file to be imported. Default value: |
| The column delimiter of the file to be imported. Default value: |
| The compression format of the file. Only |
| The maximum tolerance rate of the Stream Load job. Valid values: |
| Specifies whether to enable the strict mode. Default value: |
| The cluster that is used to import data. By default, the default cluster of the instance is used. If the instance does not have the default cluster, the system automatically selects a cluster on which you have the access permissions. |
| Specifies whether to import data into only one tablet in the corresponding partition. Default value: |
| The filter conditions that are specified for the Stream Load job. Stream Load allows you to specify a |
| The partitions into which data is to be imported in the table. If the data to be imported does not belong to the specified partitions, the data is not imported. The number of data rows that are not imported is counted in dpp.abnorm.ALL. |
| The function transformation configuration of the data to be imported. Stream Load supports column order change and expression transformation. The expression transformation syntax is the same as that you use in a query statement. |
| The type of data merging. Default value: |
| The conditions for deleting data. This parameter is valid only if the |
| This parameter is available only for tables that use the UNIQUE KEY model and ensures that the REPLACE operation is performed on the value columns based on the specified source_sequence column for records with the same key columns. The source_sequence column can be a column from the data source or an existing column in the schema. |
| The maximum size of memory that can be allocated to the Stream Load job. Default value: |
| The timeout period of the Stream Load job. Default value: |
| The time zone that is used for the Stream Load job. Default value: |
| Specifies whether to enable the two-phase commit (2PC) mode for the Stream Load job. Default value: |
Stream Load is a synchronous method for importing data. Therefore, the results of the Stream Load job are directly returned as a response to your HTTP request. The following sample code provides a sample response:
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}
The following table describes the parameters in the response.
Parameter | Description |
| The transaction ID of the Stream Load job. |
| The |
| The state of the Stream Load job. Valid values:
|
| The state of the Stream Load job that is associated with the existing This parameter is displayed only if the value of the Status parameter is |
| The error message that is returned for the Stream Load job. |
| The total number of data rows that are processed by the Stream Load job. |
| The number of data rows that are imported. |
| The number of data rows that cannot be imported. |
| The number of data rows that are filtered out by the |
| The number of bytes that are imported by the Stream Load job. |
| The duration of the Stream Load job. Unit: millisecond. |
| The amount of time that is consumed to request the frontend (FE) to start a transaction. Unit: millisecond. |
| The amount of time that is consumed to request the FE to return an execution plan for the Stream Load job. Unit: millisecond. |
| The amount of time that is consumed to read data. Unit: millisecond. |
| The amount of time that is consumed to write data. Unit: millisecond. |
| The amount of time that is consumed to request the FE to commit and publish the transaction. Unit: millisecond. |
| The URL from which you can view the error data rows. |
The following sample code shows how to use Stream Load to import data:
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
Cancel a Stream Load job
You cannot manually cancel a Stream Load job. Stream Load jobs are automatically canceled by the system if a timeout error or an import error occurs.
Query a Stream Load job
You can execute the SHOW STREAM LOAD
statement to query the information about a Stream Load job that is complete. By default, backends (BEs) do not record information about a Stream Load job. If you want to enable BEs to record such information, set the enable_stream_load_record parameter in BE configurations to true. For more information, see BE Configuration.
Examples
Sample script
Create a table into which you want to import data in an ApsaraDB for SelectDB instance.
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
Create a file to be imported, which is named
test.data
.Add the following data to the test.data file:1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.com
Submit a Steam Load job to import data. You can modify the parameters of the job based on your business requirements. Examples:
Import data from the
test.data
local file into thetest_table
table in thetest_db
database. Use alabel
to deduplicate data records and set the timeout period to 100 seconds.curl --location-trusted -u root -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
Import data from the
test.data
local file into thetest_table
table in thetest_db
database. Use a label to deduplicate data records, specify the column names in the file, and import only data whose value of the address column is hangzhou.curl --location-trusted -u root -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
Import data from the
test.data
local file into thetest_table
table in thetest_db
database. Set the maximum tolerance rate to 20%.curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
Filter the imported data in strict mode and set the time zone to
Africa/Abidjan
.curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
In the test_table in the test_db database, delete the data that has the same keys as the data to be imported from the test.data file.
curl --location-trusted -u root -H "merge_type: DELETE" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
Delete the rows whose value of the address column is hangzhou from the data to be imported, and append the remaining rows to the test_table table in the test_db database.
curl --location-trusted -u root: -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.data http://host:port/api/testDb/testTbl/_stream_load
Sample code in Java
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. Configure the parameters.
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. Construct an HTTP client. Take note that the redirection feature must be enabled for the client.
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// Enable the redirection feature.
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. Construct an HTTP PUT request object.
HttpPut httpPut = new HttpPut(loadUrl);
// Configure request headers.
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. Specify the data to be imported. In this example, a CSV file is specified.
// For example, you want to import data from a table that contains the following fields.
// field1,field2,field3,field4
// The CSV file contains the following three records. By default, the row delimiter is \n and the column delimiter is \t for a CSV file.
String data =
"1\t2\t3\t4\n" +
"11\t22\t33\t44\n" +
"111\t222\t333\t444";
httpPut.setEntity(new StringEntity(data));
// 5. Send the request and process the returned result.
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// Serialize the returned result by using an appropriate JSON serialization component.
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// Obtain the status code returned by ApsaraDB for SelectDB.
String dorisStatus = respAsMap.get("Status");
// If ApsaraDB for SelectDB returns the following status data, the data is imported.
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}
Related system configurations
FE configurations
stream_load_default_timeout_second
: the timeout period of a Stream Load job. Unit: seconds. Default value: 600. If a Stream Load job is not complete within the specified timeout period, the system cancels the job and the state of the job changes to CANCELLED. If you estimate that the source file cannot be imported within the specified timeout period, you can set a separate timeout period in the HTTP request when you submit a Stream Load job, or modify the stream_load_default_timeout_second
parameter in FE configurations to specify the global default timeout period.
BE configurations
streaming_load_max_mb
: the maximum amount of data that can be imported by a Stream Load job. Default value: 10240. Unit: MB. If the size of your source file exceeds this threshold, you must modify the streaming_load_max_mb
parameter in BE configurations.
HTTP Stream mode
Stream Load allows you to use a table-valued function (TVF) to specify request parameters in an SQL statement. This TVF is named http_stream
. For more information about how to use TVFs, see Analyzing File.
The RESTful API URL of Stream Load in HTTP Stream mode is different from the URL of Stream Load in normal mode.
URL of Stream Load in normal mode:
http://host:http_port/api/{db}/{table}/_stream_load
.URL of Stream Load in HTTP Stream mode:
http://host:http_port/api/_http_stream
.
Syntax
To submit a Stream Load job in HTTP Stream mode, run the following command:
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream
For more information about the parameters of a Stream Load job in HTTP Stream mode, see the Parameters section of this topic.
Example
You can use the load_sql parameter to replace the column_separator
, line_delimiter
, where
, and columns
parameters in the HTTP header. The following sample code shows the SQL statement specified by the load_sql parameter:
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");
Complete sample command:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_stream