All Products
Search
Document Center

Realtime Compute for Apache Flink:Regular join statements

Last Updated:Jul 01, 2024

Flink SQL supports complex and flexible join operations over dynamic tables. This topic describes how to use regular join statements.

Background information

JOIN statements for real-time computing are semantically equivalent to those for batch processing. Both statements are used to join two tables. The difference is that the join results in real-time computing are continuously updated because the tables are dynamic. This ensures that the final results are consistent with the results of batch processing.

Syntax

tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression

joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
  • tableReference: specifies the table name.

  • tableExpression: specifies the expression.

  • joinCondition: specifies the join condition.

Hints

If you set the engine version to Ververica Runtime (VVR) 8.0.1 or later, you can use hints to specify different time-to-live (TTL) values for the states of the left and right streams in a regular join. This helps reduce the size of the state data to be maintained.

  • Syntax

    -- VVR 8.0.1 and later versions use the following syntax:
    SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
    
    -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink:
    SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
  • Precautions

    • Use the JOIN_STATE_TTL hint only for regular joins. The hint does not support lookup, interval, and window joins.

    • If you use the JOIN_STATE_TTL hint to specify the state TTL only for one stream in a regular join, the other stream uses the deployment-level state TTL specified by the table.exec.state.ttl parameter. The default value is 1.5 days. For more information about this parameter, see Basic parameters.

    • You can set the tableReference parameter to a table name, a view name, or an alias. If you specify an alias for a table, you must use the alias.

    • Hints for regular joins are experimental features. The syntax may change in the future.

  • Examples

    -- Use an alias in the hint.
    SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink:
    SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    
    -- Use a table name in the hint.
    SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink:
    SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    
    -- Use a view name in the hint.
    CREATE TEMPORARY VIEW v AS
    SELECT id, ...
    	FROM (
    		SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
    		FROM src1
    		WHERE ...
    	) tmp
    WHERE rn = 1;
    	
    SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;
    -- VVR 8.0.7 and later versions also support the following syntax used by Apache Flink:
    SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;

Example 1: Join the Orders and Products tables

  • Test data

    Table 1. Orders

    rowtime

    productid

    orderid

    units

    10:17:00

    30

    5

    4

    10:17:05

    10

    6

    1

    10:18:05

    20

    7

    2

    10:18:07

    30

    8

    20

    11:02:00

    10

    9

    6

    11:04:00

    10

    10

    1

    11:09:30

    40

    11

    12

    11:24:11

    10

    12

    4

    Table 2. Products

    productid

    name

    unitprice

    30

    Cheese

    17

    10

    Beer

    0.25

    20

    Wine

    6

    30

    Cheese

    17

    10

    Beer

    0.25

    10

    Beer

    0.25

    40

    Bread

    100

    10

    Beer

    0.25

  • Test statements

    SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
  • Test results

    o.rowtime

    o.productid

    o.orderid

    o.units

    p.name

    p.unitprice

    10:17:00

    30

    5

    4

    Cheese

    17.00

    10:17:00

    30

    5

    4

    Cheese

    17.00

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:18:05

    20

    7

    2

    Wine

    6.00

    10:18:07

    30

    8

    20

    Cheese

    17.00

    10:18:07

    30

    8

    20

    Cheese

    17.00

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:09:30

    40

    11

    12

    Bread

    100.00

    11:24:11

    10

    12

    4

    Beer

    0.25

    11:24:11

    10

    12

    4

    Beer

    0.25

    11:24:11

    10

    12

    4

    Beer

    0.25

    11:24:11

    10

    12

    4

    Beer

    0.25

Example 2: Join the datahub_stream1 and datahub_stream2 tables

  • Test data

    Table 3. datahub_stream1

    a (BIGINT)

    b (BIGINT)

    c (VARCHAR)

    0

    10

    test11

    1

    10

    test21

    Table 4. datahub_stream2

    a (BIGINT)

    b (BIGINT)

    c (VARCHAR)

    0

    10

    test11

    1

    10

    test21

    0

    10

    test31

    1

    10

    test41

  • Test statements

    SELECT s1.c,s2.c 
    FROM datahub_stream1 AS s1
    JOIN datahub_stream2 AS s2 
    ON s1.a = s2.a
    WHERE s1.a = 0;    
  • Test results

    s1.c (VARCHAR)

    s2.c (VARCHAR)

    test11

    test11

    test11

    test31