全部產品
Search
文件中心

Realtime Compute for Apache Flink:SelectDB

更新時間:Jan 15, 2026

本文介紹如何使用SelectDB連接器。

背景資訊

雲資料庫 SelectDB 版是新一代即時資料倉庫SelectDB在阿里雲上的全託管服務,100%相容Apache Doris。您可以在阿里雲上便捷地購買SelectDB數倉服務,滿足海量資料分析需求,具體的產品優勢和應用情境請參見什麼是雲資料庫SelectDB版

自訂SelectDB連接器支援的資訊如下:

類別

詳情

支援類型

源表,結果表,維表和資料攝入目標端

運行模式

流模式和批模式

資料格式

JSON和CSV

特有監控指標

API種類

DataStream和SQL

是否支援更新/刪除

特色功能

  • 支援整庫資料同步。

  • SelectDB連接器提供Exactly-Once語義,保證資料不重複也不丟失。

  • 相容1.0及以上Apache Doris,可以使用Flink SelectDB自訂連接器同步資料至Apache Doris。

注意事項

  • 僅Realtime ComputeFlink版的引擎VVR 8.0.10及以上版本支援使用SelectDB自訂連接器。

  • SelectDB自訂連接器使用過程如有問題,請先提交工單給雲資料庫SelectDB版。

  • 同步資料至雲資料庫SelectDB版時,需要滿足以下條件:

    • 已建立ApsaraDB for SelectDB執行個體,如何購買執行個體請參見建立執行個體

    • 已配置IP白名單,配置白名單詳情請參見設定白名單

SQL

使用方法

說明

Realtime ComputeVVR 11.1及以上版本已內建SelectDB連接器,可跳過以下步驟。

  1. 單擊JAR包擷取SelectDB自訂連接器(需要為1.15~1.17)。

  2. Realtime Compute開發控制台上,上傳SelectDB自訂連接器,詳情請參見管理自訂連接器

  3. 在SQL作業中使用SelectDB自訂連接器,connector固定值為doris

文法結構

說明

作為源表,需要開通叢集直連,啟用Arrow Flight功能。

ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中單擊開通叢集直連。

CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為doris

    fenodes

    ApsaraDB for SelectDB執行個體的訪問和HTTP協議址地連接埠。

    String

    可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

    樣本:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

    jdbc-url

    jdbc 串連資訊,

    String

    可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

    樣本:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

    table.identifier

    資料庫表名。

    String

    樣本:db.tbl

    username

    使用者名稱

    String

    如果遺忘密碼,可以從ApsaraDB for SelectDB控制台的執行個體詳情右上方進行重設。

    password

    密碼

    String

    doris.request.retries

    發送請求的重試次數。

    Integer

    3

    無。

    doris.request.connect.timeout

    發送請求的連線逾時時間。

    Duration

    30s

    無。

    doris.request.read.timeout

    發送請求的讀取逾時時間。

    Duration

    30s

    無。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    doris.request.query.timeout

    查詢逾時時間,預設值為 6 小時

    Duration

    21600s

    固定值為doris

    doris.request.tablet.size

    一個 Partition 對應的 Tablet 個數。

    Integer

    1

    此數值設定越小,則會產生越多的 Partition。從而提升 Flink 側的並行度,但同時會對資料庫造成更大的壓力。

    doris.batch.size

    一次從 BE 讀取資料的最大行數。

    Integer

    4064

    增大此數值可減少 Flink 與資料庫之間建立串連的次數。從而減輕網路延遲所帶來的額外時間開銷。

    doris.exec.mem.limit

    單個查詢的記憶體限制。

    Integer

    8192mb

    預設為 8GB,單位為位元組。

    source.use-flight-sql

    是否使用 Arrow Flight SQL 讀取。

    Boolean

    false

    無需配置。請直接從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中單擊開通叢集直連。

    source.flight-sql-port

    使用 Arrow Flight SQL 讀取時,FE 的 arrow_flight_sql_port。

    Integer

    -

    無。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sink.label-prefix

    Stream Load 匯入使用的 label 首碼。

    String

    --

    多作業情境下要求全域唯一,用來保證 Flink 的 EOS 語義。相同的Label只能匯入一次,確保不重複寫入。

    sink.properties.*

    Stream Load 的匯入參數。

    String

    --

    CSV 格式配置

    'sink.properties.column_separator' = ',', -- 使用逗號分隔
    -- 如果資料中可能包含逗號,建議使用不可見字元,如:
    -- 'sink.properties.column_separator' = '\x01'

    JSON 格式配置

    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true' -- 或使用 strip_outer_array

    sink.enable-delete

    是否啟用刪除。此選項需要 Doris 表開啟大量刪除功能。

    Boolean

    true

    只支援 Unique 模型。

    sink.enable-2pc

    是否開啟兩階段交易認可 (2PC)。

    Boolean

    true

    保證 Exactly-Once 語義。更多兩階段交易認可請參考明確交易操作

    sink.buffer-size

    寫資料緩衝 buffer 大小。

    Integer

    1MB

    單位位元組。不建議修改,預設配置即可。

    sink.buffer-count

    寫資料緩衝 buffer 個數。

    Integer

    3

    不建議修改,預設配置即可

    sink.max-retries

    Commit 失敗後的最大重試次數。

    Integer

    3

    無。

    sink.enable.batch-mode

    是否使用攢批模式寫入。

    Boolean

    false

    開啟後寫入時機不依賴 Checkpoint,通過sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval參數來控制寫入時機。

    同時開啟後將不保證 Exactly-once 語義,但可藉助 Uniq 模型做到等冪。

    sink.flush.queue-size

    攢批模式下,緩衝的隊列大小。

    Integer

    2

    無。

    sink.buffer-flush.max-rows

    攢批模式下,單個批次最多寫入的資料行數。

    Integer

    500000

    無。

    sink.buffer-flush.max-bytes

    攢批模式下,單個批次最多寫入的位元組數。

    Integer

    100MB

    單位位元組。

    sink.buffer-flush.interval

    攢批模式下,非同步重新整理緩衝的間隔。

    String

    10s

    單位毫秒。

    sink.ignore.update-before

    是否忽略 update-before 事件。

    Boolean

    true

    無。

  • 維表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    lookup.cache.max-rows

    lookup 緩衝的最大行數。

    Integer

    -1

    -1預設為不開啟緩衝。

    lookup.cache.ttl

    lookup 緩衝的最大時間。

    String

    10s

    單位毫秒。

    lookup.max-retries

    lookup 查詢失敗後的重試次數

    Integer

    1

    無。

    lookup.jdbc.async

    是否開啟非同步 lookup。

    Boolean

    false

    無。

    lookup.jdbc.read.batch.size

    非同步 lookup 下,每次查詢的最大批次大小。

    Integer

    128

    無。

    lookup.jdbc.read.batch.queue-size

    非同步 lookup 時,中間緩衝隊列的大小。

    Integer

    256

    無。

    lookup.jdbc.read.thread-size

    每個 task 中 lookup 的 jdbc 線程數。

    Integer

    3

    無。

使用樣本

源表

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

結果表

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****',
--  'sink.label-prefix' = 'flink_orders' --相同的Label只能匯入一次,確保不重複寫入。
);

維表

CREATE TEMPORARY TABLE fact_table (
  `id` BIGINT,
  `name` STRING,
  `city` STRING,
  `process_time` as proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

create TEMPORARY table dim_city(
  `city` STRING,
  `level` INT ,
  `province` STRING,
  `country` STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username' = 'admin',
  'password' = '****'
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

資料攝入

SelectDB連接器可以用於資料攝入YAML作業開發,作為目標端寫入。

文法結構

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
   username: root
   password: ""

配置項

參數

說明

是否必填

預設值

資料類型

備忘

type

目標端類型。

(none)

String

固定值為 doris

name

目標端名稱。

(none)

String

無。

fenodes

ApsaraDB for SelectDB執行個體的訪問和HTTP協議址地連接埠。

(none)

String

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

ApsaraDB for SelectDB執行個體的JDBC串連資訊。

(none)

String

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

樣本:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

username

ApsaraDB for SelectDB執行個體的資料庫使用者名稱。

(none)

String

如果遺忘密碼,可以從ApsaraDB for SelectDB控制台的執行個體詳情右上方進行重設。

password

ApsaraDB for SelectDB執行個體對應資料庫使用者名稱的密碼。

(none)

String

sink.enable.batch-mode

是否使用攢批模式寫入SelectDB

true

Boolean

開啟後寫入時機不依賴 Checkpoint,通過sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval參數來控制寫入時機。

同時開啟後將不保證 Exactly-once 語義,但可藉助 Uniq 模型做到等冪。

sink.flush.queue-size

批處理模式下,緩衝的隊列大小。

2

Integer

Queue size for batch writing

sink.buffer-flush.max-rows

批處理模式下,單個批次最多寫入的資料行數。

500000

Integer

無。

sink.buffer-flush.max-bytes

批處理模式下,單個批次最多寫入的位元組數。

100MB

Integer

無。

sink.buffer-flush.interval

批處理模式下,非同步重新整理緩衝的間隔。最小1s。

10s

String

無。

sink.properties.*

Stream Load 的匯入參數。

(none)

String

CSV 格式配置

sink.properties.column_separator: ',', -- 使用逗號分隔
-- 如果資料中可能包含逗號,建議使用不可見字元,如:
-- sink.properties.column_separator: '\x01'

JSON 格式配置

'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true' -- 或使用 strip_outer_array

類型映射

Flink CDC Type

SelectDB Type

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

說明

在Doris中,字串以UTF-8編碼儲存,因此英文字元佔用1位元組,中文字元佔用3位元組。這裡的長度乘以3。CHAR的最大長度為255。一旦超過,它將自動轉換為VARCHAR類型。

VARCHAR(n)

VARCHAR(n*3)

說明

同上。這裡的長度乘以3。VARCHAR的最大長度為65533。一旦超過,它將自動轉換為STRING類型。

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING