Flink SQL supports complex and flexible join operations over dynamic tables. This topic describes how to use regular join statements.
Background information
JOIN statements for real-time computing are semantically equivalent to those for batch processing. Both statements are used to join two tables. The difference is that the join results in real-time computing are continuously updated because the tables are dynamic. This ensures that the final results are consistent with the results of batch processing.
Syntax
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference: specifies the table name.
tableExpression: specifies the expression.
joinCondition: specifies the join condition.
Hints
If you set the engine version to Ververica Runtime (VVR) 8.0.1 or later, you can use hints to specify different time-to-live (TTL) values for the states of the left and right streams in a regular join. This helps reduce the size of the state data to be maintained.
Syntax
-- VVR 8.0.1 and later versions use the following syntax: SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ... -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink: SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
Precautions
Use the JOIN_STATE_TTL hint only for regular joins. The hint does not support lookup, interval, and window joins.
If you use the JOIN_STATE_TTL hint to specify the state TTL only for one stream in a regular join, the other stream uses the deployment-level state TTL specified by the table.exec.state.ttl parameter. The default value is 1.5 days. For more information about this parameter, see Basic parameters.
You can set the tableReference parameter to a table name, a view name, or an alias. If you specify an alias for a table, you must use the alias.
Hints for regular joins are experimental features. The syntax may change in the future.
Examples
-- Use an alias in the hint. SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink: SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- Use a table name in the hint. SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink: SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- Use a view name in the hint. CREATE TEMPORARY VIEW v AS SELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmp WHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id; -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink: SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
Example 1: Join the Orders and Products tables
Test data
Table 1. Orders
rowtime
productid
orderid
units
10:17:00
30
5
4
10:17:05
10
6
1
10:18:05
20
7
2
10:18:07
30
8
20
11:02:00
10
9
6
11:04:00
10
10
1
11:09:30
40
11
12
11:24:11
10
12
4
Table 2. Products
productid
name
unitprice
30
Cheese
17
10
Beer
0.25
20
Wine
6
30
Cheese
17
10
Beer
0.25
10
Beer
0.25
40
Bread
100
10
Beer
0.25
Test statements
SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid;
Test results
o.rowtime
o.productid
o.orderid
o.units
p.name
p.unitprice
10:17:00
30
5
4
Cheese
17.00
10:17:00
30
5
4
Cheese
17.00
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:18:05
20
7
2
Wine
6.00
10:18:07
30
8
20
Cheese
17.00
10:18:07
30
8
20
Cheese
17.00
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:09:30
40
11
12
Bread
100.00
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
Example 2: Join the datahub_stream1 and datahub_stream2 tables
Test data
Table 3. datahub_stream1
a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
Table 4. datahub_stream2
a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
Test statements
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;
Test results
s1.c (VARCHAR)
s2.c (VARCHAR)
test11
test11
test11
test31