本文介紹如何使用SelectDB連接器。
背景資訊
雲資料庫 SelectDB 版是新一代即時資料倉庫SelectDB在阿里雲上的全託管服務,100%相容Apache Doris。您可以在阿里雲上便捷地購買SelectDB數倉服務,滿足海量資料分析需求,具體的產品優勢和應用情境請參見什麼是雲資料庫SelectDB版。
自訂SelectDB連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 源表,結果表,維表和資料攝入目標端 |
運行模式 | 流模式和批模式 |
資料格式 | JSON和CSV |
特有監控指標 | 無 |
API種類 | DataStream和SQL |
是否支援更新/刪除 | 是 |
特色功能
支援整庫資料同步。
SelectDB連接器提供Exactly-Once語義,保證資料不重複也不丟失。
相容1.0及以上Apache Doris,可以使用Flink SelectDB自訂連接器同步資料至Apache Doris。
注意事項
SQL
使用方法
Realtime ComputeVVR 11.1及以上版本已內建SelectDB連接器,可跳過以下步驟。
文法結構
作為源表,需要開通叢集直連,啟用Arrow Flight功能。
從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中單擊開通叢集直連。
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為
doris。fenodes
ApsaraDB for SelectDB執行個體的訪問和HTTP協議址地連接埠。
String
是
無
可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。
樣本:
selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。jdbc-url
jdbc 串連資訊,
String
否
無
可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠。
樣本:
jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030。table.identifier
資料庫表名。
String
是
無
樣本:
db.tbl。username
使用者名稱
String
是
無
如果遺忘密碼,可以從ApsaraDB for SelectDB控制台的執行個體詳情右上方進行重設。
password
密碼
String
是
無
doris.request.retries
發送請求的重試次數。
Integer
否
3
無。
doris.request.connect.timeout
發送請求的連線逾時時間。
Duration
否
30s
無。
doris.request.read.timeout
發送請求的讀取逾時時間。
Duration
否
30s
無。
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
doris.request.query.timeout
查詢逾時時間,預設值為 6 小時
Duration
否
21600s
固定值為
doris。doris.request.tablet.size
一個 Partition 對應的 Tablet 個數。
Integer
否
1
此數值設定越小,則會產生越多的 Partition。從而提升 Flink 側的並行度,但同時會對資料庫造成更大的壓力。
doris.batch.size
一次從 BE 讀取資料的最大行數。
Integer
否
4064
增大此數值可減少 Flink 與資料庫之間建立串連的次數。從而減輕網路延遲所帶來的額外時間開銷。
doris.exec.mem.limit
單個查詢的記憶體限制。
Integer
否
8192mb
預設為 8GB,單位為位元組。
source.use-flight-sql
是否使用 Arrow Flight SQL 讀取。
Boolean
否
false
無需配置。請直接從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中單擊開通叢集直連。
source.flight-sql-port
使用 Arrow Flight SQL 讀取時,FE 的 arrow_flight_sql_port。
Integer
否
-
無。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sink.label-prefix
Stream Load 匯入使用的 label 首碼。
String
否
--
多作業情境下要求全域唯一,用來保證 Flink 的 EOS 語義。相同的Label只能匯入一次,確保不重複寫入。
sink.properties.*
Stream Load 的匯入參數。
String
否
--
CSV 格式配置
'sink.properties.column_separator' = ',', -- 使用逗號分隔 -- 如果資料中可能包含逗號,建議使用不可見字元,如: -- 'sink.properties.column_separator' = '\x01'JSON 格式配置
'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true' -- 或使用 strip_outer_arraysink.enable-delete
是否啟用刪除。此選項需要 Doris 表開啟大量刪除功能。
Boolean
否
true
只支援 Unique 模型。
sink.enable-2pc
是否開啟兩階段交易認可 (2PC)。
Boolean
否
true
保證 Exactly-Once 語義。更多兩階段交易認可請參考明確交易操作。
sink.buffer-size
寫資料緩衝 buffer 大小。
Integer
否
1MB
單位位元組。不建議修改,預設配置即可。
sink.buffer-count
寫資料緩衝 buffer 個數。
Integer
否
3
不建議修改,預設配置即可
sink.max-retries
Commit 失敗後的最大重試次數。
Integer
否
3
無。
sink.enable.batch-mode
是否使用攢批模式寫入。
Boolean
否
false
開啟後寫入時機不依賴 Checkpoint,通過
sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval參數來控制寫入時機。同時開啟後將不保證 Exactly-once 語義,但可藉助 Uniq 模型做到等冪。
sink.flush.queue-size
攢批模式下,緩衝的隊列大小。
Integer
否
2
無。
sink.buffer-flush.max-rows
攢批模式下,單個批次最多寫入的資料行數。
Integer
否
500000
無。
sink.buffer-flush.max-bytes
攢批模式下,單個批次最多寫入的位元組數。
Integer
否
100MB
單位位元組。
sink.buffer-flush.interval
攢批模式下,非同步重新整理緩衝的間隔。
String
否
10s
單位毫秒。
sink.ignore.update-before
是否忽略 update-before 事件。
Boolean
否
true
無。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
lookup.cache.max-rows
lookup 緩衝的最大行數。
Integer
否
-1
-1預設為不開啟緩衝。lookup.cache.ttl
lookup 緩衝的最大時間。
String
否
10s
單位毫秒。
lookup.max-retries
lookup 查詢失敗後的重試次數
Integer
否
1
無。
lookup.jdbc.async
是否開啟非同步 lookup。
Boolean
否
false
無。
lookup.jdbc.read.batch.size
非同步 lookup 下,每次查詢的最大批次大小。
Integer
否
128
無。
lookup.jdbc.read.batch.queue-size
非同步 lookup 時,中間緩衝隊列的大小。
Integer
否
256
無。
lookup.jdbc.read.thread-size
每個 task 中 lookup 的 jdbc 線程數。
Integer
否
3
無。
使用樣本
源表
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);結果表
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
-- 'sink.label-prefix' = 'flink_orders' --相同的Label只能匯入一次,確保不重複寫入。
);維表
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create TEMPORARY table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city資料攝入
SelectDB連接器可以用於資料攝入YAML作業開發,作為目標端寫入。
文法結構
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
username: root
password: ""
配置項
參數 | 說明 | 是否必填 | 預設值 | 資料類型 | 備忘 |
type | 目標端類型。 | 是 | (none) | String | 固定值為 |
name | 目標端名稱。 | 否 | (none) | String | 無。 |
fenodes | ApsaraDB for SelectDB執行個體的訪問和HTTP協議址地連接埠。 | 是 | (none) | String | 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。 樣本: |
jdbc-url | ApsaraDB for SelectDB執行個體的JDBC串連資訊。 | 否 | (none) | String | 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠。 樣本: |
username | ApsaraDB for SelectDB執行個體的資料庫使用者名稱。 | 是 | (none) | String | 如果遺忘密碼,可以從ApsaraDB for SelectDB控制台的執行個體詳情右上方進行重設。 |
password | ApsaraDB for SelectDB執行個體對應資料庫使用者名稱的密碼。 | 是 | (none) | String | |
sink.enable.batch-mode | 是否使用攢批模式寫入SelectDB。 | 否 | true | Boolean | 開啟後寫入時機不依賴 Checkpoint,通過 同時開啟後將不保證 Exactly-once 語義,但可藉助 Uniq 模型做到等冪。 |
sink.flush.queue-size | 批處理模式下,緩衝的隊列大小。 | 否 | 2 | Integer | Queue size for batch writing |
sink.buffer-flush.max-rows | 批處理模式下,單個批次最多寫入的資料行數。 | 否 | 500000 | Integer | 無。 |
sink.buffer-flush.max-bytes | 批處理模式下,單個批次最多寫入的位元組數。 | 否 | 100MB | Integer | 無。 |
sink.buffer-flush.interval | 批處理模式下,非同步重新整理緩衝的間隔。最小1s。 | 否 | 10s | String | 無。 |
sink.properties.* | Stream Load 的匯入參數。 | 否 | (none) | String | CSV 格式配置 JSON 格式配置 |
類型映射
Flink CDC Type | SelectDB Type |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) 說明 在Doris中,字串以UTF-8編碼儲存,因此英文字元佔用1位元組,中文字元佔用3位元組。這裡的長度乘以3。CHAR的最大長度為255。一旦超過,它將自動轉換為VARCHAR類型。 |
VARCHAR(n) | VARCHAR(n*3) 說明 同上。這裡的長度乘以3。VARCHAR的最大長度為65533。一旦超過,它將自動轉換為STRING類型。 |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |