本文為您介紹如何使用StarRocks連接器。
背景資訊
StarRocks是新一代極速全情境MPP(Massively Parallel Processing)資料倉儲,致力於構建極速和統一分析體驗。StarRocks具有以下優勢:
StarRocks相容MySQL協議,可以使用MySQL用戶端和常用BI工具對接StarRocks來分析資料。
StarRocks採用分布式架構:
對資料表進行水平劃分並以多副本儲存。
叢集規模可以靈活伸縮,支援10 PB層級的資料分析。
支援MPP架構,並行加速計算。
支援多副本,具有彈性容錯能力。
Flink連接器內部的結果表是通過緩衝並批量由Stream Load匯入實現,源表是通過批量讀取資料實現。StarRocks連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | CSV |
特有監控指標 | 暫無 |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
EMR的StarRocks支援CTAS&CDAS功能,CTAS可以實現單表的結構和資料同步,CDAS可以實現整庫同步或者同一庫中的多表結構和資料同步,詳情請參見基於Realtime ComputeFlink使用CTAS&CDAS功能同步MySQL資料至StarRocks。
前提條件
已建立StarRocks叢集,包括EMR的StarRocks或基於ECS的雲上自建StarRocks。
使用限制
僅Realtime Compute引擎VVR 6.0.5及以上版本支援StarRocks連接器。
StarRocks連接器僅支援at-least-once和exactly-once語義。
文法結構
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
WITH參數
類型 | 參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
通用 | connector | 表類型。 | String | 是 | 無 | 固定值為starrocks。 |
jdbc-url | JDBC串連的URL。 | String | 是 | 無 | 指定FE(Front End)的IP和JDBC連接埠,格式為 | |
database-name | StarRocks資料庫名稱。 | String | 是 | 無 | 無。 | |
table-name | StarRocks表名稱。 | String | 是 | 無 | 無。 | |
username | StarRocks串連使用者名稱。 | String | 是 | 無 | 無。 | |
password | StarRocks串連密碼。 | String | 是 | 無 | 無。 | |
starrocks.create.table.properties | StarRocks表屬性。 | String | 否 | 無 | 設定資料表初始屬性,如引擎、副本數等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。 | |
源表專屬 | scan-url | 資料掃描的url。 | String | 否 | 無 | 指定FE(Front End)的IP和HTTP連接埠,格式為 說明 填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。 |
scan.connect.timeout-ms | flink-connector-starrocks串連StarRocks的時間上限。 超過該時間上限,將報錯。 | String | 否 | 1000 | 單位為毫秒。 | |
scan.params.keep-alive-min | 查詢任務的保活時間。 | String | 否 | 10 | 無。 | |
scan.params.query-timeout-s | 查詢任務的逾時時間。 如果超過該時間,仍未返回查詢結果,則停止查詢任務。 | String | 否 | 600 | 單位為秒。 | |
scan.params.mem-limit-byte | BE節點中單個查詢的記憶體上限。 | String | 否 | 1073741824(1 GB) | 單位為位元組。 | |
scan.max-retries | 查詢失敗時的最大重試次數。 超過該數量上限,則將報錯。 | String | 否 | 1 | 無。 | |
結果表專屬 | load-url | 資料匯入的URL。 | String | 是 | 無 | 指定FE(Front End)的IP和HTTP連接埠,格式為 說明 填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。 |
sink.semantic | 資料寫入語義。 | String | 否 | at-least-once | 取值如下:
| |
sink.buffer-flush.max-bytes | Buffer可容納的最巨量資料量。 | String | 否 | 94371840(90 MB) | 取值範圍為64 MB~10 GB。 | |
sink.buffer-flush.max-rows | Buffer可容納的最巨量資料行數。 | String | 否 | 500000 | 取值範圍為64,000~5000,000。 | |
sink.buffer-flush.interval-ms | Buffer重新整理時間間隔。 | String | 否 | 300000 | 取值範圍為1000毫秒~3600000毫秒。 | |
sink.max-retries | 最大重試次數。 | String | 否 | 3 | 取值範圍為0~10。 | |
sink.connect.timeout-ms | 串連到starrocks的逾時時間。 | String | 否 | 1000 | 取值範圍為100~60000。單位為毫秒。 | |
sink.properties.* | 結果表屬性。 | String | 否 | 無 | Stream Load的參數控制Stream Load匯入行為。例如,參數 sink.properties.format表示Stream Load所匯入的資料格式,如CSV。更多參數和解釋,請參見Stream Load。 |
類型映射
StarRocks欄位類型 | Flink欄位類型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
程式碼範例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;