All Products
Search
Document Center

Realtime Compute for Apache Flink:AnalyticDB for MySQL V3.0 connector

Last Updated:Jul 17, 2024

This topic describes how to use the AnalyticDB for MySQL V3.0 connector.

Background information

AnalyticDB for MySQL V3.0 is a cloud-native enterprise-class data warehousing service that integrates database and big data technologies. AnalyticDB for MySQL supports high-throughput real-time data addition, removal, and modification, low-latency real-time data analysis, and complex extract, transform, and load (ETL) operations. AnalyticDB for MySQL is compatible with upstream and downstream ecosystem tools and can be used to build enterprise-class report systems, data warehouses, and data service engines.

The following table describes the capabilities supported by the AnalyticDB for MySQL V3.0 connector.

Item

Description

Table type

Dimension table and sink table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

N/A

API type

SQL API

Data update or deletion in a sink table

Supported

Prerequisites

  • An AnalyticDB for MySQL cluster and an AnalyticDB for MySQL table are created. For more information, see Create a cluster and CREATE TABLE.

  • A whitelist is configured for the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.

Limits

  • The AnalyticDB for MySQL V3.0 connector can be used only for dimension tables and sink tables and cannot be used for source tables.

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.X or later supports the AnalyticDB for MySQL V3.0 connector.

Syntax

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);
Important

The primary key that is specified in the Flink DDL statement must be consistent with the primary key of the physical table in the AnalyticDB for MySQL database. The primary key must be specified in the Flink DDL statement and exist in the physical table in the AnalyticDB for MySQL database at the same time. The name of the primary key specified in the Flink DDL statement must be the same as the name of the primary key of the physical table. If the primary keys are not the same, data may be incorrect.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the sink table.

    STRING

    Yes

    No default value

    Set the value to adb3.0.

    url

    The Java Database Connectivity (JDBC) URL of the database.

    STRING

    Yes

    No default value

    The JDBC URL of the AnalyticDB for MySQL database. The URL is in the jdbc:mysql://<endpoint>:<port>/<databaseName> format.

    • endpoint and port: You can log on to the AnalyticDB for MySQL console. In the left-side navigation pane, click the name of the desired cluster in the Cluster ID/Cluster Description column. On the page that appears, obtain the information in the Network Information section.

    • databaseName: the name of the AnalyticDB for MySQL database.

    userName

    The username that is used to access the AnalyticDB for MySQL database.

    STRING

    Yes

    No default value

    N/A.

    password

    The password that is used to access the AnalyticDB for MySQL database.

    STRING

    Yes

    No default value

    N/A.

    tableName

    The name of the table in the database.

    STRING

    Yes

    No default value

    N/A.

    maxRetryTimes

    The maximum number of retries that are allowed if a data writing or reading attempt fails.

    INTEGER

    No

    The default value of this parameter varies based on the VVR version of Realtime Compute for Apache Flink:

    • If the VVR version is 3.X or earlier, the default value is 3.

    • If the VVR version is 4.0.10 or later, the default value is 10.

    N/A.

  • Parameters only for sink tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    batchSize

    The number of data records that can be written at a time.

    INTEGER

    No

    The default value of this parameter varies based on the VVR version of Realtime Compute for Apache Flink:

    • If the VVR version is 3.X or earlier, the default value is 100.

    • If the VVR version is 4.0.10 or later, the default value is 1000.

    This parameter takes effect only after you specify the primary key.

    bufferSize

    The maximum number of data records that can be cached in the memory. Write operations are triggered if the value of the batchSize or bufferSize parameter reaches the specified threshold.

    INTEGER

    No

    1000

    This parameter takes effect only after you specify the primary key.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.

    flushIntervalMs

    The interval at which the cache is cleared. This value indicates that if the number of cached data records does not reach the upper limit in a specified period of time, all cached data is written to the sink table.

    INTEGER

    No

    The default value of this parameter varies based on the VVR version of Realtime Compute for Apache Flink:

    • If the VVR version is 3.X or earlier, the default value is 1000.

    • If the VVR version is 4.0.10 or later, the default value is 3000.

    Unit: milliseconds.

    ignoreDelete

    Specifies whether to ignore delete operations.

    BOOLEAN

    No

    false

    Valid values:

    • true: The delete operations are ignored.

    • false: The delete operations are not ignored.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.

    replaceMode

    Specified whether to use the REPLACE INTO statement to insert data into the table if a primary key is specified in the DDL statement.

    BOOLEAN

    No

    true

    The isolation.level parameter has the following valid values:

    • true: The REPLACE INTO statement is used to insert data into the table.

    • false: The INSERT INTO ON DUPLICATE KEY UPDATE statement is used to insert data into the table.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.

    • Only AnalyticDB for MySQL V3.1.3.5 or later supports this parameter.

    • This parameter takes effect only when a primary key is specified in the DDL statement. The statement that is used to insert data into the table varies based on whether a primary key is specified and the value of the replaceMode parameter.

      • If a primary key is specified in the DDL statement and the replaceMode parameter is set to true, the REPLACE INTO statement is used.

      • If a primary key is specified in the DDL statement and the replaceMode parameter is set to false, the INSERT INTO ON DUPLICATE KEY UPDATE statement is used.

      • If no primary key is specified in the DDL statement, the INSERT INTO statement is used.

    excludeUpdateColumns

    The fields that are not updated when data that has the same primary key is updated.

    STRING

    No

    An empty string

    Separate multiple fields with commas (,). Example: excludeUpdateColumns=column1,column2.

    Note
    • This parameter takes effect only when the replaceMode parameter is set to false. If the replaceMode parameter is set to true, the values of the fields specified by this parameter are changed to null.

    • Make sure that the columns that you want to ignore are written in one line and are not wrapped.

    connectionMaxActive

    The maximum size of the thread pool.

    INTEGER

    No

    40

    Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    cache

    The cache policy.

    STRING

    No

    ALL

    Valid values:

    • None: No data is cached.

    • LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    • ALL: All data in the dimension table is cached. This is the default value. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

    Note
    • If you set the cache parameter to ALL, you must monitor the memory usage of the node to prevent out of memory (OOM) errors.

    • If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice the data amount in the remote table.

    cacheSize

    The maximum number of data records that can be cached.

    INTEGER

    No

    100000

    You must configure the cacheSize parameter when the cache parameter is set to LRU.

    cacheTTLMs

    The cache timeout period. Unit: milliseconds.

    INTEGER

    No

    Long.MAX_VALUE

    You must configure the cacheTTLMs parameter when the cache parameter is set to LRU or ALL.

    • If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. Default value: Long.MAX_VALUE. The default value indicates that cache entries do not expire.

    • If the cache parameter is set to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the data in the physical table. Default value: Long.MAX_VALUE. The default value indicates that data in the physical table is not reloaded.

    Note

    If the cache parameter is set to None, you do not need to configure the cacheTTLMs parameter. If the cache parameter is set to None, data is not cached. Therefore, you do not need to configure the cacheTTLMs parameter.

    maxJoinRows

    The maximum number of results returned after each data record in the primary table is mapped to the data in the dimension table.

    INTEGER

    No

    1024

    If you can estimate that each data record in the primary table is mapped to a maximum of n data records in the dimension table, you can configure maxJoinRows='n' to ensure efficient matching in Realtime Compute for Apache Flink.

    Note

    When you join the primary table with a dimension table, the number of results returned after an input data record in the primary table is mapped to the data records in the dimension table is limited by this parameter.

Data type mappings

Data type of AnalyticDB for MySQL V3.0

Data type of Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s) or NUMERIC(p, s)

DECIMAL(p, s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

Sample code

  • Sample code for a sink table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO adb_sink
    SELECT * FROM datagen_source;
  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source(
      `a` INT,
      `b` VARCHAR,
      `c` STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_dim (
      `a` INT,
      `b` VARCHAR,
      `c` VARCHAR
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `a` INT,
      `b` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;