All Products
Search
Document Center

Realtime Compute for Apache Flink:MySQL connector

Last Updated:Dec 18, 2024

This topic describes how to use the MySQL connector.

Background information

The MySQL connector supports all databases that are compatible with the MySQL protocol. The databases include ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase database in MySQL-compatible mode, and self-managed MySQL databases.

Important
  • We recommend that you use the MySQL connector instead of the ApsaraDB RDS for MySQL connector. The "ApsaraDB RDS for MySQL connector" topic will be removed from the Connectors documentation.

  • You can use the MySQL connector to read data from ApsaraDB for OceanBase. When you use the MySQL connector to read data from ApsaraDB for OceanBase, make sure that the binlog service is enabled in the ApsaraDB for OceanBase console and is correctly configured. For more information, see Operations related to the binlog service. The feature of using the MySQL connector to read data from ApsaraDB for OceanBase in which the binlog service is enabled is in the public preview phase. We recommend that you evaluate the data and use the connector with caution.

The following table describes the capabilities supported by the MySQL connector.

Item

Description

Supported type

Source table, dimension table, sink table, and data ingestion source.

Running mode

Streaming mode.

Data format

N/A.

Metric

  • Metrics for source tables

    • currentFetchEventTimeLag: the interval from the time when data is generated to the time when the data is pulled to the source operator.

      The value of this metric is greater than 0 only in the binary log data reading phase. The value of this metric is fixed to 0 in the savepoint reading phase.

    • currentEmitEventTimeLag: the interval from the time when data is generated to the time when the data leaves the source operator.

      The value of this metric is greater than 0 only in the binary log data reading phase. The value of this metric is fixed to 0 in the savepoint reading phase.

    • sourceIdleTime: the period of time for which no new data is generated in the source table.

  • Metrics for dimension tables and sink tables: none.

Note

For more information about the metrics, see Metrics.

API type

DataStream API, SQL API, and YAML API for data ingestion.

Data update or deletion in a sink table

Supported.

Features

A MySQL Change Data Capture (CDC) source table is a streaming source table of MySQL databases. The MySQL CDC source connector reads full historical data from a MySQL database and then reads binary log data. This way, data accuracy is ensured. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. The MySQL CDC source connector supports full data reading in multiple subtasks at the same time. You can use the incremental savepoint algorithm to perform lock-free reading and resumable uploads. For more information, see Basic concepts about MySQL CDC source tables.

The MySQL CDC source connector supports the following features:

  • Integrates stream processing and batch processing to support full and incremental data reading. This way, you do not need to separately implement stream processing and batch processing.

  • Supports full data reading in multiple subtasks at the same time. This way, data can be read in a more efficient manner.

  • Seamlessly switches between full and incremental data reading and supports automatic scale-in operations. This reduces the computing resources that are consumed.

  • Supports resumable uploads during full data reading. This way, data can be uploaded in a more stable manner.

  • Reads full data without locks. This way, online business is not affected.

  • Supports the reading of backup logs from ApsaraDB RDS for MySQL.

  • Parses binary log files in parallel to reduce the read latency.

Prerequisites

Limits

  • CDC source table and data ingestion source

    • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports lock-free reading and parallel reading.

    • You must select a VVR version based on the MySQL version. The following table describes the mappings between VVR and MySQL versions.

      You can run the select version() command to query the MySQL version.

      VVR version

      Supported MySQL version

      VVR 4.0.8 to VVR 4.0.10

      5.7

      8.0.X

      VVR 4.0.11 and later

      5.6.X

      5.7.X

      8.0.X

      Important

      The default value of the scan.incremental.snapshot.enabled parameter is true. This indicates that the incremental savepoint feature is enabled by default. This ensures that ApsaraDB RDS for MySQL 5.6.X runs as expected. You cannot disable the incremental savepoint feature for ApsaraDB RDS for MySQL 5.6.X. This limit is removed from ApsaraDB RDS for MySQL 6.0.8 and ApsaraDB RDS for MySQL 8.0.1. Therefore, you can disable the incremental savepoint feature for ApsaraDB RDS for MySQL 6.0.8 and ApsaraDB RDS for MySQL 8.0.1. We recommend that you do not disable the incremental savepoint feature. If you disable this feature, the MySQL database is locked and the online business processing performance may be affected.

    • MySQL CDC source tables do not support watermarks.

    • Only MySQL users who are granted specific permissions can use the MySQL CDC source connector to read full data and incremental data from a MySQL database. The permissions include SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT.

    • If you use the CREATE TABLE AS statement together with the CREATE DATABASE AS statement to perform data synchronization, specific schema changes of the MySQL CDC source table can be synchronized to the sink table. For more information about the supported change types, see Synchronization policies for table schema changes. In other scenarios, the schema changes of the MySQL CDC source table cannot be synchronized to the sink table.

    • MySQL CDC source tables do not support the synchronization of truncate operations.

    • We recommend that you do not read data from a secondary instance or a read-only secondary instance of ApsaraDB RDS for MySQL. By default, the binary log retention period of the secondary instance or read-only secondary instance of ApsaraDB RDS for MySQL is short. If the binary log file is deleted due to expiration, the deployment cannot consume the binary log data and an error occurs.

    • Data of Multi-master Cluster (Database/Table) Edition of PolarDB for MySQL of 1.0.19 or earlier cannot be read for the MySQL CDC source table. For more information about Multi-master Cluster (Database/Table) Edition, see Overview. Duplicate table IDs may be generated in binary log data of Multi-master Cluster (Database/Table) Edition of PolarDB for MySQL of 1.0.19 or earlier. This leads to incorrect mapping of the schema of the MySQL CDC source table. As a result, an error occurs when the binary log data is parsed. This issue is resolved in PolarDB for MySQL of a version later than 1.0.19.

  • Dimension table and sink table

    • Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later supports the MySQL connector.

    • The at-least-once semantics can be used. If an ApsaraDB RDS for MySQL sink table contains a primary key, idempotence can be used to ensure data correctness.

Precautions

  • CDC source table and data ingestion source

    • You must specify a unique server ID for each MySQL CDC data source.

      • Use of server IDs

        Each client that synchronizes data from a database has a unique server ID. The MySQL server maintains network connections and binary log file positions based on the server ID. If a large number of clients that have different server IDs connect to the MySQL server, the CPU utilization of the MySQL server may significantly increase. This affects the stability of online business.

        If multiple MySQL CDC data sources use the same server ID and the data sources cannot be merged as one source operator, the checkpoints for binary log positions may be out of order. As a result, the amount of data that is read may be greater than or less than expected. A conflict between server IDs may also occur. For more information, see Upstream and downstream storage. To prevent the preceding issue, we recommend that you specify a unique server ID for each MySQL CDC data source.

      • Configuration methods of server IDs

        You can specify server IDs in DDL statements or by using dynamic hints.

        We recommend that you use dynamic hints to configure server IDs instead of configuring server IDs in DDL statements. For more information about dynamic hints, see SQL hints.

      • Configuration of server IDs in different scenarios

        • Scenario 1: The incremental savepoint algorithm is disabled or the degree of parallelism is 1.

          If the incremental savepoint algorithm is disabled or the degree of parallelism is 1, you can specify a server ID.

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
        • Scenario 2: The incremental savepoint algorithm is enabled and the degree of parallelism is greater than 1.

          If the incremental savepoint algorithm is enabled and the degree of parallelism is greater than 1, you must specify a server ID range. Make sure that the number of available server IDs in the range is not less than the degree of parallelism. The following statement shows the configuration of server IDs when the degree of parallelism is 3.

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
        • Scenario 3: The CREATE TABLE AS statement is executed to synchronize data.

          If you execute the CREATE TABLE AS statement to synchronize data and the configurations of multiple MySQL CDC data sources are the same, the data sources are automatically merged as one source operator. In this case, you can specify the same server ID for multiple MySQL CDC data sources. For more information, see the "Example 4: execution of multiple CREATE TABLE AS statements" section of the CREATE TABLE AS statement topic.

        • Scenario 4: Multiple MySQL CDC source tables are used in a deployment in which the CREATE TABLE AS statement is not executed.

          If multiple MySQL CDC source tables are used in a deployment and the CREATE TABLE AS statement is not executed, the data sources cannot be merged as one source operator. You must specify a unique server ID for each MySQL CDC source table. Similarly, if the incremental savepoint algorithm is enabled and the degree of parallelism is greater than 1, you must specify the server ID range.

          select * from 
            source_table1 /*+ OPTIONS('server-id'='123456-123457') */
          left join 
            source_table2 /*+ OPTIONS('server-id'='123458-123459') */
          on source_table1.id=source_table2.id;
    • Only deployments that use VVR 4.0.8 or later support lock-free reading, parallel reading, and resumable uploads during full data reading.

      If your deployment uses a VVR version that is earlier than 4.0.8, you must grant the RELOAD permission to the MySQL user to obtain the global read lock. This ensures data reading consistency. The global read lock may block write operations for a few seconds. This may affect online business.

      If your deployment uses a VVR version that is earlier than 4.0.8, checkpoints cannot be generated during full data reading. If your deployment fails in the process, the deployment reads full data again. This reduces the stability of data reading performance. Therefore, we recommend that you update the version of VVR that is used by your deployment to 4.0.8 or later.

  • Sink table

    • ApsaraDB RDS for MySQL supports the auto-increment primary key. Therefore, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, you do not need to declare the ID field in the DDL statement. When a row of output data is written to the ApsaraDB RDS for MySQL database, a value is automatically filled for the auto-increment field.

    • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

    • NOT ENFORCED indicates that Flink does not perform mandatory verification on the primary key. You must ensure the correctness and integrity of the primary key.

      Realtime Compute for Apache Flink does not fully support mandatory verification. Realtime Compute for Apache Flink considers that the primary key is correct based on the assumption that the nullability of columns is aligned with the columns in the primary key. For more information, see Validity Check.

  • Dimension table

    If you want to perform queries by index when the MySQL connector is used for a dimension table, you must follow the leftmost prefix matching principle of MySQL to sort the columns that are specified in the JOIN operation. However, this cannot ensure that indexes are used for queries. Specific filter conditions may be rewritten or modified due to SQL optimization. As a result, the filter conditions that are obtained by the connector may not hit indexes. To determine whether the connector queries data by index, check the specific SELECT statements that are executed on the related database.

SQL deployment

The MySQL connector can be used in SQL deployments as a source table, dimension table, or sink table.

Syntax

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Note
  • When the connector writes data to a sink table, the connector concatenates each received data record into an SQL statement and executes the SQL statement based on the following rules:

    • If the sink table does not have a primary key, the INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); statement is executed.

    • If the sink table has a primary key, the INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; statement is executed. If a unique index constraint other than a primary key constraint exists in a physical table and two records that have different primary keys but the same unique index are inserted into the physical table, downstream data is overwritten due to an index conflict, which causes data loss.

  • If an auto-increment primary key is defined in the MySQL database, the auto-increment field cannot be declared in Flink DDL statements. During data writing, the database automatically configures the auto-increment field. The connector can be used to only write and delete data that has auto-increment fields and cannot be used to update such data.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    connector

    The type of the table.

    Yes

    STRING

    No default value

    When you use the MySQL connector for a source table, you can set this parameter to mysql-cdc or mysql. The two values are equivalent. When you use the MySQL connector for a dimension table or a sink table, you must set this parameter to mysql.

    hostname

    The IP address or hostname that is used to access the MySQL database.

    Yes

    STRING

    No default value

    We recommend that you enter the IP address of a virtual private cloud (VPC).

    Note

    If the MySQL database and Realtime Compute for Apache Flink are not deployed in the same VPC, you must establish a connection between the two VPCs or enable Realtime Compute for Apache Flink to access the MySQL database over the Internet. For more information, see Console operations and How does Realtime Compute for Apache Flink access the Internet?

    username

    The username that is used to access the MySQL database.

    Yes

    STRING

    No default value

    N/A.

    password

    The password that is used to access the MySQL database.

    Yes

    STRING

    No default value

    N/A.

    database-name

    The name of the MySQL database that you want to access.

    Yes

    STRING

    No default value

    • You can set this parameter to a regular expression to read data from multiple databases.

    • When you use a regular expression, we recommend that you do not use the caret symbol (^) and dollar sign ($) to match the beginning and end of the regular expression. For more information about the reason, see the Remarks column of the table-name parameter.

    table-name

    The name of the MySQL table.

    Yes

    STRING

    No default value

    • If you want to read data from multiple tables, you can set this parameter to a regular expression.

      When you use the MySQL connector to read data from multiple MySQL tables, you can commit multiple CREATE TABLE AS statements as one deployment to prevent multiple Binlog listeners from being enabled. This improves performance and efficiency. For more information, see the "Example 4: execution of multiple CREATE TABLE AS statements" section of the CREATE TABLE AS statement topic.

    • When you use a regular expression, we recommend that you do not use the caret symbol (^) and dollar sign ($) to match the beginning and end of the regular expression. For more information about the reason, see the following note.

    Note

    When the MySQL CDC source table matches a table name in a regular expression, the character string (\\.) is used to concatenate the values of the database-name and table-name parameters that you specify as a full-path regular expression. In Realtime Compute for Apache Flink that uses VVR 8.0.1 or earlier, a period (.) is used instead of the character string (\\.). Then, the regular expression and the fully qualified names of tables in the MySQL database are used to perform regular expression matching.

    For example, if you specify database-name'='db_. and table-name'='tb_.+, the connector uses the regular expression db_.*\\.tb_.+ to match the fully qualified names of tables to determine the tables that need to be read. In Realtime Compute for Apache Flink whose VVR version is earlier than 8.0.1, the regular expression is db_.*.tb_.+.

    port

    The port that is used to access the MySQL database.

    No

    INTEGER

    3306

    N/A.

  • Parameters only for source tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    server-id

    The numeric ID that is allocated to a database client.

    No

    STRING

    By default, a value in the range of 5400 to 6400 is randomly generated.

    The ID must be unique in a MySQL cluster. We recommend that you configure a unique server ID for each deployment in the same database.

    This parameter can also be set to an ID range, such as 5400-5408. If the incremental snapshot algorithm is enabled, parallel reading is supported. In this case, we recommend that you set this parameter to an ID range so that each parallel deployment uses a unique ID.

    scan.incremental.snapshot.enabled

    Specifies whether to enable the incremental savepoint algorithm.

    No

    BOOLEAN

    true

    By default, the incremental savepoint algorithm is enabled. The incremental savepoint algorithm is a new mechanism that is used to read savepoints of full data. Compared with the original savepoint reading algorithm, the incremental savepoint algorithm has the following advantages:

    • When the MySQL CDC source connector reads full data, the connector can read data in multiple subtasks at the same time.

    • When the MySQL CDC source connector reads full data, the connector can generate checkpoints for chunks.

    • When the MySQL CDC source connector reads full data, the connector does not need to execute the FLUSH TABLES WITH READ LOCK statement to obtain a global read lock.

    If you want the MySQL CDC source connector to support parallel reading, a unique server ID must be allocated to each parallel reader. Therefore, you must set the server-id parameter to a range that is greater than or equal to the value of the Parallelism parameter. For example, you can set the server-id parameter to 5400-6400.

    scan.incremental.snapshot.chunk.size

    The chunk size of the table. The chunk size indicates the number of rows.

    No

    INTEGER

    8096

    If the incremental savepoint algorithm is enabled, the table is split into multiple chunks for data reading. Before data in a chunk is read, the data is cached in the memory.

    A smaller number of rows in a chunk indicates a larger number of chunks in the table. Therefore, if the chunk size is small, a small number of data records are read after fault recovery. This may reduce data throughput and cause an out of memory (OOM) error. Therefore, you must specify a reasonable chunk size based on your business requirements.

    scan.snapshot.fetch.size

    The maximum number of data records that can be extracted each time full data of a table is read.

    No

    INTEGER

    1024

    N/A.

    scan.startup.mode

    The startup mode when data is consumed.

    No

    STRING

    initial

    Valid values:

    • initial: When the MySQL CDC source connector starts for the first time, the connector scans all historical data and reads the most recent binary log data. This is the default value.

    • latest-offset: When the MySQL CDC source connector starts for the first time, the connector reads binary log data from the most recent offset instead of scanning all historical data. This way, the connector reads only the most recent incremental data after the connector starts.

    • earliest-offset: The MySQL CDC source connector does not scan full historical data. The connector starts to read accessible binary log data from the earliest offset.

    • specific-offset: The MySQL CDC source connector does not scan full historical data. The connector starts to scan binary log data from the specified offset. You can specify the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters to specify the name of the binary log file and offset at which the scan starts. You can also specify the scan.startup.specific-offset.gtid-set parameter to specify a global transaction identifier (GTID) set at which the scan starts.

    • timestamp: The MySQL CDC source connector does not scan full historical data. The connector starts to read binary log data from the specified timestamp. The timestamp is specified by the scan.startup.timestamp-millis parameter. Unit: milliseconds.

    Important
    • You can set the scan.startup.mode parameter to earliest-offset, specific-offset, or timestamp in Realtime Compute for Apache Flink that uses VVR 6.0.4 or later.

    • If the scan.startup.mode parameter is set to earliest-offset, specific-offset, or timestamp for a deployment and the table schema at the startup time is different from the table schema at the specified start offset time, an error is reported in the deployment. If you use one of the three startup modes for a deployment, make sure that the table schema does not change between the specified binary log consumption offset time and the startup time of the deployment.

    scan.startup.specific-offset.file

    The name of the binary log file that is used to specify the startup offset when the scan.startup.mode parameter is set to specific-offset.

    No

    STRING

    No default value

    This parameter is required when the scan.startup.mode parameter is set to specific-offset. Example of a file name: mysql-bin.000003.

    scan.startup.specific-offset.pos

    The startup offset that is specified in the specified binary log file when the scan.startup.mode parameter is set to specific-offset.

    No

    INTEGER

    No default value

    This parameter is required when the scan.startup.mode parameter is set to specific-offset.

    scan.startup.specific-offset.gtid-set

    A GTID set that is used to specify the startup offset when the scan.startup.mode parameter is set to specific-offset.

    No

    STRING

    No default value

    This parameter is required when the scan.startup.mode parameter is set to specific-offset. Example of a GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    The timestamp in milliseconds that is used to specify the startup offset when the scan.startup.mode parameter is set to timestamp.

    No

    LONG

    No default value

    This parameter is required when the scan.startup.mode parameter is set to timestamp. Unit: milliseconds.

    Important

    If you set the scan.startup.mode parameter to timestamp, the MySQL CDC source connector attempts to read the initial event of each binary log file to determine the timestamp of the file and finally locates the binary log file that corresponds to the specified timestamp. Make sure that the binary log file that corresponds to the specified timestamp is not deleted from the database and can be read.

    server-time-zone

    The time zone of the session that is used by the database.

    Required only in a VVR version that is earlier than 6.0.2

    STRING

    If you do not specify this parameter, the system uses the time zone of the environment in which the Realtime Compute for Apache Flink deployment runs as the time zone of the database server. The time zone is the time zone of the zone that you selected.

    Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type. For more information, see Debezium connector for MySQL.

    debezium.min.row.count.to.stream.results

    The minimum number of data records allowed in a table to trigger the batch data read mode. If the number of data records in a table is greater than the value of this parameter, the batch data read mode is used.

    No

    INTEGER

    1000

    Realtime Compute for Apache Flink reads data from a MySQL source table in one of the following modes:

    • Full data read: reads all data from the table and writes the data to the memory. Data is read at a high speed, but memory space is consumed. If the source table contains a large amount of data, an OOM error may occur in this mode.

    • Batch data read: reads data of a specific number of rows at the specified point in time until all data is read. An OOM error is prevented even if the table contains a large amount of data. However, data is read at a low speed.

    connect.timeout

    The maximum duration for which the connector waits before the connector makes a retry if the connection to the MySQL database server times out.

    No

    DURATION

    30s

    N/A.

    connect.max-retries

    The maximum number of retries after the connection to the MySQL database server fails.

    No

    INTEGER

    3

    N/A.

    connection.pool.size

    The size of the database connection pool.

    No

    INTEGER

    20

    The database connection pool is used to reuse connections. This can reduce the number of database connections.

    jdbc.properties.*

    Custom Java Database Connectivity (JDBC) URL parameters.

    No

    STRING

    No default value

    You can pass custom JDBC parameters. For example, if you do not want to use the SSL protocol, you can set the jdbc.properties.useSSL parameter to false.

    For more information about the JDBC parameters, see Configuration Properties.

    debezium.*

    The custom Debezium parameter that is used to read binary log data.

    No

    STRING

    No default value

    You can pass custom Debezium parameters. For example, you can use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the logic for handling parsing errors.

    heartbeat.interval

    The interval at which the source moves forward the binary log file position by using heartbeat events.

    No

    DURATION

    30s

    Heartbeat events are used to identify the latest position of the binary log file that is read from the source. For slowly updated tables in MySQL, the binary log file position cannot automatically move forward. The source can move forward the binary log file position by using heartbeat events. This prevents the binary log file position from expiring. If the binary log file position expires, the deployment fails to run and cannot recover from the failure. If this occurs, you can only select Start without state for Start Method and specify Start Time for Reading Data to start the deployment.

    scan.incremental.snapshot.chunk.key-column

    Specifies columns for sharding in the savepoint phase.

    For more information, see the Remarks column of this parameter.

    STRING

    No default value

    • This parameter is required for a table that does not have a primary key. The selected columns must be of the NOT NULL data type.

    • This parameter is optional for a table that has a primary key. You can select only one column from the primary key of the table for sharding.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    rds.region-id

    The ID of the region where an ApsaraDB RDS for MySQL instance resides.

    This parameter is required when you use the feature of reading Object Storage Service (OSS) archived logs.

    STRING

    No default value

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • For more information about region IDs, see Regions and zones.

    rds.access-key-id

    The AccessKey ID of the account that is used to access ApsaraDB RDS for MySQL.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    No default value

    For more information, see Console operations.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage variables.

    rds.access-key-secret

    The AccessKey secret of the account that is used to access ApsaraDB RDS for MySQL.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    No default value

    For more information, see Console operations.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage variables.

    rds.db-instance-id

    The ID of the ApsaraDB RDS for MySQL instance.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    No default value

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    rds.main-db-id

    The ID of the primary database of the ApsaraDB RDS for MySQL instance.

    No

    STRING

    No default value

    rds.download.timeout

    The timeout period for downloading a single archived log from OSS.

    No

    DURATION

    60s

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    rds.endpoint

    The endpoint that is used to obtain the OSS binary log information.

    No

    STRING

    No default value

    • For more information about valid values of this parameter, see Endpoints.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    scan.incremental.close-idle-reader.enabled

    Specifies whether to close the idle reader after the savepoint is created.

    No

    BOOLEAN

    false

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports this parameter.

    • To make the configuration of this parameter take effect, you must set the execution.checkpointing.checkpoints-after-tasks-finish.enabled parameter to true.

    scan.read-changelog-as-append-only.enabled

    Specifies whether to convert a changelog stream to an append-only stream.

    No

    BOOLEAN

    false

    Valid values:

    • true: All types of messages are converted into messages of the INSERT type. The message types are INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. Set this parameter to true for special scenarios in which the data and metadata of messages that are deleted from an upstream table need to be stored.

    • false: All types of messages are sent without conversion. This is the default value.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    scan.only.deserialize.captured.tables.changelog.enabled

    Specifies whether to deserialize only change events in the table that you specify during incremental data reading.

    No

    BOOLEAN

    false

    Valid values:

    • true: Only change events in the specified table are deserialized. This accelerates binary log reading.

    • false: Change events in all tables are deserialized. This is the default value.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

    • If you use this parameter in Realtime Compute for Apache Flink that uses VVR 8.0.8 or a version earlier than VVR 8.0.8, you must change the parameter name to debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parallel-deserialize-changelog.enabled

    Specifies whether to use multiple threads to parse change events during incremental data reading.

    No

    BOOLEAN

    false

    Valid values:

    • true: Multiple threads are used to parse change events when the change events are deserialized. The order of binary log events remains unchanged. This accelerates binary log reading.

    • false: A single thread is used to parse change events when the change events are deserialized. This is the default value.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The JDBC URL that is used to connect to the MySQL database.

    No

    STRING

    No default value

    You must specify a URL in the jdbc:mysql://<Endpoint>:<Port number>/<Database name> format.

    lookup.max-retries

    The maximum number of retries to read data after the data fails to be read.

    No

    INTEGER

    3

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    lookup.cache.strategy

    The cache policy.

    No

    STRING

    None

    Valid values: None, LRU, and ALL. For more information about the valid values, see the "Background information" section of the JOIN statements for dimension tables topic.

    Note

    If you set this parameter to LRU, you must also specify the lookup.cache.max-rows parameter.

    lookup.cache.max-rows

    The maximum number of data records that can be cached.

    No

    INTEGER

    100000

    • You must specify this parameter if the lookup.cache.strategy parameter is set to LRU.

    • You do not need to specify this parameter if the lookup.cache.strategy parameter is set to ALL.

    lookup.cache.ttl

    The cache timeout period.

    No

    DURATION

    10 s

    The configuration of the lookup.cache.ttl parameter varies based on the value of the lookup.cache.strategy parameter.

    • If you set the lookup.cache.strategy parameter to None, you do not need to specify the lookup.cache.ttl parameter. This indicates that cache entries do not expire.

    • If you set the lookup.cache.strategy parameter to LRU, the lookup.cache.ttl parameter indicates the cache timeout period. By default, cache entries do not expire.

    • If you set the lookup.cache.strategy parameter to ALL, the lookup.cache.ttl parameter indicates the cache refresh period. By default, the cache is not refreshed.

    The value of this parameter must be in the time format. For example, you can set this parameter to 1min or 10s.

    lookup.max-join-rows

    The maximum number of results that are returned each time a data record in the primary table is queried and matched with data records in the dimension table.

    No

    INTEGER

    1024

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    lookup.filter-push-down.enabled

    Specifies whether to enable filter pushdown for a dimension table.

    No

    BOOLEAN

    false

    Valid values:

    • true: Filter pushdown is enabled for a dimension table. When Realtime Compute for Apache Flink loads data from a MySQL database table, the dimension table filters data in advance based on the conditions configured in the related SQL deployment.

    • false: Filter pushdown is disabled for a dimension table. When Realtime Compute for Apache Flink loads data from a MySQL database table, the dimension table loads all data of the MySQL database table. This is the default value.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

    Important

    You must enable filter pushdown only if a Flink table is used as a dimension table. You cannot enable filter pushdown for a MySQL source table. If a Flink table is used as a source table and a dimension table at the same time, and filter pushdown is enabled for the dimension table, you must set the lookup.filter-push-down.enabled parameter to false by using SQL hints when you use the Flink table as a source table. If you do not set the lookup.filter-push-down.enabled parameter to false for the source table, an error may occur when you run the related deployment.

  • Parameters only for sink tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The JDBC URL that is used to connect to the MySQL database.

    No

    STRING

    No default value

    You must specify a URL in the jdbc:mysql://<Endpoint>:<Port number>/<Database name> format.

    sink.max-retries

    The maximum number of retries to write data to the table after data fails to be written.

    No

    INTEGER

    3

    N/A.

    sink.buffer-flush.batch-size

    The number of data records that can be written at a time.

    No

    INTEGER

    4096

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    sink.buffer-flush.max-rows

    The maximum number of data records that can be cached in the memory.

    No

    INTEGER

    • If Realtime Compute for Apache Flink uses a VVR version earlier than 6.0.7, the default value of this parameter is 100.

    • If Realtime Compute for Apache Flink uses VVR 6.0.7 or later, the default value of this parameter is 10000.

    This parameter takes effect only after you specify the primary key.

    sink.buffer-flush.interval

    The interval at which the cache is cleared. The value of this parameter indicates that if the number of input data records does not reach the value specified by the batchSize parameter within the specified time, all cached data is written to the sink table.

    No

    DURATION

    1s

    N/A.

    sink.ignore-delete

    Specifies whether to ignore delete operations.

    No

    BOOLEAN

    false

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • Delete operations may occur when you use Flink SQL. If multiple output operators update different fields in the same sink table based on the primary key, the data result may be incorrect.

      For example, a data record is deleted in a task and then only some fields of the data record are updated in another task. In this case, the values of the fields that are not updated become null or default values because the fields are deleted. To prevent delete operations, you can set the sink.ignore-delete parameter to true.

    sink.ignore-null-when-update

    Specifies whether to update the fields whose input values are null during data updates.

    No

    BOOLEAN

    false

    Valid values:

    • true: The fields are not updated. You can set this parameter to true only for a Realtime Compute for Apache Flink table in which a primary key is specified. If you set this parameter to true, take note of the following points:

      • If Realtime Compute for Apache Flink that uses VVR 8.0.6 or earlier is used, data cannot be batch written to the sink table.

      • If Realtime Compute for Apache Flink that uses VVR 8.0.7 or later is used, data can be batch written to the sink table.

        Batch write operations can significantly improve data write efficiency and increase the overall throughput. However, this may cause data latency issues and OOM errors. You need to determine whether to perform batch write operations based on your business requirements.

    • false: The fields are updated with null values.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

Data type mappings

  • CDC source table

    Data type of MySQL CDC

    Data type of Realtime Compute for Apache Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Important

    We recommend that you use fields of the TINYINT(1) data type of MySQL to store only values 0 and 1. If the property-version parameter is set to 0, the MySQL CDC source table maps the TINYINT(1) data type to the BOOLEAN data type of Realtime Compute for Apache Flink by default. This causes inaccurate data. For more information about how to use fields of the TINYINT(1) data type of MySQL to store values other than 0 and 1, see the "Create a MySQL catalog" section of the Manage MySQL catalogs topic.

  • Dimension table and sink table

    Data type of MySQL

    Data type of Realtime Compute for Apache Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    The value of p is less than or equal to 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Realtime Compute for Apache Flink supports only MySQL binary large object (BLOB) records that are less than or equal to 2,147,483,647(2^31 - 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

YAML deployment for data ingestion

The MySQL connector can be used as a source in a YAML deployment for data ingestion.

Syntax

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Parameters

Parameter

Description

Required

Data type

Default value

Remarks

type

The connector type of the source.

Yes

STRING

No default value

Set the value to mysql.

name

The name of the source.

No

STRING

No default value

N/A.

hostname

The IP address or hostname that is used to access the MySQL database.

Yes

STRING

No default value

We recommend that you enter the IP address of a VPC.

Note

If the MySQL database and Realtime Compute for Apache Flink are not deployed in the same VPC, you must establish a connection between the two VPCs or enable Realtime Compute for Apache Flink to access the MySQL database over the Internet. For more information, see Console operations and How does Realtime Compute for Apache Flink access the Internet?

username

The username that is used to access the MySQL database.

Yes

STRING

No default value

N/A.

password

The password that is used to access the MySQL database.

Yes

STRING

No default value

N/A.

tables

The MySQL tables whose data you want to synchronize.

Yes

STRING

No default value

  • If you want to read data from multiple tables, you can set this parameter to a regular expression.

  • Separate multiple regular expressions with commas (,).

Note

Separate a database name and a table name with a period (.). If you want to use a period (.) to match any character, you must use a backslash (\) for escaping. Example: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*

tables.exclude

The tables that you want to exclude from the tables whose data you want to synchronize.

No

STRING

No default value

  • If you want to exclude multiple tables, you can set this parameter to a regular expression.

  • Separate multiple regular expressions with commas (,).

Note

Separate a database name and a table name with a period (.). If you want to use a period (.) to match any character, you must use a backslash (\) for escaping. Example: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*

port

The port that is used to access the MySQL database.

No

INTEGER

3306

N/A.

schema-change.enabled

Specifies whether to write schema change events.

No

BOOLEAN

true

N/A.

server-id

The numeric ID or scope that is allocated to a database client for synchronization.

No

STRING

By default, a value in the range of 5400 to 6400 is randomly generated.

The ID must be unique in a MySQL cluster. We recommend that you configure a unique server ID for each deployment in the same database.

This parameter can also be set to an ID range, such as 5400-5408. If the incremental snapshot algorithm is enabled, parallel reading is supported. In this case, we recommend that you set this parameter to an ID range so that each parallel deployment uses a unique ID.

jdbc.properties.*

Custom JDBC URL parameters.

No

STRING

No default value

You can pass custom JDBC parameters. For example, if you do not want to use the SSL protocol, you can set the jdbc.properties.useSSL parameter to false.

For more information about the JDBC parameters, see Configuration Properties.

debezium.*

The custom Debezium parameter that is used to read binary log data.

No

STRING

No default value

You can pass custom Debezium parameters. For example, you can use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the logic for handling parsing errors.

scan.incremental.snapshot.chunk.size

The chunk size of the table. The chunk size indicates the number of rows.

No

INTEGER

8096

The table is split into multiple chunks for data reading. Before data in a chunk is read, the data is cached in the memory.

A smaller number of rows in a chunk indicates a larger number of chunks in the table. Therefore, if the chunk size is small, a small number of data records are read after fault recovery. This may reduce data throughput and cause an OOM error. Therefore, you must specify a reasonable chunk size based on your business requirements.

scan.snapshot.fetch.size

The maximum number of data records that can be extracted each time full data of a table is read.

No

INTEGER

1024

N/A.

scan.startup.mode

The startup mode when data is consumed.

No

STRING

initial

Valid values:

  • initial: When the MySQL CDC source connector starts for the first time, the connector scans all historical data and reads the most recent binary log data. This is the default value.

  • latest-offset: When the MySQL CDC source connector starts for the first time, the connector reads binary log data from the most recent offset instead of scanning all historical data. This way, the connector reads only the most recent incremental data after the connector starts.

  • earliest-offset: The MySQL CDC source connector does not scan full historical data. The connector starts to read accessible binary log data from the earliest offset.

  • specific-offset: The MySQL CDC source connector does not scan full historical data. The connector starts to scan binary log data from the specified offset. You can specify the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters to specify the name of the binary log file and offset at which the scan starts. You can also specify the scan.startup.specific-offset.gtid-set parameter to specify a global transaction identifier (GTID) set at which the scan starts.

  • timestamp: The MySQL CDC source connector does not scan full historical data. The connector starts to read binary log data from the specified timestamp. The timestamp is specified by the scan.startup.timestamp-millis parameter. Unit: milliseconds.

Important

If the scan.startup.mode parameter is set to earliest-offset, specific-offset, or timestamp for a deployment and the table schema at the startup time is different from the table schema at the specified start offset time, an error is reported in the deployment. If you use one of the three startup modes for a deployment, make sure that the table schema does not change between the specified binary log consumption offset time and the startup time of the deployment.

scan.startup.specific-offset.file

The name of the binary log file that is used to specify the startup offset when the scan.startup.mode parameter is set to specific-offset.

No

STRING

No default value

This parameter is required when the scan.startup.mode parameter is set to specific-offset. Example of a file name: mysql-bin.000003.

scan.startup.specific-offset.pos

The startup offset that is specified in the specified binary log file when the scan.startup.mode parameter is set to specific-offset.

No

INTEGER

No default value

This parameter is required when the scan.startup.mode parameter is set to specific-offset.

scan.startup.specific-offset.gtid-set

A GTID set that is used to specify the startup offset when the scan.startup.mode parameter is set to specific-offset.

No

STRING

No default value

This parameter is required when the scan.startup.mode parameter is set to specific-offset. Example of a GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

The timestamp in milliseconds that is used to specify the startup offset when the scan.startup.mode parameter is set to timestamp.

No

LONG

No default value

This parameter is required when the scan.startup.mode parameter is set to timestamp. Unit: milliseconds.

Important

If you set the scan.startup.mode parameter to timestamp, the MySQL CDC source connector attempts to read the initial event of each binary log file to determine the timestamp of the file and finally locates the binary log file that corresponds to the specified timestamp. Make sure that the binary log file that corresponds to the specified timestamp is not deleted from the database and can be read.

server-time-zone

The time zone of the session that is used by the database.

No

STRING

If you do not specify this parameter, the system uses the time zone of the environment in which the Realtime Compute for Apache Flink deployment runs as the time zone of the database server. The time zone is the time zone of the zone that you selected.

Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type. For more information, see Debezium connector for MySQL.

scan.startup.specific-offset.skip-events

The number of binary log events that are skipped when the data is read from the specified offset.

No

INTEGER

No default value

This parameter is required when the scan.startup.mode parameter is set to specific-offset.

scan.startup.specific-offset.skip-rows

The number of rows whose change events are skipped when the data is read from the specified offset. One binary log event may correspond to multiple rows of change events.

No

INTEGER

No default value

This parameter is required when the scan.startup.mode parameter is set to specific-offset.

connect.timeout

The maximum duration for which the connector waits before the connector makes a retry if the connection to the MySQL database server times out.

No

DURATION

30s

N/A.

connect.max-retries

The maximum number of retries after the connection to the MySQL database server fails.

No

INTEGER

3

N/A.

connection.pool.size

The size of the database connection pool.

No

INTEGER

20

The database connection pool is used to reuse connections. This can reduce the number of database connections.

heartbeat.interval

The interval at which the source moves forward the binary log file position by using heartbeat events.

No

DURATION

30s

Heartbeat events are used to identify the latest position of the binary log file that is read from the source. For slowly updated tables in MySQL, the binary log file position cannot automatically move forward. The source can move forward the binary log file position by using heartbeat events. This prevents the binary log file position from expiring. If the binary log file position expires, the deployment fails to run and cannot recover from the failure. If this occurs, you can only select Start without state for Start Method and specify Start Time for Reading Data to start the deployment.

scan.incremental.snapshot.chunk.key-column

Specifies columns for sharding in the savepoint phase.

No

STRING

No default value

You can select only one column from the primary key of the table for sharding.

rds.region-id

The ID of the region where an ApsaraDB RDS for MySQL instance resides.

This parameter is required when you use the feature of reading OSS archived logs.

STRING

No default value

For more information about region IDs, see Regions and zones.

rds.access-key-id

The AccessKey ID of the account that is used to access ApsaraDB RDS for MySQL.

This parameter is required when you use the feature of reading OSS archived logs.

STRING

No default value

For more information, see Console operations.

Important

To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage variables.

rds.access-key-secret

The AccessKey secret of the account that is used to access ApsaraDB RDS for MySQL.

This parameter is required when you use the feature of reading OSS archived logs.

STRING

No default value

For more information, see Console operations.

Important

To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage variables.

rds.db-instance-id

The ID of the ApsaraDB RDS for MySQL instance.

This parameter is required when you use the feature of reading OSS archived logs.

STRING

No default value

N/A.

rds.main-db-id

The ID of the primary database of the ApsaraDB RDS for MySQL instance.

No

STRING

No default value

For more information about how to obtain the ID of the primary database of an ApsaraDB RDS for MySQL instance, see Use the log backup feature for an ApsaraDB RDS for MySQL instance.

rds.download.timeout

The timeout period for downloading a single archived log from OSS.

No

DURATION

60s

N/A.

rds.endpoint

The endpoint that is used to obtain the OSS binary log information.

No

STRING

No default value

For more information about valid values of this parameter, see Endpoints.

rds.binlog-directory-prefix

The prefix of the directory in which the binary log file is saved.

No

STRING

rds-binlog-

N/A.

rds.use-intranet-link

Specifies whether to download the binary log file over an internal network.

No

BOOLEAN

true

N/A.

rds.binlog-directories-parent-path

The absolute path of the parent directory in which the binary log file is saved.

No

STRING

No default value

N/A.

chunk-meta.group.size

The size of the metadata information in a chunk.

No

INTEGER

1000

If the size of the metadata information is greater than the value of this parameter, the metadata information is split into multiple parts for synchronization.

chunk-key.even-distribution.factor.lower-bound

The minimum distribution factor that can be used to evenly distribute chunks.

No

DOUBLE

0.05

If the distribution factor is less than the value of this parameter, chunks are not evenly distributed.

Distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1)/Total number of data rows.

chunk-key.even-distribution.factor.upper-bound

The maximum distribution factor that can be used to evenly distribute chunks.

No

DOUBLE

1000.0

If the distribution factor is greater than the value of this parameter, chunks are not evenly distributed.

Distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1)/Total number of data rows.

scan.incremental.close-idle-reader.enabled

Specifies whether to close the idle reader after the savepoint is created.

No

BOOLEAN

false

To make the configuration of this parameter take effect, you must set the execution.checkpointing.checkpoints-after-tasks-finish.enabled parameter to true.

scan.only.deserialize.captured.tables.changelog.enabled

Specifies whether to deserialize only change events in the table that you specify during incremental data reading.

No

BOOLEAN

false

Valid values:

  • true: Only change events in the specified table are deserialized. This accelerates binary log reading.

  • false: Change events in all tables are deserialized. This is the default value.

scan.parallel-deserialize-changelog.enabled

Specifies whether to use multiple threads to parse change events during incremental data reading.

No

BOOLEAN

false

Valid values:

  • true: Multiple threads are used to parse change events when the change events are deserialized. The order of binary log events remains unchanged. This accelerates binary log reading.

  • false: A single thread is used to parse change events when the change events are deserialized. This is the default value.

scan.parallel-deserialize-changelog.handler.size

The number of event handlers when you use multiple threads to parse change events.

No

INTEGER

2

N/A.

metadata-column.include-list

The metadata columns that you want to write to the downstream storage.

No

STRING

No default value

The following metadata columns are supported: table_name, database_name, op_ts, and row_kind. Separate multiple metadata columns with semicolons (;).

scan.newly-added-table.enabled

Specifies whether to synchronize new tables that are not matched during the last startup when the deployment is restarted from a checkpoint.

No

BOOLEAN

false

This parameter takes effect when the deployment is restarted from a checkpoint or a savepoint.

scan.binlog.newly-added-table.enabled

Specifies whether to write the data of the matched new tables to the downstream storage during incremental data reading.

No

BOOLEAN

false

You cannot set this parameter to true if the scan.newly-added-table.enabled parameter is set to true.

Data type mappings

The following table describes the mappings between data types of MySQL CDC and data types of Flink CDC in a YAML deployment for data ingestion.

Data type of MySQL CDC

Data type of Flink CDC

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL], and p ≤ 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL], and p ≤ 38

FIXED(p, s) [UNSIGNED] [ZEROFILL], and p ≤ 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP_LTZ [(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7)/8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL], and 38 < p ≤ 65

STRING

Note

In MySQL, the precision of the DECIMAL data type is up to 65 digits. In Apache Flink, the precision of the DECIMAL data type is only 38 digits. Therefore, if you define a decimal column that has a precision greater than 38, you must map the column to a string column to prevent loss of precision.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL], and 38 < p ≤ 65

FIXED(p, s) [UNSIGNED] [ZEROFILL], and 38 < p ≤ 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Note

The data of the JSON data type is converted into a JSON-formatted string in Apache Flink.

GEOMETRY

STRING

Note

Data of spatial data types in MySQL is converted into a JSON-formatted string in Apache Flink. For more information, see MySQL Spatial Data Types Mapping.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Note

Apache Flink supports only MySQL BLOB values that are less than or equal to 2,147,483,647 (2^31 - 1) in length.

BLOB

MEDIUMBLOB

LONGBLOB

Sample code

  • CDC source table

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Dimension table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Sink table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Data ingestion source

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

Basic concepts about MySQL CDC source tables

  • Principles

    When the MySQL CDC source table is started, it scans the whole table, divides the table into multiple shards (chunks) based on the primary key, and records the binary log location at this time. Then, the MySQL CDC source connector uses the incremental savepoint algorithm to read data from each chunk by using the SELECT statement. The deployment periodically generates checkpoints to record the chunks whose data is read. If a failover occurs, the MySQL CDC source connector needs to only continue reading data from the chunks whose data is not read. After the data of all chunks is read, incremental change records are read from the previously obtained binary log file position. The deployment continues periodically generating checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC source connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented.

    For more information about the incremental savepoint algorithm, see MySQL CDC Connector.

  • Metadata

    In most cases, access to metadata is required when you merge and synchronize tables in a sharded database. If you expect to identify data records by the source database names and table names after tables are merged, you can configure metadata columns in the data merging statement to read the source database name and table name of each data record. This way, you can identify the source of each data record after tables are merged.

    MySQL CDC source tables that are used by Realtime Compute for Apache Flink of vvr-4.0.11-flink-1.13 or later support the metadata column syntax. The following table describes the metadata that you can access by using metadata columns.

    Metadata key

    Metadata type

    Description

    database_name

    STRING NOT NULL

    The name of the source database to which the current data record belongs.

    table_name

    STRING NOT NULL

    The name of the source table to which the current data record belongs.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    The time when the current data record changes in the database. If the data record is obtained from the historical data of the table instead of from the binary log file, the value of the metadata key is fixed to 0.

    op_type

    STRING NOT NULL

    The change type of the current data record. Valid values:

    • +I: indicates an INSERT message.

    • -D: indicates a DELETE message.

    • -U: indicates an UPDATE_BEFORE message.

    • +U: indicates an UPDATE_AFTER message.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

    The following sample code shows how to merge multiple orders tables in database shards of a MySQL instance into a MySQL table named mysql_orders and synchronize data from the MySQL table to a Hologres table named holo_orders.

    CREATE TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Read the database name. 
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Read the table name. 
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change time. 
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read the change type. 
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Use a regular expression to match multiple database shards. 
      'table-name' = 'orders_.*'   -- Use a regular expression to match multiple tables in the sharded database. 
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    If you add the 'scan.read-changelog-as-append-only.enabled' = 'true' configuration to the WITH clause in the preceding sample code, the output of the sample code varies based on the primary key of the downstream table:

    • If the primary key of the downstream table is order_id, the output contains only the last change data of each primary key value of the upstream table. If the last change of a primary key value of the upstream table is a delete operation, a data record with the same primary key value and op_type as -D is contained in the downstream table.

    • If the primary key of the downstream table is composed of order_id, operation_ts, and op_type, the output contains the complete change data of each primary key value of the upstream table.

  • Regular expressions

    MySQL CDC source tables allow you to use regular expressions in table names or database names to match multiple tables or databases. The following sample code provides an example on how to use a regular expression to specify multiple tables.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Use the regular expression to match multiple databases. 
      'table-name' = '(t[5-8]|tt)' -- Use the regular expression to match multiple tables. 
    );

    Elements in the preceding regular expressions:

    • In this example, ^(test).* is used to match a database whose name starts with the specified prefix. This expression can be used to match databases whose names start with test, such as test1 or test2.

    • In this example, .*[p$] is used to match a database whose name ends with the specified suffix. This expression can be used to match databases whose names end with p, such as cdcp or edcp.

    • In this example, txc is used to match the specified database name. This expression can be used to match a database that has a specific name, such as txc.

    When the MySQL CDC source connector matches a full-path table name, the connector uses the database name and table name to uniquely identify a table. This way, database-name.table-name is used as the pattern to match a table. For example, the (^(test).*| ^(tpc).*| txc |.*[p$]| t{2}).(t[ 5-8]| tt) pattern can match the txc.tt and test2.test5 tables in the database.

    Important

    In an SQL deployment, you cannot separate values of table-name and database-name with commas (,) to specify multiple tables or databases.

    • If you want to match multiple tables or use multiple regular expressions, you can use vertical bars (|) to connect the tables or regular expressions and enclose the tables or regular expressions in parentheses (). For example, if you want to read data from the user and product tables, you can set table-name to (user|product).

    • If you use a regular expression that contains commas (,), you must rewrite the regular expression into an equivalent regular expression that contains vertical bars (|). For example, if you use the mytable_\d{1, 2) regular expression, you must rewrite the regular expression into an equivalent regular expression (mytable_\d{1}|mytable_\d{2}) to prevent comma conflicts.

  • Parallelism control

    The MySQL CDC source connector can read full data in multiple subtasks at the same time. This improves the efficiency of data loading. If you use the MySQL CDC source connector together with the Autopilot feature that is provided by the VVP of Realtime Compute for Apache Flink, automatic scale-in can be performed during incremental data reading after parallel reading is complete. This helps reduce the computing resources that are consumed.

    You can configure the Parallelism parameter in Basic mode or Expert mode in the Deployment Starting Configuration - Streaming dialog box in the development console of Realtime Compute for Apache Flink. The setting of the Parallelism parameter varies based on the resource configuration mode.

    • In Basic mode, the Parallelism parameter specifies the global parallelism of all deployments.基础模式

    • In Expert mode, you can configure the Parallelism parameter for a specific vertex node based on your business requirements.vertex并发

    For more information about resource configurations, see Configure a deployment.

    Important

    When you configure the Parallelism parameter for a deployment in Basic mode or Expert mode, make sure that the range of the server-id parameter declared in the table is greater than or equal to the value of the Parallelism parameter for the deployment. For example, if the range of the server-id parameter is 5404-5412, eight unique server IDs can be used. Therefore, you can configure a maximum of eight parallel subtasks. In addition, the range specified by the server-id parameter for the same MySQL instance in different deployments cannot overlap. Each deployment must be explicitly configured with a unique server ID.

  • Automatic scale-in by using Autopilot

    When full data is read, a large amount of historical data is accumulated. In most cases, Realtime Compute for Apache Flink reads historical data in parallel to improve the efficiency of data reading. When incremental data is read, only a single subtask is run because the amount of binary log data is small and the global order must be ensured. The numbers of compute units (CUs) that are required during full data reading and incremental data reading are different. You can use the Autopilot feature to balance performance and resource consumption.

    Autopilot monitors the traffic for each task that is used by the MySQL CDC source table. If the binary log data is read in only one task and other tasks are idle during incremental data reading, Autopilot automatically reduces the number of CUs and the degree of parallelism. To enable Autopilot, you need to only set the Mode parameter of Autopilot to Active on the Deployments page.

    Note

    By default, the minimum interval at which the degree of parallelism is decreased is 24 hours. For more information about automatic tuning and related parameters, see Configure automatic tuning.

  • Startup mode

    You can configure the scan.startup.mode parameter to specify the mode in which the MySQL CDC source connector starts to read data from a MySQL database. Valid values:

    • initial: When the MySQL CDC source connector starts for the first time, the connector reads full data in a database table. After the full data reading is complete, the connector switches to the incremental reading mode to read binary log data. This is the default value.

    • earliest-offset: The MySQL CDC source connector skips the savepoint reading phase and starts to read accessible binary log data from the earliest offset.

    • latest-offset: The MySQL CDC source connector skips the savepoint reading phase and starts to read binary log data from the most recent offset. In this mode, the MySQL CDC source connector can read only data changes after the deployment is started.

    • specific-offset: The MySQL CDC source connector skips the savepoint reading phase and starts to read binary log data from the specified offset. You can specify the startup offset by using the name of a binary log file and the binary log file position or by using a GTID set.

    • timestamp: The MySQL CDC source connector skips the savepoint reading phase and starts to read binary log events from the specified timestamp.

    Sample code:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode'='earliest-offset, ' -- Starts to read data from the earliest offset. 
        'scan.startup.mode' = 'latest-offset', -- Starts to read data from the most recent offset. 
        'scan.startup.mode' = 'specific-offset', -- Starts to read data from the specified offset. 
        'scan.startup.mode' = 'timestamp', -- Starts to read data from the specified timestamp. 
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specifies the name of the binlog log file when the scan.startup.mode parameter is set to specific-offset. 
        'scan.startup.specific-offset.pos' = '4', -- Specifies the binary log file position when the scan.startup.mode parameter is set to specific-offset. 
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specifies the GTID set when the scan.startup.mode parameter is set to specific-offset. 
        'scan.startup.timestamp-millis' = '1667232000000' -- Specifies the startup timestamp when the scan.startup.mode parameter is set to timestamp. 
        ...
    )
    Important
    • When a checkpoint is created, the MySQL CDC source connector records the current binary log event at the current offset as a log entry of the INFO level. The prefix of the log entry is Binlog offset on checkpoint {checkpoint-id}. The log entry can help you start the deployment from a specific checkpoint.

    • If the schema of a table is changed after the table is read, an error may occur when the scan.startup.mode parameter is set to earliest-offset, specific-offset, or timestamp. This is because the Debezium reader saves the latest table schema and earlier data that does not match the table schema cannot be correctly parsed.

  • MySQL CDC source tables that do not have a primary key

    • In Realtime Compute for Apache Flink that uses VVR 6.0.7 or later, you can use MySQL CDC source tables that do not have a primary key. To use a MySQL CDC source table that does not have a primary key, you must configure the scan.incremental.snapshot.chunk.key-column parameter and specify only non-null fields.

    • The processing semantics of a MySQL CDC source table that does not have a primary key is determined based on the behavior of the columns that are specified by the scan.incremental.snapshot.chunk.key-column parameter.

      • If no update operation is performed on the specified columns, the exactly-once semantics is ensured.

      • If the update operation is performed on the specified columns, only the at-least-once semantics is ensured. However, you can specify the downstream primary key and perform the idempotence operation to ensure data correctness.

  • Reading of backup binary log files from ApsaraDB RDS for MySQL

    The MySQL CDC source connector supports the reading of backup binary log files from ApsaraDB RDS for MySQL. This operation is suitable for scenarios in which full data reading takes a long period of time, local binary log files are automatically deleted, and the backup binary log files that are automatically or manually uploaded are available.

    Sample code:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • Enabling of the reuse of a MySQL CDC source table

    If multiple MySQL CDC source tables are used in the same deployment, the binlog client of each MySQL CDC source table is started. If a large number of MySQL CDC source tables are used and the MySQL tables to be read are in the same instance, the load on the database is high. For more information, see FAQ about CDC.

    Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports the reuse of a MySQL CDC source table. If the configurations, except the database name, table name, and server-id, of a MySQL CDC source table are the same as the configurations of another MySQL CDC source table, you can merge the source tables into one and reuse the merged source table. After you enable the reuse of MySQL CDC source tables, Realtime Compute for Apache Flink merges all MySQL CDC source tables in the same deployment that meet the merging requirements.

    You can run the following SET command to enable the reuse of a MySQL CDC source table in an SQL deployment:

    SET 'table.optimizer.source-merge.enabled' = 'true';

    If you enable the reuse of a MySQL CDC source table in an existing SQL deployment, you must start the deployment without states. If you enable the reuse of a MySQL CDC source table in an existing SQL deployment, the deployment topology is changed. In this case, the deployment may not be started from the latest state, or data loss may occur.

    Important
    • In Realtime Compute for Apache Flink that uses VVR 8.0.8 and VVR 8.0.9, you must specify SET 'sql-gateway.exec-plan.enabled' = 'false' when you enable the reuse of a MySQL CDC source table in an SQL deployment.

    • After the operator chain is disconnected, the data writing from the MySQL CDC source table to the downstream operator increases the overhead of serialization and deserialization. Therefore, we recommend that you do not set the pipeline.operator-chaining parameter to false after you enable the reuse of a MySQL CDC source table. The more MySQL CDC source tables are merged, the higher the overhead is generated.

    • In Realtime Compute for Apache Flink that uses VVR 8.0.7, serialization issues may occur if you set the pipeline.operator-chaining parameter to false.

Binary log reading acceleration

If the MySQL CDC connector is used as a source table or a data ingestion source, the MySQL CDC connector parses binary log files to generate change messages. Binary log files record the change data of all tables. You can configure the following parameters to accelerate the parsing of binary log data:

  • Enable parallel parsing and parsing filter configuration

    • scan.only.deserialize.captured.tables.changelog.enabled: Parses only the change events in the specified tables.

    • scan.parallel-deserialize-changelog.enabled: Uses multiple threads to parse events in binary log files and send the parsed events to the consumption queue based on the event order in the binary log files.

  • Optimize Debezium-related parameters

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: The maximum number of records that can be used by blocking queues. When the Debezium reader reads an event stream from the database, the event stream is added to a blocking queue before the event stream is written to the downstream storage. Default value: 8192.

    • debezium.max.batch.size: The maximum number of events that the connector can process in each iteration. Default value: 2048.

    • debezium.poll.interval.ms: The duration that the connector must wait before a new change event is requested. Default value: 1000. Unit: milliseconds.

Sample code:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Configure Debezium-related parameters.
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Enable parallel parsing and parsing filter configuration.
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Parse only the change events in the specified tables. 
    'scan.parallel-deserialize-changelog.enabled' = 'true' -- Use multiple threads to parse events in binary log files. 
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Configure Debezium-related parameters.
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Enable parallel parsing and parsing filter configuration.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  scan.parallel-deserialize-changelog.enabled: true

MySQL CDC Enterprise Edition consumes binary log data at a rate of 85 MB/s, which is about twice that of the open source community. When the binary log file is generated at a speed that is greater than 85 MB/s, the processing latency of the Realtime Compute for Apache Flink deployment continues to increase. The processing latency of the deployment gradually decreases when the speed at which the binary log file is generated decreases. If a binary log file contains a large transaction, the processing latency may increase for a short period of time. After the transaction log is read, the processing latency may decrease.

MySQL CDC DataStream API

Important

If you want to call a DataStream API operation to read or write data, you must use a DataStream connector of the related type to access Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.

Create a DataStream API program and use the MySqlSource class. Sample code and dependencies added to the pom.xml file:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

The following table describes the parameters that you must specify for the MySqlSource class.

Parameter

Description

hostname

The IP address or hostname that is used to access the MySQL database.

port

The port that is used to access the MySQL database.

databaseList

The name of the MySQL database that you want to access.

Note

If you want to read data from multiple databases, you can set this parameter to a regular expression. You can use .* to match all databases.

username

The username that is used to access the MySQL database.

password

The password that is used to access the MySQL database.

deserializer

A deserializer, which deserializes SourceRecord into a specified type. Valid values:

  • RowDataDebeziumDeserializeSchema: deserializes SourceRecords to the internal data structure RowData of the Flink Table API or Flink SQL API.

  • JsonDebeziumDeserializationSchema: deserializes SourceRecords to JSON strings.

The following table describes the parameters that you must specify for the dependencies added to the pom.xml file.

${vvr.version}

The VVR version that Realtime Compute for Apache Flink uses. Example:vvr-8.0.4-flink-1.17.

${flink.version}

The version of Apache Flink. Example: 1.17.2.

Important

The VVR version that Realtime Compute for Apache Flink uses must be compatible with the corresponding Apache Flink version. Otherwise, incompatibility issues may occur when your deployment is running. For more information about the version mappings, see the "Engine updates" section of the Release notes topic.

FAQ

For more information about issues that may occur when you use a MySQL CDC source table, see FAQ about CDC.