All Products
Search
Document Center

Realtime Compute for Apache Flink:CREATE TABLE AS statement

Last Updated:Oct 17, 2024

You can execute the CREATE TABLE AS statement to synchronize data and the changes in the table schema from one table to another table in real time. This helps improve the efficiency of creating a table in a destination store and synchronizing changes in the schema of a source table to a result table. This topic describes how to use the CREATE TABLE AS statement and provides examples of using the CREATE TABLE AS statement in various scenarios.

Prerequisites

A catalog of the destination store is created in your workspace. For more information, see Manage catalogs.

Limits

  • Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.11-flink-1.13 or later supports the CREATE TABLE AS statement.

    Important

    The CREATE TABLE AS statement does not support deployment debugging.

  • Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.12-flink-1.13 or later supports the addition of custom computed columns.

  • In Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) earlier than 4.0.16, you cannot use multiple CREATE TABLE AS statements in a deployment to synchronize data from the same source table to different result tables.

  • You cannot use the CREATE TABLE AS statement together with the INSERT INTO statement in a deployment for data synchronization.

  • The following table describes the upstream and downstream data stores for which you can use the CREATE TABLE AS statement.

    Connector

    Source table

    Result table

    Remarks

    MySQL connector

    ×

    • By default, the database name and table names of the upstream storage are synchronized during merging and synchronization of multiple tables in a sharded database.

    • During single-table synchronization, the database name and table names are not synchronized. If you want to synchronize the database name and table names, execute an SQL statement to create a catalog and add the catalog.table.metadata-columns parameter to the code. For more information, see Configure a MySQL catalog by executing an SQL statement.

    • The MySQL connector cannot be used to synchronize views of MySQL databases.

    Apache Kafka connector

    ×

    N/A.

    MongoDB connector

    ×

    • The MongoDB connector does not support merging and synchronization of multiple tables in a sharded database.

    • The MongoDB connector does not support synchronization of MongoDB metadata.

    • You cannot execute the CREATE TABLE AS statement to synchronize data from new tables in the source database by using the MongoDB connector.

    • You can execute the CREATE TABLE AS statement to synchronize data and table schema changes from a MongoDB source table to a destination table by using the MongoDB connector. For more information, see Example 9: synchronization of data from a MongoDB source table to a Hologres table by using the CREATE TABLE AS statement.

    Upsert Kafka connector

    ×

    N/A.

    StarRocks connector

    ×

    The CREATE TABLE AS statement supports only StarRocks clusters of E-MapReduce (EMR).

    Hologres connector

    ×

    If the downstream storage service is Hologres, the CREATE TABLE AS statement creates a specific number of connections for each Hologres table by default. The number of connections is specified by the connectionSize parameter. You can configure the connectionPoolName parameter to allow tables for which the same connection pool is configured to share the connection pool.

    Note

    If the source table contains data of specific types that are not supported by the fixed plan feature when you synchronize data to Hologres, we recommend that you use the INSERT INTO statement to convert the data types before the data is synchronized to Hologres. In this scenario, we recommend that you do not use the CREATE TABLE AS statement to create a result table for data synchronization. If you use the CREATE TABLE AS statement, the fixed plan feature cannot be used and the writing performance is poor.

    Apache Paimon connector

    ×

    • Only Realtime Compute for Apache Flink whose engine version is vvr-6.0.7-flink-1.15 or later supports Apache Paimon result tables.

    • The Apache Paimon connector does not support data synchronization to Apache Paimon result tables stored in Data Lake Formation (DLF) 2.0.

Features

Feature

Description

Single-table synchronization

Synchronizes full data and incremental data from a source table to a result table in real time.

Synchronization of table schema changes

Synchronizes schema changes of a source table, such as an added column, to a result table in real time.

Merging and synchronization of multiple tables in a sharded database

Allows you to use regular expressions to define database shard names and table names that match multiple database shards and tables in a sharded database. Then, you can merge the data in the tables and synchronize the data to a result table.

Note

When you use regular expressions to define database shard names and table names, you cannot use carets (^) to match the beginning of the name of a table.

Addition of custom computed columns

Allows you to add computed columns to the source table. This way, data in specific columns of the source table can be converted and computed. You can use system functions or user-defined functions (UDFs) for computed columns. You can specify the position of the computed column that you want to add and use the computed column as a physical column in the result table. The results of computed columns are synchronized to the result table in real time.

Execution of multiple CREATE TABLE AS statements

Allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS statements as one deployment. You can also merge and reuse the data of source operators to reduce the reading load on the data source.

For a deployment that executes multiple CREATE TABLE AS statements, you can add a CREATE TABLE AS statement for the deployment to add a table for data synchronization. For more information, see Example 6: addition of a CREATE TABLE AS statement for a data synchronization deployment that executes multiple CREATE TABLE AS statements.

Startup process

When you execute the CREATE TABLE AS statement, Realtime Compute for Apache Flink performs the following operations:

  1. Checks whether the result table exists in the destination store.

    • If the result table does not exist, Realtime Compute for Apache Flink uses the catalog of the destination store to create the result table in the destination store. The result table has the same schema as the data source.

    • If the result table exists, Realtime Compute for Apache Flink does not create a table.

    • If the schema of the result table is different from the schema of the source table, an error is returned.

  2. Commits and runs the data synchronization job.

    Realtime Compute for Apache Flink synchronizes data and changes in the schema from the data source to the result table.

The following figure shows how to use the CREATE TABLE AS statement to synchronize data from MySQL to Hologres.同步示意图

Synchronization policies for table schema changes

You can use the CREATE TABLE AS statement to synchronize data in real time and also synchronize schema changes from the source table to the result table. Schema changes include table creation and schema changes after a table is created.

  • The following schema change policies are supported:

    • Add a nullable column: The statement automatically adds the related column to the end of the schema of the result table and synchronizes data to the added column.

    • Delete a nullable column: The statement automatically fills null values in the nullable column of the result table instead of deleting the column from the table.

    • Add a non-nullable column: The statement automatically adds the related column to the end of the schema of the result table and synchronizes the data of the new column. The new column is automatically set to a nullable column and the data before the column is added is automatically set to null values.

    • Rename a column: The operation of renaming a column involves adding a column and deleting a column. After a column is renamed in the source table, the column that uses the new name is added to the end of the result table and the column that uses the original name is filled with null values. For example, if the name of the col_a column in the source table is changed to col_b, the col_b column is added to the end of the result table and the col_a column is automatically filled with null values.

    • Change the data type of a column:

      • If the data type of a column in a result table changes in a deployment that executes the CREATE TABLE AS statement, the statement supports a column type change only if the downstream sink supports the change of the data type in the column. For example, the data type of a column is changed from INT to BIGINT. Whether a downstream sink supports a data type change in a column depends on the column type change rules. Different result tables support different column type change rules. For more information about the column type change rules that are supported by a type of result table, see the related result table documentation. Only Apache Paimon supports column type changes.

      • If a downstream sink such as Hologres involved in the deployment that executes the CREATE TABLE AS statement does not support a data type change in a column, the CREATE TABLE AS statement does not support the column type change. In this case, you can use a result table in type normalization mode for data synchronization. When you start a deployment that executes the CREATE TABLE AS statement, you can create a result table that is in type normalization mode. If the data type of a column changes, Realtime Compute for Apache Flink determines whether the downstream sink supports the data type change. This helps support changes of more data types of columns. For more information, see Example 8: execution of the CREATE TABLE AS statement to synchronize data to a Hologres table in type normalization mode. Only Hologres supports column type changes in type normalization mode. We recommend that you enable the type normalization mode the first time you start a deployment that executes the CREATE TABLE AS statement. If you do not enable the type normalization mode the first time you start the deployment, the type normalization mode does not take effect. In this case, you must delete the downstream table and restart the deployment without using the state data.

  • The following schema changes are not supported:

    • Change of constraints, such as the primary key or index

    • Deletion of a non-nullable column

    • Change from not null to nullable

Important
  • If the schema of the source table has one of the preceding changes, you must delete the result table and restart the deployment that executes the CREATE TABLE AS statement. This way, the result table is recreated and historical data is resynchronized to the result table.

  • The CREATE TABLE AS statement does not identify the types of DDL statements, but compares the schema differences between the two data records before and after the schema is changed. Therefore, if you delete a column and then add the column again and no data change occurs between the two DDL statements that are used to delete and add the column, the CREATE TABLE AS statement considers that no schema change occurs. Similarly, the CREATE TABLE AS statement does not trigger schema change synchronization even if you add a column to the source table. The statement identifies the schema change only when the data changes in the source table. In this case, the statement synchronizes the schema change to the result table.

Syntax

CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  computed_column_definition [FIRST | AFTER column_name]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

The CREATE TABLE AS statement uses the basic syntax of the CREATE TABLE statement. The following table describes the parameters.

Parameter

Description

sink_table

The name of the table to which data is synchronized. You can use a catalog name and a database name to specify the name of the result table.

COMMENT

The description of the result table. By default, the description of source_table is used.

WITH

The parameters of the result table. You can specify the parameters in the WITH clause that are supported by the result table. For more information, see "Parameters in the WITH clause" in Upsert Kafka connector, Hologres connector, StarRocks connector, or Apache Paimon connector.

Note

Both the key and value must be of the STRING type, such as 'jdbcWriteBatchSize' = '1024'.

source_table

The name of the table from which data is synchronized. You can use a catalog name and database name to specify the name of the source table.

OPTIONS

The parameters of the source table. You can specify the parameters in the WITH clause that are supported by the source table. For more information, see "Parameters in the WITH clause" in MySQL connector and Apache Kafka connector.

Note

Both the key and value must be of the STRING type, such as 'server-id' = '65500'.

ADD COLUMN

Adds columns to the result table when data is synchronized from the source table to the result table. Only computed columns can be added.

column_component

The description of the new column.

computed_column_definition

The description of the computed column expression.

FIRST

Specifies that the new column is used as the first field in the source table. If you do not use this parameter, the new column is used as the last field in the source table by default.

AFTER

Specifies that the new column is added after the specified field in the source table.

PARTITION BY

Specifies the columns based on which a table is partitioned.

Note

The IF NOT EXISTS keyword is required. If the result table does not exist in the destination store, the result table is created first. If the result table exists, the table creation step is skipped. The result tables that are created use the schemas of the source tables, including the primary key and the names and types of the physical fields. The computed columns, meta field, and watermark are not included. The field types of the source tables are mapped to the field types of the result tables. For more information, see the data type mappings in the documentation of the related connector.

Examples

Example 1: single-table synchronization

In most cases, the CREATE TABLE AS statement is used with the catalog of the data source and the catalog of the destination store. For example, you can execute the CREATE TABLE AS statement and use a MySQL catalog and a Hologres catalog to synchronize full data and incremental data from the MySQL database to Hologres. You can use a MySQL catalog to parse the schema and related parameters of the source table without the need to manually write DDL statements.

For example, a Hologres catalog named holo and a MySQL catalog named mysql are created in your workspace. You can use the following code to synchronize data from the web_sales table of the MySQL database to Hologres:

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales  
WITH ('jdbcWriteBatchSize' = '1024')   -- Configure the parameters of the result table. This setting is optional. 
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */; -- Configure additional parameters for the MySQL CDC source table.

Example 2: merging and synchronization of multiple tables in a sharded database

If you want to merge and synchronize data of multiple tables in a sharded database, you can use a MySQL catalog and a regular expression that defines database shard names and table names to match the tables whose data you want to synchronize. You can use the CREATE TABLE AS statement to merge multiple tables in a sharded database into a Hologres table. The database shard names and table names are written as the values of two additional fields to the table. To ensure that the primary key is unique, the database shard name field, table name field, and original primary key are used as the new joint primary key of the Hologres table.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

The following figure shows the effect of the merging operation. 效果If you add a column named age to the table named user02 and insert a data record into the table, the data and schema changes on the user02 table can be automatically synchronized to the result table in real time even if the schemas of the source tables are different.

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

Example 3: addition of custom computed columns

This example describes how to add computed columns to the source tables for data conversion and computation during the merging and synchronization of multiple tables in a sharded database. In this example, the user table is used.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

The following figure shows the synchronization effect.

image

Example 4: execution of multiple CREATE TABLE AS statements

Realtime Compute for Apache Flink allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS statements as one deployment. Realtime Compute for Apache Flink can also optimize the data of source operators and use a single source operator to read data from multiple business tables. This operation can help reduce the use of server-id, the number of database connections, and the database reading load. Therefore, this operation is suitable for MySQL CDC data sources.

Important

If you want to optimize data of source operators and use one source operator to read data from multiple tables, you must make sure that the options of the tables of source operators are the same.

For example, data is synchronized from the web_sales table in the first code segment, and data is synchronized from multiple tables whose names start with user in a sharded database in the second code segment. In this case, you can use the STATEMENT SET statement to commit these code segments as one deployment.

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Synchronize data from the web_sales table. 
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- Synchronize data from multiple tables in the specified database shards whose names start with user. 
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Example 5: execution of multiple CREATE TABLE AS statements to synchronize data from the same source table to different result tables

In Realtime Compute for Apache Flink that uses VVR 4.0.16 or later, you can execute multiple CREATE TABLE AS statements to synchronize data from the same data source table to different result tables without the need to add computed columns.

USE CATALOG `holo`;

BEGIN STATEMENT SET;

-- Execute the CREATE TABLE AS statement to synchronize data from the user table of the MySQL database to the user table in database1 of Hologres.
CREATE TABLE IF NOT EXISTS `database1`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

-- Execute the CREATE TABLE AS statement to synchronize data from the user table of the MySQL database to the user table in database2 of Hologres.
CREATE TABLE IF NOT EXISTS `database2`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

If you want to add a computed column to the result table, execute the following statements to perform data synchronization:

-- Create a temporary table named user_with_changed_id based on the source table user. You can define a computed column. The following sample code defines the computed_id column that is calculated based on the id column of the source table. 
CREATE TEMPORARY TABLE `user_with_changed_id` (
  `computed_id` AS `id` + 1000
) LIKE `mysql`.`tpcds`.`user`;

-- Create a temporary table named user_with_changed_age based on the source table user. You can define a computed column. The following sample code defines the computed_age column that is calculated based on the age column of the source table. 
CREATE TEMPORARY TABLE `user_with_changed_age` (
  `computed_age` AS `age` + 1
) LIKE `mysql`.`tpcds`.`user`;

BEGIN STATEMENT SET;

-- Execute the CREATE TABLE AS statement to synchronize data from the user table of the MySQL database to the user_with_changed_id table of Hologres. The user_with_changed_id table contains the IDs that are obtained from the calculation based on the id column of the source table. The obtained IDs are in the computed_id column.  
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
AS TABLE `user_with_changed_id`
/*+ OPTIONS('server-id'='8001-8004') */;

-- Execute the CREATE TABLE AS statement to synchronize data from the user table of the MySQL database to the user_with_changed_age table of Hologres. The user_with_changed_age table contains the age values that are obtained from the calculation based on the age column of the source table. The obtained age values are in the computed_age column.  
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
AS TABLE `user_with_changed_age`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Example 6: addition of a CREATE TABLE AS statement for a data synchronization deployment that executes multiple CREATE TABLE AS statements

In Realtime Compute for Apache Flink that uses VVR 8.0.1 or later, if you add a CREATE TABLE AS statement for a deployment that executes multiple CREATE TABLE AS statements after the deployment is started, you can restart the deployment from a savepoint to capture the new table and synchronize data from the new table.

  1. When you develop an SQL draft, you must add the following statement to enable the feature of reading data from a new table:

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  2. If you want to add a CREATE TABLE AS statement for a deployment, find the deployment on the Deployments page and click Cancel in the Actions column. In the message that appears, select Stop With Savepoint and click OK.

  3. On the SQL Editor page, add the CREATE TABLE AS statement to the SQL editor of the deployment, and click Deploy in the upper-left corner to redeploy the SQL deployment.

  4. On the Deployments page, click the name of the deployment. On the deployment details page, click the Status tab. Then, click the History tab.

  5. In the Savepoints list, find the savepoint that was created when the deployment was canceled.

  6. Choose More > Start job from this snapshot in the Actions column.

  7. In the Start Job panel, configure the parameters. For more information, see Start a deployment.

Important

When you add a CREATE TABLE AS statement to the code of a draft, take note of the following items:

  • When you use a Change Data Capture (CDC) source table for data synchronization, the feature of reading data from a new table takes effect only for a deployment whose startup mode is Initial Mode.

  • The configuration of the source table that is added by using the new CREATE TABLE AS statement must be the same as the configuration of the original source table. This way, the source of the new CREATE TABLE AS statement can be reusable.

  • You cannot modify the parameters of a deployment, such as the startup mode, before and after you add a CREATE TABLE AS statement for the deployment.

Example 7: execution of the CREATE TABLE AS statement to synchronize data from a source table to a Hologres partitioned table

When you create a Hologres partitioned table that has a primary key, the partition field must be a field in the primary key. If you want to synchronize data from a MySQL table to Hologres, execute the following statement to create the table:

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

When you execute the CREATE TABLE AS statement to synchronize data from the source table to the Hologres partitioned table, the processing mechanism varies based on whether the primary key of the source table contains a partition field of the result table:

  • If the primary key of the source table contains a partition field of the result table, you can execute the following SQL statement for data synchronization. In this example, the partition field in the Hologres partitioned table is product_id.

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
    PARTITIONED BY (product_id)
    AS TABLE `mysql`.`tpcds`.`orders`;
  • If the primary key of the source table does not contain a partition field of the result table, the primary key of the MySQL table is used when the Hologres table is created. In this case, an error occurs. In this example, the partition field in the Hologres partitioned table is city. In this case, you can declare the primary key in the CREATE TABLE AS statement and specify the primary key of the Hologres partitioned table to make the deployment run as expected. Sample statement:

    -- Execute the following SQL statement to declare the order_id, product_id, and city fields as the primary key of the Hologres partitioned table: 
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
        CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
    )
    PARTITIONED BY (city)
    AS TABLE `mysql`.`tpcds`.`orders`;

Example 8: execution of the CREATE TABLE AS statement to synchronize data to a Hologres table in type normalization mode

When you execute the CREATE TABLE AS statement, you may need to change the data type precision for the existing fields. For example, you can change the data type precision from VARCHAR(10) to VARCHAR(20).

  • For Realtime Compute for Apache Flink whose engine version is earlier than vvr-6.0.5-flink-1.15, if you change the data type of a field in the source table of a deployment in which the CREATE TABLE AS statement is executed, the deployment may fail. In this case, you must recreate the result table.

  • When you synchronize data in a deployment of Realtime Compute for Apache Flink whose engine version is vvr-6.0.5-flink-1.15 or later, the type normalization mode is supported. We recommend that you enable the type normalization mode the first time you start a deployment that executes the CREATE TABLE AS statement. If you do not enable the type normalization mode the first time you start the deployment, the type normalization mode does not take effect. In this case, you must delete the downstream table and restart the deployment without using the state data.

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
    WITH (
    'connector' = 'hologres', 
    'enableTypeNormalization' = 'true' -- Use the type normalization mode. 
    ) AS TABLE `mysql`.`tpcds`.`orders`;

    In type normalization mode, data type changes in the source table do not lead to a deployment failure only if the data types before and after the change can be converted into the same data type based on type normalization rules. If you use the type normalization mode, take note of the following items:

    • The TINYINT, SMALLINT, INT, and BIGINT data types are converted into the BIGINT data type.

    • The CHAR, VARCHAR, and STRING data types are converted into the STRING data type.

    • The FLOAT and DOUBLE data types are converted into the DOUBLE data type.

    • Other data types are converted based on the data type mappings between Hologres and Flink fields. For more information, see Data type mappings.

    Example:

    • When the type normalization mode is enabled, the SMALLINT and INT data types are converted into the BIGINT data type. If you change the SMALLINT data type to the INT data type, the change is considered successful. Therefore, the deployment that executes the CREATE TABLE AS statement runs as expected.

    • When the type normalization mode is enabled, the FLOAT data type is converted into the DOUBLE data type and the BIGINT data type is converted into the BIGINT data type. If you change the FLOAT data type to the BIGINT data type, a data type incompatibility error occurs.

Example 9: synchronization of data from a MongoDB source table to a Hologres table by using the CREATE TABLE AS statement

In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, you can execute the CREATE TABLE AS statement to synchronize data from a MongoDB source table to a destination table in real time and synchronize the schema changes in the source table to the destination table. You can use MongoDB catalogs together with the CREATE TABLE AS statement to synchronize schema changes. You do not need to manually define the table schema. For more information about MongoDB catalogs, see Manage MongoDB catalogs.

The following sample code shows how to use the CREATE TABLE AS statement to synchronize data from a MongoDB source table to a Hologres table.

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;
Important

When you execute the CREATE TABLE AS or CREATE DATABASE AS statement to synchronize data from a MongoDB database to a destination table, make sure that the following conditions are met:

  • The VVR version of Realtime Compute for Apache Flink is 8.0.6 or later. The version of the MongoDB database is 6.0 or later.

  • The scan.incremental.snapshot.enabled and scan.full-changelog parameters are set to true in SQL hints.

  • The preimage and postimage features are enabled for the MongoDB database. For more information about how to enable the preimage and postimage features, see Document Preimages.

If you want to synchronize data from multiple MongoDB collections in a deployment, make sure that the configurations of the following parameters for all tables in the deployment are the same:

  • Parameters related to the MongoDB database, including hosts, scheme, username, password, and connectionOptions

  • scan.startup.mode

Example 10: synchronization of data from all tables in a MySQL database to Kafka

Multiple deployments may use the same MySQL table. If multiple deployments use the same MySQL table, the MySQL database establishes multiple connections. This causes a heavy load on the MySQL server and network. To reduce the load on an upstream MySQL database, Realtime Compute for Apache Flink allows you to synchronize data from all tables in the MySQL database to Kafka. This way, Kafka is used as the intermediate layer for data synchronization. You can execute the CREATE DATABASE AS statement to synchronize data from all tables in a MySQL database to Kafka. You can also execute the CREATE TABLE AS statement to synchronize data from a single table in a MySQL database to Kafka. For more information, see Synchronize data from all tables in a MySQL database to Kafka.

References