In Broker Load mode, StarRocks uses broker processes to read data from data sources, such as Apache Hadoop Distributed File System (HDFS) and Alibaba Cloud Object Storage Service (OSS), and uses its computing resources to preprocess and import the data. This topic describes how to import data in Broker Load mode.
Background information
Broker Load is an asynchronous import method. You can create an import job based on the MySQL protocol and execute the SHOW LOAD statement to query the import results. StarRocks supports data import from external storage systems in various file formats including CSV, ORC, and Parquet. We recommend that you run each import job to import tens to hundreds of GB of data at a time.
Import data in Broker Load mode
Query brokers
When you create a StarRocks cluster in E-MapReduce (EMR), brokers are automatically built and started on every core node. You can execute the SHOW PROC statement to query the brokers of a cluster. The following code shows the syntax:
SHOW PROC "/brokers"\G
The following output is returned:
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 2. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 3. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 4. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
4 rows in set (0.00 sec)
Create an import job
Syntax
StarRocks version earlier than 2.5.8
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_name broker_properties [PROPERTIES (key1=value1, ... )]
StarRocks version being 2.5.8 or later
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_properties [PROPERTIES (key1=value1, ... )]
Parameters
You can execute the
HELP BROKER LOAD
statement to view the syntax for creating an import job.Label
The identifier of the import job. Each import job has a unique label. You can define a custom label. Otherwise, a label is generated by the system. The label can be used to query the status of the import job and avoid importing duplicate data. After the state of the import job changes to FINISHED, the label becomes invalid. If the state of the import job changes to CANCELLED, you can use the label to submit the import job again.
data_desc
A string that describes the data to be imported. You can specify multiple data_desc strings, each with information such as the data source address, extract, transform, and load (ETL) functions, destination table, and partitions.
An import job in Broker Load mode allows you to import data to multiple tables at a time. You can specify a data_desc string for each destination table. In each data_desc string, you can specify the data source address, which can contain multiple file_path strings. The file_path strings are used to specify multiple source files. The Broker Lode mode ensures the success atomicity of importing data to multiple tables at a time. The following code shows the parameters that are usually configured in a data_desc string:
data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]
The following table describes the parameters.
Parameter
Description
file_path
Either a specific file path that points to a file, or a file path that contains asterisks (*) as wildcards and points to all files in a directory. The parent directories of the destination directory can also contain wildcards.
The following special characters are supported as wildcards: ? * [] {} ^. For more information about how to use wildcards, see FileSystem.
For example, if you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename//, all files in the partitions of the /tablename directory are imported. If you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/, only files in the 202104 partition of the /tablename directory are imported.
negative
A flag used to indicate that the specified source data has been imported and will be deleted by this import job.
This parameter is applicable if you want to cancel a batch of data that has been imported to aggregate columns of the SUM type in the destination table. After the import job is complete, the batch of data will be deleted from the aggregate columns.
partition
The partitions in the source files to be imported to the destination table.
Only source data that belongs to the specified partitions is imported. Source data that does not belong to the specified partitions is determined as erroneous data. If you do not want such data to be determined as erroneous data, use a WHERE predicate to filter it out.
column_separator
The column delimiter that you want to use to separate data in the source files into columns. Default value: \t.
If you want to specify an invisible character as the column delimiter, add \x as the prefix and set the delimiter in hexadecimal. For example, if the delimiter used in a source Hive file is \x01, set the column delimiter to \\x01.
file_type
The format of the source files. Valid values: parquet, orc, and csv. Default value: csv.
Parquet files have the file name extension .parquet or .parq.
COLUMNS FROM PATH AS
The partition fields in the path of the source files.
For example, the path of a source file is /path/col_name=col_value/dt=20210101/file1, and col_name and dt are table columns. The values col_value and 20210101 are imported to the destination columns that correspond to col_name and dt, as shown in the following code:
(col1, col2) COLUMNS FROM PATH AS (col_name, dt)
set column mapping
The SET statement that contains functions for column type conversion.
If source columns and destination columns are of different types, you must specify a SET statement for column type conversion.
where predicate
The WHERE predicate that you want to use for data filtering after column type conversion.
Data that is filtered out is not counted for the calculation of the maximum filter ratio. If data_desc strings altogether contain multiple WHERE predicates for the same table, the predicates are combined by the AND operator.
broker_properties
Property parameters for the import job in the broker_properties string. These property parameters apply to the import job.
broker_properties: (key2=value2, ...)
The following table describes some of the property parameters in the broker_properties string.
Parameter
Description
timeout
The timeout period for the import job. Unit: seconds.
You can specify a timeout period for each import job in the opt_properties string. If the import job is not complete within the specified timeout period, the state of the import job changes to CANCELLED. The default timeout period for import jobs in Broker Load mode is 4 hours.
ImportantYou do not necessarily need to specify a timeout period for an import job, unless you estimate that the completion of the import job will take longer than the default timeout period.
We recommend that you calculate the minimum timeout period in units of seconds by using the following formula:
(Total file size in MB × Number of source tables and related rollup tables)/(30 × Concurrency of the import job)
.The number 30 in the formula indicates 30 MB/s, which is the average import speed on backends. For example, the total file size of source data is 1 GB, the one source table has two rollup tables, and the concurrency of the import job is 3. The minimum timeout period is calculated by using the following formula: (1 × 1,024 × 3)/(10 × 3) = 102 seconds.
StarRocks clusters have varied machine environments and concurrent query jobs. You must estimate the slowest import speed of your StarRocks cluster based on the speed of historical import jobs.
max_filter_ratio
The maximum filter ratio of the import job. Valid values: 0 to 1. Default value: 0. If the error rate of the import job exceeds the maximum filter ratio, the import job fails. If you want erroneous data rows to be ignored, set this parameter to a value greater than 0 to ensure that the import job can succeed.
Calculate an appropriate maximum filter ratio by using the following formula:
max_filter_ratio = dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)
.dpp.abnorm.ALL
indicates the number of data rows that cannot be imported due to various reasons such as type mismatch, column quantity mismatch, and length mismatch.dpp.abnorm.ALL
indicates the number of data rows that can be imported. You can execute theSHOW LOAD
statement to query the amount of data imported by the import job.Number of data rows in the source files = dpp.abnorm.ALL + dpp.norm.ALL
load_mem_limit
The limit on the memory allocated to the import job. Default value: 0. The value 0 indicates that the memory allocated to the import job is not limited.
strict_mode
Specifies whether to enable the strict mode for the import job. To enable the strict mode, set this parameter to true:
properties ("strict_mode" = "true")
.By default, the strict mode is disabled.
If the strict mode is enabled, erroneous data is filtered out after column type conversion. Erroneous data refers to the values that are not null in the source files but are converted to null values after column type conversion. Take note of the following items:
The strict mode does not apply to source columns whose values are generated based on functions.
If a destination column restricts values to a range and a value in the source column can be converted but the result value is out of the range, the strict mode does not apply to the source column. For example, a value in the source column is 10 and the destination column is of the DECIMAL(1,0) type. The value 10 can be converted but the result value is out of the range. The strict mode does not apply to the source column.
Sample code for creating an import job to import data from Alibaba Cloud OSS
ImportantIn StarRocks clusters, you can use broker as the broker name.
If the StarRocks version is earlier than 2.5.8, you can refer to the following sample code to create an import job. If the StarRocks version is 2.5.8 or later, remove the
WITH BROKER broker
part from the following sample code.
StarRocks version earlier than 2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" );
StarRocks version being 2.5.8 or later
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
Query the status of an import job
Data import in Broker Load mode is asynchronous. To query the status of an import job, specify the label of the job in the SHOW LOAD
statement and then execute the statement. To view the syntax of this statement, execute the HELP SHOW LOAD
statement.
The SHOW LOAD
statement supports only asynchronous import jobs. For synchronous import jobs such as those in Stream Load mode, you cannot use the SHOW LOAD
statement to query the status.
The following sample code provides an example on how to query the status of an import job:
show load where label = 'label1'\G
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
The following table describes the parameters.
Parameter | Description |
JobId | The unique ID of the import job. The job ID is automatically generated by the system. The ID of an import job can never be reused, whereas the label of an import job can be reused if the import job fails. |
Label | The identifier of the import job. |
State | The state of the import job. Valid values:
|
Progress | The progress information of the import job. The progress information describes the two import phases: ETL and LOADING. Data import in Broker Load mode involves only the LOADING phase. The ETL progress is displayed as N/A, and the LOADING progress can be 0 to 100%. The LOADING progress is calculated by using the following formula: When all the source tables have been imported and the import job is ready to be complete, the LOADING progress reaches 99%. The LOADING progress reaches 100% only after the import job is complete. Important The import progress is not linear. If the progress remains unchanged within a period of time, the import job may still be in progress. |
Type | The type of the import job. The type of the import job in Broker Load mode is BROKER. |
EtlInfo | The data amount metrics of the import job: unselected.rows, dpp.norm.ALL, and dpp.abnorm.ALL. unselected.rows indicates the number of data rows filtered out by the WHERE predicate. dpp.norm.ALL and dpp.abnorm.ALL help determine whether the error rate of the import job exceeds the maximum filter ratio. The sum of the values of the three metrics equals the total number of data rows in the source files. |
TaskInfo | The parameters that you configured when you created the import job, including the cluster, timeout period, and maximum filter ratio. |
ErrorMsg | The message returned by the import job. If the import job is in the CANCELLED state, the value of this parameter indicates the cause of the import failure and contains two parts: type and msg. If the import job is in the FINISHED state, the value of this parameter is N/A. The type part has the following valid values:
|
CreateTime | The time when the import job is created, the time when the ETL phase starts, the time when the ETL phase ends, the time when the LOADING phase starts, and the time when the import job is complete.
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | The URL of sample erroneous data during the import job. If the import job involves no erroneous data, the value of this parameter is N/A. |
JobDetails | The details of the import job, including the number of imported files and their total size in bytes, the number of tasks, the number of processed source data rows, the ID of the backend that runs the tasks, and the ID of the backend on which the waiting tasks reside.
The number of processed source data rows is updated every 5 seconds. This number indicates only the current progress and does not mean the total number of data rows that are processed after the import job is complete. The latter is shown by the EtlInfo parameter. |
Cancel an import job
Import jobs in Broker Load mode can be canceled when they are not in the CANCELLED or FINISHED state. To cancel an import job, specify its label in the CANCEL LOAD statement and then execute the statement. You can execute the HELP CANCEL LOAD
statement to view the syntax for canceling an import job.
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];
Import data from HDFS
Syntax
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker1' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );
HDFS authentication
The community edition of HDFS supports two authentication modes: simple authentication and Kerberos authentication.
Simple authentication: User identities are determined by the operating system of the client that is connected to HDFS.
The following table describes the parameters.
Parameter
Description
hadoop.security.authentication
The authentication mode. In this example, this parameter is set to simple.
username
The username that is used to log on to HDFS.
password
The password that is used to log on to HDFS.
Kerberos authentication: The user identity of a client is determined by its Kerberos credentials.
The following table describes the parameters.
Parameter
Description
hadoop.security.authentication
The authentication mode. In this example, this parameter is set to kerberos.
kerberos_principal
The principal for Kerberos authentication.
kerberos_keytab
The path of the Kerberos keytab file. The file must reside on the same server as the broker processes.
kerberos_keytab_content
The Base64-encoded content of the Kerberos keytab file.
ImportantYou must configure either this parameter or the kerberos_keytab parameter.
HA configurations of HDFS
After you configure high availability (HA) for NameNodes in an HDFS cluster, if the active NameNode is switched to the other one, the new active NameNode can be automatically identified. To access an HDFS cluster deployed in HA mode, configure the parameters that are described in the following table.
Parameter
Description
dfs.nameservices
The name of the HDFS service. You can set a custom name.
For example, set the dfs.nameservices parameter to my_ha.
dfs.ha.namenodes.xxx
The custom name of the NameNode. Separate multiple names with commas (,). Replace xxx in this parameter name with the custom name that you set for the dfs.nameservices parameter.
For example, set the dfs.ha.namenodes.my_ha parameter to my_nn.
dfs.namenode.rpc-address.xxx.nn
The address used by the NameNode for remote procedure calls (RPCs). Replace nn in this parameter name with the name of the NameNode that you set for the dfs.ha.namenodes.xxx parameter.
For example, set the dfs.namenode.rpc-address.my_ha.my_nn parameter to a value in the Hostname:Port number format.
dfs.client.failover.proxy.provider
The provider that the client uses to connect to the NameNode. Default value: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.
The following sample code provides an example:
( "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
You can use simple authentication or Kerberos authentication to access HDFS clusters deployed in HA mode. The following sample code provides an example on how to access an HA HDFS cluster by using simple authentication:
( "username"="user", "password"="passwd", "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
The configurations of an HDFS cluster can be written into the hdfs-site.xml file. If you use a broker process to read information about the HDFS cluster, you need to only specify the file path and authentication information of the cluster.
Example
Create a test table. The following sample code is run to create a table named lineitem in the tpch database:
CREATE TABLE lineitem ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber int, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate date, l_commitdate date, l_receiptdate date, l_shipinstruct string, l_shipmode string, l_comment string ) ENGINE=OLAP DUPLICATE KEY(l_orderkey) DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES( "replication_num" = "1" );
Create an import job.
ImportantIf the StarRocks version is earlier than 2.5.8, you can refer to the following sample code to create an import job. If the StarRocks version is 2.5.8 or later, remove the
WITH BROKER broker
part from the following sample code.StarRocks version earlier than 2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx" );
StarRocks version being 2.5.8 or later
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
Query the status of the import job.
show load where label = 'lineitem'\G; *************************** 1. row *************************** JobId: 1**** Label: lineitem State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215 TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2022-04-13 15:07:53 EtlStartTime: 2022-04-13 15:07:56 EtlFinishTime: 2022-04-13 15:07:56 LoadStartTime: 2022-04-13 15:07:56 LoadFinishTime: 2022-04-13 15:08:06 URL: NULL JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072} 2 rows in set (0.00 sec)
After the import job is complete, query data based on your business requirements.
Query the number of data rows in the lineitem table.
select count(*) from lineitem;
The following output is returned:
+----------+ | count(*) | +----------+ | 6001215 | +----------+ 1 row in set (0.03 sec)
Query data in the first two rows of the lineitem table.
select * from lineitem limit 2;
The following output is returned:
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | 69 | 115209 | 7721 | 1 | 48 | 58761.6 | 0.01 | 0.07 | A | F | 1994-08-17 | 1994-08-11 | 1994-09-08 | NONE | TRUCK | regular epitaphs. carefully even ideas hag | | 69 | 104180 | 9201 | 2 | 32 | 37893.76 | 0.08 | 0.06 | A | F | 1994-08-24 | 1994-08-17 | 1994-08-31 | NONE | REG AIR | s sleep carefully bold, | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ 2 rows in set (0.01 sec)
Concurrency of import jobs
An import job consists of one or more tasks. Tasks are run in parallel. An import job can be split into tasks based on data_desc strings in the LOAD statement. Examples:
If an import job has multiple data_desc strings that specify source tables from different data source addresses, the import job is split into tasks, each with a data_desc string.
If an import job has multiple data_desc strings that specify different partitions of a source table, the import job is also split into tasks, each with a data_desc string.
Each task can contain one or more instances, which are evenly distributed to backends for parallel running. A task is split into instances based on the following frontend configurations:
min_bytes_per_broker_scanner: the minimum amount of data processed by each instance. By default, the minimum data amount is 64 MB.
max_broker_concurrency: the maximum number of parallel instances for each task. Default value: 100.
load_parallel_instance_num: the number of parallel instances on each backend. Default value: 1.
The total number of instances equals the minimum of the following values: the total size of imported files divided by the value of min_bytes_per_broker_scanner
, the value of max_broker_concurrency, and the value of load_parallel_instance_num multiplied by the number of backends.
In most cases, an import job has only one data_desc string and therefore contains only one task. The task is split into instances. The number of instances equals that of the backends. Each instance is assigned to a different backend for parallel running.