全部產品
Search
文件中心

ApsaraDB for SelectDB:通過Flink匯入資料

更新時間:Nov 27, 2024

ApsaraDB for SelectDB完全相容Apache Doris,支援通過Flink Doris Connector將MySQL、Oracle、PostgreSQL、SQL Server、Kafka等資料來源中的歷史資料匯入至SelectDB。同時,在Flink開啟CDC任務後,資料來源的增量資料也會同步至SelectDB

功能簡介

說明

Flink Doris Connector目前僅支援向SelectDB寫資料,如果您有讀取SelectDB資料的需求,請使用Flink JDBC Connector

Flink Doris Connector是一個用於Apache Flink和Apache Doris之間的連接器,它允許使用者在Flink中讀取Doris資料和寫入資料到Doris,從而為即時資料處理和分析提供了支援。由於SelectDB完全相容Apache Doris,所以Flink Doris Connector也是SelectDB流式匯入資料的常用方式。

Flink的流處理能力,基於Source、Transform和Sink三個組件。各個組件功能如下:

  • Source(資料來源):

    • 作用:Source是Flink資料流的入口,用於從外部系統讀取資料流。這些外部系統可能包括訊息佇列(如Apache Kafka)、資料庫、檔案系統等。

    • 樣本:使用Kafka作為資料來源讀取即時訊息,或者從檔案中讀取資料。

  • Transform(資料轉換):

    • 作用:Transform階段負責對輸入的資料流進行處理和轉換。這些轉換操作可以是過濾、映射、彙總、視窗操作等。

    • 樣本:對輸入資料流進行映射操作,將輸入的資料結構轉換成另一種形式,或對資料進行彙總,計算每分鐘的某個指標等。

  • Sink(資料匯):

    • 作用:Sink是Flink資料流的出口,用於將處理後的資料輸出到外部系統。Sink可以將資料寫入資料庫、檔案、訊息佇列等。

    • 樣本:將處理後的結果寫入MySQL資料庫,或者將資料發送到另一個Kafka主題。

通過Flink Doris Connector將資料匯入至SelectDB,各資料的流向如下圖所示。

前提條件

  • 資料來源、Fink與SelectDB網路互連:

    1. ApsaraDB for SelectDB執行個體申請公網地址。具體操作,請參見申請和釋放公網地址

      如果您的Flink與資料來源均為阿里雲產品,或者均部署在阿里雲伺服器上,且阿里雲產品或阿里雲伺服器與ApsaraDB for SelectDB執行個體位於同一VPC下,跳過此步驟。

    2. 將Flink以及資料來源的相關IP添加至ApsaraDB for SelectDB的白名單。具體操作,請參見設定白名單

  • Flink已引入與Flink Doris Connector。

    Flink與Flink Doris Connector版本要求如下:

    Flink版本

    Flink Doris Connector版本

    下載地址

    阿里雲Realtime ComputeFlink版:大於或等於1.17

    開源版:大於或等於1.15

    1.5.2及以上版本

    Flink Doris Connector

    Flink如何引入Flink Doris Connector,請參見如何引入Flink Doris Connector

如何引入Flink Doris Connector

根據您的實際情況,引入Flink Doris Connector

  • 如果您的是通過阿里雲Realtime ComputeFlink版匯入資料至SelectDB,您可以通過自訂連接器來上傳、使用和更新Flink Doris Connector。如何使用自訂連接器,請參見管理自訂連接器

  • 如果您的Flink叢集是自建的開源版叢集,您需要下載對應Flink Doris Connector版本的JAR包,並放置在Flink安裝目錄的lib目錄下。JAR包下載地址,請參見JAR包

  • 如果您需要以Maven的方式引入Flink Doris Connector,需在專案的依賴設定檔中添加以下代碼。更多版本,請參見Maven倉庫

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  

使用樣本

樣本環境

本樣本為使用Flink SQL、Flink CDC和DataStream三種方式將MySQL資料庫test的employees表的資料,遷移至SelectDB的資料庫test的employees表中。在實際使用中,請根據您的情境修改對應參數。樣本環境如下:

  • Flink 1.16單機環境

  • Jav

  • 目標庫:test

  • 目標表:employees

  • 來源資料庫:test

  • 來源資料表:employees

環境準備

Flink環境準備

  1. Java環境準備。

    Flink的運行依賴Java環境,因此需安裝Java的開發套件JDK,並配置JAVA_HOME環境變數。

    目標Java版本與Flink版本有關,Flink支援的Java版本詳情,請參見Flink支援的Java列表。本樣本中,安裝的版本為Java 8。具體操作,請參見安裝JDK

  2. 下載Flink安裝包flink-1.16.3-bin-scala_2.12.tgz。如果此版本已到期,請可以下載其他版本。更多版本,請參見Apache Flink

    wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  3. 解壓安裝包。

    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  4. 進入Flink安裝目錄的lib目錄,為後續操作引入相關Connector。

    • 引入Flink Doris Connector。

      wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
    • 引入Flink MySQL Connector。

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
  5. 啟動Flink叢集。

    在Flink安裝目錄的bin目錄下,執行以下指令。

    ./start-cluster.sh 

目標SelectDB庫表準備

  1. 建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體

  2. 連結執行個體,詳情請參見串連執行個體

  3. 建立測試資料庫test。

    CREATE DATABASE test;
  4. 建立測試表employees。

    USE test;
    
    -- 建表
    CREATE TABLE employees (
        emp_no       int NOT NULL,
        birth_date   date,
        first_name   varchar(20),
        last_name    varchar(20),
        gender       char(2),
        hire_date    date
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

源表MySQL庫表準備

  1. 建立MySQL執行個體,詳情請參見快捷建立RDS MySQL執行個體與設定資料庫

  2. 建立測試資料庫test。

    CREATE DATABASE test;
  3. 建立測試表employees。

    USE test;
    
    CREATE TABLE employees (
        emp_no INT NOT NULL PRIMARY KEY,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name VARCHAR(20),
        gender CHAR(2),
        hire_date DATE
    );
  4. 插入資料。

    INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES
    (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'),
    (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'),
    (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'),
    (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'),
    (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');

通過Flink SQL方式匯入資料

  1. 啟動Flink SQL Client服務。

    在Flink安裝目錄的bin目錄下,執行以下指令。

    ./sql-client.sh
  2. 在Flink SQL Client上提交Flink任務,具體步驟如下。

    1. 建立MySQL源表。

      下述語句中,WITH後面的配置項為MySQL CDC Source的資訊,配置項詳情,請參見MySQL | Apache Flink CDC

      CREATE TABLE employees_source (
          emp_no INT,
          birth_date DATE,
          first_name STRING,
          last_name STRING,
          gender STRING,
          hire_date DATE,
          PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '127.0.0.1', 
          'port' = '3306',
          'username' = 'root',
          'password' = '****',
          'database-name' = 'test',
          'table-name' = 'employees'
      );
    2. 建立SelectDB結果表。

      下述語句中,WITH後面的配置項為SelectDB的資訊,配置項詳情,請參見Sink配置項

      CREATE TABLE employees_sink (
          emp_no       INT ,
          birth_date   DATE,
          first_name   STRING,
          last_name    STRING,
          gender       STRING,
          hire_date    DATE
      ) 
      WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'test.employees',
        'username' = 'admin',
        'password' = '****'
      );
    3. 同步MySql源表資料至SelectDB結果表。

      INSERT INTO employees_sink SELECT * FROM employees_source;
  3. 查看資料匯入結果。

    串連SelectDB,執行以下語句,查看資料匯入結果。

    SELECT * FROM test.employees;

通過Flink CDC方式

重要

阿里雲Realtime ComputeFlink版不支援JAR作業方式,後面通過CDC 3.0的YAML作業來支援。

以下介紹如何使用Flink CDC將資料庫的資料匯入至SelectDB

在Flink安裝目錄下,通過flink程式執行Flink CDC,文法如下:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

參數說明

參數

說明

execution.checkpointing.interval

Flink checkpoint的時間間隔,影響資料同步的頻率,推薦10s。

parallelism.default

設定Flink任務的並行度,適當增加並行度可提高資料同步速度。

job-name

Flink作業名稱。

database

同步到SelectDB的資料庫名。

table-prefix

SelectDB表首碼名,例如--table-prefix ods_

table-suffix

SelectDB表的尾碼名。

including-tables

需要同步的表,可以使用"|"分隔多個表,並支援Regex。例如--including-tables table1|tbl.*,指同步table1和所有以tbl.開頭的表。

excluding-tables

不需要同步的表,配置方法與including-tables相同。

mysql-conf

MySQL CDC Source配置。詳情請參見MySQL CDC Connector,其中hostnameusernamepassworddatabase-name是必選項。

oracle-conf

Oracle CDC Source配置。詳情請參見Oracle CDC Connector,其中hostnameusernamepassworddatabase-nameschema-name是必選項。

sink-conf

Doris Sink的所有配置,詳情請參見Sink配置項

table-conf

SelectDB表的配置項,即建立SelectDB表時properties中包含的內容。

說明
  1. 同步時需要在$FLINK_HOME/lib目錄下添加對應的Flink CDC依賴,例如flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar。

  2. Flink 1.15以上的版本支援整庫同步,Flink Doris Connector各個版本的下載請參見Flink Doris Connector

Sink配置項

參數

預設值

是否必填

說明

fenodes

ApsaraDB for SelectDB執行個體的訪問地址和HTTP協議連接埠。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

資料庫與表名。樣本:test_db.test_table

username

ApsaraDB for SelectDB執行個體的資料庫使用者名稱。

password

ApsaraDB for SelectDB執行個體對應資料庫使用者名稱的密碼。

jdbc-url

ApsaraDB for SelectDB執行個體的JDBC串連資訊。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

樣本:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

auto-redirect

true

是否重新導向Stream Load請求。開啟後Stream Load將通過FE寫入,不再顯示擷取BE資訊。

doris.request.retries

3

SelectDB發送請求的重試次數。

doris.request.connect.timeout

30s

SelectDB發送請求的連線逾時時間。

doris.request.read.timeout

30s

SelectDB發送請求的讀取逾時時間。

sink.label-prefix

""

Stream load匯入使用的label首碼。2pc情境下要求全域唯一,用來保證Flink的EOS語義。

sink.properties

Stream Load的匯入參數,請填寫屬性配置。

  • CSV格式時請寫入:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON格式時請寫入:

    sink.properties.format='json' 

更多參數,請參見:Stream Load

sink.buffer-size

1048576

寫資料緩衝buffer大小,單位位元組。不建議修改,預設配置即可,預設1 MB。

sink.buffer-count

3

寫資料緩衝buffer個數。不建議修改,預設配置即可。

sink.max-retries

3

提交(Commit)階段失敗後的最大重試次數,預設3次。

sink.use-cache

false

異常時,是否使用記憶體緩衝進行恢複,開啟後緩衝中會保留Checkpoint期間的資料。

sink.enable-delete

true

是否同步刪除事件。只支援Unique模型。

sink.enable-2pc

true

是否開啟兩階段交易認可(2pc),預設為true,保證EOS語義。

sink.enable.batch-mode

false

是否使用攢批模式寫入SelectDB,開啟後寫入時機不依賴Checkpoint,通過sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes和sink.buffer-flush.interval參數來控制寫入時機。

同時開啟後將不保證EOS語義,可藉助Unique模型做到等冪。

sink.flush.queue-size

2

批處理模式下,緩衝的隊列大小。

sink.buffer-flush.max-rows

50000

批處理模式下,單個批次最多寫入的資料行數。

sink.buffer-flush.max-bytes

10MB

批處理模式下,單個批次最多寫入的位元組數。

sink.buffer-flush.interval

10s

批處理模式下,非同步重新整理緩衝的間隔。最小1s。

sink.ignore.update-before

true

是否忽略update-before事件,預設忽略。

同步樣本

MySQL同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Oracle同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

PostgreSQL同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

SQL Server同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

通過DataStream方式

  1. 在Maven專案中,引入相關依賴。

    相關Maven依賴

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
  2. Java核心代碼。

    下述代碼中,MySql源表配置與SelectDB結果表配置的參數,與上述通過Flink SQL方式匯入資料中的配置一一對應,詳情請參見MySQL | Apache Flink CDCSink配置項

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
            
            // MySql源表配置
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs***")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            // SelectDB 結果表配置
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
            DorisOptions dorisOptions = dorisBuilder.build();
    
            // 配置Stream Load相關參數 sink.properties
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

使用進階

使用Flink SQL更新部分列資料

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- 開啟部分列更新
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

使用Flink SQL根據指定列刪除資料

在資料來源為CDC的情境中,Doris Sink會根據RowKind來區分事件的類型,對隱藏列__DORIS_DELETE_SIGN__進行賦值以達到刪除的目的。在資料來源為Kafka訊息的情境中,Doris Sink無法直接使用RowKind來區分操作類型,需要依賴訊息中的特定欄位來標記操作類型,比如{"op_type":"delete",data:{...}},針對這類資料,希望將op_type=delete的資料刪除掉。此時需要根據商務邏輯判斷,顯式地傳入隱藏列的值。下面以Flink SQL方式為例,介紹如何根據Kafka資料中的特定欄位刪除SelectDB中的資料。

-- 比如資料: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false表示不從RowKind擷取事件類型
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- 顯示指定stream load的匯入列
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

常見問題

  • Q:如何寫入Bitmap類型?

    A:樣本如下所示:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'doris', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q:如何解決報錯:errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]

    A:Exactly-Once情境下,Flink Job重啟時必須從最新的Checkpoint或Savepoint啟動,否則會報如上錯誤。不要求Exactly-Once時,也可通過關閉兩階段交易認可(2PC)sink.enable-2pc=false 或更換不同的sink.label-prefix解決。

  • Q:如何解決報錯:errCode = 2, detailMessage = transaction[19650]not found

    A:此報錯發生在提交(Commit)階段,Checkpoint裡面記錄的事務ID,在SelectDB側已經到期,此時再次提交(Commit)就會出現上述錯誤。 此時無法從Checkpoint啟動,後續可通過修改SelectDB的參數streaming_label_keep_max_second配置來延長到期時間,預設為12小時。

  • Q:如何解決報錯:errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

    A:此報錯是因為同一個庫並發匯入超過了100,可以通過調整SelectDB的參數max_running_txn_num_per_db來解決,詳情請參見max_running_txn_num_per_db

    同時,一個任務頻繁修改label重啟,也可能會導致這個錯誤。兩階段交易認可(2PC)情境下(Duplicate/Aggregate模型),每個任務的label需要唯一,並且從Checkpoint重啟時,Flink任務才會主動中止(abort)之前啟動的但未完成(即已precommit但未Commit的)事務(txn)。如果頻繁修改label重啟,會導致大量precommit成功的事務(txn)無法被中止(abort),佔用事務。在Unique模型下也可關閉兩階段交易認可(2PC),通過設計Sink來實現等冪寫入。

  • Q:Flink寫入Unique模型時,如何保證一批資料的有序性?

    A:可以添加sequence列配置來保證,更多詳情,請參見sequence

  • Q:為什麼Flink任務沒報錯,但是無法同步資料?

    A:Connector 1.1.0版本以前,是攢批寫入的,寫入均是由資料驅動,需要判斷上遊是否有資料寫入。1.1.0之後,依賴Checkpoint,必須開啟Checkpoint才能寫入。

  • Q:如何解決報錯:tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

    A:此報錯通常發生在Connector 1.1.0版本之前,是由於寫入頻率過快,導致版本過多。可以通過設定sink.buffer-flush.max-bytessink.buffer-flush.interval參數來降低Stream Load的頻率。

  • Q:Flink匯入時有髒資料,如何跳過?

    A:Flink在資料匯入時,如果有髒資料,比如欄位格式、長度等問題,會導致Stream Load報錯,此時Flink會不斷地重試。如果需要跳過,可以通過禁用Stream Load的strict 模式(strict_mode=false,max_filter_ratio=1)或者在Sink運算元之前對資料做過濾。

  • Q:源表和SelectDB表應如何對應?

    A:使用Flink Doris Connector匯入資料時,要注意兩個方面,一是源表的列和類型要跟Flink SQL中的列和類型對應;二是Flink SQL中的列和類型要跟SelectDB表的列和類型對應。

  • Q:如何解決報錯:TApplicationException: get_next failed: out of sequence response: expected 4 but got 3

    A:此報錯是由於Thrift架構存在並發bug導致的,建議您使用儘可能新的Connector以及與之相容的Flink版本。

  • Q:如何解決報錯:DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX

    A:你可以在TaskManager中搜尋日誌abort transaction response,根據HTTP返回碼確定是用戶端(Client)的問題還是伺服器(Server)的問題。