All Products
Search
Document Center

ApsaraDB for SelectDB:Use Flink to import data

Last Updated:Dec 20, 2024

ApsaraDB for SelectDB is fully compatible with Apache Doris. You can use Flink Doris Connector to import historical data from data sources such as MySQL, Oracle, PostgreSQL, SQL Server, and Kafka to SelectDB. If you create a Flink job for which the Flink change data capture (CDC) connector is used, the incremental data is also synchronized from the data source to SelectDB.

Overview

Note

You can use Flink Doris Connector only to write data to ApsaraDB for SelectDB. To read data from ApsaraDB for SelectDB, use JDBC Connector.

Flink Doris Connector is used to connect Apache Flink to Apache Doris. You can use Flink Doris Connector to read data from and write data to Apache Doris for real-time data processing and analysis. Flink Doris Connector is commonly used to import data to SelectDB in streaming mode because SelectDB is fully compatible with Apache Doris.

The stream processing capability of Flink is implemented based on the source, transform, and sink components.

  • Source:

    • Reads data streams from external systems. The external systems include message queues such as Apache Kafka, databases, and file systems.

    • For example, sources can be used to read messages from Kafka in real time or read data from files.

  • Transform:

    • Processes and transforms the input data streams during the transformation stage. The transformation operations include filtering, mapping, aggregating, and defining windows.

    • For example, mapping operations can be performed on input data streams to convert a data structure or data can be aggregated to calculate a specific metric per minute.

  • Sink:

    • Delivers processed data streams to external systems. Sinks can be used to write data to databases, files, and message queues.

    • For example, sinks can be used to write processed data to MySQL databases or Kafka topics.

The following figure shows how data is imported to SelectDB by using Flink Doris Connector.

image

Prerequisites

  • The data source and Fink are connected to SelectDB. To establish a network connection, perform the following steps:

    1. Apply for a public endpoint for the ApsaraDB for SelectDB instance that you want to use. For more information, see Apply for or release a public endpoint.

      Skip this step if you use Realtime Compute for Apache Flink and a data source provided by Alibaba Cloud or open source Flink and a data source deployed on an Elastic Compute Service (ECS) instance and the Alibaba Cloud services or ECS instance reside in the same virtual private cloud (VPC) as the ApsaraDB for SelectDB instance.

    2. Add related IP addresses of Flink and the data source to the whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.

  • Flink Doris Connector is installed.

    The following table describes the version requirements.

    Flink version

    Version of Flink Doris Connector

    Download link

    Realtime Compute for Apache Flink: V1.17 or later

    Open source Flink: V1.15 or later

    V1.5.2 or later

    Flink Doris Connector

    For more information, see Install Flink Doris Connector.

Install Flink Doris Connector

You can use one of the following methods to install Flink Doris Connector based on your business requirements:

  • Upload, use, and update Flink Doris Connector by using the custom connector feature of Realtime Compute for Apache Flink. This method is applicable if you import data by using Realtime Compute for Apache Flink.SelectDB For more information, see Manage custom connectors.

  • Download the JAR package of the required version of Flink Doris Connector to the lib directory of the Flink installation directory. This method is applicable if you use a self-managed open source Flink cluster. To download the JAR package, visit org/apache/doris.

  • Install the Maven dependencies of Flink Doris Connector. The following sample code provides an example. To obtain dependencies for other versions, visit org/apache/doris.

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  

Examples

Sample environment

The following examples show how to import data from the employees table in the test database in an ApsaraDB RDS for MySQL instance to the employees table in the test database in an SelectDB instance by using Flink SQL, Flink CDC, and DataStream. You can modify the corresponding parameters based on your business requirements. Sample environment:

  • Flink standalone cluster of version 1.16

  • Java

  • Destination database: test

  • Destination table: employees

  • Source database: test

  • Source table: employees

Prepare the environment

Flink environment

  1. Prepare a Java environment.

    The running of Flink depends on the Java environment. Therefore, you must install Java Development Kit (JDK) and configure the JAVA_HOME environment variable.

    The required Java version is related to the Flink version. For more information about the Java versions supported by Flink, see Java compatibility. In this example, Java 8 is installed. For more information, see the "Step 2: Install JDK" section of the Deploy a Java web environment on an instance that runs Alibaba Cloud Linux 2, Alibaba Cloud Linux 3, or CentOS 7.x topic.

  2. Download the Flink installation package flink-1.16.3-bin-scala_2.12.tgz. If this version has expired, download another version on the Apache Flink official website.

    wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  3. Decompress the installation package.

    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  4. Go to the lib directory of the Flink installation directory to install the required connectors.

    • Install Flink Doris Connector.

      wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
    • Install Flink MySQL Connector.

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
  5. Start a Flink cluster.

    Run the following command in the bin directory of the Flink installation directory:

    ./start-cluster.sh 

Destination table and database in ApsaraDB for SelectDB

  1. Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.

  2. Connect to the ApsaraDB for SelectDB instance. For more information, see Connect to an instance.

  3. Create a test database named test.

    CREATE DATABASE test;
  4. Create a test table named employees.

    USE test;
    
    -- Create a table.
    CREATE TABLE employees (
        emp_no       int NOT NULL,
        birth_date   date,
        first_name   varchar(20),
        last_name    varchar(20),
        gender       char(2),
        hire_date    date
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Source table and database in ApsaraDB RDS for MySQL

  1. Create an ApsaraDB RDS for MySQL instance. For more information, see Step 1: Create an ApsaraDB RDS for MySQL instance and configure databases.

  2. Create a test database named test.

    CREATE DATABASE test;
  3. Create a test table named employees.

    USE test;
    
    CREATE TABLE employees (
        emp_no INT NOT NULL PRIMARY KEY,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name VARCHAR(20),
        gender CHAR(2),
        hire_date DATE
    );
  4. Insert data into the table.

    INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES
    (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'),
    (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'),
    (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'),
    (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'),
    (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');

Import data by using Flink SQL

  1. Start the Flink SQL client.

    Run the following command in the bin directory of the Flink installation directory:

    ./sql-client.sh
  2. Submit a Flink job on the Flink SQL client by performing the following steps:

    1. Create a source table in the MySQL database.

      In this example, the configurations related to the MySQL CDC connector are specified in WITH. For more information about the configuration items, see MySQL CDC Connector.

      CREATE TABLE employees_source (
          emp_no INT,
          birth_date DATE,
          first_name STRING,
          last_name STRING,
          gender STRING,
          hire_date DATE,
          PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '127.0.0.1', 
          'port' = '3306',
          'username' = 'root',
          'password' = '****',
          'database-name' = 'test',
          'table-name' = 'employees'
      );
    2. Create a sink table in the SelectDB instance.

      In this example, the configurations related to the SelectDB instance are specified in WITH. For more information about the configuration items, see Configuration items of Doris Sink.

      CREATE TABLE employees_sink (
          emp_no       INT ,
          birth_date   DATE,
          first_name   STRING,
          last_name    STRING,
          gender       STRING,
          hire_date    DATE
      ) 
      WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'test.employees',
        'username' = 'admin',
        'password' = '****'
      );
    3. Import data from the source table in the MySQL database to the sink table in the SelectDB instance.

      INSERT INTO employees_sink SELECT * FROM employees_source;
  3. Verify the data import result.

    Connect to the SelectDB instance and run the following command to check whether the data is imported:

    SELECT * FROM test.employees;

Import data by using Flink CDC

Important

If you import data by using Realtime Compute for Apache Flink, Flink CDC does not support JAR jobs. Flink CDC 3.0 imports data by using YAML jobs.

The following example shows how to use Flink CDC to import data from a database to an SelectDB instance:

In the directory in which Flink is installed, use bin/flink to run Flink CDC jobs. Syntax:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

Parameters

Parameter

Description

execution.checkpointing.interval

The checkpoint interval, which affects the frequency of data synchronization. We recommend that you set this parameter to 10s.

parallelism.default

The parallelism of the Flink job. You can appropriately increase the parallelism to accelerate data synchronization.

job-name

The name of the Flink job.

database

The name of the database to which data is imported in the SelectDB instance.

table-prefix

The prefix added to the name of the SelectDB table. Example: --table-prefix ods_.

table-suffix

The suffix appended to the name of the SelectDB table.

including-tables

The tables from which data is to be synchronized. You can use vertical bars (|) to separate multiple table names. Regular expressions are also supported. Example: --including-tables table1|tbl.*, which specifies that data is synchronized from table1 and all tables whose names start with tbl.

excluding-tables

The tables to be excluded. You can specify this parameter in the same way as you specify the including-tables parameter.

mysql-conf

The configuration items of the MySQL CDC source. For more information about the configuration items, see MySQL CDC Connector. The hostname, username, password, and database-name parameters are required.

oracle-conf

The configuration items of the Oracle CDC source. For more information about the configuration items, see Oracle CDC Connector. The hostname, username, password, database-name, and schema-name parameters are required.

sink-conf

The configuration items of Doris Sink. For more information, see the Configuration items of Doris Sink section of this topic.

table-conf

The configuration items of the SelectDB table. The configuration items are contained in the properties when the SelectDB table is created.

Note
  1. To synchronize data, you must install the dependencies of Flink CDC, such as flink-sql-connector-mysql-cdc-${version}.jar and flink-sql-connector-oracle-cdc-${version}.jar, in the $FLINK_HOME/lib directory.

  2. If you want to synchronize full data from a database, the Flink version must be 1.15 or later. For more information about how to download Flink Doris Connector of different versions, visit org/apache/doris.

Configuration items of Doris Sink

Parameter

Default value

Required

Description

fenodes

No default value

Yes

The endpoint and HTTP port that are used to access the ApsaraDB for SelectDB instance.

To obtain the VPC endpoint or public endpoint and HTTP port of an ApsaraDB for SelectDB instance, perform the following steps: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter.

Example: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

No default value

Yes

The names of the database and table. Example: test_db.test_table.

username

No default value

Yes

The username that is used to connect to the ApsaraDB for SelectDB instance.

password

No default value

Yes

The password that is used to connect to the ApsaraDB for SelectDB instance.

jdbc-url

No default value

No

The JDBC connection string that is used to access the ApsaraDB for SelectDB instance.

To obtain the VPC endpoint or public endpoint and MySQL port of an ApsaraDB for SelectDB instance, perform the following steps: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the MySQL Port parameter.

Example: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030.

auto-redirect

true

No

Specifies whether to redirect Stream Load requests. If you set this parameter to true, Stream Load requests are sent to the frontend (FE). The backend (BE) information is no longer displayed.

doris.request.retries

3

No

The maximum number of retries that are allowed to send requests to the SelectDB instance.

doris.request.connect.timeout

30s

No

The connection timeout period for sending requests to the SelectDB instance.

doris.request.read.timeout

30s

No

The read timeout period for sending requests to the SelectDB instance.

sink.label-prefix

""

Yes

The label prefix that is used by Stream Load. In the two-phase commit scenario, the label prefix must be globally unique to ensure the exactly-once semantics of Flink.

sink.properties

No default value

No

The data import properties of Stream Load. Configure the properties in one of the following formats:

  • CSV format:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON format:

    sink.properties.format='json' 

For more information about the properties, see Import data by using Stream Load.

sink.buffer-size

1048576

No

The size of the write data buffer. Unit: bytes. We recommend that you use the default value, which is equivalent to 1 MB.

sink.buffer-count

3

No

The number of write data buffers. We recommend that you use the default value.

sink.max-retries

3

No

The maximum number of retries that are allowed after a commit request fails. Default value: 3.

sink.use-cache

false

No

Specifies whether to use the memory cache for recovery if an exception occurs. If you set this parameter to true, the cache retains the data generated during checkpointing.

sink.enable-delete

true

No

Specifies whether to synchronously delete events. Only the Unique key model is supported.

sink.enable-2pc

true

No

Specifies whether to enable the two-phase commit mode. Default value: true. You can enable the two-phase commit mode to ensure the exactly-once semantics.

sink.enable.batch-mode

false

No

Specifies whether to enable the batch mode for writing data to the SelectDB instance. If you enable the batch mode, the point in time at which data is written to the ApsaraDB for SelectDB instance does not depend on the checkpoints. Instead, the point in time is specified by the sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, and sink.buffer-flush.interval parameters.

The exactly-once semantics is not ensured after the batch mode is enabled, and the Unique key model can be used to implement idempotence.

sink.flush.queue-size

2

No

The size of the cache queue in batch mode.

sink.buffer-flush.max-rows

50000

No

The maximum number of data rows that can be written in a single batch in batch mode.

sink.buffer-flush.max-bytes

10MB

No

The maximum number of bytes that can be written in a single batch in batch mode.

sink.buffer-flush.interval

10s

No

The interval at which the cache is asynchronously refreshed in batch mode. The minimum value is 1. Unit: seconds.

sink.ignore.update-before

true

No

Specifies whether to ignore update-before events. Default value: true.

Examples

Import data from a MySQL database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data from an Oracle database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data from a PostgreSQL database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data from an SQL Server database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data by using DataStream

  1. Install the required Maven dependencies in the Maven project.

    Maven dependencies

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
  2. Run the core Java code.

    In the following code snippet, the parameters used to configure the source table in the MySQL database and the sink table in the ApsaraDB for SelectDB instance correspond to those described in the Import data by using Flink SQL section of this topic. For more information, see MySQL CDC Connector and the Configuration items of Doris Sink section of this topic.

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
            
            // Configure the source table in the MySQL database.
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs***")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            // Configure the sink table in the ApsaraDB for SelectDB instance.
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
            DorisOptions dorisOptions = dorisBuilder.build();
    
            // Configure parameters related to Stream Load as properties.
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

Advanced usage

Use Flink SQL to update partial columns

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- Allow partial columns to be updated.
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

Use Flink SQL to delete data from specific columns

In scenarios in which the data source supports CDC, Doris Sink identifies the types of events based on RowKind and assigns values to the hidden column __DORIS_DELETE_SIGN__ to delete data. In scenarios in which Kafka messages serve as the data source, Doris Sink cannot identify the types of operations based on RowKind. In this case, Doris Sink needs to mark the operation type based on specific fields in messages, such as {"op_type":"delete",data:{...}}. Doris Sink can use this mark to delete the data whose value of the op_type field is delete. In this case, Doris Sink needs to explicitly reference the values of the hidden column based on the business logic. The following example shows how to use Flink SQL to delete data from an ApsaraDB for SelectDB instance based on specific fields in Kafka data:

-- In this example, the source data contains the {"op_type":"delete",data:{"id":1,"name":"zhangsan"}} fields.
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- A value of false indicates that the event type is not identified based on RowKind.
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- The columns imported by Stream Load.
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

FAQ

  • Q: How do I write data of the BITMAP type?

    A: The following sample code shows how to write data of the BITMAP type:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'doris', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q: What do I do if the errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650] error is reported?

    A: In the exactly-once scenario, a Flink job must be started from the latest checkpoint or savepoint when it is restarted. Otherwise, the preceding error is reported. If exactly-once is not required, you can set the sink.enable-2pc parameter to false to disable the two-phase commit mode or modify the sink.label-prefix parameter.

  • Q: What do I do if the errCode = 2, detailMessage = transaction[19650]not found error is reported?

    A: This error occurs during the commit phase. If the transaction ID recorded in the checkpoint has expired in your ApsaraDB for SelectDB instance and you commit the transaction again, this error occurs. In this case, you cannot start the transaction from the checkpoint. You can modify the streaming_label_keep_max_second parameter of the ApsaraDB for SelectDB instance to extend the validity period. The default validity period is 12 hours.

  • Q: What do I do if the errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100 error is reported?

    A: This error is reported because the number of concurrent data import jobs of the same database exceeds 100. You can resolve this error by modifying the max_running_txn_num_per_db parameter of your ApsaraDB for SelectDB instance. For more information, see max_running_txn_num_per_db in FE Configuration.

    If you frequently modify the label for a job and restart the job, this error may occur. In two-phase commit scenarios in which the Duplicate key or Aggregate key model is used, the label of each job must be unique. A Flink job actively aborts transactions that were previously started but not complete when the Flink job is restarted from a checkpoint. If you frequently modify the label for a job and restart the job, a large number of precommitted transactions cannot be aborted and occupy the quota of transactions. If the Unique key model is used, you can also disable the two-phase commit mode and implement idempotent writing by configuring Doris Sink.

  • Q: How do I ensure the sequence of a batch of data when Flink writes data to a table that uses the Unique key model?

    A: You can add a sequence column to ensure the sequence of a batch of data. For more information, see Sequence Column.

  • Q: Why does a Flink job fail to import data when no errors are reported for the Flink job?

    A: If Flink Doris Connector 1.1.0 or earlier is used to import data, data is written in batch mode. All data writes are driven by data. You must check whether data is written to the upstream data source. If Flink Doris Connector whose version is later than 1.1.0 is used, data writes depend on checkpoints. You must enable checkpoints to write data.

  • Q: What do I do if the tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235 error is reported?

    A: In most cases, this error is reported when Flink Doris Connector 1.1.0 or earlier is used. This error is caused by an excessively high write frequency. You can reduce the frequency of Stream Load by specifying the sink.buffer-flush.max-bytes and sink.buffer-flush.interval parameters.

  • Q: How do I skip dirty data when I import data by using Flink?

    A: When you use Flink to import data, if dirty data exists, such as data whose field format or length does not meet the requirements, Stream Load reports errors. In this case, Flink keeps retrying to import data. To skip dirty data, you can disable the strict mode of Stream Load by setting the strict_mode parameter to false and the max_filter_ratio parameter to 1 or filter data before the Sink operator.

  • Q: How do I map the source table to the destination ApsaraDB for SelectDB table?

    A: When you use Flink Doris Connector to import data, take note of the following two items: 1. The columns and types of the source table must match those in Flink SQL. 2. The columns and types in Flink SQL must match those of the destination ApsaraDB for SelectDB table.

  • Q: What do I do if the TApplicationException: get_next failed: out of sequence response: expected 4 but got 3 error is reported?

    A: This error is caused by concurrency bugs in the Thrift framework. We recommend that you use the latest version of Flink Doris Connector and a compatible Flink version.

  • Q: What do I do if the DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX error is reported?

    A: You can search logs for abort transaction response in TaskManager and check whether the error is caused by the client or the server based on the returned HTTP status code.