All Products
Search
Document Center

Realtime Compute for Apache Flink:CREATE DATABASE AS statement

Last Updated:Oct 18, 2024

The CREATE DATABASE AS statement can synchronize table schemas and data of an entire database in real time. This statement can also synchronize changes of table schemas. This topic describes how to use the CREATE DATABASE AS statement and provides examples on how to use the CREATE DATABASE AS statement in various scenarios.

Background information

The CREATE DATABASE AS statement is a syntactic sugar of the CREATE TABLE AS statement to synchronize data from all tables in a database or synchronize data from multiple tables in a database. Realtime Compute for Apache Flink translates the CREATE DATABASE AS statement into a CREATE TABLE AS statement for each table from which data needs to be synchronized. Therefore, the CREATE DATABASE AS statement also provides the data synchronization and schema change synchronization capabilities of the CREATE TABLE AS statement. In most cases, the CREATE DATABASE AS statement is suitable for fully automated data integration scenarios. Realtime Compute for Apache Flink can also optimize source tables and use one source operator to read data from multiple business tables. This is suitable for MySQL Change Data Capture (CDC) data sources. The CREATE DATABASE AS statement can help reduce the number of database connections and prevent the system from repeatedly pulling binary log data to reduce the database reading load.

Limits

  • Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.11-flink-1.13 or later supports the CREATE DATABASE AS statement.

    Important

    The CREATE DATABASE AS statement does not support deployment debugging.

  • Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.13-flink-1.13 or later supports merging and synchronization of data of tables in a sharded database.

  • The following table describes the upstream and downstream storage services supported by the CREATE DATABASE AS statement.

    Connector

    Source table

    Result table

    Remarks

    MySQL connector

    ×

    The MySQL connector cannot be used to synchronize views of MySQL databases.

    Apache Kafka connector

    ×

    N/A.

    MongoDB connector

    ×

    • The MongoDB connector does not support merging and synchronization of multiple tables in a sharded database.

    • The MongoDB connector does not support synchronization of MongoDB metadata.

    • You can execute the CREATE DATABASE AS statement to synchronize data and table schema changes from a MongoDB database to the destination table by using the MongoDB connector. For more information about the configuration requirements, see Manage MongoDB catalogs (public preview).

    Upsert Kafka connector

    ×

    N/A.

    Hologres connector

    ×

    If the downstream storage service is Hologres, the CREATE DATABASE AS statement creates a specific number of connections for each Hologres table by default. The number of connections is specified by the connectionSize parameter. You can configure the connectionPoolName parameter to allow tables for which the same connection pool is configured to share the connection pool.

    Note
    • If the source table contains data of specific types that are not supported by the fixed plan feature when you synchronize data to Hologres, we recommend that you use the INSERT INTO statement to convert the data types before the data is synchronized to Hologres. In this scenario, we recommend that you do not use the CREATE DATABASE AS statement to create a result table for data synchronization. If you use the CREATE DATABASE AS statement, the fixed plan feature cannot be used and the writing performance is poor.

    • Realtime Compute for Apache Flink can read data from and write data to Hologres internal tables. Therefore, only Hologres exclusive instances are supported. Hologres Shared Cluster instances are not supported.

    StarRocks connector

    ×

    The CREATE DATABASE AS statement can be used to synchronize only data in StarRocks of E-MapReduce (EMR).

    Apache Paimon connector

    ×

    • Only Realtime Compute for Apache Flink whose engine version is vvr-6.0.7-flink-1.15 or later supports Apache Paimon result tables.

    • The Apache Paimon connector does not support data synchronization to Apache Paimon result tables stored in Data Lake Formation (DLF) 2.0.

Prerequisites

  • A catalog of the destination store is created in your workspace. For more information, see Manage catalogs.

  • If you want to access upstream and downstream resources under different accounts or use a RAM user or RAM role to access upstream and downstream resources, make sure that the account that you use to log on to Realtime Compute for Apache Flink has the permissions to read and write upstream and downstream resources before you execute the CREATE DATABASE AS statement. Otherwise, read and write operations may fail due to insufficient permissions.

Precautions

  • In Ververica Runtime (VVR) 8.0.6 or later, after a deployment that uses the CREATE DATABASE AS statement is started and new tables are added to the source database, you can restart the deployment from savepoints to capture the new tables. For more information, see Example 3: Synchronization of data from new tables in the source database.

  • In VVR 8.0.5 or earlier, after a deployment that uses the CREATE DATABASE AS statement is started, the tables whose data needs to be synchronized are determined in the deployment. New tables in the database are not automatically captured and cannot be captured by restarting the deployment. If you want to synchronize data from new tables, you can use one of the following methods:

    • Remain the existing deployment that uses the CREATE DATABASE AS statement unchanged and start a new deployment to synchronize data from new tables. Sample code:

      // Create a deployment that uses the CREATE DATABASE AS statement to synchronize data from the new table new_table.
      CREATE TABLE IF NOT EXISTS new_table
      AS TABLE mysql.tpcds.new_table 
      /*+ OPTIONS('server-id'='8008-8010') */;
    • Cancel the existing deployment that uses the CREATE DATABASE AS statement, delete the synchronized data, and then restart the deployment without state data to resynchronize data.

Features

Feature

Description

Synchronization of data from all tables in a database

Synchronizes full data and incremental data from all tables in a database or multiple tables in a database to each related result table.

Synchronization of table schema changes

Synchronizes schema changes of each source table, such as an added column, to the related result table in real time during database synchronization.

Merging and synchronization of data of tables in a sharded database

Allows you to use regular expressions to specify database names to match the source tables in multiple database shards of the data source. After the data of the source tables is merged, the data is synchronized to a downstream result table with a name that corresponds to each source table.

Synchronization of data from new tables in the source database

Allows you to synchronize data from new tables in the source database. After a deployment that uses the CREATE DATABASE AS statement is started and new tables are added to the source database, you can restart the deployment from savepoints to capture the new tables. This way, data from new tables can be synchronized.

Execution of multiple CREATE TABLE AS and CREATE DATABASE AS statements

Allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS and CREATE DATABASE AS statements as one deployment. You can also merge and reuse the data of source table operators to reduce the reading load on the data source.

Startup process

When you execute the CREATE DATABASE AS statement, Realtime Compute for Apache Flink performs the following operations:

  1. Checks whether the destination database and result tables exist in the destination store.

    • If the destination database does not exist, Realtime Compute for Apache Flink uses the catalog of the destination store to create a destination database in the destination store.

    • If the destination database exists, Realtime Compute for Apache Flink skips the database creation step and checks whether the result tables exist in the destination database.

      • If the result tables do not exist, Realtime Compute for Apache Flink creates the result tables in the destination database. The result tables have the same names and schemas as the tables in the source database.

      • If the result tables exist, Realtime Compute for Apache Flink skips the table creation step.

  2. Commits and runs the data synchronization deployment. Then, Realtime Compute for Apache Flink synchronizes data and schema changes from the source database to the tables in the destination database.

The following figure shows how to use the CREATE DATABASE AS statement to synchronize data from MySQL to Hologres.CDAS示意图

Synchronization policies of table schema changes

The CREATE DATABASE AS statement is a syntactic sugar of the CREATE TABLE AS statement. Therefore, the CREATE DATABASE AS statement provides the same synchronization capability of schema changes as the CREATE TABLE AS statement. For more information about the synchronization policies of table schema changes, see CREATE TABLE AS statement.

Syntax

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name

The CREATE DATABASE AS statement uses the basic syntax of the CREATE DATABASE statement. The following table describes the parameters.

Parameter

Description

target_database

The name of the database to which data is synchronized. You can specify a catalog name.

COMMENT

The description of the destination database. By default, the description of source_database is used.

WITH

The parameters of the destination database. For more information, see the related catalog documentation of Manage catalogs.

Note

Both the key and value must be of the STRING type, such as 'sink.parallelism' = '4'.

source_database

The name of the database from which data is synchronized. You can specify a catalog name.

INCLUDING ALL TABLES

Specifies that all tables in the source database are synchronized.

INCLUDING TABLE

Specifies that the specified tables in the source database are synchronized. You can use vertical bars (|) to separate the names of multiple tables. You can also use a regular expression to specify tables that comply with a rule. For example, INCLUDING TABLE 'web.*' indicates that all tables whose names start with web in the source database are synchronized.

EXCLUDING TABLE

Specifies the tables that do not need to be synchronized. You can use vertical bars (|) to separate the names of multiple tables. You can also use a regular expression to specify tables that comply with a rule. For example, INCLUDING ALL TABLES EXCLUDING TABLE 'web.*' indicates that all tables whose names do not start with web in the source database are synchronized.

OPTIONS

The parameters of the source tables. For more information, see the parameters in the WITH clause in the documentation of the related source table connector.

Note

Both the key and value must be of the STRING type, such as 'server-id' = '65500'.

Note

The IF NOT EXISTS keyword is required. If the destination database or result tables do not exist in the destination store, the destination database or result tables are created. If the destination database or result tables exist, the related creation step is skipped. The result tables that are created use the schemas of the source tables, including the primary key and the names and types of the physical fields. The computed columns, meta field, and watermark are not included. The field types of the source tables are mapped to the field types of the result tables. For more information, see the data type mappings in the documentation of the related connector.

Examples

Example 1: Synchronization of data from all tables in a database

In most cases, the CREATE DATABASE AS statement is used together with the catalog of the data source and the catalog of the destination store. For example, you can execute the CREATE DATABASE AS statement and use a MySQL catalog and a Hologres catalog to synchronize full data and incremental data from the MySQL database to Hologres. You can use a MySQL catalog to parse the schema and related parameters of the source tables without the need to manually write DDL statements.

For example, a Hologres catalog named holo and a MySQL catalog named mysql are created in your workspace. A MySQL database named tpcds exists, and the tpcds database has 24 tables. You can execute the following statement to synchronize all tables from the tpcds database to Hologres. The synchronized data includes future data changes and table schema changes. You do not need to create tables in Hologres in advance.

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_tpcds  -- Create a database named holo_tpcds in Hologres. 
WITH ('sink.parallelism' = '4') -- Configure the parameters of the destination database. By default, the parallelism for the Hologres sink is set to 4. This setting is optional. 
AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- Synchronize all tables from the tpcds database in the MySQL database. 
/*+ OPTIONS('server-id'='8001-8004') */ ; -- Configure additional parameters for MySQL CDC source tables. This setting is optional.
Note

When you create a destination database, Hologres allows you to configure parameters in the WITH clause. These parameters take effect only for the current deployment. The parameters are used to manage the behavior when data is written to the result table and data is not persistently stored in Hologres. For more information about the supported parameters in the WITH clause, see Hologres connector.

Example 2: Merging and synchronization of data of tables in a sharded database

If you want to merge and synchronize data of tables in a sharded database, you must use the database names that are defined in regular expressions to match the database shards from which data is synchronized. You can use the CREATE DATABASE AS statement to merge and synchronize data from the tables that have the same name in multiple upstream database shards to the table that has the same name as the source table in the Hologres destination database. The database names and table names are written as two additional fields to each result table. To ensure that the primary key is unique, the database name, table name, and original primary key are used together to become the new joint primary key of the Hologres table.

For example, if a MySQL instance contains multiple database shards from order_db01 to order_db99. Each database shard contains multiple tables, such as order and order_detail. You can use the following statement to synchronize data from all tables such as order and order_detail of the 99 database shards to Hologres. The data includes future data changes and table schema changes. You do not need to create a table in Hologres in advance.order1

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_order -- Create a database named holo_order in Hologres. The database contains all tables in the order_db database shards of the MySQL instance. 
WITH('sink.parallelism'='4')        -- Specify the parameters of the destination database. By default, the parallelism of each Hologres sink is 4. The setting is optional. 
AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES -- Synchronize data from all tables in the order_db database shards of the MySQL instance. 
/*+OPTIONS('server-id'='8001-8004')*/;  -- Configure additional parameters for MySQL CDC source tables. This setting is optional.

Example 3: Synchronization of data from new tables in the source database

In VVR 8.0.6 or later, after a deployment that uses the CREATE DATABASE AS statement is started and new tables are added to the source database, you can restart the deployment from savepoints to capture the new tables. This way, the data in the new tables can be synchronized.

  1. When you develop an SQL draft, add the following statement to enable the feature of reading data from new tables by using the CREATE DATABASE AS statement:

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  2. If data from a new table needs to be synchronized in a deployment, cancel the deployment and select Stop With Savepoint in the Cancel this deployment dialog box.

  3. On the SQL Editor page, redeploy an SQL draft for the deployment.

  4. On the Deployments page, click the name of the deployment. On the deployment details page, click the Status tab. Then, click the History tab.

  5. In the Savepoints list, find the savepoint that was created when the deployment was canceled.

  6. Choose More > Start job from this snapshot in the Actions column.

  7. In the Start Job panel, configure the parameters. For more information, see Start a deployment.

Important

You can only select Initial Mode to start the deployment whose source database has new tables.

Example 4: Execution of multiple CREATE TABLE AS and CREATE DATABASE AS statements

Realtime Compute for Apache Flink allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS statements as one deployment. Realtime Compute for Apache Flink can also optimize the data of source operators and use a single source operator to read data from multiple business tables. This operation can help reduce the use of server-id, the number of database connections, and the database reading load. Therefore, this operation is suitable for MySQL CDC data sources.

Note

If you want to optimize data of source operators and use one source operator to read data from multiple tables, you must make sure that the options of the tables of source operators are the same.

For example, if the MySQL instance has multiple databases named tpcds, tpch, and user_db01 to user_db99 (database shards), you can combine multiple CREATE TABLE AS and CREATE DATABASE AS statements to synchronize data from all databases and tables of the MySQL instance to Hologres. This way, you can run only one Flink deployment to synchronize data from all tables in the databases and use only one source operator to read data from all tables. The following code shows an example.

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Synchronize data from tables in the specified database shards whose names start with user. 
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

-- Synchronize data from the tpcds database. 
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- Synchronize data from the tpch database. 
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

Example 5: Execution of multiple CREATE DATABASE AS statements to synchronize data from all tables in a database to Kafka

When you execute multiple CREATE DATABASE AS statements to synchronize data from all tables in a database to Kafka, you must configure the cdas.topic.pattern parameter to prevent topic conflicts because the same table may exist in different databases. The cdas.topic.pattern parameter specifies the format of the name of a topic. You can replace the placeholder {table-name} with a table name. For example, if you configure 'cdas.topic.pattern'='db1-{table-name}' and the name of the upstream table is named table1, the name of the topic in Kafka that corresponds to the table1 table is db1-table1.

For example, a MySQL instance has multiple databases, such as tpcds and tpch. You can execute the following statements to synchronize data from all databases and tables in the MySQL instance to Kafka and prevent topic conflicts during data synchronization. Sample code:

USE CATALOG kafkaCatalog;

BEGIN STATEMENT SET;

-- Synchronize data from the tpcds database. 
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

-- Synchronize data from the tpch database. 
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

Realtime Compute for Apache Flink allows you to synchronize data from all tables in the MySQL database to Kafka. This way, Kafka is used as the intermediate layer for data synchronization. You can execute the CREATE DATABASE AS statement to synchronize data from all tables in a MySQL database to Kafka. You can also execute the CREATE TABLE AS statement to synchronize data from a single table in a MySQL database to Kafka. For more information, see Synchronize data from all tables in a MySQL database to Kafka.

References