全部產品
Search
文件中心

Realtime Compute for Apache Flink:StarRocks

更新時間:Nov 23, 2024

本文為您介紹如何使用StarRocks連接器。

背景資訊

StarRocks是新一代極速全情境MPP(Massively Parallel Processing)資料倉儲,致力於構建極速和統一分析體驗。StarRocks具有以下優勢:

  • StarRocks相容MySQL協議,可以使用MySQL用戶端和常用BI工具對接StarRocks來分析資料。

  • StarRocks採用分布式架構:

    • 對資料表進行水平劃分並以多副本儲存。

    • 叢集規模可以靈活伸縮,支援10 PB層級的資料分析。

    • 支援MPP架構,並行加速計算。

    • 支援多副本,具有彈性容錯能力。

Flink連接器內部的結果表是通過緩衝並批量由Stream Load匯入實現,源表是通過批量讀取資料實現。StarRocks連接器支援的資訊如下。

類別

詳情

支援類型

源表和結果表、資料攝入目標端

運行模式

流模式和批模式

資料格式

CSV

特有監控指標

暫無

API種類

Datastream、SQL和資料攝入YAML

是否支援更新或刪除結果表資料

前提條件

已建立StarRocks叢集,包括EMR的StarRocks或基於ECS的雲上自建StarRocks。

使用限制

  • 僅Realtime Compute引擎VVR 6.0.5及以上版本支援StarRocks連接器。

  • StarRocks連接器僅支援at-least-once和exactly-once語義。

SQL

特色功能

EMR的StarRocks支援CTAS&CDAS功能,CTAS可以實現單表的結構和資料同步,CDAS可以實現整庫同步或者同一庫中的多表結構和資料同步,詳情請參見基於Realtime ComputeFlink使用CTAS&CDAS功能同步MySQL資料至StarRocks

文法結構

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

WITH參數

類型

參數

說明

資料類型

是否必填

預設值

備忘

通用

connector

表類型。

String

固定值為starrocks。

jdbc-url

JDBC串連的URL。

String

指定FE(Front End)的IP和JDBC連接埠,格式為jdbc:mysql://ip:port

database-name

StarRocks資料庫名稱。

String

無。

table-name

StarRocks表名稱。

String

無。

username

StarRocks串連使用者名稱。

String

無。

password

StarRocks串連密碼。

String

無。

starrocks.create.table.properties

StarRocks表屬性。

String

設定資料表初始屬性,如引擎、副本數等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。

源表專屬

scan-url

資料掃描的url。

String

指定FE(Front End)的IP和HTTP連接埠,格式為fe_ip:http_port;fe_ip:http_port

說明

填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。

scan.connect.timeout-ms

flink-connector-starrocks串連StarRocks的時間上限。

超過該時間上限,將報錯。

String

1000

單位為毫秒。

scan.params.keep-alive-min

查詢任務的保活時間。

String

10

無。

scan.params.query-timeout-s

查詢任務的逾時時間。

如果超過該時間,仍未返回查詢結果,則停止查詢任務。

String

600

單位為秒。

scan.params.mem-limit-byte

BE節點中單個查詢的記憶體上限。

String

1073741824(1 GB)

單位為位元組。

scan.max-retries

查詢失敗時的最大重試次數。

超過該數量上限,則將報錯。

String

1

無。

結果表專屬

load-url

資料匯入的URL。

String

指定FE(Front End)的IP和HTTP連接埠,格式為fe_ip:http_port;fe_ip:http_port

說明

填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。

sink.semantic

資料寫入語義。

String

at-least-once

取值如下:

  • at-least-once(預設值):至少一次。

  • exactly-once:恰好一次。

sink.buffer-flush.max-bytes

Buffer可容納的最巨量資料量。

String

94371840(90 MB)

取值範圍為64 MB~10 GB。

sink.buffer-flush.max-rows

Buffer可容納的最巨量資料行數。

String

500000

取值範圍為64,000~5000,000。

sink.buffer-flush.interval-ms

Buffer重新整理時間間隔。

String

300000

取值範圍為1000毫秒~3600000毫秒。

sink.max-retries

最大重試次數。

String

3

取值範圍為0~10。

sink.connect.timeout-ms

串連到starrocks的逾時時間。

String

1000

取值範圍為100~60000。單位為毫秒。

sink.properties.*

結果表屬性。

String

Stream Load的參數控制Stream Load匯入行為。例如,參數 sink.properties.format表示Stream Load所匯入的資料格式,如CSV。更多參數和解釋,請參見Stream Load

類型映射

StarRocks欄位類型

Flink欄位類型

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

BIGINT UNSIGNED

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

DECIMAL(20,0)

LARGEINT

DECIMAL(20,0)

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR(n × 3)

說明
  • 僅Realtime Compute引擎VVR 8.0.10及以上版本,CHAR類型長度能夠自動擴充至三倍,以適配MySQL和StarRocks之間的編碼差異。

  • StarRocks的CHAR類型長度最長不可超過255,因此只有長度不超過85的Flink CHAR類型才會被映射到StarRocks CHAR類型。

CHAR(n)

(n <= 85 時)

VARCHAR(n × 3)

說明
  • 僅Realtime Compute引擎VVR 8.0.10及以上版本,VARCHAR類型長度能夠自動擴充至三倍,以適配MySQL和StarRocks之間的編碼差異。

  • StarRocks的CHAR類型長度最長不可超過255,因此長度大於85的Flink CHAR類型會被映射到StarRocks VARCHAR類型。

CHAR(n)

(n > 85 時)

VARCHAR

STRING

VARBINARY

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

VARBINARY

程式碼範例

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;

資料攝入

使用StarRocks Pipeline連接器,您可以輕鬆地將來自上遊資料來源的資料記錄和表結構變更寫入外部StarRocks資料庫。StarRocks連接器同時支援社區版與阿里雲E-MapReduce Serverless StarRocks全託管版本。

特色功能

  • 自動建庫建表

    如果來自上遊的資料庫及資料表不存在於下遊StarRocks執行個體中,則對應的資料庫及資料表會被自動建立。您可以通過table.create.properties.*參數設定自動建立表時的選項。

  • 表結構變更同步

    目前,StarRocks連接器支援自動將建表事件(CreateTableEvent)、增加列事件(AddColumnEvent)和刪除列(DropColumnEvent)事件自動應用到下遊資料庫中。

注意事項

  • 目前StarRocks連接器只支援At-least Once語義,並通過主鍵表來保證等冪寫入。

  • 目前,同步的表必須包含主鍵。不含主鍵的表必須通過transform 語句塊指定主鍵方可正常寫入下遊。例如:

    transform:
      - source-table: ...
        primary-keys: id, ...
  • 自動建立的表分桶鍵與主鍵相同,且不可有分區鍵。

  • 進行表結構變更同步時,新增列只能追加到已有列的尾部。在預設的表結構演化模式Lenient下,會自動將其他位置的插入轉換到尾部。

  • 如果您使用的StarRocks版本低於2.5.7,則必須顯式地通過table.create.num-buckets參數指定分桶數量。更高版本的StarRocks可以自動設定合適的分桶數

  • 如果您使用的是StarRocks 3.2或更高版本,建議開啟table.create.properties.fast_schema_evolution選項來加快表結構變更的速度。

文法結構

source:
  ...

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

配置項

參數名稱

描述

類型

是否必填

預設值

備忘

type

連接器的名稱。

String

固定值為starrocks

name

Sink的顯示名稱。

String

無。

jdbc-url

JDBC串連的URL。

String

支援傳入多個地址,使用英文逗號 (,) 分隔。例如 jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3

load-url

串連到FE節點的HTTP服務URL。

String

支援傳入多個地址,使用英文分號 (;) 分隔。例如 fe_host1:fe_http_port1;fe_host2:fe_http_port2

username

串連到 StarRocks時使用的使用者名稱。

String

該使用者至少需要具備對目標表的SELECT和INSERT許可權。您可以使用StarRocks的GRANT命令賦予相應的許可權。

password

串連到 StarRocks時使用的密碼。

String

無。

sink.label-prefix

在進行Stream Load匯入時使用的標籤首碼。

String

無。

sink.connect.timeout-ms

建立HTTP串連時的逾時時間。

Integer

30000

單位為毫秒,取值需要介於100 ~ 60000。

sink.wait-for-continue.timeout-ms

從伺服器得到100 Continue請求前的逾時時間)。

Integer

30000

單位為毫秒。取值需要介於3000 ~ 600000。

sink.buffer-flush.max-bytes

在將資料寫入StarRocks前,最多可以在記憶體中緩衝多大量的資料。

Long

157286400

單位為位元組,取值需要介於64 MB ~ 10 GB。

說明
  • 該緩衝大小被所有表共用。當緩衝區已滿時,連接器將選擇若干張表進行Flush。

  • 將此參數設定為較大的值可以提高輸送量,但可能會增加匯入時的延遲。

sink.buffer-flush.interval-ms

每張表連續兩次Flush之間的間隔時間。

Long

300000

單位為毫秒。

sink.scan-frequency.ms

連續兩次檢查是否應該進行Flush之間的間隔時間。

Long

50

單位為毫秒。

sink.io.thread-count

在進行 Stream Load匯入時的線程數量。

Integer

2

無。

sink.at-least-once.use-transaction-stream-load

是否使用Stream Load事務介面進行匯入。

Boolean

true

僅在資料庫支援的情況下生效。

sink.properties.*

提供給Sink的額外參數。

String

可以在STREAM LOAD查看支援的參數。

table.create.num-buckets

自動建表時的Bucket數量。

Integer

table.create.properties.*

在自動建表時需要傳遞的額外參數。

String

例如,可以傳遞'table.create.properties.fast_schema_evolution' = 'true'來啟用快速表結構變更功能。參數詳情請參見StarRocks文檔

table.schema-change.timeout

執行表結構變更的逾時時間。

Duration

30 min

必須設定為整數秒。

說明

如果某個表結構變更操作耗時超過此限制,作業將運行失敗。

類型映射

說明

StarRocks並不支援所有的CDC YAML類型,嘗試將不支援的類型寫入下遊會導致作業失敗。您可以使用Transform CAST內建函數對不支援的資料進行轉換,或是使用Projection語句將其從結果表中移除。詳情請參考資料攝入開發參考

CDC類型

StarRocks類型

附註

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP

DATETIME

TIMESTAMP_LTZ

DATETIME

CHAR(n)

(n <= 85 時)

CHAR(n × 3)

CDC中的CHAR類型長度表示字元數,而StarRocks中的CHAR類型長度表示UTF-8編碼後的位元組數。通常情況下,一個中文字元經過UTF-8編碼後不會超過3位元組,因此映射到的StarRocks CHAR類型長度為原來的3倍。

說明

StarRocks的CHAR類型長度最長不可超過255,因此只有長度不超過85的CDC CHAR類型才會被映射到StarRocks CHAR類型。

CHAR(n)

(n > 85 時)

VARCHAR(n × 3)

CDC中的CHAR類型長度表示字元數,而StarRocks中的CHAR類型長度表示UTF-8編碼後的位元組數。通常情況下,一個中文字元經過UTF-8編碼後不會超過3位元組,因此映射到的 StarRocks VARCHAR類型長度為原來的3倍。

說明

StarRocks的CHAR類型長度最長不可超過255,因此長度大於85的CDC CHAR類型會被映射到StarRocks VARCHAR類型。

VARCHAR(n)

VARCHAR(n × 3)

CDC中的VARCHAR類型長度表示字元數,而StarRocks中的VARCHAR類型長度表示UTF-8編碼後的位元組數。通常情況下,一個中文字元經過UTF-8編碼後不會超過3位元組,因此映射到的StarRocks VARCHAR類型長度為原來的3倍。