全部產品
Search
文件中心

Realtime Compute for Apache Flink:Processing Time Temporal Join語句

更新時間:Nov 23, 2024

對於每條流式資料,可以關聯一個外部維表資料來源,為Realtime ComputeFlink版提供資料關聯查詢。

背景資訊

Processing Time Temporal Join使用處理時間(Processing Time)屬性,將事實表中的每條資料與維表的的最新資料進行關聯處理。與事件時間(Event Time)不同,處理時間並不關注事件實際發生的時刻,而是依據資料到達處理系統的時間點。

使用限制

  • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

  • 僅MySQL維表支援使用Processing Time Temporal Join。

注意事項

  • 不支援全量同步階段下的Checkpoint,需要配置execution.checkpointing.interval-during-backlog: 0參數關閉Checkpoint,而增量同步處理階段不受影響。

  • 使用Processing Time Temporal Join時,需配置table.optimizer.proctime-temporal-join-strategy: TEMPORAL_JOIN參數。

文法格式

Processing Time Temporal Join文法與維表Join文法一樣:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

使用樣本

  • 測試資料

    • 表1 kafka_input

      id(bigint)

      name(varchar)

      age(bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • 表2 phoneNumber

      name(varchar)

      phoneNumber(bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • 測試語句

    SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';  -- 使用Processing Time Temporal Join。
    SET 'execution.checkpointing.interval-during-backlog' = '0';              -- 關閉全量階段下的Checkpoint。
    
    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    -- 維表
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w
    ON t.name = w.name;
  • 測試結果

    id(bigint)

    phoneNumber(bigint)

    name(varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai