ApsaraDB for SelectDB相容Apache Doris,支援通過Flink Doris Connector,將Kafka中的非結構化資料以及MySQL等上遊業務資料庫中的變更資料,即時同步到ApsaraDB for SelectDB中,有效地滿足海量資料的分析需求。
功能簡介
Flink Doris Connector是雲資料庫 SelectDB 版流式匯入資料的常用方式。基於Flink的流處理能力,您可以將上遊資料來源(例如:MySQL、Oracle、PostgreSQL、SQL Server、Kafka等)中的大量資料,通過Flink Doris Connector匯入到SelectDB表中。同時,您也可以使用Flink的JDBC方式來讀取SelectDB表中的資料。
Flink Doris Connector目前僅支援向SelectDB寫資料,如果您有讀取SelectDB資料的需求,請使用Flink JDBC Connector。
工作原理
Flink Doris Connector在接收到資料後,通過HTTP的分塊傳輸編碼(Chunked Transfer Encoding)機制,持續向SelectDB寫入資料。進一步結合Flink的Checkpoint機制和Stream Load的兩階段交易認可,Flink Doris Connector 實現了精確一次語義(EOS,Exactly-Once Semantics),確保了端到端的資料一致性。具體原理如下:
Flink任務在啟動的時候,會發起一個Stream Load的PreCommit請求,此時會先開啟一個事務,同時會通過HTTP的分塊傳輸編碼(Chunked)機制將資料持續發送到SelectDB,其原理如下圖:
在Checkpoint時,結束資料寫入,同時完成HTTP請求,並且將事務狀態設定為預提交(PreCommitted),此時資料已經寫入SelectDB,對使用者不可見,其原理如下圖:
Checkpoint完成後,發起Commit Stream Load請求,並且將事務狀態設定為提交(Committed),完成後資料對使用者可見,其原理如下圖:
Flink任務意外故障後,從Checkpoint重啟時,若前次交易為預提交(PreCommitted)狀態,則會發起復原請求,並且將事務狀態設定為Aborted。基於此,可以藉助Flink Doris Connector實現資料即時入庫時資料不丟不重。
前提條件
Flink版本必須大於或等於1.15,並且需與Connector版本相對應。版本對應規則如下:
Flink版本 | Connector類型 | Connector版本 | 下載地址 |
大於或等於1.15 |
| 1.5.2及以上版本 |
如何引入Flink Doris Connector
您可以通過以下方式,引入Flink Doris Connector
。
如果您需要以Maven的方式引入
Flink Doris Connector
,需在專案的依賴設定檔中添加以下代碼。更多版本,請參見Maven倉庫。下載對應版本的JAR包,並放置在FLINK_HOME/lib
目錄下。JAR包下載地址,請參見JAR包。<!-- flink-doris-connector --> <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> <version>1.5.2</version> </dependency>
如果您的情境是通過阿里雲Realtime ComputeFlink版匯入資料到SelectDB,您可以使用自訂連接器管理功能,上傳、使用和更新
Flink Doris Connector
。如何使用自訂連接器,請參見管理自訂連接器。
使用樣本
如下以Flink SQL、Flink CDC和DataStream三種方式,示範如何將上遊的資料同步到ApsaraDB for SelectDB。
環境準備
搭建Flink環境,本文以Flink 1.16單機環境為例。
下載flink-1.16.3-bin-scala_2.12.tgz,進行解壓,樣本如下。
wget https://dlcdn.apache.org/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
進入FLINK_HOME/lib目錄中下載flink-sql-connector-mysql-cdc-2.4.2和flink-doris-connector-1.16-1.5.2,樣本如下。
cd flink-1.16.3 cd lib/ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
啟動Flink Standalone叢集,樣本如下。
bin/start-cluster.sh
建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體。
通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;
建立測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
通過Flink SQL方式
如下以MySQL為例,介紹如何使用Flink SQL將上遊的MySQL資料匯入至ApsaraDB for SelectDB。
啟動Flink SQL Client服務,樣本如下。
bin/sql-client.sh
在Flink SQL Client上提交Flink任務,樣本如下。
SET 'execution.checkpointing.interval' = '10s'; CREATE TABLE employees_source ( emp_no INT, birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE, PRIMARY KEY (`emp_no`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '****', 'database-name' = 'test', 'table-name' = 'employees' ); CREATE TABLE employees_sink ( emp_no INT , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'selectdb-preview', 'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.employees', 'username' = 'admin', 'password' = '****', 'sink.enable-delete' = 'true' ); INSERT INTO employees_sink SELECT * FROM employees_source;
通過Flink CDC方式
阿里雲Realtime ComputeFlink版不支援JAR作業方式,後面通過CDC 3.0的YAML作業來支援。
以下介紹如何使用Flink CDC將上遊資料庫的資料匯入至ApsaraDB for SelectDB。
Flink CDC的文法如下:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
--database <selectdb-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <selectdb-table-prefix>] \
[--table-suffix <selectdb-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]
參數 | 說明 |
execution.checkpointing.interval | Flink checkpoint的時間間隔,影響資料同步的頻率,推薦10s。 |
parallelism.default | 設定Flink任務的並行度,適當增加並行度可提高資料同步速度。 |
job-name | Flink作業名稱。 |
database | 同步到SelectDB的資料庫名。 |
table-prefix | SelectDB表首碼名,例如 |
table-suffix | SelectDB表的尾碼名。 |
including-tables | 需要同步的表,可以使用"|"分隔多個表,並支援Regex。 例如 |
excluding-tables | 不需要同步的表,配置方法與including-tables相同。 |
mysql-conf | MySQL CDC Source配置。詳情請參見MySQL CDC Connector,其中 |
oracle-conf | Oracle CDC Source配置。詳情請參見Oracle CDC Connector,其中 |
sink-conf | Doris Sink的所有配置,詳情請參見Sink配置項。 |
table-conf | SelectDB表的配置項,即建立SelectDB表時properties中包含的內容。 |
同步時需要在$FLINK_HOME/lib目錄下添加對應的Flink CDC依賴,例如flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar。
Flink 1.15以上的版本支援整庫同步,Flink Doris Connector各個版本的下載請參見Flink Doris Connector。
Sink配置項
參數 | 預設值 | 是否必填 | 說明 |
fenodes | 無 | 是 | ApsaraDB for SelectDB執行個體的訪問地址和HTTP協議連接埠。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。 樣本: |
table.identifier | 無 | 是 | 資料庫與表名。樣本: |
username | 無 | 是 | ApsaraDB for SelectDB執行個體的資料庫使用者名稱。 |
password | 無 | 是 | 請填寫ApsaraDB for SelectDB執行個體對應資料庫使用者名稱的密碼。 |
jdbc-url | 無 | 否 | ApsaraDB for SelectDB執行個體的JDBC串連資訊。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠。 樣本: |
auto-redirect | true | 否 | 是否重新導向Stream Load請求。開啟後Stream Load將通過FE寫入,不再顯示擷取BE資訊。 |
doris.request.retries | 3 | 否 | 向SelectDB發送請求的重試次數。 |
doris.request.connect.timeout | 30s | 否 | 向SelectDB發送請求的連線逾時時間。 |
doris.request.read.timeout | 30s | 否 | 向SelectDB發送請求的讀取逾時時間。 |
sink.label-prefix | "" | 是 | Stream load匯入使用的label首碼。2pc情境下要求全域唯一 ,用來保證Flink的EOS語義。 |
sink.properties | 無 | 否 | Stream Load 的匯入參數,請填寫屬性配置。
更多參數,請參見:Stream Load。 |
sink.buffer-size | 1048576 | 否 | 寫資料緩衝buffer大小,單位位元組。不建議修改,預設配置即可,預設1 MB。 |
sink.buffer-count | 3 | 否 | 寫資料緩衝buffer個數。不建議修改,預設配置即可。 |
sink.max-retries | 3 | 否 | 提交(Commit)階段失敗後的最大重試次數,預設3次。 |
sink.use-cache | false | 否 | 異常時,是否使用記憶體緩衝進行恢複,開啟後緩衝中會保留Checkpoint期間的資料。 |
sink.enable-delete | true | 否 | 是否同步刪除事件。只支援Unique模型。 |
sink.enable-2pc | true | 否 | 是否開啟兩階段交易認可(2pc),預設為true,保證EOS語義。 |
sink.enable.batch-mode | false | 否 | 是否使用攢批模式寫入SelectDB,開啟後寫入時機不依賴Checkpoint,通過sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes和sink.buffer-flush.interval參數來控制寫入時機。 同時開啟後將不保證EOS語義,可藉助Unique模型做到等冪。 |
sink.flush.queue-size | 2 | 否 | 攢批模式下,緩衝的隊列大小。 |
sink.buffer-flush.max-rows | 50000 | 否 | 攢批模式下,單個批次最多寫入的資料行數。 |
sink.buffer-flush.max-bytes | 10MB | 否 | 攢批模式下,單個批次最多寫入的位元組數。 |
sink.buffer-flush.interval | 10s | 否 | 攢批模式下,非同步重新整理緩衝的間隔。最小1s。 |
sink.ignore.update-before | true | 否 | 是否忽略update-before事件,預設忽略。 |
MySQL同步樣本
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
Oracle同步樣本
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
PostgreSQL同步樣本
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
SQL Server同步樣本
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
通過DataStream方式
如下以MySQL為例,介紹如何使用DataStream將上遊的MySQL資料匯入至ApsaraDB for SelectDB。
maven依賴包如下,樣本如下。
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.12</scala.version> <java.version>1.8</java.version> <flink.version>1.16.3</flink.version> <fastjson.version>1.2.62</fastjson.version> <scope.mode>compile</scope.mode> </properties> <dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>RELEASE</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.1-jre</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.14.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.31</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.4.2</version> <exclusions> <exclusion> <artifactId>flink-shaded-guava</artifactId> <groupId>org.apache.flink</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies>
Java核心代碼,樣本如下。
package org.example; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Main { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(10000); Map<String, Object> customConverterConfigs = new HashMap<>(); customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("rm-xxx.mysql.rds.aliyuncs.com") .port(3306) .startupOptions(StartupOptions.initial()) .databaseList("db_test") .tableList("db_test.employees") .username("root") .password("test_123") .debeziumProperties(DateToStringConverter.DEFAULT_PROPS) .deserializer(schema) .serverTimeZone("Asia/Shanghai") .build(); DorisSink.Builder<String> sinkBuilder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080") .setTableIdentifier("db_test.employees") .setUsername("admin") .setPassword("test_123"); DorisOptions dorisOptions = dorisBuilder.build(); Properties properties = new Properties(); properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setStreamLoadProp(properties); sinkBuilder.setDorisExecutionOptions(executionBuilder.build()) .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string .setDorisOptions(dorisOptions); DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); dataStreamSource.sinkTo(sinkBuilder.build()); env.execute("MySQL to SelectDB"); } }
使用進階
使用Flink SQL更新部分列資料
樣本
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (
id INT
,name STRING
,bank STRING
,age INT
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);
CREATE TABLE selectdb_sink (
id INT,
name STRING,
bank STRING,
age INT
)
WITH (
'connector' = 'selectdb-preview',
'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'database.table',
'username' = 'admin',
'password' = '****',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age',
'sink.properties.partial.columns' = 'true' -- 開啟部分列更新
);
INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;
使用Flink SQL根據指定列刪除資料
在上遊資料來源為CDC的情境中,Doris Sink會根據RowKind來區分事件的類型,對隱藏列__DORIS_DELETE_SIGN__
進行賦值以達到刪除的目的。在上遊資料來源為Kafka訊息的情境中,Doris Sink無法直接使用RowKind來區分操作類型,需要依賴訊息中的特定欄位來標記操作類型,比如{"op_type":"delete",data:{...}}
,針對這類資料,希望將op_type=delete的資料刪除掉。此時需要根據商務邏輯判斷,顯式地傳入隱藏列的值。下面以Flink SQL方式為例,介紹如何根據Kafka資料中的特定欄位刪除SelectDB中的資料。
樣本
-- 比如上遊資料: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
data STRING,
op_type STRING
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE SELECTDB_SINK(
id INT,
name STRING,
__DORIS_DELETE_SIGN__ INT
) WITH (
'connector' = 'selectdb-preview',
'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'db.table',
'username' = 'admin',
'password' = '****',
'sink.enable-delete' = 'false', -- false表示不從RowKind擷取事件類型
'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- 顯示指定stream load的匯入列
);
INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
FROM KAFKA_SOURCE;
常見問題
Q:如何寫入Bitmap類型?
A:樣本如下所示:
CREATE TABLE bitmap_sink ( dt INT, page STRING, user_id INT ) WITH ( 'connector' = 'selectdb-preview', 'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.bitmap_test', 'username' = 'admin', 'password' = '****', 'sink.label-prefix' = 'selectdb_label', 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)' );
Q:如何解決報錯:errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]。
A:Exactly-Once情境下,Flink Job重啟時必須從最新的Checkpoint或Savepoint啟動,否則會報如上錯誤。不要求Exactly-Once時,也可通過關閉兩階段交易認可(2PC)
sink.enable-2pc=false
或更換不同的sink.label-prefix解決。Q:如何解決報錯:errCode = 2, detailMessage = transaction[19650]not found。
A:此報錯發生在提交(Commit)階段,Checkpoint裡面記錄的事務ID,在SelectDB側已經到期,此時再次提交(Commit)就會出現上述錯誤。 此時無法從Checkpoint啟動,後續可通過修改SelectDB的參數
streaming_label_keep_max_second
配置來延長到期時間,預設為12小時。Q:如何解決報錯:errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100。
A:此報錯是因為同一個庫並發匯入超過了100,可以通過調整SelectDB的參數
max_running_txn_num_per_db
來解決,詳情請參見max_running_txn_num_per_db。同時,一個任務頻繁修改label重啟,也可能會導致這個錯誤。兩階段交易認可(2PC)情境下(Duplicate/Aggregate模型),每個任務的label需要唯一,並且從Checkpoint重啟時,Flink任務才會主動中止(abort)之前啟動的但未完成(即已precommit但未Commit的)事務(txn)。如果頻繁修改label重啟,會導致大量precommit成功的事務(txn)無法被中止(abort),佔用事務。在Unique模型下也可關閉兩階段交易認可(2PC),通過設計Sink來實現等冪寫入。
Q:Flink寫入Unique模型時,如何保證一批資料的有序性?
A:可以添加sequence列配置來保證,更多詳情,請參見sequence。
Q:為什麼Flink任務沒報錯,但是無法同步資料?
A:Connector 1.1.0版本以前,是攢批寫入的,寫入均是由資料驅動,需要判斷上遊是否有資料寫入。1.1.0之後,依賴Checkpoint,必須開啟Checkpoint才能寫入。
Q:如何解決報錯:tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235。
A:此報錯通常發生在Connector 1.1.0版本之前,是由於寫入頻率過快,導致版本過多。可以通過設定
sink.buffer-flush.max-bytes
和sink.buffer-flush.interval
參數來降低Stream Load的頻率。Q:Flink匯入時有髒資料,如何跳過?
A:Flink在資料匯入時,如果有髒資料,比如欄位格式、長度等問題,會導致Stream Load報錯,此時Flink會不斷地重試。如果需要跳過,可以通過禁用Stream Load的strict 模式(strict_mode=false,max_filter_ratio=1)或者在Sink運算元之前對資料做過濾。
Q:源表和SelectDB表應如何對應?
A:使用Flink Doris Connector匯入資料時,要注意兩個方面,一是源表的列和類型要跟Flink SQL中的列和類型對應;二是Flink SQL中的列和類型要跟SelectDB表的列和類型對應。
Q:如何解決報錯:TApplicationException: get_next failed: out of sequence response: expected 4 but got 3。
A:此報錯是由於Thrift架構存在並發bug導致的,建議您使用儘可能新的Connector以及與之相容的Flink版本。
Q:如何解決報錯:DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX。
A:你可以在TaskManager中搜尋日誌
abort transaction response
,根據HTTP返回碼確定是用戶端(Client)的問題還是伺服器(Server)的問題。