StarRocks allows you to import data of a size less than 10 GB from local CSV files. This topic describes the basic principles and best practices of Stream Load. This topic also provides examples on how to import data in Stream Load mode.
Background information
Stream Load is a synchronous import method used to import local files or data streams to StarRocks by using HTTP requests. Stream Load synchronously imports data and returns the import result. You can determine whether the import is successful based on the return value of the request.
Term
coordinator: the node that receives data, distributes data to other data nodes, and returns a result after the data is imported.
How it works
Stream Load submits an import command to StarRocks by using the HTTP protocol. If the command is submitted to the frontend (FE) node, the FE node forwards the request to a backend (BE) node by using an HTTP redirect. You can also submit the command directly to a BE node. The BE node functions as a coordinator node to divide data by table schemas and distribute the data to related BE nodes. The data import result is returned to you by the coordinator node.
Examples
Create an import job
Stream Load submits and transfers data by using the HTTP protocol. In this example, the curl
command is used to submit an import job. You can also use other HTTP clients to perform operations.
- Syntax:
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \ http://fe_host:http_port/api/{db}/{table}/_stream_load
Note- HTTP chunked transfer encoding and other methods are supported. For other methods, you must use the Content-Length header field to specify the length of the content to be uploaded. This ensures data integrity.
- We recommend that you set the Expect header field to 100-continue to prevent unnecessary data transmission when errors occur.
You can view the supported header properties in the description of the import job parameters in the following table. The parameters are in the
-H "key1:value1"
format. If multiple job parameters are involved, you must use multiple -H to indicate the parameters. Example:-H "key1:value1" -H "key2:value2"
.You can run the
HELP STREAM LOAD
command to view the detailed syntax for creating an import job. In Stream Load mode, all parameters related to the import job are set in the header. The following table describes the parameters.Parameter Description Signature parameter user:passwd Stream Load uses the HTTP protocol to create an import job. A signature is generated for the import job by using basic access authentication. StarRocks authenticates user identity and import permissions based on the signature. Import job parameters label The label of the import job. Data with the same label cannot be repeatedly imported. You can specify a label for data to prevent the data from being repeatedly imported. StarRocks retains labels for jobs that have been completed in the last 30 minutes.
column_separator The column delimiter of the imported file. Default value: \t. If a non-printable character is used as a column delimiter, the delimiter must be in the hexadecimal format and start with the \x prefix. For example, if the column delimiter of a Hive file is \x01, set the parameter to
-H "column_separator:\x01"
.row_delimiter The row delimiter of the imported file. Default value: \n. Important \n cannot be passed by using the curl command. If you specify \n as the row delimiter, the shell script first passes the backward slash (\) and then n, instead of directly passing \n.You can escape a string by using a Bash script. If you want to pass \n and \t, you can start the string with a dollar sign ($) and a full-width single quotation mark ('), and end the string with a half-width single quotation mark ('). Example:
-H $'row_delimiter:\n'
.columns Specifies the mapping between the columns in the imported file and the columns in the StarRocks table. If the columns in the source file are the same as those in the StarRocks table, you do not need to set this parameter. Otherwise, you must set this parameter to configure a data conversion rule. You can set this parameter in two ways. Set this parameter to the column names in the imported file if the columns in the StarRocks table are the same as those in the imported file. Alternatively, you can set this parameter based on calculations.- Example 1: The StarRocks table contains the following three columns:
c1, c2, and c3
. The source file contains the following three columns in sequence:c3, c2, c1
. In this case, you need to set this parameter to-H "columns: c3, c2, c1"
. - Example 2: The StarRocks table contains the following three columns:
c1, c2, and c3
. The source file contains four columns. The first three columns in the source file are the same those in the StarRocks table. In this case, you need to set this parameter to-H "columns: c1, c2, c3, temp"
. You can specify a custom name for the fourth column as a placeholder. - Example 3: The StarRocks table contains the following three columns:
year, month, and day
. The source file contains only a time column in the 2018-06-01 01:02:03 format. In this case, you can set this parameter to-H "columns: col, year = year(col), month=month(col), day=day(col)"
.
where Specifies the data filter condition. You can set this parameter to filter out unnecessary data. For example, if you want to import only the data in the k1 column whose value is 20180601, you can set this parameter to
-H "where: k1 = 20180601"
during data import.max_filter_ratio Specifies the maximum ratio of data that can be filtered out. For example, data is filtered out because it does not conform to standards. Default value: 0. Note Data that does not conform to standards does not include the data that is filtered out by the WHERE condition.partitions Specifies the partitions to which data is imported. We recommend that you set this parameter if you know the partitions to which data is imported. Data that does not belong to the specified partitions will be filtered out. For example, if you want to import data to the p1 and p2 partitions, you can set this parameter to
-H "partitions: p1, p2"
.timeout Specifies the timeout period of the import job. The default timeout period is 600 seconds. Valid values: 1 to 259200. Unit: seconds.
strict_mode Specifies whether the strict mode is enabled for this import job. By default, the strict mode is enabled. You can disable the strict mode by setting the parameter to
-H "strict_mode: false"
.timezone Specifies the time zone of the import job. The default time zone is UTC+8. This parameter affects the results of all time zone-related functions involved in the import job.
exec_mem_limit Specifies the maximum size of memory that is available for the import job. Default value: 2. Unit: GB. - Example
curl --location-trusted -u root -T date -H "label:123" \ http://abc.com:8030/api/test/date/_stream_load
- Return results
After the import job is complete, the information about the import job is returned in JSON format. Sample return results:
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "ErrorURL": "[http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005](http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005)" }
Parameter Description TxnId The transaction ID of the import job. The transaction ID can be fully managed by Alibaba Cloud. Label The label of the import job. You can specify the label, or the label can be automatically generated by the system. Status The state of the import job. Valid values: - Success: The import job is successful.
- Publish Timeout: The import job is complete, but the data may be delayed. You do not need to try again.
- Label Already Exists: The label already exists. You must change the label.
- Fail: The import job fails.
ExistingJobStatus The state of the import job that corresponds to the existing label. This parameter is displayed only if the value of the Status parameter is Label Already Exists. You can know the state of the import job that corresponds to the existing label based on the returned value. Valid values: - RUNNING: The import job is in progress.
- FINISHED: The import job is successful.
Message The detailed description of the state of the import job. If the import job fails, the detailed failure cause is returned. NumberTotalRows The total number of data rows read from the data stream. NumberLoadedRows The number of imported data rows in the import job. This parameter is valid only if the import job is in the Success state. NumberFilteredRows The number of data rows filtered out in the import job. The data rows whose quality does not conform to standards are filtered out. NumberUnselectedRows The number of data rows that are filtered out by the WHERE condition. LoadBytes The data size of the source file. LoadTimeMs The duration of the import job. Unit: milliseconds. ErrorURL The URL of the data entries that are filtered out. Only the first 1,000 data entries are retained. If the import job fails, you can run the following command to obtain the data that is filtered out. Then, you can analyze the data and make adjustments. wget http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005
Cancel an import job
In Stream Load mode, an import job cannot be manually canceled. If an import job times out or an error occurs, StarRocks automatically cancels the job.
Best practices
Scenarios
Stream Load is optimal for scenarios where the source files are stored in memory or local disks. Stream Load is a synchronous import method. You can synchronously obtain the result of an import job by using Stream Load.
Data size
In Stream Load mode, a BE node imports and distributes data. We recommend that you use Stream Load to import data of a size from 1 GB to 10 GB. By default, the maximum size of data that can be imported in Stream Load mode is 10 GB. Therefore, you must modify the streaming_load_max_mb parameter of the BE node to import a file whose size exceeds 10 GB. For example, if the size of the file to be imported is 15 GB, you can set the streaming_load_max_mb parameter of the BE node to a data size greater than 15 GB.
The default timeout period of an import job in Stream Load mode is 300 seconds. If the size of the file to be imported exceeds 3 GB, you need to modify the timeout period of the import job due to the limited import speed of StarRocks. For example, to import a 10-GB file, you need to set the timeout period to 1,000 seconds.
Timeout period of an import job = Size of data to be imported/10 Mbit/s
. You need to calculate the average import speed based on your cluster.
Complete example
Data information: You want to import about 15-GB data from the /home/store-sales directory of your local disk to the store-sales table in the bj-sales database.
Cluster information: The number of import jobs that can be concurrently processed is not affected by the cluster size.
- Modify the BE.conf configuration files of the BE nodes because the size of the file to be imported exceeds 10 GB.
For example, you can set the streaming_load_max_mb parameter to 16000.
- Determine whether the import duration exceeds the default timeout period. In this case, the import duration is 1,500 seconds, which is calculated based on the following formula:
15,000 MB/10 Mbit/s = 1,500s
. If the import duration exceeds the default timeout period, you must modify the FE.conf configuration file of the FE node and set the stream_load_default_timeout_second parameter to 1500. - Run the following command to create an import job:
curl --location-trusted -u user:password -T /home/store_sales \ -H "label:abc" [http://abc.com:8000/api/bj_sales/store_sales/_stream_load](http://abc.com:8000/api/bj_sales/store_sales/_stream_load)
Sample code for integration
- Develop Stream Load by using Java. For more information, see stream_load.
- Integrate Spark with Stream Load. For more information, see 01_sparkStreaming2StarRocks.