All Products
Search
Document Center

ApsaraDB for SelectDB:Import data by using Stream Load

Last Updated:Jul 26, 2024

If you want to import local files or data streams into an ApsaraDB for SelectDB instance, you can use Stream Load to synchronously import data and determine whether the data is imported based on the response. This topic describes how to use Stream Load to import data into an ApsaraDB for SelectDB instance.

Background information

Stream Load is a synchronous method for importing data. This method allows you to import local files or data streams into ApsaraDB for SelectDB instances by sending HTTP requests. Stream Load synchronously imports data and returns the import results. You can determine whether a Stream Load job is successful based on the response.

Stream Load is suitable for importing local files or importing data streams by using applications. You can use Stream Load to import data in the CSV, JSON, Parquet, or Optimized Row Columnar (ORC) format.

Create a Stream Load job

Stream Load submits and transfers data over the HTTP protocol. In this example, the curl command is used to submit a Stream Load job. You can also use other HTTP clients to submit a Stream Load job.

Syntax

# The parameters that you can specify in the header, which are described in the following table. 
# Format: -H "key1:value1". 
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_name> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

Parameters

Parameter

Description

--location-trusted

Specifies that the username and password are passed to the server to which the request is redirected if authentication is required.

-u

The username and password that are used to connect to the ApsaraDB for SelectDB instance.

-H

The request header.

-T

The data file to be imported.

-XPUT

The method of the HTTP request. In this case, the PUT method is used. You need to specify the URL of the ApsaraDB for SelectDB instance into which data is imported. The following parameters are included:

  • host: the virtual private cloud (VPC) endpoint or public endpoint of the ApsaraDB for SelectDB instance.

    Note

    For more information about how to apply for a public endpoint, see Apply for or release a public endpoint.

  • port: the port number of the ApsaraDB for SelectDB instance. Default value: 8080.

    Note

    You can view the endpoint and port number of an ApsaraDB for SelectDB instance on the instance details page of the ApsaraDB for SelectDB instance.

  • db_name: the name of the database.

  • table_name: the name of the table.

Stream Load uses the HTTP protocol. Therefore, the parameters related to the Stream Load job are mainly specified in the header. The following table describes the common parameters that are used to import data.

Parameter

Description

label

The unique identifier of the Stream Load job. You can customize a label for your Stream Load job in the import statement. After the Stream Load job is submitted, you can query the status of the job based on the label. A unique label can also prevent repeated import of the same data. If the Stream Load job that is associated with a label is in the CANCELLED state, the label can be reused.

Note

We recommend that you use the same label for the same batch of data. This way, repeated requests for importing the same batch of data are accepted only once. This implements the at-most-once semantics.

format

The format of the data to be imported. Default value: CSV. Valid values: CSV, JSON, PARQUET, ORC, csv_with_names (skip the first row of a CSV file), and csv_with_names_and_types (skip the first two rows of a CSV file).

line_delimiter

The row delimiter of the file to be imported. Default value: \n. You can use a combination of multiple characters as a row delimiter.

column_separator

The column delimiter of the file to be imported. Default value: \t. You can use a combination of multiple characters as a column delimiter. If you want to specify an invisible character as the column delimiter, add \x as the prefix and set the delimiter in hexadecimal. For example, if the column delimiter of a Hive file is \x01, specify this parameter in the following format: -H "column_separator:\x01".

compress_type

The compression format of the file. Only CSV files can be compressed. Supported compression formats include .gz, .lzo, .bz2, .lz4, .lzop, and .deflate.

max_filter_ratio

The maximum tolerance rate of the Stream Load job. Valid values: 0 to 1. Default value: 0. If the error rate of the Stream Load job exceeds the maximum tolerance rate, the job fails. If you want to ignore error data rows, set this parameter to a value that is greater than 0 to ensure that the Stream Load job can succeed.

strict_mode

Specifies whether to enable the strict mode. Default value: false. In strict mode, the error data that occurs during column type conversion is strictly filtered out in the import process.

cloud_cluster

The cluster that is used to import data. By default, the default cluster of the instance is used. If the instance does not have the default cluster, the system automatically selects a cluster on which you have the access permissions.

load_to_single_tablet

Specifies whether to import data into only one tablet in the corresponding partition. Default value: false. This parameter is available only if you import data into a table that uses the DUPLICATE KEY model and contains random partitions.

where

The filter conditions that are specified for the Stream Load job. Stream Load allows you to specify a WHERE clause to filter the source data. The filtered data is not imported or used to calculate the error rate of the Stream Load job, but is counted in the value of the num_rows_unselected parameter.

partitions

The partitions into which data is to be imported in the table. If the data to be imported does not belong to the specified partitions, the data is not imported. The number of data rows that are not imported is counted in dpp.abnorm.ALL.

columns

The function transformation configuration of the data to be imported. Stream Load supports column order change and expression transformation. The expression transformation syntax is the same as that you use in a query statement.

merge_type

The type of data merging. Default value: APPEND, which appends new data to the existing data. You can set this parameter to MERGE or DELETE only for tables that use the UNIQUE KEY model. If this parameter is set to MERGE, the delete parameter is required to label the Delete Flag column. If this parameter is set to DELETE, all data is deleted in the current Stream Load job.

delete

The conditions for deleting data. This parameter is valid only if the merge_type parameter is set to MERGE.

function_column.sequence_col

This parameter is available only for tables that use the UNIQUE KEY model and ensures that the REPLACE operation is performed on the value columns based on the specified source_sequence column for records with the same key columns. The source_sequence column can be a column from the data source or an existing column in the schema.

exec_mem_limit

The maximum size of memory that can be allocated to the Stream Load job. Default value: 2147483648. Unit: bytes. The default value specifies that up to 2 GB of memory can be allocated to a Stream Load job.

timeout

The timeout period of the Stream Load job. Default value: 600. Unit: seconds. Valid values: 1 to 259200.

timezone

The time zone that is used for the Stream Load job. Default value: Asia/Shanghai, which specifies UTC+8. This parameter affects the results of all time zone-related functions that are involved in the Stream Load job.

two_phase_commit

Specifies whether to enable the two-phase commit (2PC) mode for the Stream Load job. Default value: false. If the 2PC mode is enabled, the import results are returned immediately after data is written. In this phase, the data is invisible and the transaction is in the PRECOMMITTED state. The data is visible only after you manually trigger a commit operation.

Stream Load is a synchronous method for importing data. Therefore, the results of the Stream Load job are directly returned as a response to your HTTP request. The following sample code provides a sample response:

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

The following table describes the parameters in the response.

Parameter

Description

TxnId

The transaction ID of the Stream Load job.

Label

The label of the Stream Load job. If you have specified a label, the label is returned. If you have not specified a label, the system automatically generates a label.

Status

The state of the Stream Load job. Valid values:

  • Success: The job is successful.

  • Publish Timeout: The job is complete, but the data may be visible after a delay. You do not need to retry the job.

  • Label Already Exists: The label already exists. You must change the label.

  • Fail: The job fails.

ExistingJobStatus

The state of the Stream Load job that is associated with the existing label.

This parameter is displayed only if the value of the Status parameter is Label Already Exists. You can obtain the status of the Stream Load job that is associated with the existing label based on the value of this parameter. A value of RUNNING indicates that the job is in progress. A value of FINISHED indicates that the job is successful.

Message

The error message that is returned for the Stream Load job.

NumberTotalRows

The total number of data rows that are processed by the Stream Load job.

NumberLoadedRows

The number of data rows that are imported.

NumberFilteredRows

The number of data rows that cannot be imported.

NumberUnselectedRows

The number of data rows that are filtered out by the WHERE clause.

LoadBytes

The number of bytes that are imported by the Stream Load job.

LoadTimeMs

The duration of the Stream Load job. Unit: millisecond.

BeginTxnTimeMs

The amount of time that is consumed to request the frontend (FE) to start a transaction. Unit: millisecond.

StreamLoadPutTimeMs

The amount of time that is consumed to request the FE to return an execution plan for the Stream Load job. Unit: millisecond.

ReadDataTimeMs

The amount of time that is consumed to read data. Unit: millisecond.

WriteDataTimeMs

The amount of time that is consumed to write data. Unit: millisecond.

CommitAndPublishTimeMs

The amount of time that is consumed to request the FE to commit and publish the transaction. Unit: millisecond.

ErrorURL

The URL from which you can view the error data rows.

The following sample code shows how to use Stream Load to import data:

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Cancel a Stream Load job

You cannot manually cancel a Stream Load job. Stream Load jobs are automatically canceled by the system if a timeout error or an import error occurs.

Query a Stream Load job

You can execute the SHOW STREAM LOAD statement to query the information about a Stream Load job that is complete. By default, backends (BEs) do not record information about a Stream Load job. If you want to enable BEs to record such information, set the enable_stream_load_record parameter in BE configurations to true. For more information, see BE Configuration.

Examples

Sample script

  1. Create a table into which you want to import data in an ApsaraDB for SelectDB instance.

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 16
    PROPERTIES("replication_num" = "1");
  2. Create a file to be imported, which is named test.data.Add the following data to the test.data file:

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. Submit a Steam Load job to import data. You can modify the parameters of the job based on your business requirements. Examples:

    • Import data from the test.data local file into the test_table table in the test_db database. Use a label to deduplicate data records and set the timeout period to 100 seconds.

       curl --location-trusted -u root -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
    • Import data from the test.data local file into the test_table table in the test_db database. Use a label to deduplicate data records, specify the column names in the file, and import only data whose value of the address column is hangzhou.

      curl --location-trusted -u root -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
    • Import data from the test.data local file into the test_table table in the test_db database. Set the maximum tolerance rate to 20%.

      curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • Filter the imported data in strict mode and set the time zone to Africa/Abidjan.

      curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • In the test_table in the test_db database, delete the data that has the same keys as the data to be imported from the test.data file.

      curl --location-trusted -u root -H "merge_type: DELETE" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • Delete the rows whose value of the address column is hangzhou from the data to be imported, and append the remaining rows to the test_table table in the test_db database.

      curl --location-trusted -u root: -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.data http://host:port/api/testDb/testTbl/_stream_load

Sample code in Java

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. Configure the parameters.
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. Construct an HTTP client. Take note that the redirection feature must be enabled for the client.
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // Enable the redirection feature.
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. Construct an HTTP PUT request object.
        HttpPut httpPut = new HttpPut(loadUrl);

        // Configure request headers.
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. Specify the data to be imported. In this example, a CSV file is specified.
        // For example, you want to import data from a table that contains the following fields.
        // field1,field2,field3,field4
        // The CSV file contains the following three records. By default, the row delimiter is \n and the column delimiter is \t for a CSV file.
        String data =
                "1\t2\t3\t4\n" +
                "11\t22\t33\t44\n" +
                "111\t222\t333\t444";

        httpPut.setEntity(new StringEntity(data));

        // 5. Send the request and process the returned result.
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // Serialize the returned result by using an appropriate JSON serialization component.
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // Obtain the status code returned by ApsaraDB for SelectDB.
            String dorisStatus = respAsMap.get("Status");
            // If ApsaraDB for SelectDB returns the following status data, the data is imported.
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

Related system configurations

FE configurations

stream_load_default_timeout_second: the timeout period of a Stream Load job. Unit: seconds. Default value: 600. If a Stream Load job is not complete within the specified timeout period, the system cancels the job and the state of the job changes to CANCELLED. If you estimate that the source file cannot be imported within the specified timeout period, you can set a separate timeout period in the HTTP request when you submit a Stream Load job, or modify the stream_load_default_timeout_second parameter in FE configurations to specify the global default timeout period.

BE configurations

streaming_load_max_mb: the maximum amount of data that can be imported by a Stream Load job. Default value: 10240. Unit: MB. If the size of your source file exceeds this threshold, you must modify the streaming_load_max_mb parameter in BE configurations.

HTTP Stream mode

Stream Load allows you to use a table-valued function (TVF) to specify request parameters in an SQL statement. This TVF is named http_stream. For more information about how to use TVFs, see Analyzing File.

The RESTful API URL of Stream Load in HTTP Stream mode is different from the URL of Stream Load in normal mode.

  • URL of Stream Load in normal mode: http://host:http_port/api/{db}/{table}/_stream_load.

  • URL of Stream Load in HTTP Stream mode: http://host:http_port/api/_http_stream.

Syntax

To submit a Stream Load job in HTTP Stream mode, run the following command:

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

For more information about the parameters of a Stream Load job in HTTP Stream mode, see the Parameters section of this topic.

Example

You can use the load_sql parameter to replace the column_separator, line_delimiter, where, and columns parameters in the HTTP header. The following sample code shows the SQL statement specified by the load_sql parameter:

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

Complete sample command:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream