All Products
Search
Document Center

Realtime Compute for Apache Flink:Processing-time temporal join statements

Last Updated:Dec 18, 2024

In Realtime Compute for Apache Flink, each data stream can be associated with a dimension table of an external data source. This allows you to perform associated queries in Realtime Compute for Apache Flink. This topic describes how to use processing-time temporal joins.

Background

A processing-time temporal join uses a processing-time attribute to correlate rows in a fact table to the latest versions of a corresponding key in a dimension table. Unlike an event-time temporal join that correlates rows based on the time when events occurred, a processing-time temporal join correlates rows based on the data arrival time.

Prerequisites

  • Realtime Compute for Apache Flink uses Ververica Runtime (VVR) 8.0.10 or later.

  • MySQL dimension tables are created.

Usage notes

  • Disable checkpointing during full synchronization by configuring execution.checkpointing.interval-during-backlog = 0, as Realtime Compute for Apache Flink does not support checkpointing during full synchronization. This configuration will not affect checkpointing during incremental data synchronization.

  • To use processing-time temporal joins, configure table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOIN.

Syntax

The syntax of the processing-time temporal join is the same as that of joining dimension tables:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

Example

  • Test data

    • Table 1 kafka_input

      id (bigint)

      name (varchar)

      age (bigint)

      1

      Lee

      22

      2

      Harry

      20

      3

      Liban

      28

    • Table 2 phoneNumber

      name (varchar)

      phoneNumber (bigint)

      David

      1390000111

      Brooks

      1390000222

      Liban

      1390000333

      Lee

      1390000444

  • Test code

    SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';  -- Use processing-time temporal joins. 
    SET 'execution.checkpointing.interval-during-backlog' = '0';              -- Disable checkpointing during full synchronization. 
    
    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    -- Define a dimension table.
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w
    ON t.name = w.name;
  • Test results

    id (bigint)

    phoneNumber (bigint)

    name (varchar)

    1

    1390000444

    Lee

    3

    1390000333

    Liban