全部產品
Search
文件中心

ApsaraDB for SelectDB:通過Flink匯入資料

更新時間:Oct 26, 2024

ApsaraDB for SelectDB相容Apache Doris,支援通過Flink Doris Connector,將Kafka中的非結構化資料以及MySQL等上遊業務資料庫中的變更資料,即時同步到ApsaraDB for SelectDB中,有效地滿足海量資料的分析需求。

功能簡介

Flink Doris Connector是雲資料庫 SelectDB 版流式匯入資料的常用方式。基於Flink的流處理能力,您可以將上遊資料來源(例如:MySQL、Oracle、PostgreSQL、SQL Server、Kafka等)中的大量資料,通過Flink Doris Connector匯入到SelectDB表中。同時,您也可以使用Flink的JDBC方式來讀取SelectDB表中的資料。

重要

Flink Doris Connector目前僅支援向SelectDB寫資料,如果您有讀取SelectDB資料的需求,請使用Flink JDBC Connector。

flink1_bd9b152917_副本

工作原理

Flink Doris Connector在接收到資料後,通過HTTP的分塊傳輸編碼(Chunked Transfer Encoding)機制,持續向SelectDB寫入資料。進一步結合Flink的Checkpoint機制和Stream Load的兩階段交易認可,Flink Doris Connector 實現了精確一次語義(EOS,Exactly-Once Semantics),確保了端到端的資料一致性。具體原理如下:

  1. Flink任務在啟動的時候,會發起一個Stream Load的PreCommit請求,此時會先開啟一個事務,同時會通過HTTP的分塊傳輸編碼(Chunked)機制將資料持續發送到SelectDB,其原理如下圖:

    StreamLoad-1

  2. 在Checkpoint時,結束資料寫入,同時完成HTTP請求,並且將事務狀態設定為預提交(PreCommitted),此時資料已經寫入SelectDB,對使用者不可見,其原理如下圖:

    StreamLoad-2

  3. Checkpoint完成後,發起Commit Stream Load請求,並且將事務狀態設定為提交(Committed),完成後資料對使用者可見,其原理如下圖:

    StreamLoad-3

  4. Flink任務意外故障後,從Checkpoint重啟時,若前次交易為預提交(PreCommitted)狀態,則會發起復原請求,並且將事務狀態設定為Aborted。基於此,可以藉助Flink Doris Connector實現資料即時入庫時資料不丟不重。

前提條件

Flink版本必須大於或等於1.15,並且需與Connector版本相對應。版本對應規則如下:

Flink版本

Connector類型

Connector版本

下載地址

大於或等於1.15

Flink Doris Connector

1.5.2及以上版本

Flink Doris Connector

如何引入Flink Doris Connector

您可以通過以下方式,引入Flink Doris Connector

  • 如果您需要以Maven的方式引入Flink Doris Connector,需在專案的依賴設定檔中添加以下代碼。更多版本,請參見Maven倉庫。下載對應版本的JAR包,並放置在FLINK_HOME/lib目錄下。JAR包下載地址,請參見JAR包

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  
  • 如果您的情境是通過阿里雲Realtime ComputeFlink版匯入資料到SelectDB,您可以使用自訂連接器管理功能,上傳、使用和更新Flink Doris Connector。如何使用自訂連接器,請參見管理自訂連接器

使用樣本

如下以Flink SQL、Flink CDC和DataStream三種方式,示範如何將上遊的資料同步到ApsaraDB for SelectDB

環境準備

搭建Flink環境,本文以Flink 1.16單機環境為例。

  1. 下載flink-1.16.3-bin-scala_2.12.tgz,進行解壓,樣本如下。

    wget https://dlcdn.apache.org/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. 進入FLINK_HOME/lib目錄中下載flink-sql-connector-mysql-cdc-2.4.2flink-doris-connector-1.16-1.5.2,樣本如下。

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. 啟動Flink Standalone叢集,樣本如下。

    bin/start-cluster.sh
  4. 建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體

  5. 通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體

  6. 建立測試資料庫和測試表。

    1. 建立測試資料庫。

      CREATE DATABASE test_db;
    2. 建立測試表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

通過Flink SQL方式

如下以MySQL為例,介紹如何使用Flink SQL將上遊的MySQL資料匯入至ApsaraDB for SelectDB

  1. 啟動Flink SQL Client服務,樣本如下。

    bin/sql-client.sh
  2. 在Flink SQL Client上提交Flink任務,樣本如下。

    SET 'execution.checkpointing.interval' = '10s';
    
    CREATE TABLE employees_source (
        emp_no INT,
        birth_date DATE,
        first_name STRING,
        last_name STRING,
        gender STRING,
        hire_date DATE,
        PRIMARY KEY (`emp_no`) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '127.0.0.1', 
        'port' = '3306',
        'username' = 'root',
        'password' = '****',
        'database-name' = 'test',
        'table-name' = 'employees'
    );
    
    CREATE TABLE employees_sink (
        emp_no       INT ,
        birth_date   DATE,
        first_name   STRING,
        last_name    STRING,
        gender       STRING,
        hire_date    DATE
    ) 
    WITH (
      'connector' = 'selectdb-preview',
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.employees',
      'username' = 'admin',
      'password' = '****',
      'sink.enable-delete' = 'true'
    );
    
    INSERT INTO employees_sink SELECT * FROM employees_source;

通過Flink CDC方式

重要

阿里雲Realtime ComputeFlink版不支援JAR作業方式,後面通過CDC 3.0的YAML作業來支援。

以下介紹如何使用Flink CDC將上遊資料庫的資料匯入至ApsaraDB for SelectDB

Flink CDC的文法如下:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

參數

說明

execution.checkpointing.interval

Flink checkpoint的時間間隔,影響資料同步的頻率,推薦10s。

parallelism.default

設定Flink任務的並行度,適當增加並行度可提高資料同步速度。

job-name

Flink作業名稱。

database

同步到SelectDB的資料庫名。

table-prefix

SelectDB表首碼名,例如 --table-prefix ods_

table-suffix

SelectDB表的尾碼名。

including-tables

需要同步的表,可以使用"|"分隔多個表,並支援Regex。 例如--including-tables table1|tbl.*,指同步table1和所有以tbl.開頭的表。

excluding-tables

不需要同步的表,配置方法與including-tables相同。

mysql-conf

MySQL CDC Source配置。詳情請參見MySQL CDC Connector,其中hostnameusernamepassworddatabase-name是必選項。

oracle-conf

Oracle CDC Source配置。詳情請參見Oracle CDC Connector,其中hostnameusernamepassworddatabase-nameschema-name是必選項。

sink-conf

Doris Sink的所有配置,詳情請參見Sink配置項

table-conf

SelectDB表的配置項,即建立SelectDB表時properties中包含的內容。

說明
  1. 同步時需要在$FLINK_HOME/lib目錄下添加對應的Flink CDC依賴,例如flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar。

  2. Flink 1.15以上的版本支援整庫同步,Flink Doris Connector各個版本的下載請參見Flink Doris Connector

Sink配置項

參數

預設值

是否必填

說明

fenodes

ApsaraDB for SelectDB執行個體的訪問地址和HTTP協議連接埠。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

資料庫與表名。樣本:test_db.test_table

username

ApsaraDB for SelectDB執行個體的資料庫使用者名稱。

password

請填寫ApsaraDB for SelectDB執行個體對應資料庫使用者名稱的密碼。

jdbc-url

ApsaraDB for SelectDB執行個體的JDBC串連資訊。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

樣本:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

auto-redirect

true

是否重新導向Stream Load請求。開啟後Stream Load將通過FE寫入,不再顯示擷取BE資訊。

doris.request.retries

3

向SelectDB發送請求的重試次數。

doris.request.connect.timeout

30s

向SelectDB發送請求的連線逾時時間。

doris.request.read.timeout

30s

向SelectDB發送請求的讀取逾時時間。

sink.label-prefix

""

Stream load匯入使用的label首碼。2pc情境下要求全域唯一 ,用來保證Flink的EOS語義。

sink.properties

Stream Load 的匯入參數,請填寫屬性配置。

  • CSV格式時請寫入:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON格式時請寫入:

    sink.properties.format='json' 

更多參數,請參見:Stream Load

sink.buffer-size

1048576

寫資料緩衝buffer大小,單位位元組。不建議修改,預設配置即可,預設1 MB。

sink.buffer-count

3

寫資料緩衝buffer個數。不建議修改,預設配置即可。

sink.max-retries

3

提交(Commit)階段失敗後的最大重試次數,預設3次。

sink.use-cache

false

異常時,是否使用記憶體緩衝進行恢複,開啟後緩衝中會保留Checkpoint期間的資料。

sink.enable-delete

true

是否同步刪除事件。只支援Unique模型。

sink.enable-2pc

true

是否開啟兩階段交易認可(2pc),預設為true,保證EOS語義。

sink.enable.batch-mode

false

是否使用攢批模式寫入SelectDB,開啟後寫入時機不依賴Checkpoint,通過sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes和sink.buffer-flush.interval參數來控制寫入時機。

同時開啟後將不保證EOS語義,可藉助Unique模型做到等冪。

sink.flush.queue-size

2

攢批模式下,緩衝的隊列大小。

sink.buffer-flush.max-rows

50000

攢批模式下,單個批次最多寫入的資料行數。

sink.buffer-flush.max-bytes

10MB

攢批模式下,單個批次最多寫入的位元組數。

sink.buffer-flush.interval

10s

攢批模式下,非同步重新整理緩衝的間隔。最小1s。

sink.ignore.update-before

true

是否忽略update-before事件,預設忽略。

MySQL同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Oracle同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

PostgreSQL同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

SQL Server同步樣本

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

通過DataStream方式

如下以MySQL為例,介紹如何使用DataStream將上遊的MySQL資料匯入至ApsaraDB for SelectDB

  • maven依賴包如下,樣本如下。

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter</artifactId>
                <version>RELEASE</version>
                <scope>test</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>2.0.31</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
    
        </dependencies>
  • Java核心代碼,樣本如下。

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
    
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs.com")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
    
            DorisOptions dorisOptions = dorisBuilder.build();
    
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
    
            DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

使用進階

使用Flink SQL更新部分列資料

樣本

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'selectdb-preview',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- 開啟部分列更新
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

使用Flink SQL根據指定列刪除資料

在上遊資料來源為CDC的情境中,Doris Sink會根據RowKind來區分事件的類型,對隱藏列__DORIS_DELETE_SIGN__進行賦值以達到刪除的目的。在上遊資料來源為Kafka訊息的情境中,Doris Sink無法直接使用RowKind來區分操作類型,需要依賴訊息中的特定欄位來標記操作類型,比如{"op_type":"delete",data:{...}},針對這類資料,希望將op_type=delete的資料刪除掉。此時需要根據商務邏輯判斷,顯式地傳入隱藏列的值。下面以Flink SQL方式為例,介紹如何根據Kafka資料中的特定欄位刪除SelectDB中的資料。

樣本

-- 比如上遊資料: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'selectdb-preview',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false表示不從RowKind擷取事件類型
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- 顯示指定stream load的匯入列
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

常見問題

  • Q:如何寫入Bitmap類型?

    A:樣本如下所示:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'selectdb-preview', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q:如何解決報錯:errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]。

    A:Exactly-Once情境下,Flink Job重啟時必須從最新的Checkpoint或Savepoint啟動,否則會報如上錯誤。不要求Exactly-Once時,也可通過關閉兩階段交易認可(2PC)sink.enable-2pc=false 或更換不同的sink.label-prefix解決。

  • Q:如何解決報錯:errCode = 2, detailMessage = transaction[19650]not found。

    A:此報錯發生在提交(Commit)階段,Checkpoint裡面記錄的事務ID,在SelectDB側已經到期,此時再次提交(Commit)就會出現上述錯誤。 此時無法從Checkpoint啟動,後續可通過修改SelectDB的參數streaming_label_keep_max_second配置來延長到期時間,預設為12小時。

  • Q:如何解決報錯:errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100。

    A:此報錯是因為同一個庫並發匯入超過了100,可以通過調整SelectDB的參數max_running_txn_num_per_db來解決,詳情請參見max_running_txn_num_per_db

    同時,一個任務頻繁修改label重啟,也可能會導致這個錯誤。兩階段交易認可(2PC)情境下(Duplicate/Aggregate模型),每個任務的label需要唯一,並且從Checkpoint重啟時,Flink任務才會主動中止(abort)之前啟動的但未完成(即已precommit但未Commit的)事務(txn)。如果頻繁修改label重啟,會導致大量precommit成功的事務(txn)無法被中止(abort),佔用事務。在Unique模型下也可關閉兩階段交易認可(2PC),通過設計Sink來實現等冪寫入。

  • Q:Flink寫入Unique模型時,如何保證一批資料的有序性?

    A:可以添加sequence列配置來保證,更多詳情,請參見sequence

  • Q:為什麼Flink任務沒報錯,但是無法同步資料?

    A:Connector 1.1.0版本以前,是攢批寫入的,寫入均是由資料驅動,需要判斷上遊是否有資料寫入。1.1.0之後,依賴Checkpoint,必須開啟Checkpoint才能寫入。

  • Q:如何解決報錯:tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235。

    A:此報錯通常發生在Connector 1.1.0版本之前,是由於寫入頻率過快,導致版本過多。可以通過設定sink.buffer-flush.max-bytessink.buffer-flush.interval參數來降低Stream Load的頻率。

  • Q:Flink匯入時有髒資料,如何跳過?

    A:Flink在資料匯入時,如果有髒資料,比如欄位格式、長度等問題,會導致Stream Load報錯,此時Flink會不斷地重試。如果需要跳過,可以通過禁用Stream Load的strict 模式(strict_mode=false,max_filter_ratio=1)或者在Sink運算元之前對資料做過濾。

  • Q:源表和SelectDB表應如何對應?

    A:使用Flink Doris Connector匯入資料時,要注意兩個方面,一是源表的列和類型要跟Flink SQL中的列和類型對應;二是Flink SQL中的列和類型要跟SelectDB表的列和類型對應。

  • Q:如何解決報錯:TApplicationException: get_next failed: out of sequence response: expected 4 but got 3。

    A:此報錯是由於Thrift架構存在並發bug導致的,建議您使用儘可能新的Connector以及與之相容的Flink版本。

  • Q:如何解決報錯:DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX。

    A:你可以在TaskManager中搜尋日誌abort transaction response,根據HTTP返回碼確定是用戶端(Client)的問題還是伺服器(Server)的問題。