本文為您介紹如何使用StarRocks連接器。
背景資訊
StarRocks是新一代極速全情境MPP(Massively Parallel Processing)資料倉儲,致力於構建極速和統一分析體驗。StarRocks具有以下優勢:
StarRocks相容MySQL協議,可以使用MySQL用戶端和常用BI工具對接StarRocks來分析資料。
StarRocks採用分布式架構:
對資料表進行水平劃分並以多副本儲存。
叢集規模可以靈活伸縮,支援10 PB層級的資料分析。
支援MPP架構,並行加速計算。
支援多副本,具有彈性容錯能力。
Flink連接器內部的結果表是通過緩衝並批量由Stream Load匯入實現,源表是通過批量讀取資料實現。StarRocks連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表和結果表、資料攝入目標端 |
運行模式 | 流模式和批模式 |
資料格式 | CSV |
特有監控指標 | 暫無 |
API種類 | Datastream、SQL和資料攝入YAML |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立StarRocks叢集,包括EMR的StarRocks或基於ECS的雲上自建StarRocks。
使用限制
僅Realtime Compute引擎VVR 6.0.5及以上版本支援StarRocks連接器。
StarRocks連接器僅支援at-least-once和exactly-once語義。
SQL
特色功能
EMR的StarRocks支援CTAS&CDAS功能,CTAS可以實現單表的結構和資料同步,CDAS可以實現整庫同步或者同一庫中的多表結構和資料同步,詳情請參見基於Realtime ComputeFlink使用CTAS&CDAS功能同步MySQL資料至StarRocks。
文法結構
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
WITH參數
類型 | 參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
通用 | connector | 表類型。 | String | 是 | 無 | 固定值為starrocks。 |
jdbc-url | JDBC串連的URL。 | String | 是 | 無 | 指定FE(Front End)的IP和JDBC連接埠,格式為 | |
database-name | StarRocks資料庫名稱。 | String | 是 | 無 | 無。 | |
table-name | StarRocks表名稱。 | String | 是 | 無 | 無。 | |
username | StarRocks串連使用者名稱。 | String | 是 | 無 | 無。 | |
password | StarRocks串連密碼。 | String | 是 | 無 | 無。 | |
starrocks.create.table.properties | StarRocks表屬性。 | String | 否 | 無 | 設定資料表初始屬性,如引擎、副本數等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。 | |
源表專屬 | scan-url | 資料掃描的url。 | String | 否 | 無 | 指定FE(Front End)的IP和HTTP連接埠,格式為 說明 填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。 |
scan.connect.timeout-ms | flink-connector-starrocks串連StarRocks的時間上限。 超過該時間上限,將報錯。 | String | 否 | 1000 | 單位為毫秒。 | |
scan.params.keep-alive-min | 查詢任務的保活時間。 | String | 否 | 10 | 無。 | |
scan.params.query-timeout-s | 查詢任務的逾時時間。 如果超過該時間,仍未返回查詢結果,則停止查詢任務。 | String | 否 | 600 | 單位為秒。 | |
scan.params.mem-limit-byte | BE節點中單個查詢的記憶體上限。 | String | 否 | 1073741824(1 GB) | 單位為位元組。 | |
scan.max-retries | 查詢失敗時的最大重試次數。 超過該數量上限,則將報錯。 | String | 否 | 1 | 無。 | |
結果表專屬 | load-url | 資料匯入的URL。 | String | 是 | 無 | 指定FE(Front End)的IP和HTTP連接埠,格式為 說明 填寫多個IP和連接埠號碼時,請使用半形分號(;)進行分隔。 |
sink.semantic | 資料寫入語義。 | String | 否 | at-least-once | 取值如下:
| |
sink.buffer-flush.max-bytes | Buffer可容納的最巨量資料量。 | String | 否 | 94371840(90 MB) | 取值範圍為64 MB~10 GB。 | |
sink.buffer-flush.max-rows | Buffer可容納的最巨量資料行數。 | String | 否 | 500000 | 取值範圍為64,000~5000,000。 | |
sink.buffer-flush.interval-ms | Buffer重新整理時間間隔。 | String | 否 | 300000 | 取值範圍為1000毫秒~3600000毫秒。 | |
sink.max-retries | 最大重試次數。 | String | 否 | 3 | 取值範圍為0~10。 | |
sink.connect.timeout-ms | 串連到starrocks的逾時時間。 | String | 否 | 1000 | 取值範圍為100~60000。單位為毫秒。 | |
sink.properties.* | 結果表屬性。 | String | 否 | 無 | Stream Load的參數控制Stream Load匯入行為。例如,參數 sink.properties.format表示Stream Load所匯入的資料格式,如CSV。更多參數和解釋,請參見Stream Load。 |
類型映射
StarRocks欄位類型 | Flink欄位類型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(n × 3) 說明
| CHAR(n) (n <= 85 時) |
VARCHAR(n × 3) 說明
| CHAR(n) (n > 85 時) |
VARCHAR | STRING |
VARBINARY 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 | VARBINARY |
程式碼範例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
資料攝入
使用StarRocks Pipeline連接器,您可以輕鬆地將來自上遊資料來源的資料記錄和表結構變更寫入外部StarRocks資料庫。StarRocks連接器同時支援社區版與阿里雲E-MapReduce Serverless StarRocks全託管版本。
特色功能
自動建庫建表
如果來自上遊的資料庫及資料表不存在於下遊StarRocks執行個體中,則對應的資料庫及資料表會被自動建立。您可以通過
table.create.properties.*
參數設定自動建立表時的選項。表結構變更同步
目前,StarRocks連接器支援自動將建表事件(CreateTableEvent)、增加列事件(AddColumnEvent)和刪除列(DropColumnEvent)事件自動應用到下遊資料庫中。
注意事項
目前StarRocks連接器只支援At-least Once語義,並通過主鍵表來保證等冪寫入。
目前,同步的表必須包含主鍵。不含主鍵的表必須通過
transform
語句塊指定主鍵方可正常寫入下遊。例如:transform: - source-table: ... primary-keys: id, ...
自動建立的表分桶鍵與主鍵相同,且不可有分區鍵。
進行表結構變更同步時,新增列只能追加到已有列的尾部。在預設的表結構演化模式Lenient下,會自動將其他位置的插入轉換到尾部。
如果您使用的StarRocks版本低於2.5.7,則必須顯式地通過
table.create.num-buckets
參數指定分桶數量。更高版本的StarRocks可以自動設定合適的分桶數。如果您使用的是StarRocks 3.2或更高版本,建議開啟
table.create.properties.fast_schema_evolution
選項來加快表結構變更的速度。
文法結構
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
配置項
參數名稱 | 描述 | 類型 | 是否必填 | 預設值 | 備忘 |
| 連接器的名稱。 | String | 是 | 無 | 固定值為 |
| Sink的顯示名稱。 | String | 否 | 無 | 無。 |
| JDBC串連的URL。 | String | 是 | 無 | 支援傳入多個地址,使用英文逗號 ( |
| 串連到FE節點的HTTP服務URL。 | String | 是 | 無 | 支援傳入多個地址,使用英文分號 ( |
| 串連到 StarRocks時使用的使用者名稱。 | String | 是 | 無 | 該使用者至少需要具備對目標表的SELECT和INSERT許可權。您可以使用StarRocks的GRANT命令賦予相應的許可權。 |
| 串連到 StarRocks時使用的密碼。 | String | 是 | 無 | 無。 |
| 在進行Stream Load匯入時使用的標籤首碼。 | String | 否 | 無 | 無。 |
| 建立HTTP串連時的逾時時間。 | Integer | 否 | 30000 | 單位為毫秒,取值需要介於100 ~ 60000。 |
| 從伺服器得到100 Continue請求前的逾時時間)。 | Integer | 否 | 30000 | 單位為毫秒。取值需要介於3000 ~ 600000。 |
| 在將資料寫入StarRocks前,最多可以在記憶體中緩衝多大量的資料。 | Long | 否 | 157286400 | 單位為位元組,取值需要介於64 MB ~ 10 GB。 說明
|
| 每張表連續兩次Flush之間的間隔時間。 | Long | 否 | 300000 | 單位為毫秒。 |
| 連續兩次檢查是否應該進行Flush之間的間隔時間。 | Long | 否 | 50 | 單位為毫秒。 |
| 在進行 Stream Load匯入時的線程數量。 | Integer | 否 | 2 | 無。 |
| 是否使用Stream Load事務介面進行匯入。 | Boolean | 否 | true | 僅在資料庫支援的情況下生效。 |
| 提供給Sink的額外參數。 | String | 否 | 無 | 可以在STREAM LOAD查看支援的參數。 |
| 自動建表時的Bucket數量。 | Integer | 否 | 無 |
|
| 在自動建表時需要傳遞的額外參數。 | String | 否 | 無 | 例如,可以傳遞 |
| 執行表結構變更的逾時時間。 | Duration | 否 | 30 min | 必須設定為整數秒。 說明 如果某個表結構變更操作耗時超過此限制,作業將運行失敗。 |
類型映射
StarRocks並不支援所有的CDC YAML類型,嘗試將不支援的類型寫入下遊會導致作業失敗。您可以使用Transform CAST內建函數對不支援的資料進行轉換,或是使用Projection語句將其從結果表中移除。詳情請參考資料攝入開發參考。
CDC類型 | StarRocks類型 | 附註 |
TINYINT | TINYINT | 無 |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
CHAR(n) (n <= 85 時) | CHAR(n × 3) | CDC中的CHAR類型長度表示字元數,而StarRocks中的CHAR類型長度表示UTF-8編碼後的位元組數。通常情況下,一個中文字元經過UTF-8編碼後不會超過3位元組,因此映射到的StarRocks CHAR類型長度為原來的3倍。 說明 StarRocks的CHAR類型長度最長不可超過255,因此只有長度不超過85的CDC CHAR類型才會被映射到StarRocks CHAR類型。 |
CHAR(n) (n > 85 時) | VARCHAR(n × 3) | CDC中的CHAR類型長度表示字元數,而StarRocks中的CHAR類型長度表示UTF-8編碼後的位元組數。通常情況下,一個中文字元經過UTF-8編碼後不會超過3位元組,因此映射到的 StarRocks VARCHAR類型長度為原來的3倍。 說明 StarRocks的CHAR類型長度最長不可超過255,因此長度大於85的CDC CHAR類型會被映射到StarRocks VARCHAR類型。 |
VARCHAR(n) | VARCHAR(n × 3) | CDC中的VARCHAR類型長度表示字元數,而StarRocks中的VARCHAR類型長度表示UTF-8編碼後的位元組數。通常情況下,一個中文字元經過UTF-8編碼後不會超過3位元組,因此映射到的StarRocks VARCHAR類型長度為原來的3倍。 |