本文匯總了StarRocks資料匯入的常見問題。
- 通用問題
- 如何選擇匯入方式?
- 影響匯入效能的因素都有哪些?
- 報錯“close index channel failed“或“too many tablet versions”,該如何處理?
- 報錯“Label Already Exists”,該如何處理?
- 報錯“ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel”,該如何處理?
- 資料匯入處理程序中,發生遠端程序呼叫(Remote Procedure Call,簡稱RPC)逾時問題,該如何處理?
- 報錯“Value count does not match column count”,該如何處理?
- 報錯“ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3”,該如何處理?
- 匯入資料時發現BE服務日誌中出現Too many open files問題,該如何處理?
- 報錯“increase config load_process_max_memory_limit_percent”,該如何處理?
- Stream Load
- Routine Load
- Broker Load
- Broker Load是否支援再次執行已經執行成功、且處於FINISHED狀態的匯入作業?
- Broker Load匯入HDFS資料時,資料的匯入日期欄位出現異常,比正確的日期時間多加了8小時,該如何處理?
- Broker Load匯入ORC格式的資料時,報錯“ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>”,該如何處理?
- 為什麼Broker Load匯入作業沒報錯,但是卻查詢不到資料?
- 報錯“failed to send batch或TabletWriter add batch with unknown id”,該如何處理?
- 報錯“LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found”,該如何處理?
- 匯入作業長時間沒有結束等問題應該如何處理?
- 如何配置以訪問高可用 (HA) 模式下的Apache HDFS叢集?
- 如何配置Hadoop ViewFS Federation?
- 訪問Kerberos認證的叢集時,報錯“Can't get Kerberos realm”,該如何處理?
- INSERT INTO
- MySQL即時同步至StarRocks
- Flink Connector
- DataX Writer
- Spark Load
- 報錯“When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment”,該如何解決?
- 提交Spark job時用到spark-submit命令,報錯“Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory”,該如何解決?
- 報錯“File xxx/jars/spark-2x.zip does not exist”,該如何解決?
- 報錯“yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn”,該如何解決?
報錯“close index channel failed“或“too many tablet versions”,該如何處理?
- 報錯原因
該問題主要是資料匯入頻率太快,資料沒能及時合并 (Compaction) ,從而導致版本數超過支援的最大未合并版本數。
- 解決方案預設支援的最大未合并版本數為1000。您可以通過以下方法解決上述報錯:
- 增大單次匯入的資料量,降低匯入頻率。
- 在BE的設定檔be.conf中修改以下配置,通過調整合并策略實現加快合并的目的。
cumulative_compaction_num_threads_per_disk = 4 base_compaction_num_threads_per_disk = 2 cumulative_compaction_check_interval_seconds = 2
報錯“Label Already Exists”,該如何處理?
- 問題描述
StarRocks叢集中同一個資料庫內已經有一個相同Label的匯入作業匯入成功或者正在執行。
- 報錯原因
由於Stream Load是採用HTTP協議提交匯入作業的請求,通常各個語言的HTTP用戶端均會內建請求重試邏輯。StarRocks叢集在接收到第一個請求後,已經開始操作Stream Load,但是由於沒有及時向用戶端返回結果,會出現用戶端再次重試發送相同請求的情況。這時候 StarRocks叢集由於已經在操作第一個請求,所以第二個請求會返回
Label Already Exists
的狀態提示。 - 解決方案需要檢查不同匯入方式之間是否有標籤衝突、或是有重複提交的匯入作業。排查方法如下:
- 使用標籤搜尋主FE的日誌,查看是否存在同一個標籤出現了兩次的情況。如果有,則說明用戶端重複提交了該請求。說明 StarRocks叢集中匯入作業的標籤不區分匯入方式。因此,可能存在不同的匯入作業使用了相同標籤的問題。
- 運行
SHOW LOAD WHERE LABEL = "xxx"
語句,查看是否已經存在具有標籤相同、且處於FINISHED狀態的匯入作業。其中xxx
為待檢查的標籤字串。
建議根據當前請求匯入的資料量,計算出大致的匯入耗時,並根據匯入逾時時間來適當地調大用戶端的請求逾時時間,從而避免用戶端多次提交該請求。
- 使用標籤搜尋主FE的日誌,查看是否存在同一個標籤出現了兩次的情況。如果有,則說明用戶端重複提交了該請求。
報錯“ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel”,該如何處理?
SHOW LOAD
語句。在語句返回的資訊中找到URL,查看錯誤資料。常見的錯誤類型如下:convert csv string to INT failed.
待匯入資料檔案中某列的字串在轉化為對應類型的資料時出錯。例如,將abc轉化為數字時失敗。
the length of input is too long than schema.
待匯入資料檔案中某列的長度不正確。例如,定長字串超過建表時設定的長度,或INT類型的欄位超過4個位元組。
actual column number is less than schema column number.
待匯入資料檔案中某一行按照指定的分隔字元切分後,列數小於指定的列數。可能原因是分隔字元不正確。
actual column number is more than schema column number.
待匯入資料檔案中某一行按照指定的分隔字元切分後,列數大於指定的列數。
the frac part length longer than schema scale.
待匯入資料檔案中某DECIMAL類型的列的小數部分超過指定的長度。
the int part length longer than schema precision.
待匯入資料檔案中某DECIMAL類型的列的整數部分超過指定的長度。
there is no corresponding partition for this key.
匯入檔案某行的分區列的值不在分區範圍內。
報錯“ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3”,該如何處理?
您可以在建表屬性中添加"replication_num" = "1"
資訊。
匯入資料時發現BE服務日誌中出現Too many open files問題,該如何處理?
- 調整系統控制代碼數。
- 調小base_compaction_num_threads_per_disk和cumulative_compaction_num_threads_per_disk(預設都是1)的參數值。修改配置項的具體操作,請參見修改配置項。
- 如果問題還未解決,建議擴容或者降低匯入頻率。
報錯“increase config load_process_max_memory_limit_percent”,該如何處理?
資料匯入處理程序中,發生遠端程序呼叫(Remote Procedure Call,簡稱RPC)逾時問題,該如何處理?
檢查BE設定檔be.conf中write_buffer_size參數的設定。該參數用於控制BE上記憶體塊的大小閾值,預設閾值為100 MB。如果閾值過大,可能會導致遠端程序呼叫逾時,此時需要配合BE設定檔中的tablet_writer_rpc_timeout_sec參數來適當地調整write_buffer_size參數的取值。BE配置項的更多資訊,請參見參數配置。
報錯“Value count does not match column count”,該如何處理?
- 問題描述匯入作業失敗,通過查看錯誤詳情URL發現返回“Value count does not match column count”錯誤,提示解析來源資料得到的列數與目標表的列數不匹配。
Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:00Z,cpu0,80.99 Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:10Z,cpu1,75.23 Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:20Z,cpu2,59.44
- 報錯原因
發生該錯誤的原因是匯入命令或匯入語句中指定的資料行分隔符號與來源資料中的資料行分隔符號不一致。例如,上面樣本中,來源資料為CSV格式,包括三列,資料行分隔符號為逗號(,),但是匯入命令或匯入語句中卻指定製表符(\t)作為資料行分隔符號,最終導致來源資料的三列資料解析成了一列資料。
- 解決方案
修改匯入命令或匯入語句中的資料行分隔符號為逗號(,),然後再次嘗試執行匯入。
如何選擇匯入方式?
匯入方式的選擇可以參見匯入概述。
影響匯入效能的因素都有哪些?
- 機器記憶體
當tablet比較多的時候,對於記憶體消耗比較大。建議單個tablet大小按照如何分桶?中介紹的進行評估。
- 磁碟IO能力和網路頻寬
正常50 Mbit/s~100 Mbit/s是沒有問題的。
- 匯入批次和頻率
- Stream Load批次大小建議10 MB~100 MB。
- Broker Load還好,因為Broker Load針對的情境都是批次大小比較大的情況。
- 匯入頻率不要太高,SATA盤1s不超過一個任務。
Stream Load是否支援識別文字檔中首行的列名?或者是否支援指定不讀取第一行?
- 在匯出工具中修改設定,重新匯出不帶列名的文字檔。
- 使用
sed -i '1d' filename
命令刪除文字檔的首行。 - 在Stream Load執行語句中,使用
-H "where: 列名 != '列名稱'"
過濾掉首行。當前系統會先轉換,然後再過濾,因此如果首行字串轉其他資料類型失敗的話,會返回
null
。所以,該方式要求StarRocks表中的列不能設定為NOT NULL
。 - 在Stream Load執行語句中加入
-H "max_filter_ratio:0.01"
,可以給匯入作業設定一個1%或者更小、容錯超過1行的容錯率,從而將首行的錯誤忽視掉。您也可以根據實際資料量設定一個更小的容錯率,但是要保證1行以上的容錯。設定容錯率後,返回結果的ErrorURL
依舊會提示有錯誤,但匯入作業整體會成功。容錯率不宜設定過大,避免漏掉其他資料問題。
當前業務的分區鍵對應的資料不是標準的DATE和INT類型,使用Stream Load匯入資料到StarRocks時,需要轉換資料嗎?
StarRocks支援在匯入處理程序中進行資料轉換。
-H "columns: NO,DATE_1, VERSION, PRICE, DATE=LEFT(DATE_1,6)"
DATE_1
可以簡單地看成先佔位進行取數,然後通過LEFT()
函數進行轉換,賦值給StarRocks表中的DATE
列。需要注意的是,必須先列出CSV檔案中所有列的臨時名稱,然後再使用函數進行轉換。支援列轉換的函數為純量涵式,包括非彙總函式和視窗函數。
報錯“body exceed max size: 10737418240, limit: 10737418240”,該如何處理?
- 報錯原因
來源資料檔案大小超過10 GB,超過了Stream Load所能支援的檔案大小的上限。
- 解決方案
- 通過
seq -w 0 n
拆分資料檔案。 - 通過
curl -XPOST http:///be_host:http_port/api/update_config?streaming_load_max_mb=<file_size>
調整BE配置項中streaming_load_max_mb的取值來擴大檔案大小上限。BE配置項的更多資訊,請參見參數配置。
- 通過
如何提高匯入效能?
方式一:增加實際任務並行度
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
- alive_be_number:存活BE數量。
- partition_number:消費分區數量。
- desired_concurrent_number:Routine Load匯入作業時為單個匯入作業設定較高的期望任務並行度,預設值為3。
- 如果還未建立匯入作業,則需要在執行
CREATE ROUTINE LOAD
時,設定該參數。 - 如果已經建立匯入作業,則需要在執行
ALTER ROUTINE LOAD
時,修改該參數。
- 如果還未建立匯入作業,則需要在執行
- max_routine_load_task_concurrent_num:Routine Load匯入作業的預設最大任務並行度 ,預設值為5。該參數為FE動態參數,更多說明和設定方式,請參見參數配置。
因此當消費分區和BE節點數量較多,並且大於desired_concurrent_number
和max_routine_load_task_concurrent_num
參數時,如果您需要增加實際任務並行度,則可以提高desired_concurrent_number
和max_routine_load_task_concurrent_num
參數。
例如,消費分區數量為7,存活BE數量為5,max_routine_load_task_concurrent_num為預設值5。此時如果需要增加實際任務並發度至上限,則需要將desired_concurrent_number設定為5(預設值為3),則計算實際任務並行度min(5,7,5,5)
為5。
方式二:增大單個匯入任務消費分區的資料量
單個Routine Load匯入任務消費訊息的上限由參數max_routine_load_batch_size,或者參數routine_load_task_consume_second決定。當匯入任務消費資料並達到上述要求後,則完成消費。上述兩個參數為FE配置項,更多說明和設定方式,請參見參數配置。
I0325 20:27:50.410579 15259 data_consumer_group.cpp:131] consumer group done: 41448fb1a0ca59ad-30e34dabfa7e47a0. consume time(ms)=3261, received rows=179190, received bytes=9855450, eos: 1, left_time: -261, left_bytes: 514432550, blocking get time(us): 3065086, blocking put time(us): 24855
正常情況下,該日誌的left_bytes >=0,表示在routine_load_task_consume_second時間內一次讀取的資料量還未超過max_routine_load_batch_size,說明調度的一批匯入任務都可以消費完Kafka的資料,不存在消費延遲,則此時您可以通過提高routine_load_task_consume_second的參數值,增大單個匯入任務消費分區的資料量 。
如果left_bytes < 0,則表示未在routine_load_task_consume_second規定時間,一次讀取的資料量已經達到max_routine_load_batch_size,每次Kafka的資料都會把調度的一批匯入任務填滿,因此極有可能Kafka還有剩餘資料沒有消費完,存在消費積壓,則可以提高max_routine_load_batch_size的參數值。
執行SHOW ROUTINE LOAD命令後,匯入作業狀態變為PAUSED或CANCELLED,該如何處理?
- 報錯提示:匯入作業變成PAUSED狀態,並且ReasonOfStateChanged報錯
Broker: Offset out of range
。- 原因分析:匯入作業的消費位點在Kafka分區中不存在。
- 解決方式:您可以通過執行命令
SHOW ROUTINE LOAD
,在Progress參數中查看匯入作業的最新消費位點,並且在Kafka分區中查看是否存在該位點的訊息。如果不存在,則可能有如下兩個原因:- 建立匯入作業時指定的消費位點為將來的時間點。
- Kafka分區中該位點的訊息還未被匯入作業消費,就已經被清理。建議您根據匯入作業的匯入速度設定合理的Kafka日誌清理策略和參數,例如log.retention.hours,log.retention.bytes等。
- 報錯提示:匯入作業變成PAUSED狀態。
- 原因分析:可能是匯入任務錯誤行數超過閾值max_error_number。
- 解決方式:您可以根據
ReasonOfStateChanged
,ErrorLogUrls
報錯進行排查。- 如果是資料來源的資料格式問題,則需要檢查資料來源資料格式,並進行修複。修複後您可以使用
RESUME ROUTINE LOAD
,恢複PAUSED狀態的匯入作業。 - 如果是資料來源的資料格式無法被StarRocks解析,則需要調整錯誤行數閾值max_error_number。
- 執行命令
SHOW ROUTINE LOAD
,查看錯誤行數閾值max_error_number。 - 執行命令
ALTER ROUTINE LOAD
,適當提高錯誤行數閾值max_error_number。 - 執行命令
RESUME ROUTINE LOAD
,恢複PAUSED狀態的匯入作業。
- 執行命令
- 如果是資料來源的資料格式問題,則需要檢查資料來源資料格式,並進行修複。修複後您可以使用
- 報錯提示:匯入作業變為CANCELLED狀態。
- 原因分析:可能是執行匯入任務時遇到異常。例如,表被刪除。
- 解決方式:您可以根據
ReasonOfStateChanged
或ErrorLogUrls
報錯進行排查和修複。但是修複後,您無法恢複CANCELLED狀態的匯入作業。
使用Routine Load消費Kafka寫入StarRocks是否能保證一致性語義?
Routine Load能夠保證一致性(Exactly-once)語義。
一個匯入任務是一個單獨的事務,如果該事務在執行過程中出現錯誤,則事務會中止,FE不會更新該匯入任務相關分區的消費進度。FE在下一次調度任務執行隊列中的匯入任務時,從上次儲存的分區消費位點發起消費請求,保證Exactly once語義。
報錯“Broker: Offset out of range”,該如何處理?
SHOW ROUTINE LOAD
查看最新的offset,使用Kafka用戶端查看該offset有沒有資料。可能原因是:- 匯入時指定了未來的offset。
- 還沒來得及匯入,Kafka已經將該offset的資料清理。需要根據StarRocks的匯入速度設定合理的log清理參數log.retention.hours、log.retention.bytes等。
Broker Load是否支援再次執行已經執行成功、且處於FINISHED狀態的匯入作業?
Broker Load不支援再次執行已經執行成功、且處於FINISHED狀態的匯入作業。而且,為了保證匯入作業的不丟不重,每個執行成功的匯入作業的標籤(Label)均不可複用。可以使用SHOW LOAD
命令查看歷史的匯入記錄,找到想要再次執行的匯入作業,複製作業資訊,並修改作業標籤後,重新建立一個匯入作業並執行。
Broker Load匯入HDFS資料時,資料的匯入日期欄位出現異常,比正確的日期時間多加了8小時,該如何處理?
- 報錯原因
StarRocks在建表時設定的timezone為中國時區,建立Broker Load匯入作業時設定的timezone也是中國時區,而伺服器設定的是UTC時區。因此,日期欄位在匯入時,比正確的日期時間多加了8小時。
- 解決方案
建表時去掉timezone參數。
Broker Load匯入ORC格式的資料時,報錯“ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>”,該如何處理?
- 報錯原因
待匯入資料檔案和Starrocks表兩側的列名不一致,執行
SET
子句的時候系統內部會有一個類型推斷,但是在調用cast函數執行資料類型轉換的時候失敗了。 - 解決方案
確保兩側的列名一致,這樣就不需要
SET
子句,也就不會調用cast函數執行資料類型轉換,即可匯入成功。
為什麼Broker Load匯入作業沒報錯,但是卻查詢不到資料?
Broker Load是一種非同步匯入方式,建立匯入作業的語句沒報錯,不代表匯入作業成功了。您可以通過SHOW LOAD
語句來查看匯入作業的結果狀態和errmsg
資訊,然後修改匯入作業的參數配置後,再重試匯入作業。
報錯“failed to send batch或TabletWriter add batch with unknown id”,該如何處理?
該錯誤由資料寫入逾時而引起。需要修改系統變數query_timeout和BE配置項streaming_load_rpc_max_alive_time_sec的參數值。BE配置項的更多資訊,請參見參數配置。
報錯“LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found”,該如何處理?
如果匯入的是Parquet或ORC格式的資料,則需要檢查檔案頭的列名是否與StarRocks表中的列名一致。
name
和id
列。如果沒有使用SET子句,則以column_list參數中指定的列作為映射。(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
如果匯入的是Apache Hive版本直接產生的ORC檔案,並且ORC檔案中的表頭是 (_col0, _col1, _col2, ...)
,則可能導致"Invalid Column Name"錯誤。此時需要使用SET子句設定列轉換規則。
匯入作業長時間沒有結束等問題應該如何處理?
在FE上的記錄檔fe.log中,根據匯入作業的標籤來搜尋匯入作業的ID。然後,在BE上的記錄檔be.INFO中,根據匯入作業的ID來搜尋上下文日誌,進而查看具體原因。
如何配置以訪問高可用 (HA) 模式下的Apache HDFS叢集?
通過配置NameNode HA,可以在NameNode切換時,自動識別到新的NameNode。配置以下參數用於訪問以HA模式部署的HDFS叢集。
參數 | 描述 |
dfs.nameservices | 指定HDFS服務的名稱,您可以自訂。 例如,設定dfs.nameservices為my_ha。 |
dfs.ha.namenodes.xxx | 自訂NameNode的名稱,多個名稱時以逗號(,)分隔。其中xxx為dfs.nameservices中自訂的名稱。 例如,設定dfs.ha.namenodes.my_ha為my_nn。 |
dfs.namenode.rpc-address.xxx.nn | 指定NameNode的RPC地址資訊。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名稱。 例如,設定dfs.namenode.rpc-address.my_ha.my_nn參數值的格式為host:port。 |
dfs.client.failover.proxy.provider | 指定Client串連NameNode的Provider,預設值為org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。 |
HA模式可以與簡單認證、Kerberos認證兩種認證方式組合,進行叢集訪問。例如,通過簡單認證方式訪問HA HDFS。
(
"username"="user",
"password"="passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
HDFS叢集的配置可以寫入hdfs-site.xml檔案中,您使用Broker進程讀取HDFS叢集的資訊時,只需要填寫叢集的檔案路徑名和認證資訊即可。
如何配置Hadoop ViewFS Federation?
需要將ViewFs相關的設定檔core-site.xml和hdfs-site.xml拷貝到broker/conf目錄中。
如果有自訂的檔案系統,還需要將檔案系統相關的.jar檔案拷貝到broker/lib目錄中。
訪問Kerberos認證的叢集時,報錯“Can't get Kerberos realm”,該如何處理?
- 首先檢查是否所有的Broker所在的機器都配置了/etc/krb5.conf檔案。
- 如果配置了仍然報錯,則需要在Broker的啟動指令碼中
JAVA_OPTS
變數的最後,加上-Djava.security.krb5.conf:/etc/krb5.conf
。
使用INSERT INTO語句匯入資料時,SQL每插入一條資料大約耗時50~100ms,能否最佳化執行效率?
因為INSERT INTO匯入方式為批量寫入,所以單條寫入和批量寫入的耗時相同。因此OLAP情境下不建議使用INSERT INTO語句單條寫入資料。
使用INSERT INTO SELECT語句匯入資料時,系統報錯“index channel has intoleralbe failure”,該如何解決?
該報錯是因為流式匯入RPC逾時導致。您可以通過在設定檔中調節RPC逾時相關參數解決。
- streaming_load_rpc_max_alive_time_sec:流式匯入RPC的逾時時間,預設為1200,單位為秒。
- tablet_writer_rpc_timeout_sec:TabletWriter的逾時時間長度,預設為600,單位為秒。
使用INSERT INTO SELECT語句匯入大量資料時會執行失敗,報錯“execute timeout”,該如何解決?
該報錯是因為Query逾時導致。您可以通過調節Session變數query_timeout
解決。該參數預設為600,單位為秒。
set query_timeout =xx;
執行Flink job,報錯“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,該如何解決?
- 報錯原因
在StarRocks-migrate-tools(簡稱SMT)設定檔config_prod.conf中設定了多組規則
[table-rule.1]
、[table-rule.2]
等,但是缺失必要的配置資訊。 - 解決方案
檢查是否給每組規則
[table-rule.1]
、[table-rule.2]
等配置了database,table和Flink Connector資訊。
Flink如何自動重啟失敗的Task?
Flink通過Checkpointing機制和重啟策略,自動重啟失敗的Task。
# unit: ms
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
execution.checkpointing.interval
: Checkpoint的基本時間間隔,單位為ms。如果需要啟用Checkpointing機制,則該參數值須大於0。state.backend
:啟動Checkpointing機制後,狀態會隨著CheckPoint而持久化,以防止資料丟失、保障恢複時的一致性。 狀態內部的儲存格式、狀態在CheckPoint時如何持久化以及持久化在哪裡均取決於選擇的State Backend。狀態更多介紹,請參見State Backends。state.checkpoints.dir
:Checkpoint資料存放區目錄。
如何手動停止Flink job,並且恢複Flink job至停止前的狀態?
您可以在停止Flink job時手動觸發Savepoint(Savepoint是依據Checkpointing機制所建立的流作業執行狀態的一致鏡像),後續可以從指定Savepoint中恢複Flink job。
使用Savepoint停止作業。自動觸發Flink job ID的Savepoint,並停止該job。此外,您可以指定一個目標檔案系統目錄來儲存Savepoint 。
bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
jobId
:您可以通過Flink WebUI查看Flink job ID,或者在命令列執行flink list -running
進行查看。targetDirectory
:您也可以在Flink設定檔flink-conf.yml中state.savepoints.dir
配置Savepoint的預設目錄。 觸發Savepoint時,使用此目錄來儲存Savepoint,無需指定目錄。state.savepoints.dir: [file://或hdfs://]/home/user/savepoints_dir
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql
使用事務介面的exactly-once時,匯入失敗,該如何解決?
- 問題描述:報錯資訊如下。
com.starrocks.data.load.stream.exception.StreamLoadFailException: { "TxnId": 3382****, "Label": "502c2770-cd48-423d-b6b7-9d8f9a59****", "Status": "Fail", "Message": "timeout by txn manager",--具體報錯資訊 "NumberTotalRows": 1637, "NumberLoadedRows": 1637, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 4284214, "LoadTimeMs": 120294, "BeginTxnTimeMs": 0, "StreamLoadPlanTimeMs": 7, "ReadDataTimeMs": 9, "WriteDataTimeMs": 120278, "CommitAndPublishTimeMs": 0 }
- 原因分析:
sink.properties.timeout
小於Flink checkpoint interval,導致事務逾時。 - 解決方式:需要調整該參數值大於Flink checkpoint interval。
flink-connector-jdbc_2.11 Sink到StarRocks時間落後8小時,該如何處理?
- 問題描述:Flink中localtimestap函數產生的時間,在Flink中時間正常,Sink到StarRocks後發現時間落後8小時。已確認Flink所在伺服器與StarRocks所在伺服器時區均為Asia/ShangHai東八區。Flink版本為1.12,驅動為flink-connector-jdbc_2.11。
- 解決方式:可以在Flink sink表中配置時區參數
'server-time-zone' = 'Asia/Shanghai'
,同時在url
參數裡添加&serverTimezone=Asia/Shanghai
。樣本如下。CREATE TABLE sk ( sid int, local_dtm TIMESTAMP, curr_dtm TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.**.**:9030/sys_device?characterEncoding=utf-8&serverTimezone=Asia/Shanghai', 'table-name' = 'sink', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'sr', 'password' = 'sr123', 'server-time-zone' = 'Asia/Shanghai' );
為什麼在Starrocks叢集上部署的Kafka叢集可以匯入資料,其他kafka叢集無法匯入資料?
- 問題描述:報錯資訊如下。
failed to query wartermark offset, err: Local: Bad message format
- 原因分析:kafka無法解析hostname。
- 解決方式:在StarRocks叢集節點配置Kafka主機名稱,解析/etc/hosts檔案。
為什麼沒有查詢時BE記憶體處於打滿狀態,且CPU也是打滿狀態?
因為BE會定期收集統計資訊,不是長期佔用CPU,10 GB以內的記憶體使用量完不會釋放,BE會自己管理,您可以通過tc_use_memory_min參數限制大小。
tc_use_memory_min,TCmalloc最小保留記憶體。預設值10737418240,只有超過該值,StarRocks才將空閑記憶體返還給作業系統。您可以在EMR控制台StarRocks服務配置頁簽的be.conf中配置。BE配置項的更多資訊,請參見參數配置。
為什麼BE申請的記憶體不會釋放給作業系統?
因為資料庫從作業系統獲得的大塊的記憶體配置,在分配的時候會多預留,釋放時候會延後,為了重複利用,大塊記憶體的分配的代價比較大。建議測試環境驗證時,對記憶體使用量進行監控,在較長的周期內看記憶體是否能夠完成釋放。
為什麼無法解析Flink Connector依賴?
- 原因分析:Flink Connector依賴需要通過阿里雲鏡像地址來擷取。/etc/maven/settings.xml的mirror部分未配置全部通過阿里雲鏡像擷取。
- 解決方式:修改阿里雲公用倉庫地址為
https://maven.aliyun.com/repository/public
。
Flink-connector-StarRocks中sink.buffer-flush.interval-ms參數是否生效?
- 問題描述:
sink.buffer-flush.interval-ms
參數設定為15s,但是checkpoint interval=5mins
,設定的sink.buffer-flush.interval-ms
參數還生效嗎?+----------------------+--------------------------------------------------------------+ | Option | Required | Default | Type | Description | +-------------------------------------------------------------------------------------+ | sink.buffer-flush. | NO | 300000 | String | the flushing time interval, | | interval-ms | | | | range: [1000ms, 3600000ms] | +----------------------+--------------------------------------------------------------+
- 解決方式:以下三個參數哪個先達到閾值,即觸發flush,和
checkpoint interval
設定的值沒關係,checkpoint interval
對於Exactly once語義才有效,At Least Once語義用的是sink.buffer-flush.interval-ms
。sink.buffer-flush.max-rows sink.buffer-flush.max-bytes sink.buffer-flush.interval-ms
使用DataX匯入支援更新資料嗎?
目前的版本中,主鍵模型已支援通過DataX更新資料。您需要在JSON設定檔的reader部分添加_op
欄位配置該功能。
使用DataX同步資料時,如何處理命名中使用的關鍵字,防止報錯?
您需要使用反引號(``)包圍相應的欄位。
報錯“When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment”,該如何解決?
使用Spark Load時沒有在Spark用戶端的spark-env.sh中配置HADOOP-CONF-DIR
環境變數。
提交Spark job時用到spark-submit命令,報錯“Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory”,該如何解決?
使用Spark Load時spark_home_default_dir
配置項沒有指定或者指定了錯誤的Spark用戶端根目錄。
報錯“File xxx/jars/spark-2x.zip does not exist”,該如何解決?
使用Spark Load時spark-resource-path
配置項沒有指向打包好的ZIP檔案,檢查指向檔案路徑和檔案名稱詞是否一致。
報錯“yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn”,該如何解決?
使用Spark Load時yarn-client-path
配置項沒有指定YARN的可執行檔。