All Products
Search
Document Center

AnalyticDB:Use Realtime Compute for Apache Flink to subscribe to AnalyticDB for MySQL binary logs

Last Updated:Dec 02, 2024

AnalyticDB for MySQL allows you to subscribe to binary logs by using Realtime Compute for Apache Flink. This helps you obtain and process data changes in databases to implement efficient data synchronization and stream computing. This topic describes how to use Realtime Compute for Apache Flink to subscribe to AnalyticDB for MySQL binary logs.

Prerequisites

  • An AnalyticDB for MySQL cluster of Data Lakehouse Edition or Data Warehouse Edition in elastic mode is created.

  • The minor version of the AnalyticDB for MySQL cluster is 3.2.1.0 or later.

    Note
    • To query the minor version of an AnalyticDB for MySQL Data Lakehouse Edition cluster, execute the SELECT adb_version(); statement. To update the minor version of a cluster, contact technical support.

    • For information about how to view and update the minor version of an AnalyticDB for MySQL Data Warehouse Edition cluster, see Update the minor version of a cluster.

  • Realtime Compute for Apache Flink uses Ververica Runtime (VVR) 8.0.4 or later.

  • A fully managed Flink workspace is created in the same virtual private cloud (VPC) as the AnalyticDB for MySQL cluster. For more information, see Create a cluster and Activate Realtime Compute for Apache Flink.

  • The CIDR block of the fully managed Flink workspace is added to an IP address whitelist of the AnalyticDB for MySQL cluster. For more information, see the "How do I configure a whitelist?" section of the FAQ about network connectivity topic and Configure an IP address whitelist.

Limits

  • You can enable the binary logging feature only for tables in AnalyticDB for MySQL.

  • Realtime Compute for Apache Flink can process AnalyticDB for MySQL binary logs of all basic data types and the JSON complex data type. For information about the data types supported by AnalyticDB for MySQL, see Basic data types.

  • Realtime Compute for Apache Flink does not process the records of automatic partition deletion operations on partitioned tables and DDL operations in AnalyticDB for MySQL binary logs.

Step 1: Enable the binary logging feature

  1. Enable the binary logging feature for a table. In this example, a table named source_table is used.

    Enable the binary logging feature when you create a table

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    Enable the binary logging feature for an existing table

    ALTER TABLE source_table BINLOG=true;
  2. (Optional) View binary log information.

    Note

    When you execute the following SQL statements to query information about binary log files, no log information is displayed if the binary logging feature is enabled but Realtime Compute for Apache Flink does not subscribe to binary logs. Log information is displayed only if Realtime Compute for Apache Flink subscribes to binary logs.

    • SQL statement for querying the most recent binary log offset:

      SHOW MASTER STATUS FOR source_table;
    • SQL statement for querying information about all binary log files of tables in an AnalyticDB for MySQL cluster:

      SHOW BINARY LOGS FOR source_table;
  3. (Optional) Change the retention period of binary logs.

    You can modify the binlog_ttl parameter to change the retention period of binary logs. Execute the following statement to change the retention period of binary logs to 1 day for the source_table table:

    ALTER TABLE source_table binlog_ttl='1d';

    The binlog_ttl parameter supports values in the following formats:

    • Pure number: the period of time in milliseconds. For example, 60 specifies 60 milliseconds.

    • Number + s: the period of time in seconds. For example, 30s specifies 30 seconds.

    • Number + h: the period of time in hours. For example, 2h specifies 2 hours.

    • Number + d: the period of time in days. For example, 1d specifies 1 day.

Step 2: Configure a connector of Realtime Compute for Apache Flink

  1. Log on to the Realtime Compute for Apache Flink console.

  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, click Connectors.

  4. On the Connectors page, click Create Custom Connector.

  5. Upload a custom connector JAR package. Download link: AnalyticDB for MySQL connector.

  6. After the upload is complete, click Next.

  7. Click Finish. The created custom connector is displayed in the connector list.

Step 3: Subscribe to binary logs

  1. Log on to the Realtime Compute for Apache Flink console and create an SQL draft. For more information, see the "Step 1: Create an SQL draft" section of the Getting started with a Flink SQL deployment topic.

  2. Create a source table that is used to connect to the AnalyticDB for MySQL cluster and read binary log data from the source_table table.

    Note
    • The primary key specified in the DDL statement in Realtime Compute for Apache Flink must be the same as the primary key specified in the table of the AnalyticDB for MySQL cluster, including the name of the primary key. Otherwise, data accuracy is affected.

    • The data types of fields must be compatible between Realtime Compute for Apache Flink and AnalyticDB for MySQL. For information about data type mappings, see the "Data type mappings" section of this topic.

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    The following table describes the parameters in the WITH clause.

    Parameter

    Required

    Default value

    Data type

    Description

    connector

    Yes

    None

    STRING

    The name of the custom connector.

    Set this parameter to adb-mysql-cd.

    hostname

    Yes

    None

    STRING

    The VPC endpoint that is used to connect to the AnalyticDB for MySQL cluster.

    username

    Yes

    None

    STRING

    The name of the database account of the AnalyticDB for MySQL cluster.

    password

    Yes

    None

    STRING

    The password of the database account of the AnalyticDB for MySQL cluster.

    database-name

    Yes

    None

    STRING

    The name of the database in the AnalyticDB for MySQL cluster.

    Specify only one database name because the binary logging feature is implemented only on tables in AnalyticDB for MySQL.

    table-name

    Yes

    None

    STRING

    The name of the table in the database of the AnalyticDB for MySQL cluster.

    Specify only one table name because the binary logging feature is implemented only on tables in AnalyticDB for MySQL.

    port

    No

    3306

    INTEGER

    The port number that is used to connect to the cluster.

    scan.incremental.snapshot.enabled

    No

    true

    BOOLEAN

    Specifies whether to enable the incremental snapshot feature.

    By default, this feature is enabled. The incremental snapshot feature provides a new method to read table snapshots. Compared with the original snapshot method, the incremental snapshot feature provides the following advantages:

    • Supports concurrent snapshot reads.

    • Supports checkpoints at the granularity of chunks when snapshots are read.

    • Requires no database lock permissions before snapshots are read.

    scan.incremental.snapshot.chunk.size

    No

    8096

    INTEGER

    The chunk size for a table snapshot, which is the number of rows in a chunk.

    If you enable the incremental snapshot feature for a table, the table is split into multiple chunks for reading.

    scan.snapshot.fetch.size

    No

    1024

    INTEGER

    The maximum number of rows that can be read each time a table snapshot is read.

    scan.startup.mode

    No

    initial

    STRING

    The mode in which data is consumed.

    Valid values:

    • initial (default): scans full historical data and reads the most recent binary log data the first time the connector is started.

    • earliest-offset: does not scan full historical data and starts to read accessible binary log data from the earliest offset.

    • specific-offset: does not scan full historical data and starts to read binary log data from the specified offset. You can configure the scan.startup.specific-offset.file parameter to specify the name of the binary log file for data consumption and the scan.startup.specific-offset.pos parameter to specify the offset from which data consumption starts.

    scan.startup.specific-offset.file

    No

    None

    STRING

    The name of the binary log file for data consumption if you set the scan.startup.mode parameter to specific-offset.

    To obtain the most recent binary log file name, execute the SHOW MASTER STATUES for table_name statement.

    scan.startup.specific-offset.pos

    No

    None

    LONG

    The offset from which data consumption starts if you set the scan.startup.mode parameter to specific-offset.

    To obtain the most recent binary log offset, execute the SHOW MASTER STATUES for table_name statement.

    scan.startup.specific-offset.skip-events

    No

    None

    LONG

    The number of events to skip after the specified offset from which data consumption starts.

    scan.startup.specific-offset.skip-rows

    No

    None

    LONG

    The number of rows to skip after the specified offset from which data consumption starts.

    server-time-zone

    No

    None

    STRING

    The time zone of sessions used by the AnalyticDB for MySQL cluster.

    Example: "Asia/Shanghai". This parameter determines how data of the TIMESTAMP type in AnalyticDB for MySQL is converted into the STRING type when data is read. If you do not specify this parameter, the ZONELD.SYSTEMDEFAULT() function is used to determine the time zone of the AnalyticDB for MySQL cluster.

    debezium.min.row.count.to.stream.result

    No

    1000

    INTEGER

    The row number threshold for stream processing. If the number of rows in the table is greater than the specified value, the connector performs stream processing on the results.

    If you set this parameter to 0, table size checks are skipped and stream processing is performed on all results during snapshot reading.

    connect.timeout

    No

    30s

    DURATION

    The maximum timeout period of a connection to the AnalyticDB for MySQL cluster.

    Default unit: seconds.

    connect.max-retries

    No

    3

    INTEGER

    The maximum number of retries after a connection to the AnalyticDB for MySQL cluster fails.

  3. Create a destination table to store the processed data. In this example, an AnalyticDB for MySQL table is created. For information about the connectors supported by Realtime Compute for Apache Flink, see the "Supported connectors" section of the Supported connectors topic.

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. Create a result table to connect to the table created in Step 3 for writing the processed data to the target_table table in AnalyticDB for MySQL.

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    For information about the parameters in the WITH clause of the result table and related data type mappings, see AnalyticDB for MySQL V3.0 connector.

  5. Synchronize the obtained source data changes to the result table and synchronize the data from the result table to the destination.

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. Click Save.

  7. Click Validate.

    Check the SQL semantics of the draft, network connectivity, and the metadata information of the tables that are used by the draft. You can also click SQL Advice in the calculated results to view information about SQL risks and related optimization suggestions.

  8. (Optional) Click Debug.

    You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.

  9. Click Deploy. For more information, see Deploy an SQL job.

    After the draft development and syntax check are complete, you can deploy the draft to publish the data to the production environment. After the draft is deployed, you can start the deployment for the draft on the Deployments page to run the deployment. For more information, see Start a deployment.

Data type mappings

The following table describes the data type mappings between AnalyticDB for MySQL and Realtime Compute for Apache Flink.

Data type supported by AnalyticDB for MySQL

Data type supported by Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s) or NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING