全部產品
Search
文件中心

Realtime Compute for Apache Flink:SQL常見問題

更新時間:Oct 13, 2024

本文為您介紹Realtime ComputeFlink版的SQL常見問題,包括作業常見問題、開發報錯、營運報錯。

為什麼使用POJO類作為UDTF傳回型別時欄位會出現“錯位”?

  • 問題描述

    當使用POJO類作為UDTF傳回型別,並在SQL中顯式聲明了UDTF返回列的別名列表(Alias Name)時,可能會出現欄位錯位(即使類型一致,但實際使用的欄位可能與預期不符)問題。

    例如,如果使用如下POJO類作為UDTF的傳回型別,並根據自訂函數開發的要求進行打包並完成函數註冊(這裡使用作業級自訂函數註冊方式)後,SQL校正會失敗。

    package com.aliyun.example;
    
    public class TestPojoWithoutConstructor {
    	public int c;
    	public String d;
    	public boolean a;
    	public String b;
    }
    package com.aliyun.example;
    
    import org.apache.flink.table.functions.TableFunction;
    
    public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> {
    	private static final long serialVersionUID = 1L;
    
    	public void eval(String str1, Integer i2) {
    		TestPojoWithoutConstructor p = new TestPojoWithoutConstructor();
    		p.d = str1 + "_d";
    		p.c = i2 + 2;
    		p.b = str1 + "_b";
    		collect(p);
    	}
    }
    CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor';
    
    CREATE TEMPORARY TABLE src ( 
      id STRING,
      cnt INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE sink ( 
      f1 INT,
      f2 STRING,
      f3 BOOLEAN,
      f4 STRING
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink
    SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);

    SQL校正報錯資訊如下:

    org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match.
    Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL.
    Hint: You will need to rewrite or cast the expression.
    
    Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING]
    Sink schema:  [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING]
    	at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)

    看起來從UDTF返回的欄位和POJO類中的欄位可能錯位了,SQL中欄位c最終是BOOLEAN,而欄位a是INT類型,和POJO類的定義恰好相反。

  • 問題原因

    根據POJO類的類型規則:

    • 如果POJO類實現了有參建構函式,推導的傳回型別會按建構函式的參數列表順序。

    • 如果POJO類缺少有參建構函式,就會按欄位名的字典序重排列。

    在上述樣本中,由於UDTF傳回型別缺少有參建構函式,因此對應的傳回型別為BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)。雖然這一步並沒有產生錯誤,但因為SQL中對返回欄位加了重新命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b),這導致對推匯出的類型顯式進行了重新命名(基於欄位位置進行映射),進而引發與POJO類中的欄位錯位問題,出現校正異常或非預期的資料錯位問題。

  • 解決方案

    • POJO類缺少有參建構函式時,去掉對UDTF返回欄位的顯式重新命名,如將上述SQL的INSERT語句改為:

      -- POJO類無有參建構函式時,推薦明確選取需要的欄位名,使用 T.* 時需要明確知曉實際返回的欄位順序。
      SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
    • POJO類實現有參建構函式,以確定傳回型別的欄位順序。這種情況下UDTF傳回型別的欄位順序就是有參建構函式的參數順序。

      package com.aliyun.example;
      
      public class TestPojoWithConstructor {
      	public int c;
      	public String d;
      	public boolean a;
      	public String b;
      
      	// Using specific fields order instead of alphabetical order
      	public TestPojoWithConstructor(int c, String d, boolean a, String b) {
      		this.c = c;
      		this.d = d;
      		this.a = a;
      		this.b = b;
      	}
      }

為什麼資料在LocalGroupAggregate節點中長時間卡住,無輸出?

  • 問題描述

    如果作業未設定table.exec.mini-batch.size或設定table.exec.mini-batch.size為負,並且作業還包含WindowAggregate和GroupAggregate,且WindowAggregate的時間列為事件時間(proctime),在這種情況下啟動作業,在作業拓撲圖中會看到包含LocalGroupAggregate節點,但缺失MiniBatchAssigner節點。

    image

    作業中同時包含WindowAggregate和GroupAggregate,且WindowAggregate的時間列為事件時間(proctime)的程式碼範例如下。

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts as PROCTIME(),
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a BIGINT,
      b BIGINT
    ) WITH (
      'connector'='print'
    );
    
    CREATE TEMPORARY VIEW window_view AS
    SELECT window_start, window_end, a, sum(b) as b_sum FROM TABLE(TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '2' SECONDS)) GROUP BY window_start, window_end, a;
    
    INSERT INTO sink SELECT count(distinct a), b_sum FROM window_view GROUP BY b_sum;
  • 問題原因

    在table.exec.mini-batch.size未配置或者為負的情況下,MiniBatch處理模式會使用Managed Memory快取資料,但是此時會錯誤地無法產生MiniBatchAssigner節點,因此計算節點無法收到MinibatchAssigner節點發送的Watermark訊息後觸發計算和輸出,只能在三種條件(Managed Memory已滿、進行Checkpoint前和作業停止時)之一下觸發計算和輸出,詳情請參見table.exec.mini-batch.size。如果此時Checkpoint間隔設定過大,就會導致資料積攢在LocalGroupAggregate節點中,長時間無法輸出。

  • 解決方案

    • 調小Checkpoint間隔,讓LocalGroupAggregate節點在執行Checkpoint前自動觸發輸出。

    • 通過Heap Memory來快取資料,讓LocalGroupAggregate節點內快取資料達到N條時自動觸發輸出。即在營運中心 > 作業營運頁面的部署詳情運行參數配置地區其他配置中,設定table.exec.mini-batch.size參數為正值N。

運行拓撲圖中顯示的Low Watermark、Watermark以及Task InputWatermark指標顯示的時間和目前時間有時差?

  • 原因1:聲明源表Watermark時使用了TIMESTAMP_LTZ(TIMESTAMP(p) WITH LOCAL TIME ZONE)類型,導致Watermark和目前時間有時差。

    下文以具體的樣本為您展示使用TIMESTAMP_LTZ類型和TIMESTAMP類型對應的Watermark指標差異。

    • 源表中Watermark聲明使用的欄位是TIMESTAMP_LTZ類型。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts as CURRENT_TIMESTAMP,--使用CURRENT_TIMESTAMP內建函數產生TIMESTAMP_LTZ類型。
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND 
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE t1 (
        k INT,
        ts_ltz timestamp_ltz(3),
        cnt BIGINT
      ) WITH ('connector' = 'print');
      
      -- 輸出計算結果。
      INSERT INTO t1
      SELECT b, window_start, COUNT(*) FROM
      TABLE(
          TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND))
      GROUP BY b, window_start, window_end;
      說明

      Legacy Window對應的老文法和TVF Window(Table-Valued Function)產生的結果是一致的。以下為Legacy Window對應的老文法的範例程式碼。

      SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;

      在Flink開發控制台將作業部署上線運行後,以北京時間為例,可以觀察到作業運行拓撲圖及監控警示上顯示的Watermark和目前時間存在8小時時差。

      • Watermark&Low Watermark

        image

      • Task InputWatermark

        image

    • 源表中Watermark聲明使用的欄位是TIMESTAMP(TIMESTAMP(p) WITHOUT TIME ZONE)類型。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        -- 類比資料源中的TIMESTAMP無時區資訊,從2024-01-31 01:00:00開始逐秒累加。
        ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND 
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE t1 (
        k INT,
        ts_ltz timestamp_ltz(3),
        cnt BIGINT
      ) WITH ('connector' = 'print');
      
      -- 輸出計算結果。
      INSERT INTO t1
      SELECT b, window_start, COUNT(*) FROM
      TABLE(
          TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND))
      GROUP BY b, window_start, window_end;

      在Flink開發控制台上將作業部署上線運行後,可以觀察到作業運行拓撲圖及監控警示上顯示的Watermark和目前時間是同步的(本樣本是與類比資料的時間同步的),不存在時差現象。

      • Watermark&Low Watermark

        image

      • Task InputWatermark

        image

  • 原因2:Flink開發控制台和Apache Flink UI的展示時間存在時區差異。

    Flink開發控制台UI介面是以UTC+0顯示時間,而Apache Flink UI是通過瀏覽器擷取本地時區並進行相應的時間轉換後的本地時間。以北京時間為例,為您展示二者顯示區別,您會觀察到在Flink開發控制台顯示的時間比Apache Flink UI時間慢8小時。

    • Flink開發控制台

      image

    • Apache Flink UI

      image

報錯:undefined

報錯:Object '****' not found

  • 報錯詳情

    單擊深度檢查後,出現報錯詳情如下。報錯詳情

  • 報錯原因

    在DDL和DML同在一個文本中提交運行時,DDL沒有聲明為CREATE TEMPORARY TABLE。

  • 解決方案

    在DDL和DML同在一個文本中提交運行時,DDL需要聲明為CREATE TEMPORARY TABLE,而不是聲明為CREATE TABLE。

報錯:Only a single 'INSERT INTO' is supported

  • 報錯詳情

    單擊深度檢查後,驗證報錯詳情如下。報錯詳情

  • 報錯原因

    多個DML語句沒有寫在關鍵語句BEGIN STATEMENT SET;END;之間。

  • 解決方案

    將多個DML語句寫在BEGIN STATEMENT SET;END;之間。詳情請參見INSERT INTO語句

報錯:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

  • 報錯詳情

    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • 報錯原因

    Realtime Compute引擎vvr-3.0.7-flink-1.12及以前的版本,CDC Source只能單並發運行。但在Realtime Compute引擎vvr-4.0.8-flink-1.13版本後增加了按PK分區進行多並發讀取資料的功能並預設開啟該功能(scan.incremental.snapshot.enabled預設設定為true),在該功能下必須要配置主鍵。

  • 解決方案

    如果您使用Realtime Compute引擎vvr-4.0.8-flink-1.13及以後的版本,則可以根據需求來選擇解決方案:

    • 如果您需要多並發讀取MySQL CDC的資料,則在DDL中必須配置主鍵(PK)。

    • 如果您不需要多並發讀取MySQL CDC的資料,需要將scan.incremental.snapshot.enabled設定為false,參數配置詳情請參見WITH參數

報錯:exceeded quota: resourcequota

  • 報錯詳情

    作業啟動過程中報錯。報錯

  • 報錯原因

    當前專案空間資源不足導致作業啟動失敗。

  • 解決方案

    您需要對專案資源進行資源變更配置,詳情請參見資源調整

報錯:Exceeded checkpoint tolerable failure threshold

  • 報錯詳情

    作業運行過程中報錯。

    org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold.
      at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
  • 報錯原因

    未設定任務允許Checkpoint失敗的次數,系統預設Checkpoint失敗一次就觸發一次Failover。

  • 解決方案

    1. 營運中心 > 作業營運頁面,單擊目標作業名稱。

    2. 部署詳情頁簽,單擊運行參數配置地區右側的編輯

    3. 其他配置文字框,輸入如下參數。

      execution.checkpointing.tolerable-failed-checkpoints: num

      您需要設定num值來調整任務允許Checkpoint失敗的次數。num需要為0或正整數。如果num為0時,則表示不允許存在任何Checkpoint異常或者失敗。

報錯:Flink version null is not configured for sql

  • 報錯詳情

    StatusRuntimeException: INTERNAL: Flink version null is not configured for sql.
  • 報錯原因

    系統升級至VVR 4.0.8,導致作業的Flink計算引擎版本資訊沒有了。

  • 解決方案

    在作業開發頁面右側更多配置頁簽中,配置正確的Flink計算引擎版本。引擎版本

    說明

    如果您需要使用調試功能,則還需要檢查Session叢集頁面的引擎版本是否選擇正確。

INFO:org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss

  • 報錯詳情報錯詳情

  • 報錯原因

    OSS每次建立新目錄時,會先檢查是否存在該目錄,如果不存在,就會報這個INFO資訊,但該INFO資訊不影響Flink作業運行。

  • 解決方案

    在日誌模板中添加<Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>。詳情請參見配置作業日誌輸出

報錯:DateTimeParseException: Text 'xxx' could not be parsed

  • 報錯詳情

    作業運行過程中,會出現報錯DateTimeParseException: Text 'xxx' could not be parsed

  • 報錯原因

    VVR 4.0.13以下版本,您在DDL中聲明的日期格式和實際資料的格式不一致,Flink系統會直接報錯。

  • 解決方案

    VVR 4.0.13及以上版本對JSON格式(Json、Canal Json、Debezium Json、Maxwell Json和Ogg Json)中TIMESTAMP類型的資料解析進行了增強,提升資料解析的能力。資料解析增強詳情如下:

    • 支援聲明的TIMESTAMP類型解析DATE格式資料。

    • 支援聲明的TIMESTAMP_LTZ類型解析DATE或TIMESTAMP格式資料。

      Flink系統根據您設定的table.local-time-zone的時區資訊來轉換TIMESTAMP資料至TIMESTAMP_LTZ。例如,在DDL中聲明如下資訊。

      CREATE TABLE source (
        date_field TIMESTAMP,
        timestamp_field TIMESTAMP_LTZ(3)
      ) WITH (
        'format' = 'json',
        ...
      );

      當解析資料 {"date_field": "2020-09-12", "timestamp_field": "2020-09-12T12:00:00"} ,且在當前時區為東八區的情況下,會得到如下結果:"+I(2020-09-12T00:00:00, 2020-09-12T04:00:00.000Z)"。

    • 支援自動解析TIMESTAMP或TIMESTAMP_LTZ格式。

      增強前,JSON Format在解析TIMESTAMP資料時,需要您正確設定timestamp-format.standard為SQL或ISO-8601,資料才能被正確解析。增強後,Flink系統會自動推導TIMESTAMP的格式並解析,如果無法正確解析,則會報告錯誤。您手動設定的timestamp-format.standard的值會作為提示供解析器使用。

報錯:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'

  • 報錯詳情

    Cause by:java.sql.SQLSyntaxErrorException:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        ...
  • 報錯原因

    MySQL的CDC流結合where條件過濾使用時,update類型的資料會發送update_before和update_after兩條資料到下遊,update_before資料到下遊會被識別為DELETE操作,需要使用者具有DELETE許可權。

  • 解決方案

    檢查SQL邏輯是否存在retract相關操作,如果存在相關操作,給結果表的操作使用者賦予DELETE許可權。

報錯:java.io.EOFException: SSL peer shut down incorrectly

  • 報錯詳情

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • 報錯原因

    在MySQL driver版本為8.0.27時,MySQL資料庫開啟了SSL協議,但預設訪問方式不通過SSL串連資料庫,導致報錯。

  • 解決方案

    建議WITH參數中connector設定為rds,且MySQL維表URL參數中追加characterEncoding=utf-8&useSSL=false,例如:

    'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

報錯:binlog probably contains events generated with statement or mixed based replication format

  • 報錯詳情

    Caused by: io.debezium.DebeziumException: Received DML 'insert into table_name (...) values (...)' for processing, 
    binlog probably contains events generated with statement or mixed based replication format
  • 報錯原因

    MySQL CDC Binlog不可以為mixed格式,只能為ROW格式。

  • 解決方案

    1. 在MySQL產品側,通過show variables like "binlog_format"命令查看當前模式的Binlog格式。

      說明

      您可以使用show global variables like "binlog_format"查看全域模式的Binlog格式。

    2. 在MySQL產品側,將Binlog格式設定為ROW格式。

    3. 重啟作業生效。

報錯:java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

  • 報錯詳情

    Causedby:java.lang.ClassCastException:org.codehaus.janino.CompilerFactorycannotbecasttoorg.codehaus.commons.compiler.ICompilerFactory
        atorg.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
        atorg.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
        atorg.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)
        ...66more
  • 報錯原因

    • JAR包中引入了會發生衝突的janino依賴。

    • UDF JAR或連接器JAR中,誤打入Flink中的某些依賴(例如flink-table-planner和flink-table-runtime)。

  • 解決方案

    分析JAR包裡面是否含有org.codehaus.janino.CompilerFactory。因為在不同機器上的Class載入順序不一樣,所以有時候出現類衝突。該問題的解決步驟如下:

    1. 營運中心 > 作業營運頁面,單擊目標作業名稱。

    2. 部署詳情頁簽,單擊運行參數配置地區右側的編輯

    3. 其他配置文字框,輸入如下參數。

      classloader.parent-first-patterns.additional: org.codehaus.janino

      其中,參數的value值需要替換為衝突的類。