全部產品
Search
文件中心

Realtime Compute for Apache Flink:StarRocks

更新時間:Sep 13, 2024

本文為您介紹如何使用StarRocks連接器。

背景資訊

StarRocks是新一代極速全情境MPP(Massively Parallel Processing)資料倉儲,致力於構建極速和統一分析體驗。StarRocks具有以下優勢:

  • StarRocks相容MySQL協議,可以使用MySQL用戶端和常用BI工具對接StarRocks來分析資料。

  • StarRocks採用分布式架構:

    • 對資料表進行水平劃分並以多副本儲存。

    • 叢集規模可以靈活伸縮,支援10 PB層級的資料分析。

    • 支援MPP架構,並行加速計算。

    • 支援多副本,具有彈性容錯能力。

Flink連接器內部的結果表是通過緩衝並批量由Stream Load匯入實現,源表是通過批量讀取資料實現。StarRocks連接器支援的資訊如下。

類別

詳情

支援類型

源表和結果表

運行模式

流模式和批模式

資料格式

CSV

特有監控指標

暫無

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

特色功能

EMR的StarRocks支援CTAS&CDAS功能,CTAS可以實現單表的結構和資料同步,CDAS可以實現整庫同步或者同一庫中的多表結構和資料同步,詳情請參見基於Realtime ComputeFlink使用CTAS&CDAS功能同步MySQL資料至StarRocks

前提條件

已建立StarRocks叢集,包括EMR的StarRocks或基於ECS的雲上自建StarRocks。

使用限制

  • 僅Realtime Compute引擎VVR 6.0.5及以上版本支援StarRocks連接器。

  • StarRocks連接器僅支援at-least-once和exactly-once語義。

文法結構

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

WITH參數

類型

參數

說明

資料類型

是否必填

預設值

備忘

通用

connector

表類型。

String

固定值為starrocks。

jdbc-url

JDBC串連的URL。

String

指定FE(Front End)的IP和JDBC連接埠,格式為jdbc:mysql://ip:port

database-name

StarRocks資料庫名稱。

String

無。

table-name

StarRocks表名稱。

String

無。

username

StarRocks串連使用者名稱。

String

無。

password

StarRocks串連密碼。

String

無。

starrocks.create.table.properties

StarRocks表屬性。

String

設定資料表初始屬性,如引擎、副本數等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。

源表專屬

scan-url

資料掃描的url。

String

指定FE(Front End)的IP和HTTP連接埠,格式為fe_ip:http_port;fe_ip:http_port

說明

填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。

scan.connect.timeout-ms

flink-connector-starrocks串連StarRocks的時間上限。

超過該時間上限,將報錯。

String

1000

單位為毫秒。

scan.params.keep-alive-min

查詢任務的保活時間。

String

10

無。

scan.params.query-timeout-s

查詢任務的逾時時間。

如果超過該時間,仍未返回查詢結果,則停止查詢任務。

String

600

單位為秒。

scan.params.mem-limit-byte

BE節點中單個查詢的記憶體上限。

String

1073741824(1 GB)

單位為位元組。

scan.max-retries

查詢失敗時的最大重試次數。

超過該數量上限,則將報錯。

String

1

無。

結果表專屬

load-url

資料匯入的URL。

String

指定FE(Front End)的IP和HTTP連接埠,格式為fe_ip:http_port;fe_ip:http_port

說明

填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。

sink.semantic

資料寫入語義。

String

at-least-once

取值如下:

  • at-least-once(預設值):至少一次。

  • exactly-once:恰好一次。

sink.buffer-flush.max-bytes

Buffer可容納的最巨量資料量。

String

94371840(90 MB)

取值範圍為64 MB~10 GB。

sink.buffer-flush.max-rows

Buffer可容納的最巨量資料行數。

String

500000

取值範圍為64,000~5000,000。

sink.buffer-flush.interval-ms

Buffer重新整理時間間隔。

String

300000

取值範圍為1000毫秒~3600000毫秒。

sink.max-retries

最大重試次數。

String

3

取值範圍為0~10。

sink.connect.timeout-ms

串連到starrocks的逾時時間。

String

1000

取值範圍為100~60000。單位為毫秒。

sink.properties.*

結果表屬性。

String

Stream Load的參數控制Stream Load匯入行為。例如,參數 sink.properties.format表示Stream Load所匯入的資料格式,如CSV。更多參數和解釋,請參見Stream Load

類型映射

StarRocks欄位類型

Flink欄位類型

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

LARGEINT

STRING

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR

CHAR

VARCHAR

STRING

程式碼範例

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;