本文汇总了StarRocks数据导入的常见问题。
- 通用问题
- 如何选择导入方式?
- 影响导入性能的因素都有哪些?
- 报错“close index channel failed“或“too many tablet versions”,该如何处理?
- 报错“Label Already Exists”,该如何处理?
- 报错“ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel”,该如何处理?
- 数据导入过程中,发生远程过程调用(Remote Procedure Call,简称RPC)超时问题,该如何处理?
- 报错“Value count does not match column count”,该如何处理?
- 报错“ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3”,该如何处理?
- 导入数据时发现BE服务日志中出现Too many open files问题,该如何处理?
- 报错“increase config load_process_max_memory_limit_percent”,该如何处理?
- Stream Load
- Routine Load
- Broker Load
- Broker Load是否支持再次执行已经执行成功、且处于FINISHED状态的导入作业?
- Broker Load导入HDFS数据时,数据的导入日期字段出现异常,比正确的日期时间多加了8小时,该如何处理?
- Broker Load导入ORC格式的数据时,报错“ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>”,该如何处理?
- 为什么Broker Load导入作业没报错,但是却查询不到数据?
- 报错“failed to send batch或TabletWriter add batch with unknown id”,该如何处理?
- 报错“LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found”,该如何处理?
- 导入作业长时间没有结束等问题应该如何处理?
- 如何配置以访问高可用 (HA) 模式下的Apache HDFS集群?
- 如何配置Hadoop ViewFS Federation?
- 访问Kerberos认证的集群时,报错“Can't get Kerberos realm”,该如何处理?
- INSERT INTO
- MySQL实时同步至StarRocks
- Flink Connector
- DataX Writer
- Spark Load
- 报错“When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment”,该如何解决?
- 提交Spark job时用到spark-submit命令,报错“Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory”,该如何解决?
- 报错“File xxx/jars/spark-2x.zip does not exist”,该如何解决?
- 报错“yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn”,该如何解决?
报错“close index channel failed“或“too many tablet versions”,该如何处理?
- 报错原因
该问题主要是数据导入频率太快,数据没能及时合并 (Compaction) ,从而导致版本数超过支持的最大未合并版本数。
- 解决方案默认支持的最大未合并版本数为1000。您可以通过以下方法解决上述报错:
- 增大单次导入的数据量,降低导入频率。
- 在BE的配置文件be.conf中修改以下配置,通过调整合并策略实现加快合并的目的。
cumulative_compaction_num_threads_per_disk = 4 base_compaction_num_threads_per_disk = 2 cumulative_compaction_check_interval_seconds = 2
报错“Label Already Exists”,该如何处理?
- 问题描述
StarRocks集群中同一个数据库内已经有一个相同Label的导入作业导入成功或者正在执行。
- 报错原因
由于Stream Load是采用HTTP协议提交导入作业的请求,通常各个语言的HTTP客户端均会自带请求重试逻辑。StarRocks集群在接收到第一个请求后,已经开始操作Stream Load,但是由于没有及时向客户端返回结果,会出现客户端再次重试发送相同请求的情况。这时候 StarRocks集群由于已经在操作第一个请求,所以第二个请求会返回
Label Already Exists
的状态提示。 - 解决方案需要检查不同导入方式之间是否有标签冲突、或是有重复提交的导入作业。排查方法如下:
- 使用标签搜索主FE的日志,查看是否存在同一个标签出现了两次的情况。如果有,则说明客户端重复提交了该请求。说明 StarRocks集群中导入作业的标签不区分导入方式。因此,可能存在不同的导入作业使用了相同标签的问题。
- 运行
SHOW LOAD WHERE LABEL = "xxx"
语句,查看是否已经存在具有标签相同、且处于FINISHED状态的导入作业。其中xxx
为待检查的标签字符串。
建议根据当前请求导入的数据量,计算出大致的导入耗时,并根据导入超时时间来适当地调大客户端的请求超时时间,从而避免客户端多次提交该请求。
- 使用标签搜索主FE的日志,查看是否存在同一个标签出现了两次的情况。如果有,则说明客户端重复提交了该请求。
报错“ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel”,该如何处理?
SHOW LOAD
语句。在语句返回的信息中找到URL,查看错误数据。常见的错误类型如下:convert csv string to INT failed.
待导入数据文件中某列的字符串在转化为对应类型的数据时出错。例如,将abc转化为数字时失败。
the length of input is too long than schema.
待导入数据文件中某列的长度不正确。例如,定长字符串超过建表时设置的长度,或INT类型的字段超过4个字节。
actual column number is less than schema column number.
待导入数据文件中某一行按照指定的分隔符切分后,列数小于指定的列数。可能原因是分隔符不正确。
actual column number is more than schema column number.
待导入数据文件中某一行按照指定的分隔符切分后,列数大于指定的列数。
the frac part length longer than schema scale.
待导入数据文件中某DECIMAL类型的列的小数部分超过指定的长度。
the int part length longer than schema precision.
待导入数据文件中某DECIMAL类型的列的整数部分超过指定的长度。
there is no corresponding partition for this key.
导入文件某行的分区列的值不在分区范围内。
报错“ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3”,该如何处理?
您可以在建表属性中添加"replication_num" = "1"
信息。
导入数据时发现BE服务日志中出现Too many open files问题,该如何处理?
- 调整系统句柄数。
- 调小base_compaction_num_threads_per_disk和cumulative_compaction_num_threads_per_disk(默认都是1)的参数值。修改配置项的具体操作,请参见修改配置项。
- 如果问题还未解决,建议扩容或者降低导入频率。
报错“increase config load_process_max_memory_limit_percent”,该如何处理?
数据导入过程中,发生远程过程调用(Remote Procedure Call,简称RPC)超时问题,该如何处理?
检查BE配置文件be.conf中write_buffer_size参数的设置。该参数用于控制BE上内存块的大小阈值,默认阈值为100 MB。如果阈值过大,可能会导致远程过程调用超时,此时需要配合BE配置文件中的tablet_writer_rpc_timeout_sec参数来适当地调整write_buffer_size参数的取值。BE配置项的更多信息,请参见参数配置。
报错“Value count does not match column count”,该如何处理?
- 问题描述导入作业失败,通过查看错误详情URL发现返回“Value count does not match column count”错误,提示解析源数据得到的列数与目标表的列数不匹配。
Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:00Z,cpu0,80.99 Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:10Z,cpu1,75.23 Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:20Z,cpu2,59.44
- 报错原因
发生该错误的原因是导入命令或导入语句中指定的列分隔符与源数据中的列分隔符不一致。例如,上面示例中,源数据为CSV格式,包括三列,列分隔符为逗号(,),但是导入命令或导入语句中却指定制表符(\t)作为列分隔符,最终导致源数据的三列数据解析成了一列数据。
- 解决方案
修改导入命令或导入语句中的列分隔符为逗号(,),然后再次尝试执行导入。
如何选择导入方式?
导入方式的选择可以参见导入概述。
影响导入性能的因素都有哪些?
- 机器内存
当tablet比较多的时候,对于内存消耗比较大。建议单个tablet大小按照如何分桶?中介绍的进行评估。
- 磁盘IO能力和网络带宽
正常50 Mbit/s~100 Mbit/s是没有问题的。
- 导入批次和频率
- Stream Load批次大小建议10 MB~100 MB。
- Broker Load还好,因为Broker Load针对的场景都是批次大小比较大的情况。
- 导入频率不要太高,SATA盘1s不超过一个任务。
Stream Load是否支持识别文本文件中首行的列名?或者是否支持指定不读取第一行?
- 在导出工具中修改设置,重新导出不带列名的文本文件。
- 使用
sed -i '1d' filename
命令删除文本文件的首行。 - 在Stream Load执行语句中,使用
-H "where: 列名 != '列名称'"
过滤掉首行。当前系统会先转换,然后再过滤,因此如果首行字符串转其他数据类型失败的话,会返回
null
。所以,该方式要求StarRocks表中的列不能设置为NOT NULL
。 - 在Stream Load执行语句中加入
-H "max_filter_ratio:0.01"
,可以给导入作业设置一个1%或者更小、容错超过1行的容错率,从而将首行的错误忽视掉。您也可以根据实际数据量设置一个更小的容错率,但是要保证1行以上的容错。设置容错率后,返回结果的ErrorURL
依旧会提示有错误,但导入作业整体会成功。容错率不宜设置过大,避免漏掉其他数据问题。
当前业务的分区键对应的数据不是标准的DATE和INT类型,使用Stream Load导入数据到StarRocks时,需要转换数据吗?
StarRocks支持在导入过程中进行数据转换。
-H "columns: NO,DATE_1, VERSION, PRICE, DATE=LEFT(DATE_1,6)"
DATE_1
可以简单地看成先占位进行取数,然后通过LEFT()
函数进行转换,赋值给StarRocks表中的DATE
列。需要注意的是,必须先列出CSV文件中所有列的临时名称,然后再使用函数进行转换。支持列转换的函数为标量函数,包括非聚合函数和窗口函数。
报错“body exceed max size: 10737418240, limit: 10737418240”,该如何处理?
- 报错原因
源数据文件大小超过10 GB,超过了Stream Load所能支持的文件大小的上限。
- 解决方案
- 通过
seq -w 0 n
拆分数据文件。 - 通过
curl -XPOST http:///be_host:http_port/api/update_config?streaming_load_max_mb=<file_size>
调整BE配置项中streaming_load_max_mb的取值来扩大文件大小上限。BE配置项的更多信息,请参见参数配置。
- 通过
如何提高导入性能?
方式一:增加实际任务并行度
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
- alive_be_number:存活BE数量。
- partition_number:消费分区数量。
- desired_concurrent_number:Routine Load导入作业时为单个导入作业设置较高的期望任务并行度,默认值为3。
- 如果还未创建导入作业,则需要在执行
CREATE ROUTINE LOAD
时,设置该参数。 - 如果已经创建导入作业,则需要在执行
ALTER ROUTINE LOAD
时,修改该参数。
- 如果还未创建导入作业,则需要在执行
- max_routine_load_task_concurrent_num:Routine Load导入作业的默认最大任务并行度 ,默认值为5。该参数为FE动态参数,更多说明和设置方式,请参见参数配置。
因此当消费分区和BE节点数量较多,并且大于desired_concurrent_number
和max_routine_load_task_concurrent_num
参数时,如果您需要增加实际任务并行度,则可以提高desired_concurrent_number
和max_routine_load_task_concurrent_num
参数。
例如,消费分区数量为7,存活BE数量为5,max_routine_load_task_concurrent_num为默认值5。此时如果需要增加实际任务并发度至上限,则需要将desired_concurrent_number设置为5(默认值为3),则计算实际任务并行度min(5,7,5,5)
为5。
方式二:增大单个导入任务消费分区的数据量
单个Routine Load导入任务消费消息的上限由参数max_routine_load_batch_size,或者参数routine_load_task_consume_second决定。当导入任务消费数据并达到上述要求后,则完成消费。上述两个参数为FE配置项,更多说明和设置方式,请参见参数配置。
I0325 20:27:50.410579 15259 data_consumer_group.cpp:131] consumer group done: 41448fb1a0ca59ad-30e34dabfa7e47a0. consume time(ms)=3261, received rows=179190, received bytes=9855450, eos: 1, left_time: -261, left_bytes: 514432550, blocking get time(us): 3065086, blocking put time(us): 24855
正常情况下,该日志的left_bytes >=0,表示在routine_load_task_consume_second时间内一次读取的数据量还未超过max_routine_load_batch_size,说明调度的一批导入任务都可以消费完Kafka的数据,不存在消费延迟,则此时您可以通过提高routine_load_task_consume_second的参数值,增大单个导入任务消费分区的数据量 。
如果left_bytes < 0,则表示未在routine_load_task_consume_second规定时间,一次读取的数据量已经达到max_routine_load_batch_size,每次Kafka的数据都会把调度的一批导入任务填满,因此极有可能Kafka还有剩余数据没有消费完,存在消费积压,则可以提高max_routine_load_batch_size的参数值。
执行SHOW ROUTINE LOAD命令后,导入作业状态变为PAUSED或CANCELLED,该如何处理?
- 报错提示:导入作业变成PAUSED状态,并且ReasonOfStateChanged报错
Broker: Offset out of range
。- 原因分析:导入作业的消费位点在Kafka分区中不存在。
- 解决方式:您可以通过执行命令
SHOW ROUTINE LOAD
,在Progress参数中查看导入作业的最新消费位点,并且在Kafka分区中查看是否存在该位点的消息。如果不存在,则可能有如下两个原因:- 创建导入作业时指定的消费位点为将来的时间点。
- Kafka分区中该位点的消息还未被导入作业消费,就已经被清理。建议您根据导入作业的导入速度设置合理的Kafka日志清理策略和参数,例如log.retention.hours,log.retention.bytes等。
- 报错提示:导入作业变成PAUSED状态。
- 原因分析:可能是导入任务错误行数超过阈值max_error_number。
- 解决方式:您可以根据
ReasonOfStateChanged
,ErrorLogUrls
报错进行排查。- 如果是数据源的数据格式问题,则需要检查数据源数据格式,并进行修复。修复后您可以使用
RESUME ROUTINE LOAD
,恢复PAUSED状态的导入作业。 - 如果是数据源的数据格式无法被StarRocks解析,则需要调整错误行数阈值max_error_number。
- 执行命令
SHOW ROUTINE LOAD
,查看错误行数阈值max_error_number。 - 执行命令
ALTER ROUTINE LOAD
,适当提高错误行数阈值max_error_number。 - 执行命令
RESUME ROUTINE LOAD
,恢复PAUSED状态的导入作业。
- 执行命令
- 如果是数据源的数据格式问题,则需要检查数据源数据格式,并进行修复。修复后您可以使用
- 报错提示:导入作业变为CANCELLED状态。
- 原因分析:可能是执行导入任务时遇到异常。例如,表被删除。
- 解决方式:您可以根据
ReasonOfStateChanged
或ErrorLogUrls
报错进行排查和修复。但是修复后,您无法恢复CANCELLED状态的导入作业。
使用Routine Load消费Kafka写入StarRocks是否能保证一致性语义?
Routine Load能够保证一致性(Exactly-once)语义。
一个导入任务是一个单独的事务,如果该事务在执行过程中出现错误,则事务会中止,FE不会更新该导入任务相关分区的消费进度。FE在下一次调度任务执行队列中的导入任务时,从上次保存的分区消费位点发起消费请求,保证Exactly once语义。
报错“Broker: Offset out of range”,该如何处理?
SHOW ROUTINE LOAD
查看最新的offset,使用Kafka客户端查看该offset有没有数据。可能原因是:- 导入时指定了未来的offset。
- 还没来得及导入,Kafka已经将该offset的数据清理。需要根据StarRocks的导入速度设置合理的log清理参数log.retention.hours、log.retention.bytes等。
Broker Load是否支持再次执行已经执行成功、且处于FINISHED状态的导入作业?
Broker Load不支持再次执行已经执行成功、且处于FINISHED状态的导入作业。而且,为了保证导入作业的不丢不重,每个执行成功的导入作业的标签(Label)均不可复用。可以使用SHOW LOAD
命令查看历史的导入记录,找到想要再次执行的导入作业,复制作业信息,并修改作业标签后,重新创建一个导入作业并执行。
Broker Load导入HDFS数据时,数据的导入日期字段出现异常,比正确的日期时间多加了8小时,该如何处理?
- 报错原因
StarRocks在建表时设置的timezone为中国时区,创建Broker Load导入作业时设置的timezone也是中国时区,而服务器设置的是UTC时区。因此,日期字段在导入时,比正确的日期时间多加了8小时。
- 解决方案
建表时去掉timezone参数。
Broker Load导入ORC格式的数据时,报错“ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>”,该如何处理?
- 报错原因
待导入数据文件和Starrocks表两侧的列名不一致,执行
SET
子句的时候系统内部会有一个类型推断,但是在调用cast函数执行数据类型转换的时候失败了。 - 解决方案
确保两侧的列名一致,这样就不需要
SET
子句,也就不会调用cast函数执行数据类型转换,即可导入成功。
为什么Broker Load导入作业没报错,但是却查询不到数据?
Broker Load是一种异步的导入方式,创建导入作业的语句没报错,不代表导入作业成功了。您可以通过SHOW LOAD
语句来查看导入作业的结果状态和errmsg
信息,然后修改导入作业的参数配置后,再重试导入作业。
报错“failed to send batch或TabletWriter add batch with unknown id”,该如何处理?
该错误由数据写入超时而引起。需要修改系统变量query_timeout和BE配置项streaming_load_rpc_max_alive_time_sec的参数值。BE配置项的更多信息,请参见参数配置。
报错“LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found”,该如何处理?
如果导入的是Parquet或ORC格式的数据,则需要检查文件头的列名是否与StarRocks表中的列名一致。
name
和id
列。如果没有使用SET子句,则以column_list参数中指定的列作为映射。(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
如果导入的是Apache Hive版本直接生成的ORC文件,并且ORC文件中的表头是 (_col0, _col1, _col2, ...)
,则可能导致"Invalid Column Name"错误。此时需要使用SET子句设置列转换规则。
导入作业长时间没有结束等问题应该如何处理?
在FE上的日志文件fe.log中,根据导入作业的标签来搜索导入作业的ID。然后,在BE上的日志文件be.INFO中,根据导入作业的ID来搜索上下文日志,进而查看具体原因。
如何配置以访问高可用 (HA) 模式下的Apache HDFS集群?
参数 | 描述 |
---|---|
dfs.nameservices | 指定HDFS服务的名称,您可以自定义。 例如,设置dfs.nameservices为my_ha。 |
dfs.ha.namenodes.xxx | 自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxx为dfs.nameservices中自定义的名称。 例如,设置dfs.ha.namenodes.my_ha为my_nn。 |
dfs.namenode.rpc-address.xxx.nn | 指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。 例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为host:port。 |
dfs.client.failover.proxy.provider | 指定Client连接NameNode的Provider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。 |
(
"username"="user",
"password"="passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
HDFS集群的配置可以写入hdfs-site.xml文件中,您使用Broker进程读取HDFS集群的信息时,只需要填写集群的文件路径名和认证信息即可。
如何配置Hadoop ViewFS Federation?
需要将ViewFs相关的配置文件core-site.xml和hdfs-site.xml拷贝到broker/conf目录中。
如果有自定义的文件系统,还需要将文件系统相关的.jar文件拷贝到broker/lib目录中。
访问Kerberos认证的集群时,报错“Can't get Kerberos realm”,该如何处理?
- 首先检查是否所有的Broker所在的机器都配置了/etc/krb5.conf文件。
- 如果配置了仍然报错,则需要在Broker的启动脚本中
JAVA_OPTS
变量的最后,加上-Djava.security.krb5.conf:/etc/krb5.conf
。
使用INSERT INTO语句导入数据时,SQL每插入一条数据大约耗时50~100ms,能否优化执行效率?
因为INSERT INTO导入方式为批量写入,所以单条写入和批量写入的耗时相同。因此OLAP场景下不建议使用INSERT INTO语句单条写入数据。
使用INSERT INTO SELECT语句导入数据时,系统报错“index channel has intoleralbe failure”,该如何解决?
该报错是因为流式导入RPC超时导致。您可以通过在配置文件中调节RPC超时相关参数解决。
- streaming_load_rpc_max_alive_time_sec:流式导入RPC的超时时间,默认为1200,单位为秒。
- tablet_writer_rpc_timeout_sec:TabletWriter的超时时长,默认为600,单位为秒。
使用INSERT INTO SELECT语句导入大量数据时会执行失败,报错“execute timeout”,该如何解决?
该报错是因为Query超时导致。您可以通过调节Session变量query_timeout
解决。该参数默认为600,单位为秒。
set query_timeout =xx;
执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该如何解决?
- 报错原因
在StarRocks-migrate-tools(简称SMT)配置文件config_prod.conf中设置了多组规则
[table-rule.1]
、[table-rule.2]
等,但是缺失必要的配置信息。 - 解决方案
检查是否给每组规则
[table-rule.1]
、[table-rule.2]
等配置了database,table和Flink Connector信息。
Flink如何自动重启失败的Task?
Flink通过Checkpointing机制和重启策略,自动重启失败的Task。
# unit: ms
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
execution.checkpointing.interval
: Checkpoint的基本时间间隔,单位为ms。如果需要启用Checkpointing机制,则该参数值须大于0。state.backend
:启动Checkpointing机制后,状态会随着CheckPoint而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在CheckPoint时如何持久化以及持久化在哪里均取决于选择的State Backend。状态更多介绍,请参见State Backends。state.checkpoints.dir
:Checkpoint数据存储目录。
如何手动停止Flink job,并且恢复Flink job至停止前的状态?
您可以在停止Flink job时手动触发Savepoint(Savepoint是依据Checkpointing机制所创建的流作业执行状态的一致镜像),后续可以从指定Savepoint中恢复Flink job。
使用Savepoint停止作业。自动触发Flink job ID的Savepoint,并停止该job。此外,您可以指定一个目标文件系统目录来存储Savepoint 。
bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
jobId
:您可以通过Flink WebUI查看Flink job ID,或者在命令行执行flink list -running
进行查看。targetDirectory
:您也可以在Flink配置文件flink-conf.yml中state.savepoints.dir
配置Savepoint的默认目录。 触发Savepoint时,使用此目录来存储Savepoint,无需指定目录。state.savepoints.dir: [file://或hdfs://]/home/user/savepoints_dir
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql
使用事务接口的exactly-once时,导入失败,该如何解决?
- 问题描述:报错信息如下。
com.starrocks.data.load.stream.exception.StreamLoadFailException: { "TxnId": 3382****, "Label": "502c2770-cd48-423d-b6b7-9d8f9a59****", "Status": "Fail", "Message": "timeout by txn manager",--具体报错信息 "NumberTotalRows": 1637, "NumberLoadedRows": 1637, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 4284214, "LoadTimeMs": 120294, "BeginTxnTimeMs": 0, "StreamLoadPlanTimeMs": 7, "ReadDataTimeMs": 9, "WriteDataTimeMs": 120278, "CommitAndPublishTimeMs": 0 }
- 原因分析:
sink.properties.timeout
小于Flink checkpoint interval,导致事务超时。 - 解决方式:需要调整该参数值大于Flink checkpoint interval。
flink-connector-jdbc_2.11 Sink到StarRocks时间落后8小时,该如何处理?
- 问题描述:Flink中localtimestap函数生成的时间,在Flink中时间正常,Sink到StarRocks后发现时间落后8小时。已确认Flink所在服务器与StarRocks所在服务器时区均为Asia/ShangHai东八区。Flink版本为1.12,驱动为flink-connector-jdbc_2.11。
- 解决方式:可以在Flink sink表中配置时区参数
'server-time-zone' = 'Asia/Shanghai'
,同时在url
参数里添加&serverTimezone=Asia/Shanghai
。示例如下。CREATE TABLE sk ( sid int, local_dtm TIMESTAMP, curr_dtm TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.**.**:9030/sys_device?characterEncoding=utf-8&serverTimezone=Asia/Shanghai', 'table-name' = 'sink', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'sr', 'password' = 'sr123', 'server-time-zone' = 'Asia/Shanghai' );
为什么在Starrocks集群上部署的Kafka集群可以导入数据,其他kafka集群无法导入数据?
- 问题描述:报错信息如下。
failed to query wartermark offset, err: Local: Bad message format
- 原因分析:kafka无法解析hostname。
- 解决方式:在StarRocks集群节点配置Kafka主机名,解析/etc/hosts文件。
为什么没有查询时BE内存处于打满状态,且CPU也是打满状态?
因为BE会定期收集统计信息,不是长期占用CPU,10 GB以内的内存使用完不会释放,BE会自己管理,您可以通过tc_use_memory_min参数限制大小。
tc_use_memory_min,TCmalloc最小保留内存。默认值10737418240,只有超过该值,StarRocks才将空闲内存返还给操作系统。您可以在EMR控制台StarRocks服务配置页签的be.conf中配置。BE配置项的更多信息,请参见参数配置。
为什么BE申请的内存不会释放给操作系统?
因为数据库从操作系统获得的大块的内存分配,在分配的时候会多预留,释放时候会延后,为了重复利用,大块内存的分配的代价比较大。建议测试环境验证时,对内存使用进行监控,在较长的周期内看内存是否能够完成释放。
为什么无法解析Flink Connector依赖?
- 原因分析:Flink Connector依赖需要通过阿里云镜像地址来获取。/etc/maven/settings.xml的mirror部分未配置全部通过阿里云镜像获取。
- 解决方式:修改阿里云公共仓库地址为
https://maven.aliyun.com/repository/public
。
Flink-connector-StarRocks中sink.buffer-flush.interval-ms参数是否生效?
- 问题描述:
sink.buffer-flush.interval-ms
参数设置为15s,但是checkpoint interval=5mins
,设置的sink.buffer-flush.interval-ms
参数还生效吗?+----------------------+--------------------------------------------------------------+ | Option | Required | Default | Type | Description | +-------------------------------------------------------------------------------------+ | sink.buffer-flush. | NO | 300000 | String | the flushing time interval, | | interval-ms | | | | range: [1000ms, 3600000ms] | +----------------------+--------------------------------------------------------------+
- 解决方式:以下三个参数哪个先达到阈值,即触发flush,和
checkpoint interval
设置的值没关系,checkpoint interval
对于Exactly once语义才有效,At Least Once语义用的是sink.buffer-flush.interval-ms
。sink.buffer-flush.max-rows sink.buffer-flush.max-bytes sink.buffer-flush.interval-ms
使用DataX导入支持更新数据吗?
当前版本中,主键模型已支持通过DataX更新数据。您需要在JSON配置文件的reader部分添加_op
字段配置该功能。
使用DataX同步数据时,如何处理命名中使用的关键字,防止报错?
您需要使用反引号(``)包围相应的字段。
报错“When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment”,该如何解决?
使用Spark Load时没有在Spark客户端的spark-env.sh中配置HADOOP-CONF-DIR
环境变量。
提交Spark job时用到spark-submit命令,报错“Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory”,该如何解决?
使用Spark Load时spark_home_default_dir
配置项没有指定或者指定了错误的Spark客户端根目录。
报错“File xxx/jars/spark-2x.zip does not exist”,该如何解决?
使用Spark Load时spark-resource-path
配置项没有指向打包好的ZIP文件,检查指向文件路径和文件名词是否一致。
报错“yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn”,该如何解决?
使用Spark Load时yarn-client-path
配置项没有指定YARN的可执行文件。