In Realtime Compute for Apache Flink, each data stream can be associated with a dimension table of an external data source. This allows you to perform associated queries in Realtime Compute for Apache Flink. This topic describes how to use processing-time temporal joins.
Background
A processing-time temporal join uses a processing-time attribute to correlate rows in a fact table to the latest versions of a corresponding key in a dimension table. Unlike an event-time temporal join that correlates rows based on the time when events occurred, a processing-time temporal join correlates rows based on the data arrival time.
Prerequisites
Realtime Compute for Apache Flink uses Ververica Runtime (VVR) 8.0.10 or later.
MySQL dimension tables are created.
Usage notes
Disable checkpointing during full synchronization by configuring
execution.checkpointing.interval-during-backlog = 0
, as Realtime Compute for Apache Flink does not support checkpointing during full synchronization. This configuration will not affect checkpointing during incremental data synchronization.To use processing-time temporal joins, configure
table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOIN
.
Syntax
The syntax of the processing-time temporal join is the same as that of joining dimension tables:
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
Example
Test data
Table 1 kafka_input
id (bigint)
name (varchar)
age (bigint)
1
Lee
22
2
Harry
20
3
Liban
28
Table 2 phoneNumber
name (varchar)
phoneNumber (bigint)
David
1390000111
Brooks
1390000222
Liban
1390000333
Lee
1390000444
Test code
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN'; -- Use processing-time temporal joins. SET 'execution.checkpointing.interval-during-backlog' = '0'; -- Disable checkpointing during full synchronization. CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); -- Define a dimension table. CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w ON t.name = w.name;
Test results
id (bigint)
phoneNumber (bigint)
name (varchar)
1
1390000444
Lee
3
1390000333
Liban