Routine Load is a routine import method. StarRocks allows you to use this method to continuously import data from Apache Kafka and control the pause, resumption, and stop of import jobs by using SQL statements. This topic describes the basic principles, import example, and FAQ of Routine Load.
Terms
- RoutineLoadJob: a routine import job that is submitted
- JobScheduler: the routine import job scheduler that is used to schedule and split a RoutineLoadJob into multiple tasks
- Task: the task that is split from a RoutineLoadJob by JobScheduler based on rules
- TaskScheduler: the task scheduler that is used to schedule the execution of a task
Basic principles
- You submit a Kafka import job to the frontend by using a client that supports the MySQL protocol.
- The frontend splits the import job into multiple tasks. Each task imports a specified part of data.
- Each task is assigned to the specified backend for execution. On the backend, a task is regarded as a regular import job and imports data based on the import mechanism of Stream Load.
- After the import process is completed on the backend, the backend reports the import result to the frontend.
- The frontend continues to generate new tasks or retry failed tasks based on the import result.
- The frontend continuously generates new tasks to achieve the uninterrupted import of data.
Example
Environment requirements
- Access to Kafka clusters without authentication or SSL authentication is supported.
- A message can be in one of the following formats:
- CSV format. In this format, each message serves as a line, and the end of the line does not contain a line feed.
- JSON format.
- The Array type is not supported.
- Only Kafka V0.10.0.0 and later are supported.
Create an import job
- Syntax
CREATE ROUTINE LOAD [database.][job_name] ON [table_name] [COLUMNS TERMINATED BY "column_separator" ,] [COLUMNS (col1, col2, ...) ,] [WHERE where_condition ,] [PARTITION (part1, part2, ...)] [PROPERTIES ("key" = "value", ...)] FROM [DATA_SOURCE] [(data_source_properties1 = 'value1', data_source_properties2 = 'value2', ...)]
The following table describes the parameters.Parameter Required Description job_name Yes The name of the import job. The name of the import database can be placed in the front. The name is usually in the format of timestamp plus table name. The name of the job must be unique in a database. table_name Yes The name of the destination table. COLUMNS TERMINATED clause No The column delimiter in the source data file. Default value: \t. COLUMNS clause No The mapping between columns in the source data file and columns in the destination table. - Mapped columns: For example, the destination table has three columns, col1, col2, and col3, whereas the source data file has four columns, and the first, second, and fourth columns in the destination table correspond to col2, col1, and col3 in the source data file. In this case, the clause can be written as
COLUMNS (col2, col1, temp, col3)
. The temp column does not exist and is used to skip the third column in the source data file. - Derived columns: StarRocks can not only read the data in a column of the source data file but also provide processing operations on data columns. For example, a column col4 is added to the destination table, and the value of col4 is equal to the value of col1 plus the value of col2. In this case, the clause can be written as
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)
.
WHERE clause No The filter conditions that you want to use to filter out the rows that you do not need. The filter conditions can be specified on mapped columns or derived columns. For example, if only rows with k1 greater than 100 and k2 equal to 1000 are imported, the clause can be written as
WHERE k1 > 100 and k2 = 1000
.PARTITION clause No The partition of the destination table. If you do not specify the partition, the source data file is automatically imported to the corresponding partition. PROPERTIES clause No The common parameters for the import job. desired_concurrent_number No The maximum number of tasks into which the import job can be split. The value must be greater than 0. Default value: 3. max_batch_interval No The maximum execution time of each task. Valid values: 5 to 60. Unit: seconds. Default value: 10. In V1.15 and later, this parameter specifies the scheduling time of the task. You can specify how often the task is executed. routine_load_task_consume_second in fe.conf specifies the amount of time required by the task to consume data. Default value: 3s. routine_load_task_timeout_second in fe.conf specifies the execution timeout period of the task. Default value: 15s.
max_batch_rows No The maximum number of rows that each task can read. The value must be greater than or equal to 200000. Default value: 200000. In V1.15 and later, this parameter is used only to define the range of the error detection window. The range of the window is 10 × max-batch-rows.
max_batch_size No The maximum number of bytes that each task can read. Unit: bytes. Valid value: 100 MB to 1 GB. Default value: 100 MB. In V1.15 and later, this parameter is discarded. routine_load_task_consume_second in fe.conf specifies the amount of time required by the task to consume data. Default value: 3s.
max_error_number No The maximum number of error rows allowed in the sampling window. The value must be greater than or equal to 0. Default value: 0. No error row is allowed. Important Rows filtered out by the WHERE condition are not error rows.strict_mode No Specifies whether to enable the strict mode. By default, this mode is enabled. If the column type of non-empty raw data is changed to NULL after you enable the strict mode, the data is filtered out. To disable the strict mode, set this parameter to false.
timezone No The time zone that is used for the import job. By default, the value of the timezone parameter of the session is used. This parameter affects the results of all time zone-related functions involved in the import.
DATA_SOURCE Yes The type of the data source. Set the value to KAFKA. data_source_properties No The information about the data source. The value includes the following fields: - kafka_broker_list: the connection information about the Kafka broker. Format:
ip:host
. Separate multiple brokers with commas (,). - kafka_topic: the Kafka topic to which you want to subscribe. Note The kafka_broker_list and kafka_topic fields are required.
- kafka_partitions and kafka_offsets: the Kafka partitions to which you want to subscribe and the start offset of each partition.
- property: Kafka-related properties. This field is equivalent to the
--property
parameter in Kafka Shell. You can run theHELP ROUTINE LOAD;
command to view the more detailed syntax for creating an import job.
Note You can run theHELP ROUTINE LOAD;
command to view the more detailed syntax for creating an import job. - Mapped columns: For example, the destination table has three columns, col1, col2, and col3, whereas the source data file has four columns, and the first, second, and fourth columns in the destination table correspond to col2, col1, and col3 in the source data file. In this case, the clause can be written as
- Example: Import data from a local Kafka cluster.
CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit COLUMNS TERMINATED BY ",", COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted) PROPERTIES ( "desired_concurrent_number"="1", "max_error_number"="1000" ) FROM KAFKA ( "kafka_broker_list"= "localhost:9092", "kafka_topic" = "starrocks-load" );
View the status of a job
- Run the following command to display all routine import jobs in a database, including the jobs that are stopped or canceled: The jobs are displayed in one or more rows.
USE [database]; SHOW ALL ROUTINE LOAD;
- Run the following command to display the running routine import job named job_name in a database:
SHOW ROUTINE LOAD FOR [database].[job_name];
You can run the HELP SHOW ROUTINE LOAD
command to obtain specific commands and examples for viewing the status of the job. You can run the HELP SHOW ROUTINE LOAD TASK
command to obtain specific commands and examples for viewing the running status of a job (including tasks).
SHOW ALL ROUTINE LOAD
command to view all running Routine Load jobs. The following output is returned: *************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)
Parameter | Description |
---|---|
State | The status of the import job. RUNNING indicates that the import job is continuously running. |
Statistic | The progress information, which records the import information since the job is created. |
receivedBytes | The size of the received data. Unit: bytes. |
errorRows | The number of imported error rows. |
committedTaskNum | The number of tasks submitted by the frontend. |
loadedRows | The number of imported rows. |
loadRowsRate | The rate at which data is imported. Unit: row/s. |
abortedTaskNum | The number of failed tasks on the backend. |
totalRows | The total number of received rows. |
unselectedRows | The number of rows that are filtered out by the WHERE condition. |
receivedBytesRate | The rate at which data is received. Unit: bytes/s. |
taskExecuteTimeMs | The import duration. Unit: milliseconds. |
ErrorLogUrls | The error message log. You can use the URL to view the error message during the import process. |
Pause the import job
Use the PAUSE statement to make the import job enter the PAUSED state. Data import is paused, but the job is not terminated. You can use the RESUME statement to resume the job.
PAUSE ROUTINE LOAD FOR [job_name];
You can use the HELP PAUSE ROUTINE LOAD
command to view the help information and examples.
After the import job is paused, the status of the job changes to PAUSED, and the import information in Statistic and Progress stops being updated. In this case, the job is not terminated. You can use the SHOW ROUTINE LOAD
statement to view the paused import job.
Resume the import job
Use the RESUME statement to make the job temporarily enter the NEED_SCHEDULE state. The job is being rescheduled. After a period of time, the job returns to the RUNNING state, and data import continues.
RESUME ROUTINE LOAD FOR [job_name];
You can use the HELP RESUME ROUTINE LOAD
command to view the help information and examples.
SHOW ROUTINE LOAD
command to view the status of the job. The following output is returned: *************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: NEED_SCHEDULE
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
Progress: {"0":"13824771"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
OtherMsg:
1 row in set (0.00 sec)
SHOW ROUTINE LOAD
command again to view the status of the job. The following output is returned: *************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7422000,"taskExecuteTimeMs":23623}
Progress: {"0":"14024771"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.**.**:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specified
Stop the import job
Use the STOP statement to make the import job enter the STOP state. Data import is stopped, and the job is terminated. Data import cannot be resumed.
STOP ROUTINE LOAD FOR [job_name];
You can use the HELP STOP ROUTINE LOAD
command to view the help information and examples.
SHOW ROUTINE LOAD
command to view the status of the job. The following output is returned: *************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: 2020-05-16 16:08:25
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: STOPPED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecuteTimeMs":47173}
Progress: {"0":"16414875"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.**.**:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.**.**:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
OtherMsg:
After the import job is stopped, the status of the job changes to STOPPED, and the import information in Statistic and Progress is no longer updated. In this case, you cannot view the stopped import job by using the SHOW ROUTINE LOAD
statement.