Flink Connector內部實現是通過緩衝並批量由Stream Load匯入。本文為您介紹Flink Connector的使用方式及樣本。
背景資訊
因為Flink官方只提供了flink-connector-jdbc,不足以滿足匯入效能要求,所以新增了flink-connector-starrocks,其內部實現是通過緩衝並批量由Stream Load匯入。
使用方式
您可以下載源碼進行測試:下載flink-connector-starrocks源碼。
將以下內容添加pom.xml中。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for flink-1.11, flink-1.12 -->
<version>x.x.x_flink-1.11</version>
<!-- for flink-1.13 -->
<version>x.x.x_flink-1.13</version>
</dependency>
說明 您可以在版本資訊頁面,查看Latest Version資訊,替換代碼中的
x.x.x
。程式碼範例如下:
- 方式一
// -------- sink with raw json string stream -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build() ) ); // -------- sink with stream transformation -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.properties.row_delimiter", "\\x02") .build(), // set the slots with streamRowData (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) );
- 方式二
// create a table with `structure` and `properties` // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," + "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.max-rows' = '1000000'," + "'sink.buffer-flush.max-bytes' = '300000000'," + "'sink.buffer-flush.interval-ms' = '5000'," + "'sink.properties.column_separator' = '\\x01'," + "'sink.properties.row_delimiter' = '\\x02'," + "'sink.max-retries' = '3'" + "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'` ")" );
其中,Sink選項如下表所示。
參數 | 是否必選 | 預設值 | 類型 | 描述 |
connector | 是 | 無 | String | starrocks。 |
jdbc-url | 是 | 無 | String | 用於在StarRocks中執行查詢操作。 |
load-url | 是 | 無 | String | 指定FE的IP和HTTP連接埠,格式為fe_ip:http_port;fe_ip:http_port ,多個時使用半形分號(;)分隔。 |
database-name | 是 | 無 | String | StarRocks資料庫名稱。 |
table-name | 是 | 無 | String | StarRocks表名稱。 |
username | 是 | 無 | String | StarRocks串連使用者名稱。 |
password | 是 | 無 | String | StarRocks串連密碼。 |
sink.semantic | 否 | at-least-once | String | 取值為at-least-once或exactly-once。 |
sink.buffer-flush.max-bytes | 否 | 94371840(90M) | String | Buffer可容納的最巨量資料量,取值範圍為64 MB~10 GB。 |
sink.buffer-flush.max-rows | 否 | 500000 | String | Buffer可容納的最巨量資料行數,取值範圍為64,000~5000,000。 |
sink.buffer-flush.interval-ms | 否 | 300000 | String | Buffer重新整理時間間隔,取值範圍為 1000 ms~3600000 ms。 |
sink.max-retries | 否 | 1 | String | 最大重試次數,取值範圍為0~10。 |
sink.connect.timeout-ms | 否 | 1000 | String | 串連到load-url的逾時時間,取值範圍為100~60000。 |
sink.properties.* | 否 | 無 | String | Sink屬性。 |
重要
- 為了保證Sink資料的Exactly-once語義,需要外部系統的Two-phase Commit機制。由於StarRocks無此機制,所以需要依賴Flink的checkpoint-interval,在每次Checkpoint時儲存批資料以及其Label,在Checkpoint完成後的第一次invoke中阻塞flush所有緩衝在state中的資料,以此達到Exactly-once。但如果StarRocks終止了,會導致您的Flink Sink Stream運算元長時間阻塞,並引起Flink的監控警示或強制退出。
- 預設使用CSV格式進行匯入,您可以通過指定sink.properties.row_delimiter(該參數自StarRocks 1.15.0版本開始支援)為\\x02,sink.properties.column_separator為\\x01,來自訂行分隔字元與資料行分隔符號。
- 如果遇到匯入停止的情況,請嘗試增加Flink任務的記憶體。
- 如果代碼運行正常且能接收到資料,但是寫入不成功時,請確認當前機器是否能訪問BE的http_port連接埠,即能ping通BE服務使用的IP地址。
例如,如果一台機器有外網和內網IP,且FE或BE的http_port均可通過外網
ip:port
訪問,叢集裡綁定的IP為內網IP,任務裡loadurl寫的FE外網ip:http_port
,FE會將寫入任務轉寄給BE內網ip:port
,此時如果Client機器ping不通BE的內網IP就會寫入失敗。
樣本:使用Flink-connector寫入實現MySQL資料同步
基本原理
通過Flink-cdc和StarRocks-migrate-tools(簡稱smt)可以實現MySQL資料的秒級同步。
說明 本文圖片和部分內容來源於開源StarRocks的Realtime synchronization from MySQL。
smt可以根據MySQL和StarRocks的叢集資訊和表結構自動產生Source table和Sink table的建表語句。
操作步驟
- 準備工作
- 下載Flink。
推薦使用1.13,最低支援1.11版本。
- 下載Flink CDC connector。
下載對應Flink版本的Flink-MySQL-CDC。
- 下載Flink StarRocks connector。重要 1.13、1.11和1.12版本使用不同的connector。
- 下載Flink。
- 複製flink-sql-connector-mysql-cdc-xxx.jar,flink-connector-starrocks-xxx.jar到flink-xxx/lib/下。
- 下載smt.tar.gz,解壓縮並修改設定檔。相關配置項描述如下:
db
:修改為MySQL的串連資訊。be_num
:需要配置成StarRocks叢集的節點數。[table-rule.1]
:匹配規則,可以根據Regex匹配資料庫和表名產生建表的SQL,也可以配置多個規則。flink.starrocks.*
:StarRocks的叢集配置資訊。
程式碼範例如下。[db] host = 192.168.**.** port = 3306 user = root password = [other] # number of backends in StarRocks be_num = 3 # `decimal_v3` is supported since StarRocks-1.18.1 use_decimal_v3 = false # file to save the converted DDL SQL output_dir = ./result [table-rule.1] # pattern to match databases for setting properties database = ^console_19321.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000
- 執行
starrocks-migrate-tool
命令,將所有建表語句都產生在result目錄下。您可以通過ls result
命令,查看result目錄。flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql
- 執行以下命令,產生StarRocks的表結構。
Mysql -h<FE的IP地址> -P9030 -uroot -p < starrocks-create.1.sql
- 執行以下命令,產生Flink table,同步任務會持續執行。
bin/sql-client.sh -f flink-create.1.sql
重要 如果是Flink 1.13之前的版本可能無法直接執行該指令碼,需要逐行提交,並且需要開啟MySQL binlog。 - 執行以下命令,查看任務狀態。
bin/flink list -running
您可以通過Flink WebUI或者$FLINK_HOME/log目錄下的記錄檔查看任務的詳細資料和執行狀態。
使用本樣本時,需要注意以下資訊:
- 如果有多組規則,則需要給每一組規則匹配database,table和flink-connector的配置。程式碼範例如下。
[table-rule.1] # pattern to match databases for setting properties database = ^console_19321.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000 [table-rule.2] # pattern to match databases for setting properties database = ^database2.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= # 如果匯入資料不方便選出合適的分隔字元,則可以考慮使用JSON格式,但是會有一定的效能損失,使用方法:用以下參數替換flink.starrocks.sink.properties.column_separator和flink.starrocks.sink.properties.row_delimiter參數。 flink.starrocks.sink.properties.strip_outer_array=true flink.starrocks.sink.properties.format=json
說明 Flink.starrocks.sink系列的參數,可以給不同的規則配置不同的匯入頻率等。 - 針對分庫分表的大表可以單獨配置一個規則。例如,有兩個資料庫edu_db_1和edu_db_2,每個資料庫下面分別有course_1、course_2兩張表,並且所有表的資料結構都是相同的,通過如以下配置把他們匯入StarRocks的一張表中進行分析。
[table-rule.3] # pattern to match databases for setting properties database = ^edu_db_[0-9]*$ # pattern to match tables for setting properties table = ^course_[0-9]*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=5000
配置後會自動產生一個多對一的匯入關係,在StarRocks預設產生的表名是course__auto_shard,也可以自行在產生的設定檔中修改。
- 如果在
sql-client
中命令列執行建表和同步任務,需要做對反斜線(\)字元進行轉義。程式碼範例如下。'sink.properties.column_separator' = '\\x01' 'sink.properties.row_delimiter' = '\\x02'