This topic provides answers to some frequently asked questions about data import to StarRocks.

What do I do if the error "close index channel failed" or "too many tablet versions" occurs?

  • Cause

    Data is not merged in a timely manner because of high data import frequency. As a result, the number of versions for unmerged data exceeds the upper limit.

  • Solution
    By default, the maximum number of versions for unmerged data is 1,000. Use one of the following methods to fix the error:
    • Increase the amount of data that you import in each batch to reduce the data import frequency.
    • Set the following parameters to specified values in the be.conf configuration file of a backend (BE) node to modify merging policies. This helps increase the data merging speed.
      cumulative_compaction_num_threads_per_disk = 4
      base_compaction_num_threads_per_disk = 2
      cumulative_compaction_check_interval_seconds = 2

What do I do if the error "Label Already Exists" occurs?

  • Description

    An import job that uses the same label is successfully run or is running in the same database of a StarRocks cluster.

  • Cause

    Stream Load submits the requests of an import job over the HTTP protocol. In most cases, HTTP clients in various languages use their pre-configured request retry logic. After a StarRocks cluster receives a request, data starts to be imported to the StarRocks cluster in Stream Load mode. The import result is not returned to the HTTP client in a timely manner. As a result, the HTTP client retries to send the same request. When the HTTP client submits the second request, the error Label Already Exists is returned because the StarRocks cluster is processing the first request.

  • Solution
    Use one of the following methods to check whether jobs that use different data import methods are assigned the same label or whether you repeatedly submitted an import job.
    • Search for the log of the primary frontend (FE) node based on the label of the import job and check whether the label appears twice. If the label appears twice, the HTTP client sent the request twice.
      Note The labels of import jobs in a StarRocks cluster cannot be distinguished based on the data import methods. As a result, jobs that use different data import methods may be assigned the same label.
    • Execute the SHOW LOAD WHERE LABEL = "xxx" statement to check whether an import job in the FINISHED state uses the same label. In the statement, xxx is the string of the label that needs to be checked.

    We recommend that you calculate the approximate import duration based on the size of data that you want to import and increase the timeout period for the request of the HTTP client based on the import timeout period. This helps prevent requests from being repeatedly submitted by the client.

What do I do if the error "ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel" occurs?

Execute the SHOW LOAD statement. Then, find the URL in the information that is returned by the statement and view the error information in the URL. The following common errors may occur:
  • convert csv string to INT failed.

    The string of a column in the file that you want to import fails to be converted to data of a specific data type. For example, the string abc fails to be converted to a number.

  • the length of input is too long than schema.

    The length of a column in the file that you want to import exceeds the specified limit. For example, the length of a string exceeds the fixed length that is specified when a table is created, or the value of a field of the INT type exceeds 4 bytes in length.

  • actual column number is less than schema column number.

    After a row of the file that you want to import is split based on a specified delimiter, the number of the generated columns is less than the number of the specified columns. This error may be caused by an invalid delimiter.

  • actual column number is more than schema column number.

    After a row of the file that you want to import is split based on a specified delimiter, the number of the generated columns is greater than the number of the specified columns.

  • the frac part length longer than schema scale.

    The decimal part of a value in a column of the DECIMAL type in the file that you want to import exceeds the specified length.

  • the int part length longer than schema precision.

    The integer part of a value in a column of the DECIMAL type in the file that you want to import exceeds the specified length.

  • there is no corresponding partition for this key.

    The value of the partition key column of a row in the file that you want to import is not in the partitioning range.

What do I do if the error "ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3" occurs?

Add "replication_num" = "1" to the properties of a table when you create the table.

What do I do if the error "Too many open files" occurs in the logs of the BE node when I import data?

To resolve the issue, perform the following steps:
  1. Change the number of file handles in the system.
  2. Reduce the values of the base_compaction_num_threads_per_disk and cumulative_compaction_num_threads_per_disk parameters. The default values of the parameters are 1. For information about how to modify a configuration item, see Modify configuration items.
  3. If the issue persists, we recommend that you scale out the cluster or reduce the data import frequency.

What do I do if the error "increase config load_process_max_memory_limit_percent" occurs?

If the error messages that are similar to those in the following figure appear when you import data, we recommend that you check and increase the values of the load_process_max_memory_limit_bytes and load_process_max_memory_limit_percent parameters. For information about how to modify a configuration item, see Modify configuration items. tablet open failed

What do I do if a remote procedure call (RPC) timeout occurs during data import?

Check the value of the write_buffer_size parameter in the be.conf file of the BE node. This parameter is used to specify the maximum size of the memory block of the BE node. The default value of this parameter is 100, in MB. If the value is too large, an RPC timeout may occur. In this case, you need to modify the write_buffer_size parameter based on the value of the tablet_writer_rpc_timeout_sec parameter in the configuration file of the BE node. For more information about other parameters in the configuration file of the BE node, see Parameter configuration.

What do I do if the error "Value count does not match column count" occurs?

  • Description
    The import job fails, and the error message "Value count does not match column count" appears in the URL of the error details. The error message indicates that the number of columns that are obtained by parsing the source data is not consistent with the number of columns in the destination table.
    Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:00Z,cpu0,80.99
    Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:10Z,cpu1,75.23
    Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:20Z,cpu2,59.44
  • Cause

    The column delimiter that you specified in the import command or import statement is inconsistent with the column delimiter that is used in the source data. In the preceding example, the source data is in the CSV format and contains three columns. The source data uses commas (,) as the column delimiter, but the import command or statement uses the tab character (\t) as the column delimiter. As a result, data in the columns of source data is parsed into one column of data.

  • Solution

    Change the column delimiter in the import command or import statement to commas (,) and import data again.

How do I select an import method?

For more information about how to select an import method, see Overview.

What are the factors that affect the import performance?

In most cases, the following factors affect the import performance:
  • Server memory

    A larger number of tablets consume more memory resources. We recommend that you follow the instructions that are described in How do I determine the number of tablets? to estimate the size of a single tablet.

  • Disk I/O capacity and network bandwidth

    A network bandwidth of 50 Mbit/s to 100 Mbit/s is suitable.

  • Batch size and import frequency
    • We recommend that you set the import batch size to a value that ranges from 10 to 100 MB when you use the Stream Load import method.
    • No batch size requirement is needed for the Broker Load import method. This import method is available for scenarios in which the import batch size is large.
    • The import frequency cannot be extremely high. Do not run more than one import job per second for a serial advanced technology attachment (SATA) HDD.

Can Stream Load identify the column names in the first row of a text file? Can Stream Load ignore data in the first row of the text file?

No, Stream Load cannot identify the column names in the first row of the text file. Data in the first row is common data for Stream Load. Stream Load cannot ignore data in the first row of a text file. If column names exist in the first row of the text file that you want to import, you can troubleshoot the issue by using one of the following methods:
  • Modify the settings of the export tool. This way, you can export the text file without column names.
  • Run the sed -i '1d' filename command to delete data in the first row of the text file.
  • Add -H "where: column name! ='Column name' " to the statement of Stream Load to filter out data in the first row.

    The system converts the strings in the first row to data of a specific data type and then filters out the data. If the conversion fails, null is returned. If you use this method, you must make sure that the NOT NULL property is not configured for the columns in a StarRocks table.

  • Add -H "max_filter_ratio:0.01" to the statement of Stream Load to set the fault tolerance rate to 1% or a smaller value. Make sure that at least one error line is allowed for import jobs. This way, the error that occurs in the first row can be ignored. You can also specify a lower fault tolerance rate based on the actual amount of data that you want to import. After the fault tolerance rate is specified, ErrorURL in the returned result still indicates that an error occurred, but the import job is successful. We recommend that you do not specify a high fault tolerance rate. Otherwise, other data errors may be ignored.

Do I need to convert the data type of the data in a partition key column that is not of the standard DATE type or INT type when I use Stream Load to import data to StarRocks?

Yes, you can use StarRocks to convert data types during the data import process.

For example, the TEST file that you want to import is in the CSV format and contains the NO, DATE, VERSION, and PRICE columns, but the data in the DATE column is in the 202106.00 format. If the column that you want to use in StarRocks is DATE, you need to create a table that contains the NO, VERSION, PRICE, and DATE columns in StarRocks. You also need to specify the data type of the data in the DATE column as DATE, DATETIME, or INT. Then, you can configure the following settings in the statement of Stream Load to perform data type conversion between columns:
-H "columns: NO,DATE_1, VERSION, PRICE, DATE=LEFT(DATE_1,6)"

In the preceding statement, DATE_1 can be regarded as a placeholder to obtain data. Then, the LEFT() function is called to perform data type conversion and assign the converted data to the DATE column in the StarRocks table. Note that you must list the temporary names of all columns in the CSV file before you call the function to perform the conversion operation. Functions that can be used to perform data type conversion between columns are scalar functions, including non-aggregate functions and window functions.

What do I do if the error "body exceed max size: 10737418240, limit: 10737418240" occurs?

  • Cause

    The size of the source data file exceeds the maximum file size (10 GB) supported by Stream Load.

  • Solution
    • Split the file by running the seq -w 0 n command.
    • Run the curl -XPOST http:///be_host:http_port/api/update_config?streaming_load_max_mb=<file_size> command to change the value of the streaming_load_max_mb parameter to increase the maximum size of files. For more information about other parameters in the configuration file of the BE node, see Parameter configuration.

How can I improve the import performance?

Method 1: Increase the parallelism of tasks

Note This method may consume more CPU resources, and data of too many versions is imported.
Split an import job into multiple tasks for running. The task parallelism is calculated based on the following formula. The maximum task parallelism is determined by the number of alive BE nodes or the number of partitions that you want to consume.
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
Parameters:
  • alive_be_number: the number of alive BE nodes.
  • partition_number: the number of partitions that you want to consume.
  • desired_concurrent_number: the expected task parallelism of a single Routine Load job. Default value: 3.
    • If you did not create an import job, you must configure this parameter when you execute the CREATE ROUTINE LOAD statement.
    • If you created an import job, you must modify this parameter when you execute the ALTER ROUTINE LOAD statement.
  • max_routine_load_task_concurrent_num: the maximum task parallelism of a single Routine Load job. Default value: 5. This parameter is an FE dynamic parameter. For more information, see Parameter configuration.

If a large number of partitions and BE nodes are used, and the values of the alive_be_number and partition_number parameters are greater than the values of the desired_concurrent_number and max_routine_load_task_concurrent_num parameters, you can increase the values of the desired_concurrent_number and max_routine_load_task_concurrent_num parameters to increase the task parallelism.

For example, the number of partitions that you want to consume is 7, the number of alive BE nodes is 5, and the default value of max_routine_load_task_concurrent_num is 5. In this case, if you want to increase the task parallelism to the upper limit, you need to change the value of the desired_concurrent_number parameter from 3 (default value) to 5. Then, the task parallelism is calculated based on the following formula: min(5,7,5,5). The result is 5.

Method 2: Increase the amount of data in the partitions that can be consumed by a single import task

Note This method may increase the latency of data import.

The maximum number of messages that can be consumed by a single Routine Load task is determined by the max_routine_load_batch_size or routine_load_task_consume_second parameter. When an import task consumes data, and the maximum number of consumed messages meets the requirements of the preceding parameters, the consumption is complete. The preceding parameters are FE parameters. For more information, see Parameter configuration.

You can view the be/log/be.INFO file to analyze the parameter that specifies the upper limit for the amount of data that can be consumed by a single import task. Then, you can increase the value of this parameter to increase the amount of data that can be consumed by a single import task.
I0325 20:27:50.410579 15259 data_consumer_group.cpp:131] consumer group done: 41448fb1a0ca59ad-30e34dabfa7e47a0. consume time(ms)=3261, received rows=179190, received bytes=9855450, eos: 1, left_time: -261, left_bytes: 514432550, blocking get time(us): 3065086, blocking put time(us): 24855

In normal cases, the value of the left_bytes parameter is equal to or greater than 0. This indicates that the amount of data that is read in a batch does not exceed the value that is specified by the max_routine_load_batch_size parameter in the period of time that is specified by the routine_load_task_consume_second parameter. In this case, all scheduled import tasks can consume all Kafka data without latency. You can increase the value of the routine_load_task_consume_second parameter to increase the amount of data that can be consumed by a single import task.

If the value of the left_bytes parameter is less than 0, the amount of data that can be read in a batch exceeds the value that is specified by the max_routine_load_batch_size parameter in the period of time that is specified by the routine_load_task_consume_second parameter. In this case, scheduled import tasks may not consume all Kafka data. You can increase the value of the max_routine_load_batch_size parameter.

After I execute the SHOW ROUTINE LOAD statement, the status of an import job changes to PAUSED or CANCELLED. What do I do?

You can troubleshoot the error based on the error descriptions:
  • Description: The status of the import job changes to PAUSED, and the description of ReasonOfStateChanged is Broker: Offset out of range.
    • Cause: The consumer offset of the import job does not exist in the Kafka partition.
    • Solution: Execute the SHOW ROUTINE LOAD statement to view the latest consumer offset of the import job in the Progress parameter. Then, check whether messages in the consumer offset exist in the Kafka partition. If messages in the consumer offset do not exist, the issue may occur due to the following reasons:
      • The consumer offset that is specified when you create an import job is a future point in time.
      • In the Kafka partition, messages in the consumer offset are cleared before the messages are consumed by the import job. We recommend that you configure the cleanup policies and parameters, such as the log.retention.hours and log.retention.bytes parameters, for Kafka logs based on the import speed of an import job.
  • Description: The status of the import job changes to PAUSED.
    • Cause: The number of error data rows imported by the import task may exceed the upper limit that is specified by the max_error_number parameter.
    • Solution: Troubleshoot the error based on the descriptions of ReasonOfStateChanged and ErrorLogUrls.
      • If the error occurs because of an incorrect data format of the data source, check the data format and fix the error. After you fix the error, execute the RESUME ROUTINE LOAD statement to resume the import job in the PAUSED state.
      • If the data format of the data source cannot be parsed by StarRocks, modify the max_error_number parameter to change the maximum number of error data rows allowed for the import job.
        1. Execute the SHOW ROUTINE LOAD statement to check the value of the max_error_number parameter.
        2. Execute the ALTER ROUTINE LOAD statement to increase the value of the max_error_number parameter.
        3. Execute the RESUME ROUTINE LOAD statement to resume the import job in the PAUSED state.
  • Description: The status of the import job changes to CANCELLED.
    • Cause: An exception may occur when you execute the import task. For example, tables are deleted.
    • Solution: Troubleshoot the error based on the descriptions of ReasonOfStateChanged and ErrorLogUrls. You cannot resume the import job in the CANCELLED state after you fix the error.

Can the exactly-once semantics be ensured when I use Routine Load to import data from Kafka to StarRocks?

Yes, the exactly-once semantics can be ensured.

An import task is a separate transaction. If an error occurs during the execution of the transaction, the transaction is terminated, and the FE node does not update the consumption progress of the related partition of the import task. When the FE node schedules import tasks in a queue, the FE node initiates a consumption request from the last saved consumer offset of the partition. This helps ensure the exactly-once semantics.

What do I do if the error "Broker: Offset out of range" occurs?

Execute the SHOW ROUTINE LOAD statement to view the latest offset. Then, check whether the offset includes data on the Kafka client. Possible causes:
  • A future offset is specified during import.
  • Kafka cleans the offset data before the import job starts. You must specify suitable values for parameters that you want to use to clean logs based on the import speed of StarRocks, such as the log.retention.hours and log.retention.bytes parameters.

Can I use Broker Load to rerun an import job that is successfully run and is in the FINISHED state?

No, you cannot use Broker Load to rerun an import job that is successfully run and is in the FINISHED state. In addition, the label of each import job that is successfully run cannot be reused. This ensures that import jobs are not lost or duplicated. If you want to rerun an import job that is successfully run and is in the FINISHED status, you can perform the following steps: Execute the SHOW LOAD statement to view the historical import records, find the import job that you want to rerun, and create another import job based on the information about the job that you want to rerun and a new job label.

When I import HDFS data to StarRocks by using Broker Load, the value of the date field is abnormal, and the time is 8 hours later than the correct time. What do I do?

  • Cause

    The time zone that you specified when you created a StarRocks table and the import job is UTC+8, but the server uses the UTC+0 time zone. As a result, the value of the date field is equal to the original value of the date field plus eight hours.

  • Solution

    Remove the timezone parameter when you create a StarRocks table.

When I import data in the ORC format to StarRocks by using Broker Load, the error "ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>" occurs. What do I do?

  • Cause

    The column names of the file that you want to import are inconsistent with those of the StarRocks table. In this case, the system implements type inference when executing the SET statement and calls the cast function to perform data type conversion. As a result, data type conversion fails.

  • Solution

    Make sure that the column names of the file that you want to import are consistent with those of the StarRocks table. This way, the SET statement and the cast function are not required.

No error occurs when I use Broker Load to create an import job, but no data is queried. What do I do?

Broker Load is an asynchronous import method. Even if no error occurs when you execute the related statement to create an import job, the import job may not be successful. You can execute the SHOW LOAD statement to view the status and error message of the import job, modify the parameters that are configured for the import job, and then rerun the import job.

What do I do if the error "failed to send batch" or "TabletWriter add batch with unknown id" occurs?

This error occurs because of a data write timeout. You need to change the values of the system variable query_timeout and the BE parameter streaming_load_rpc_max_alive_time_sec. For more information about other parameters in the configuration file of the BE node, see Parameter configuration.

What do I do if the error "LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found" occurs?

If data is imported from a Parquet file or an ORC file, check whether the column names in the file header are consistent with those in the destination StarRocks table.

In the following sample code, the tmp_c1 and tmp_c2 columns in the Parquet or ORC file are mapped respectively to the name and id columns in the StarRocks table. If you do not execute a SET statement, the source columns are mapped to the columns whose names are the same as the specified names but may not exist in the StarRocks table.
(tmp_c1,tmp_c2)
SET
(
   id=tmp_c2,
   name=tmp_c1
)

If you import an ORC file that is generated in a specific version of Hive, and the table header in the ORC file is (_col0, _col1, _col2, ...), the error "Invalid Column Name" may occur. In this case, you need to execute a SET statement to configure rules for column mappings.

What do I do if an import job does not end for a long period of time?

In the fe.log file of the FE node, search for the ID of the import job based on the label of the import job. Then, in the be.INFO file of the BE node, search for the log context based on the ID of the import job to identify the reasons that cause the error.

How do I access a high-availability Apache HDFS cluster?

After you configure high availability (HA) for NameNodes in an HDFS cluster, if the active NameNode is switched to the other one, the new active NameNode can be automatically identified. To access an HDFS cluster deployed in HA mode, configure the parameters that are described in the following table.
ParameterDescription
dfs.nameservicesThe name of the HDFS service. You can set a custom name.

For example, set the dfs.nameservices parameter to my_ha.

dfs.ha.namenodes.xxxThe custom name of the NameNode. Separate multiple names with commas (,). Replace xxx in this parameter name with the custom name that you set for the dfs.nameservices parameter.

For example, set the dfs.ha.namenodes.my_ha parameter to my_nn.

dfs.namenode.rpc-address.xxx.nnThe address used by the NameNode for remote procedure calls (RPCs). Replace nn in this parameter name with the name of the NameNode that you set for the dfs.ha.namenodes.xxx parameter.

For example, set the dfs.namenode.rpc-address.my_ha.my_nn parameter to a value in the Hostname:Port number format.

dfs.client.failover.proxy.providerThe provider that the client uses to connect to the NameNode. Default value: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.
You can use simple authentication or Kerberos authentication to access HDFS clusters deployed in HA mode. The following sample code provides an example on how to access an HA HDFS cluster by using simple authentication:
(
    "username"="user",
    "password"="passwd",
    "dfs.nameservices" = "my-ha",
    "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
    "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
    "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

The configurations of an HDFS cluster can be written into the hdfs-site.xml file. If you use a broker process to read information about the HDFS cluster, you need to only specify the file path and authentication information of the cluster.

How do I configure ViewFs in HDFS Federation?

Copy the core-site.xml and hdfs-site.xml configuration files of View File System (ViewFs) to the broker/conf directory.

If a custom file system is included, copy the .jar files related to the file system to the broker/lib directory.

What do I do if the error "Can't get Kerberos realm" occurs when I access an EMR cluster for which Kerberos authentication is enabled?

  1. Check whether the /etc/krb5.conf file is configured on all physical machines of the brokers.
  2. If the error persists after the preceding file is configured on all physical machines of the brokers, append -Djava.security.krb5.conf:/etc/krb5.conf to the JAVA_OPTS variable in the Broker startup script.

Why are approximately 50 to 100 milliseconds required to insert a data record when I execute the INSERT INTO statement to import data?

The INSERT INTO statement is used to import multiple data records in a batch. As a result, the time period that is required to import a single data record is the same as the time period that is required to import multiple data records in a batch. We recommend that you do not use the INSERT INTO statement to import a single data record in the online analytical processing (OLAP) scenario.

What do I do if the error "index channel has intolerable failure" occurs when I execute the INSERT INTO SELECT statement to import data?

This error occurs because of the RPC timeout for Stream Load. You can fix the error by modifying the parameters that are related to RPC timeout in the corresponding configuration file.

Modify the following system parameters in the be.conf configuration file and restart the cluster for the modifications to take effect.
  • streaming_load_rpc_max_alive_time_sec: the RPC timeout period for Stream Load. Default value: 1200. Unit: seconds.
  • tablet_writer_rpc_timeout_sec: the timeout period for TabletWriter. Default value: 600. Unit: seconds.

What do I do if the execution fails and the error "execute timeout" occurs when I execute the INSERT INTO SELECT statement to import a large amount of data?

This error occurs because the query times out. To fix the error, modify the query_timeout parameter of the session. The default value is 600. Unit: seconds.

Sample command:
set query_timeout =xx;

What do I do if the error "Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing" occurs when I run a Flink job?

  • Cause

    Multiple rules, such as [table-rule.1] and [table-rule.2], are configured in the config_prod.conf configuration file of StarRocks-migrate-tools (SMT), but the required information is not specified for the rules.

  • Solution

    Check whether databases, tables, and Flink connectors are configured for the rules [table-rule.1] and [table-rule.2].

How does Flink restart failed tasks?

Flink restarts failed tasks based on the checkpointing mechanism and the restart policy.

To enable the checkpointing mechanism and use the fixed delay restart policy, configure the following parameters in the flink-conf.yaml configuration file:
# unit: ms
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
Parameters:
  • execution.checkpointing.interval: the interval between two checkpoints. Unit: ms. To enable the checkpointing mechanism, you must specify a value that is greater than 0 for this parameter.
  • state.backend: After the checkpointing mechanism is enabled, the state is persisted based on the checkpoints to prevent data loss and ensure data consistency during data recovery. The storage of the state information, the method that is used for storage, and the location of the state that is persisted based on checkpoints depend on the state backend that you select. For more information, see State Backends.
  • state.checkpoints.dir: the directory in which the checkpoints are stored.

How do I stop a Flink job and restore the Flink job to the state before it is stopped?

You can trigger a savepoint to stop a Flink job and resume the job from the savepoint. A savepoint is a consistent image of the execution status of a streaming job that is created based on the checkpointing mechanism. For more information, see Savepoints.

The savepoint for a Flink job that uses a specific ID can also be automatically triggered to stop a Flink job. You can specify a destination file system directory to store savepoints.

If you want to restore a Flink job to the state before it is stopped, you must specify a savepoint when you resubmit the Flink job. Sample command:
bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
Parameters:
  • jobId: The ID of the Flink job. You can view the ID by running the flink list -running command or on the web UI of Flink.
  • targetDirectory: The directory in which savepoints are stored. To specify a directory for storing the savepoints, configure the state.savepoints.dir parameter in the flink-conf.yml configuration file of Flink. This way, you can use the specified directory to store the savepoint when a savepoint is triggered.
    state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
If you want to restore a Flink job to the state before it is stopped, you must specify a savepoint when you resubmit the Flink job.
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql

What do I do if data import fails when I use the exactly-once semantics of a transaction to import data?

  • Description: The following error information is returned:
    com.starrocks.data.load.stream.exception.StreamLoadFailException: {
        "TxnId": 3382****,
        "Label": "502c2770-cd48-423d-b6b7-9d8f9a59****",
        "Status": "Fail",
        "Message": "timeout by txn manager",-- Error message
        "NumberTotalRows": 1637,
        "NumberLoadedRows": 1637,
        "NumberFilteredRows": 0,
        "NumberUnselectedRows": 0,
        "LoadBytes": 4284214,
        "LoadTimeMs": 120294,
        "BeginTxnTimeMs": 0,
        "StreamLoadPlanTimeMs": 7,
        "ReadDataTimeMs": 9,
        "WriteDataTimeMs": 120278,
        "CommitAndPublishTimeMs": 0
    }
  • Cause: The value of the sink.properties.timeout parameter is less than the checkpoint interval of Flink.
  • Solution: Increase the value of the sink.properties.timeout parameter. The value must be greater than the checkpoint interval of Flink.

After the flink-connector-jdbc_2.11 driver is used to import data to StarRocks, the time is 8 hours earlier than the time in Flink. What do I do?

  • Description: The time generated by the localtimestap function is normal in Flink. After the flink-connector-jdbc_2.11 driver is used to import data to StarRocks, the time is 8 hours earlier than the time in Flink. The time zone of the server in which Flink and StarRocks are deployed is UTC+8 (Asia/Shanghai). The version of Flink is 1.12 and the driver is flink-connector-jdbc_2.11.
  • Solution: Set the server-time-zone parameter to Asia/Shanghai for the Flink sink table and add &serverTimezone=Asia/Shanghai to the value of the url parameter. Sample statement:
    CREATE TABLE sk (
        sid int,
        local_dtm TIMESTAMP,
        curr_dtm TIMESTAMP
    )
    WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://192.168.**.**:9030/sys_device?characterEncoding=utf-8&serverTimezone=Asia/Shanghai',
        'table-name' = 'sink',
        'driver' = 'com.mysql.jdbc.Driver',
        'username' = 'sr',
        'password' = 'sr123',
        'server-time-zone' = 'Asia/Shanghai'
    );

Data from Kafka that is deployed on a StarRocks cluster can be imported to StarRocks. However, data from Kafka that is deployed on other machines cannot be imported to StarRocks. Why?

  • Description: The following error message is returned:
    failed to query wartermark offset, err: Local: Bad message format
  • Cause: Kafka cannot parse the hostname of the StarRocks cluster.
  • Solution: Specify the hostname of Kafka on the nodes of the StarRocks cluster. This way, the /etc/hosts file can be parsed.

The memory of the BE node is fully occupied and the CPU utilization of the BE node reaches 100% when no queries are performed. Why?

The BE node collects statistics on a regular basis and does not occupy CPU resources for a long period of time. If less than 10 GB memory is used, the BE node does not release the remaining memory resources. The BE node manages the memory resources based on its configurations. You can configure the tc_use_memory_min parameter to change the memory size.

tc_use_memory_min specifies the minimum memory of TCmalloc. The default value is 10737418240. StarRocks returns idle memory resources to the operating system only if the actually used memory exceeds the value of the tc_use_memory_min parameter. To configure the parameter, perform the following steps: In the EMR console, go to the Configure tab of the StarRocks service page. On the Configure tab, click the be.conf tab. For more information about other parameters in the configuration file of the BE node, see Parameter configuration.

Why does the BE node not return the requested memory resources to the operating system?

Memory allocation is a heavy-weight operation. When the database requests a large amount of memory resources from the operating system, more memory resources are reserved for the database. To reuse the memory resources, the requested memory resources are returned to the operating system with a delay. We recommend that you monitor the memory usage during the verification of the test environment and check whether the memory resources can be returned to the operating system in a long period of time.

Why does the system fail to parse the dependencies of the Flink connector?

  • Cause: You need to obtain the dependencies of the Flink connector from the address of the Alibaba Cloud image repository. The related settings are not configured in the /etc/maven/settings.xml file. As a result, not all dependencies of the Flink connector are obtained from the address of the Alibaba Cloud image repository.
  • Solution: Change the address of the Alibaba Cloud public repository to https://maven.aliyun.com/repository/public.

Does the sink.buffer-flush.interval-ms parameter take effect if the parameter is configured together with the checkpoint interval parameter in Flink-connector-StarRocks?

  • Problem description: The sink.buffer-flush.interval-ms parameter is set to 15s, but the checkpoint interval parameter is set to 5mins.
    +----------------------+--------------------------------------------------------------+
    |         Option       | Required |  Default   | Type   |       Description           |
    +-------------------------------------------------------------------------------------+
    |  sink.buffer-flush.  |  NO      |   300000   | String | the flushing time interval, |
    |  interval-ms         |          |            |        | range: [1000ms, 3600000ms]  |
    +----------------------+--------------------------------------------------------------+
  • Solution: Flush operations are not affected by the value of the checkpoint interval parameter. A flush operation is triggered when the value of one of the following parameters exceeds the specified limit. The value of the checkpoint interval parameter takes effect for the exactly-once semantics, and the sink.buffer-flush.interval-ms parameter takes effect for the at-least-once semantics.
    sink.buffer-flush.max-rows
    sink.buffer-flush.max-bytes
    sink.buffer-flush.interval-ms

Can I update data that is imported by using DataX?

StarRocks of the latest version allows you to use DataX to update data in a StarRocks table that is created by using a primary key model. To enable this feature, you need to add the _op field in the reader section of the JSON configuration file.

When I use DataX to synchronize data, how do I handle DataX keywords to avoid errors?

Use backticks (``) to enclose the keywords.

What do I do if the error "When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment" occurs?

Configure the HADOOP-CONF-DIR environment variable in the spark-env.sh script of the Spark client.

What do I do if the error "Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory" occurs when I run the spark-submit command to submit a Spark job?

When you use Spark Load to import data, you do not configure the spark_home_default_dir parameter or you configure an incorrect root directory of the Spark client. To fix the error, specify a correct root directory of the Spark client.

What do I do if the error "File xxx/jars/spark-2x.zip does not exist" occurs?

When you use Spark Load to import data, the value of the spark-resource-path parameter does not point to the packaged ZIP file. Check whether the file path is consistent with the file name.

What do I do if the error "yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn" occurs?

When you use Spark Load to import data, no executable files are configured for the yarn-client-path parameter.