本文為您介紹Realtime ComputeFlink版的SQL常見問題,包括作業常見問題、開發報錯、營運報錯。
運行拓撲圖中顯示的Low Watermark、Watermark以及Task InputWatermark指標顯示的時間和目前時間有時差?
開發報錯
營運報錯
為什麼使用POJO類作為UDTF傳回型別時欄位會出現“錯位”?
問題描述
當使用POJO類作為UDTF傳回型別,並在SQL中顯式聲明了UDTF返回列的別名列表(Alias Name)時,可能會出現欄位錯位(即使類型一致,但實際使用的欄位可能與預期不符)問題。
例如,如果使用如下POJO類作為UDTF的傳回型別,並根據自訂函數開發的要求進行打包並完成函數註冊(這裡使用作業級自訂函數註冊方式)後,SQL校正會失敗。
package com.aliyun.example; public class TestPojoWithoutConstructor { public int c; public String d; public boolean a; public String b; }
package com.aliyun.example; import org.apache.flink.table.functions.TableFunction; public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> { private static final long serialVersionUID = 1L; public void eval(String str1, Integer i2) { TestPojoWithoutConstructor p = new TestPojoWithoutConstructor(); p.d = str1 + "_d"; p.c = i2 + 2; p.b = str1 + "_b"; collect(p); } }
CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor'; CREATE TEMPORARY TABLE src ( id STRING, cnt INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE sink ( f1 INT, f2 STRING, f3 BOOLEAN, f4 STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);
SQL校正報錯資訊如下:
org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match. Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL. Hint: You will need to rewrite or cast the expression. Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING] Sink schema: [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING] at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
看起來從UDTF返回的欄位和POJO類中的欄位可能錯位了,SQL中欄位c最終是BOOLEAN,而欄位a是INT類型,和POJO類的定義恰好相反。
問題原因
根據POJO類的類型規則:
如果POJO類實現了有參建構函式,推導的傳回型別會按建構函式的參數列表順序。
如果POJO類缺少有參建構函式,就會按欄位名的字典序重排列。
在上述樣本中,由於UDTF傳回型別缺少有參建構函式,因此對應的傳回型別為
BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)
。雖然這一步並沒有產生錯誤,但因為SQL中對返回欄位加了重新命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b)
,這導致對推匯出的類型顯式進行了重新命名(基於欄位位置進行映射),進而引發與POJO類中的欄位錯位問題,出現校正異常或非預期的資料錯位問題。解決方案
POJO類缺少有參建構函式時,去掉對UDTF返回欄位的顯式重新命名,如將上述SQL的INSERT語句改為:
-- POJO類無有參建構函式時,推薦明確選取需要的欄位名,使用 T.* 時需要明確知曉實際返回的欄位順序。 SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
POJO類實現有參建構函式,以確定傳回型別的欄位順序。這種情況下UDTF傳回型別的欄位順序就是有參建構函式的參數順序。
package com.aliyun.example; public class TestPojoWithConstructor { public int c; public String d; public boolean a; public String b; // Using specific fields order instead of alphabetical order public TestPojoWithConstructor(int c, String d, boolean a, String b) { this.c = c; this.d = d; this.a = a; this.b = b; } }
為什麼資料在LocalGroupAggregate節點中長時間卡住,無輸出?
問題描述
如果作業未設定
table.exec.mini-batch.size
或設定table.exec.mini-batch.size
為負,並且作業還包含WindowAggregate和GroupAggregate,且WindowAggregate的時間列為事件時間(proctime),在這種情況下啟動作業,在作業拓撲圖中會看到包含LocalGroupAggregate節點,但缺失MiniBatchAssigner節點。作業中同時包含WindowAggregate和GroupAggregate,且WindowAggregate的時間列為事件時間(proctime)的程式碼範例如下。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts as PROCTIME(), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a BIGINT, b BIGINT ) WITH ( 'connector'='print' ); CREATE TEMPORARY VIEW window_view AS SELECT window_start, window_end, a, sum(b) as b_sum FROM TABLE(TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '2' SECONDS)) GROUP BY window_start, window_end, a; INSERT INTO sink SELECT count(distinct a), b_sum FROM window_view GROUP BY b_sum;
問題原因
在table.exec.mini-batch.size未配置或者為負的情況下,MiniBatch處理模式會使用Managed Memory快取資料,但是此時會錯誤地無法產生MiniBatchAssigner節點,因此計算節點無法收到MinibatchAssigner節點發送的Watermark訊息後觸發計算和輸出,只能在三種條件(Managed Memory已滿、進行Checkpoint前和作業停止時)之一下觸發計算和輸出,詳情請參見table.exec.mini-batch.size。如果此時Checkpoint間隔設定過大,就會導致資料積攢在LocalGroupAggregate節點中,長時間無法輸出。
解決方案
調小Checkpoint間隔,讓LocalGroupAggregate節點在執行Checkpoint前自動觸發輸出。
通過Heap Memory來快取資料,讓LocalGroupAggregate節點內快取資料達到N條時自動觸發輸出。即在
頁面的部署詳情頁運行參數配置地區其他配置中,設定table.exec.mini-batch.size參數為正值N。
運行拓撲圖中顯示的Low Watermark、Watermark以及Task InputWatermark指標顯示的時間和目前時間有時差?
原因1:聲明源表Watermark時使用了TIMESTAMP_LTZ(TIMESTAMP(p) WITH LOCAL TIME ZONE)類型,導致Watermark和目前時間有時差。
下文以具體的樣本為您展示使用TIMESTAMP_LTZ類型和TIMESTAMP類型對應的Watermark指標差異。
源表中Watermark聲明使用的欄位是TIMESTAMP_LTZ類型。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts as CURRENT_TIMESTAMP,--使用CURRENT_TIMESTAMP內建函數產生TIMESTAMP_LTZ類型。 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- 輸出計算結果。 INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;
說明Legacy Window對應的老文法和TVF Window(Table-Valued Function)產生的結果是一致的。以下為Legacy Window對應的老文法的範例程式碼。
SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;
在Flink開發控制台將作業部署上線運行後,以北京時間為例,可以觀察到作業運行拓撲圖及監控警示上顯示的Watermark和目前時間存在8小時時差。
Watermark&Low Watermark
Task InputWatermark
源表中Watermark聲明使用的欄位是TIMESTAMP(TIMESTAMP(p) WITHOUT TIME ZONE)類型。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, -- 類比資料源中的TIMESTAMP無時區資訊,從2024-01-31 01:00:00開始逐秒累加。 ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- 輸出計算結果。 INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;
在Flink開發控制台上將作業部署上線運行後,可以觀察到作業運行拓撲圖及監控警示上顯示的Watermark和目前時間是同步的(本樣本是與類比資料的時間同步的),不存在時差現象。
Watermark&Low Watermark
Task InputWatermark
原因2:Flink開發控制台和Apache Flink UI的展示時間存在時區差異。
Flink開發控制台UI介面是以UTC+0顯示時間,而Apache Flink UI是通過瀏覽器擷取本地時區並進行相應的時間轉換後的本地時間。以北京時間為例,為您展示二者顯示區別,您會觀察到在Flink開發控制台顯示的時間比Apache Flink UI時間慢8小時。
Flink開發控制台
Apache Flink UI
報錯:undefined
報錯詳情
報錯原因
您的JAR包較大。
解決方案
您可以在OSS管理主控台上傳JAR包,詳情請參見如何在OSS控制台上傳JAR包?
報錯:Object '****' not found
報錯詳情
單擊深度檢查後,出現報錯詳情如下。
報錯原因
在DDL和DML同在一個文本中提交運行時,DDL沒有聲明為CREATE TEMPORARY TABLE。
解決方案
在DDL和DML同在一個文本中提交運行時,DDL需要聲明為CREATE TEMPORARY TABLE,而不是聲明為CREATE TABLE。
報錯:Only a single 'INSERT INTO' is supported
報錯詳情
單擊深度檢查後,驗證報錯詳情如下。
報錯原因
多個DML語句沒有寫在關鍵語句
BEGIN STATEMENT SET;
和END;
之間。解決方案
將多個DML語句寫在
BEGIN STATEMENT SET;
和END;
之間。詳情請參見INSERT INTO語句。
報錯:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
報錯詳情
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true' at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186) at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ... 30 more
報錯原因
Realtime Compute引擎vvr-3.0.7-flink-1.12及以前的版本,CDC Source只能單並發運行。但在Realtime Compute引擎vvr-4.0.8-flink-1.13版本後增加了按PK分區進行多並發讀取資料的功能並預設開啟該功能(scan.incremental.snapshot.enabled預設設定為true),在該功能下必須要配置主鍵。
解決方案
如果您使用Realtime Compute引擎vvr-4.0.8-flink-1.13及以後的版本,則可以根據需求來選擇解決方案:
如果您需要多並發讀取MySQL CDC的資料,則在DDL中必須配置主鍵(PK)。
如果您不需要多並發讀取MySQL CDC的資料,需要將scan.incremental.snapshot.enabled設定為false,參數配置詳情請參見WITH參數。
報錯:exceeded quota: resourcequota
報錯詳情
作業啟動過程中報錯。
報錯原因
當前專案空間資源不足導致作業啟動失敗。
解決方案
您需要對專案資源進行資源變更配置,詳情請參見資源調整。
報錯:Exceeded checkpoint tolerable failure threshold
報錯詳情
作業運行過程中報錯。
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
報錯原因
未設定任務允許Checkpoint失敗的次數,系統預設Checkpoint失敗一次就觸發一次Failover。
解決方案
在
頁面,單擊目標作業名稱。在部署詳情頁簽,單擊運行參數配置地區右側的編輯。
在其他配置文字框,輸入如下參數。
execution.checkpointing.tolerable-failed-checkpoints: num
您需要設定num值來調整任務允許Checkpoint失敗的次數。num需要為0或正整數。如果num為0時,則表示不允許存在任何Checkpoint異常或者失敗。
報錯:Flink version null is not configured for sql
報錯詳情
StatusRuntimeException: INTERNAL: Flink version null is not configured for sql.
報錯原因
系統升級至VVR 4.0.8,導致作業的Flink計算引擎版本資訊沒有了。
解決方案
在作業開發頁面右側更多配置頁簽中,配置正確的Flink計算引擎版本。
說明如果您需要使用調試功能,則還需要檢查Session叢集頁面的引擎版本是否選擇正確。
INFO:org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss
報錯詳情
報錯原因
OSS每次建立新目錄時,會先檢查是否存在該目錄,如果不存在,就會報這個INFO資訊,但該INFO資訊不影響Flink作業運行。
解決方案
在日誌模板中添加
<Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
。詳情請參見配置作業日誌輸出。
報錯:DateTimeParseException: Text 'xxx' could not be parsed
報錯詳情
作業運行過程中,會出現報錯
DateTimeParseException: Text 'xxx' could not be parsed
。報錯原因
VVR 4.0.13以下版本,您在DDL中聲明的日期格式和實際資料的格式不一致,Flink系統會直接報錯。
解決方案
VVR 4.0.13及以上版本對JSON格式(Json、Canal Json、Debezium Json、Maxwell Json和Ogg Json)中TIMESTAMP類型的資料解析進行了增強,提升資料解析的能力。資料解析增強詳情如下:
支援聲明的TIMESTAMP類型解析DATE格式資料。
支援聲明的TIMESTAMP_LTZ類型解析DATE或TIMESTAMP格式資料。
Flink系統根據您設定的table.local-time-zone的時區資訊來轉換TIMESTAMP資料至TIMESTAMP_LTZ。例如,在DDL中聲明如下資訊。
CREATE TABLE source ( date_field TIMESTAMP, timestamp_field TIMESTAMP_LTZ(3) ) WITH ( 'format' = 'json', ... );
當解析資料 {"date_field": "2020-09-12", "timestamp_field": "2020-09-12T12:00:00"} ,且在當前時區為東八區的情況下,會得到如下結果:"+I(2020-09-12T00:00:00, 2020-09-12T04:00:00.000Z)"。
支援自動解析TIMESTAMP或TIMESTAMP_LTZ格式。
增強前,JSON Format在解析TIMESTAMP資料時,需要您正確設定timestamp-format.standard為SQL或ISO-8601,資料才能被正確解析。增強後,Flink系統會自動推導TIMESTAMP的格式並解析,如果無法正確解析,則會報告錯誤。您手動設定的timestamp-format.standard的值會作為提示供解析器使用。
報錯:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'
報錯詳情
Cause by:java.sql.SQLSyntaxErrorException:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name' at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) ...
報錯原因
MySQL的CDC流結合where條件過濾使用時,update類型的資料會發送update_before和update_after兩條資料到下遊,update_before資料到下遊會被識別為DELETE操作,需要使用者具有DELETE許可權。
解決方案
檢查SQL邏輯是否存在retract相關操作,如果存在相關操作,給結果表的操作使用者賦予DELETE許可權。
報錯:java.io.EOFException: SSL peer shut down incorrectly
報錯詳情
Caused by: java.io.EOFException: SSL peer shut down incorrectly at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302] at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302] at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?] at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?] at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?] at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?] at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?] at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?] at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?] at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?] at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?] at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?] ... 14 more
報錯原因
在MySQL driver版本為8.0.27時,MySQL資料庫開啟了SSL協議,但預設訪問方式不通過SSL串連資料庫,導致報錯。
解決方案
建議WITH參數中connector設定為rds,且MySQL維表URL參數中追加
characterEncoding=utf-8&useSSL=false
,例如:'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'
報錯:binlog probably contains events generated with statement or mixed based replication format
報錯詳情
Caused by: io.debezium.DebeziumException: Received DML 'insert into table_name (...) values (...)' for processing, binlog probably contains events generated with statement or mixed based replication format
報錯原因
MySQL CDC Binlog不可以為mixed格式,只能為ROW格式。
解決方案
在MySQL產品側,通過
show variables like "binlog_format"
命令查看當前模式的Binlog格式。說明您可以使用
show global variables like "binlog_format"
查看全域模式的Binlog格式。在MySQL產品側,將Binlog格式設定為ROW格式。
重啟作業生效。
報錯:java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
報錯詳情
Causedby:java.lang.ClassCastException:org.codehaus.janino.CompilerFactorycannotbecasttoorg.codehaus.commons.compiler.ICompilerFactory atorg.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) atorg.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79) atorg.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426) ...66more
報錯原因
JAR包中引入了會發生衝突的janino依賴。
UDF JAR或連接器JAR中,誤打入Flink中的某些依賴(例如flink-table-planner和flink-table-runtime)。
解決方案
分析JAR包裡面是否含有org.codehaus.janino.CompilerFactory。因為在不同機器上的Class載入順序不一樣,所以有時候出現類衝突。該問題的解決步驟如下:
在
頁面,單擊目標作業名稱。在部署詳情頁簽,單擊運行參數配置地區右側的編輯。
在其他配置文字框,輸入如下參數。
classloader.parent-first-patterns.additional: org.codehaus.janino
其中,參數的value值需要替換為衝突的類。