All Products
Search
Document Center

ApsaraDB for SelectDB:Import data by using Flink

Last Updated:Nov 18, 2024

ApsaraDB for SelectDB is compatible with Apache Doris. You can use Flink Doris Connector to synchronize unstructured data in Kafka and updated data in upstream databases such as MySQL databases to ApsaraDB for SelectDB instances in real time. This way, you can analyze large amounts of data.

Overview

Flink Doris Connector is commonly used to import data to ApsaraDB for SelectDB in streaming mode. Based on the stream processing capabilities of Flink, you can import a large amount of data from upstream data sources such as MySQL, Oracle, PostgreSQL, SQL Server, and Kafka to tables in ApsaraDB for SelectDB by using Flink Doris Connector. You can also use Flink Java Database Connectivity (JDBC) Connector to read data from tables in ApsaraDB for SelectDB.

Important

You can use Flink Doris Connector only to write data to ApsaraDB for SelectDB. To read data from ApsaraDB for SelectDB, use Flink JDBC Connector.

flink1_bd9b152917_副本

How it works

After Flink Doris Connector receives data, Flink Doris Connector continuously writes the received data to ApsaraDB for SelectDB instances by using the chunked transfer encoding mechanism of HTTP. Flink Doris Connector implements the exactly-once semantics based on the checkpointing mechanism of Flink and the two-phase commit mode of Stream Load. This ensures end-to-end data consistency. The following section describes how Flink Doris Connector works:

  1. When a Flink job starts, a Stream Load pre-commit request is initiated. In this case, a transaction is started, and data is continuously written to the destination ApsaraDB for SelectDB instance by using the chunked transfer encoding mechanism of HTTP. The following figure shows the principle.

    StreamLoad-1

  2. During the checkpointing period, Flink Doris Connector stops writing data to the ApsaraDB for SelectDB instance, and completes the HTTP request. The state of the transaction changes to PreCommitted. In this case, the data written to the ApsaraDB for SelectDB instance is invisible to users. The following figure shows the principle.

    StreamLoad-2

  3. After checkpointing is complete, a Stream Load commit request is initiated and the state of the transaction changes to Committed. After the transaction is complete, the data is visible to users. The following figure shows the principle.

    StreamLoad-3

  4. When the Flink job unexpectedly hangs and attempts to restart from the checkpoint, if the last transaction is in the PreCommitted state, a rollback request is initiated and the state of the transaction changes to Aborted. This way, you can prevent data loss and data duplicates when you use Flink Doris Connector to import data to an ApsaraDB for SelectDB instance.

Prerequisites

The Flink version is 1.15 or later. An appropriate Connector version is selected based on the Flink version. The following table describes the version mappings.

Flink version

Connector type

Connector version

Download URL

V1.15 or later

Flink Doris Connector

V1.5.2 or later

Flink Doris Connector

Install Flink Doris Connector

You can use one of the following methods to install Flink Doris Connector:

  • Install the Maven dependencies of Flink Doris Connector. The following sample code provides an example. To obtain dependencies for other versions, visit org/apache/doris. Download the JAR package of the required version of Flink Doris Connector to the FLINK_HOME/lib directory. To download the JAR package, visit org/apache/doris.

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  
  • Upload, use, and update Flink Doris Connector by using the custom connector management feature of Realtime Compute for Apache Flink. This method is applicable if you import data by using Realtime Compute for Apache Flink. For more information, see Manage custom connectors.

Examples

The following examples show how to import data from an upstream data source to an ApsaraDB for SelectDB instance by using Flink SQL, Flink change data capture (CDC), and DataStream.

Prepare the environment

Build a Flink environment. In this example, a Flink standalone cluster of version 1.16 is deployed.

  1. Download the flink-1.16.3-bin-scala_2.12.tgz package and decompress the package. Sample code:

    wget https://dlcdn.apache.org/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. Download the flink-sql-connector-mysql-cdc-2.4.2 and flink-doris-connector-1.16-1.5.2 packages to the FLINK_HOME/lib directory. Sample code:

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. Start a Flink standalone cluster. Sample code:

    bin/start-cluster.sh
  4. Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.

  5. Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.

  6. Create a test database and a test table.

    1. Execute the following statement to create a test database:

      CREATE DATABASE test_db;
    2. Execute the following statements to create a test table:

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Import data by using Flink SQL

The following example shows how to use Flink SQL to import data from an upstream MySQL database to an ApsaraDB for SelectDB instance:

  1. Start the Flink SQL client. Sample code:

    bin/sql-client.sh
  2. Submit a Flink job on the Flink SQL client. Sample code:

    SET 'execution.checkpointing.interval' = '10s';
    
    CREATE TABLE employees_source (
        emp_no INT,
        birth_date DATE,
        first_name STRING,
        last_name STRING,
        gender STRING,
        hire_date DATE,
        PRIMARY KEY (`emp_no`) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '127.0.0.1', 
        'port' = '3306',
        'username' = 'root',
        'password' = '****',
        'database-name' = 'test',
        'table-name' = 'employees'
    );
    
    CREATE TABLE employees_sink (
        emp_no       INT ,
        birth_date   DATE,
        first_name   STRING,
        last_name    STRING,
        gender       STRING,
        hire_date    DATE
    ) 
    WITH (
      'connector' = 'selectdb-preview',
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.employees',
      'username' = 'admin',
      'password' = '****',
      'sink.enable-delete' = 'true'
    );
    
    INSERT INTO employees_sink SELECT * FROM employees_source;

Import data by using Flink CDC

Important

If you import data by using Realtime Compute for Apache Flink, Flink CDC does not support JAR jobs. Flink CDC 3.0 imports data by using YAML jobs.

The following example shows how to use Flink CDC to import data from an upstream database to an ApsaraDB for SelectDB instance:

The following sample code shows the syntax of Flink CDC:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

Parameter

Description

execution.checkpointing.interval

The checkpoint interval, which affects the frequency of data synchronization. We recommend that you set this parameter to 10s.

parallelism.default

The parallelism of the Flink job. You can appropriately increase the parallelism to accelerate data synchronization.

job-name

The name of the Flink job.

database

The name of the database to which data is imported in the ApsaraDB for SelectDB instance.

table-prefix

The prefix added to the name of the ApsaraDB for SelectDB table. Example: --table-prefix ods_.

table-suffix

The suffix appended to the name of the ApsaraDB for SelectDB table.

including-tables

The tables from which data is to be synchronized. You can use vertical bars (|) to separate multiple table names. Regular expressions are also supported. Example: --including-tables table1|tbl.*, which specifies that data is synchronized from table1 and all tables whose names start with tbl.

excluding-tables

The tables to be excluded. You can specify this parameter in the same way as you specify the including-tables parameter.

mysql-conf

The configuration items of the MySQL CDC source. For more information about the configuration items, see MySQL CDC Connector. The hostname, username, password, and database-name parameters are required.

oracle-conf

The configuration items of the Oracle CDC source. For more information about the configuration items, see Oracle CDC Connector. The hostname, username, password, database-name, and schema-name parameters are required.

sink-conf

The configuration items of Doris Sink. For more information, see the Configuration items of Doris Sink section of this topic.

table-conf

The configuration items of the ApsaraDB for SelectDB table. The configuration items are contained in the properties when the ApsaraDB for SelectDB table is created.

Note
  1. To synchronize data, you must install the dependencies of Flink CDC, such as flink-sql-connector-mysql-cdc-${version}.jar and flink-sql-connector-oracle-cdc-${version}.jar, in the $FLINK_HOME/lib directory.

  2. If you want to synchronize full data from a database, the Flink version must be 1.15 or later. For more information about how to download Flink Doris Connector of different versions, visit org/apache/doris.

Configuration items of Doris Sink

Parameter

Default value

Required

Description

fenodes

No default value

Yes

The endpoint and HTTP port that are used to access the ApsaraDB for SelectDB instance.

To obtain the virtual private cloud (VPC) endpoint or public endpoint and HTTP port of an ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter.

Example: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

No default value

Yes

The names of the database and table. Example: test_db.test_table.

username

No default value

Yes

The username that is used to connect to the ApsaraDB for SelectDB instance.

password

No default value

Yes

The password that is used to connect to the ApsaraDB for SelectDB instance.

jdbc-url

No default value

No

The JDBC connection string that is used to access the ApsaraDB for SelectDB instance.

To obtain the VPC endpoint or public endpoint and MySQL port of an ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the MySQL Port parameter.

Example: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030.

auto-redirect

true

No

Specifies whether to redirect Stream Load requests. If you set this parameter to true, Stream Load requests are sent to the frontend (FE). The backend (BE) information is no longer displayed.

doris.request.retries

3

No

The maximum number of retries that are allowed to send requests to the ApsaraDB for SelectDB instance.

doris.request.connect.timeout

30s

No

The connection timeout period for sending requests to the ApsaraDB for SelectDB instance.

doris.request.read.timeout

30s

No

The read timeout period for sending requests to the ApsaraDB for SelectDB instance.

sink.label-prefix

""

Yes

The label prefix that is used by Stream Load. In the two-phase commit scenario, the label prefix must be globally unique to ensure the exactly-once semantics of Flink.

sink.properties

No default value

No

The data import properties of Stream Load. Configure the properties in one of the following formats:

  • CSV format:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON format:

    sink.properties.format='json' 

For more information about the properties, see Import data by using Stream Load.

sink.buffer-size

1048576

No

The size of the write data buffer. Unit: bytes. We recommend that you use the default value. The default value specifies 1 MB.

sink.buffer-count

3

No

The number of write data buffers. We recommend that you use the default value.

sink.max-retries

3

No

The maximum number of retries that are allowed after a commit request fails. Default value: 3.

sink.use-cache

false

No

Specifies whether to use the memory cache for recovery if an exception occurs. If you set this parameter to true, the cache retains the data generated during checkpointing.

sink.enable-delete

true

No

Specifies whether to synchronously delete events. Only the Unique key model is supported.

sink.enable-2pc

true

No

Specifies whether to enable the two-phase commit mode. Default value: true. You can enable the two-phase commit mode to ensure the exactly-once semantics.

sink.enable.batch-mode

false

No

Specifies whether to enable the batch mode for writing data to the ApsaraDB for SelectDB instance. If you enable the batch mode, the point in time at which data is written to the ApsaraDB for SelectDB instance does not depend on the checkpoints. Instead, the point in time is specified by the sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, and sink.buffer-flush.interval parameters.

The exactly-once semantics is not ensured after the batch mode is enabled, and the Unique key model can be used to implement idempotence.

sink.flush.queue-size

2

No

The size of the cache queue in batch mode.

sink.buffer-flush.max-rows

50000

No

The maximum number of data rows that can be written in a single batch in batch mode.

sink.buffer-flush.max-bytes

10MB

No

The maximum number of bytes that can be written in a single batch in batch mode.

sink.buffer-flush.interval

10s

No

The interval at which the cache is asynchronously refreshed in batch mode. The minimum value is 1. Unit: seconds.

sink.ignore.update-before

true

No

Specifies whether to ignore update-before events. By default, this parameter is set to true.

Import data from a MySQL database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data from an Oracle database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data from a PostgreSQL database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data from an SQL Server database

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Import data by using DataStream

The following example shows how to use DataStream to import data from an upstream MySQL database to an ApsaraDB for SelectDB instance:

  • The following sample code shows how to install the required Maven dependencies:

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter</artifactId>
                <version>RELEASE</version>
                <scope>test</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>2.0.31</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
    
        </dependencies>
  • The following sample code shows the core Java code:

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
    
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs.com")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
    
            DorisOptions dorisOptions = dorisBuilder.build();
    
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
    
            DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

Advanced usage

Use Flink SQL to update partial columns

Sample code

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'selectdb-preview',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- Allow partial columns to be updated.
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

Use Flink SQL to delete data from specific columns

In scenarios in which the upstream data source supports CDC, Doris Sink identifies the types of events based on RowKind and assigns values to the hidden column __DORIS_DELETE_SIGN__ to delete data. In scenarios in which Kafka messages serve as the upstream data source, Doris Sink cannot identify the types of operations based on RowKind. In this case, Doris Sink needs to mark the operation type based on specific fields in messages, such as {"op_type":"delete",data:{...}}. Doris Sink can use this mark to delete the data whose value of the op_type field is delete. In this scenario, Doris Sink needs to explicitly reference the values of the hidden column based on the business logic. The following example shows how to use Flink SQL to delete data from an ApsaraDB for SelectDB instance based on specific fields in Kafka data:

Sample code

-- In this example, the upstream data contains the {"op_type":"delete",data:{"id":1,"name":"zhangsan"}} fields.
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'selectdb-preview',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- A value of false specifies that the event type is not identified based on RowKind.
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- The columns imported by Stream Load.
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

FAQ

  • Q: How do I write data of the BITMAP type?

    A: The following sample code shows how to write data of the BITMAP type:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'selectdb-preview', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q: What do I do if the "errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]" error is reported?

    A: In the exactly-once scenario, a Flink job must be started from the latest checkpoint or savepoint when it is restarted. Otherwise, the preceding error is reported. If exactly-once is not required, you can set the sink.enable-2pc parameter to false to disable the two-phase commit mode or modify the sink.label-prefix parameter.

  • Q: What do I do if the "errCode = 2, detailMessage = transaction[19650]not found" error is reported?

    A: This error occurs during the commit phase. If the transaction ID recorded in the checkpoint has expired in your ApsaraDB for SelectDB instance and you commit the transaction again, this error occurs. In this case, you cannot start the transaction from the checkpoint. You can modify the streaming_label_keep_max_second parameter of the ApsaraDB for SelectDB instance to extend the validity period. The default validity period is 12 hours.

  • Q: What do I do if the "errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100" error is reported?

    A: This error is reported because the number of concurrent data import jobs of the same database exceeds 100. You can resolve this error by modifying the max_running_txn_num_per_db parameter of your ApsaraDB for SelectDB instance. For more information, see max_running_txn_num_per_db in FE Configuration.

    If you frequently modify the label for a job and restart the job, this error may occur. In two-phase commit scenarios in which the Duplicate key or Aggregate key model is used, the label of each job must be unique. A Flink job actively aborts transactions that were previously started but not complete when the Flink job is restarted from a checkpoint. If you frequently modify the label for a job and restart the job, a large number of precommitted transactions cannot be aborted and occupy the quota of transactions. If the Unique key model is used, you can also disable the two-phase commit mode and implement idempotent writing by configuring Doris Sink.

  • Q: How do I ensure the sequence of a batch of data when Flink writes data to a table that uses the Unique key model?

    A: You can add a sequence column to ensure the sequence of a batch of data. For more information, see Sequence Column.

  • Q: Why does a Flink job fail to import data when no errors are reported for the Flink job?

    A: If Flink Doris Connector 1.1.0 or earlier is used to import data, data is written in batch mode. All data writes are driven by data. You must check whether data is written to the upstream data source. If Flink Doris Connector whose version is later than 1.1.0 is used, data writes depend on checkpoints. You must enable checkpoints to write data.

  • Q: What do I do if the "tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235" error is reported?

    A: In most cases, this error is reported when Flink Doris Connector 1.1.0 or earlier is used. This error is caused by an excessively high write frequency. You can reduce the frequency of Stream Load by specifying the sink.buffer-flush.max-bytes and sink.buffer-flush.interval parameters.

  • Q: How do I skip dirty data when I import data by using Flink?

    A: When you use Flink to import data, if dirty data exists, such as data whose field format or length does not meet the requirements, Stream Load reports errors. In this case, Flink keeps retrying to import data. To skip dirty data, you can disable the strict mode of Stream Load by setting the strict_mode parameter to false and the max_filter_ratio parameter to 1 or filter data before the Sink operator.

  • Q: How do I map the source table to the destination ApsaraDB for SelectDB table?

    A: When you use Flink Doris Connector to import data, take note of the following two items: 1. The columns and types of the source table must match those in Flink SQL. 2. The columns and types in Flink SQL must match those of the destination ApsaraDB for SelectDB table.

  • Q: What do I do if the "TApplicationException: get_next failed: out of sequence response: expected 4 but got 3" error is reported?

    A: This error is caused by concurrency bugs in the Thrift framework. We recommend that you use the latest version of Flink Doris Connector and a compatible Flink version.

  • Q: What do I do if the "DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX" error is reported?

    A: You can search logs for abort transaction response in TaskManager and check whether the error is caused by the client or the server based on the returned HTTP status code.