全部產品
Search
文件中心

E-MapReduce:Flink Connector

更新時間:Jul 01, 2024

Flink Connector內部實現是通過緩衝並批量由Stream Load匯入。本文為您介紹Flink Connector的使用方式及樣本。

背景資訊

因為Flink官方只提供了flink-connector-jdbc,不足以滿足匯入效能要求,所以新增了flink-connector-starrocks,其內部實現是通過緩衝並批量由Stream Load匯入。

使用方式

您可以下載源碼進行測試:下載flink-connector-starrocks源碼

將以下內容添加pom.xml中。
<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.11, flink-1.12 -->
    <version>x.x.x_flink-1.11</version>
    <!-- for flink-1.13 -->
    <version>x.x.x_flink-1.13</version>
</dependency>
說明 您可以在版本資訊頁面,查看Latest Version資訊,替換代碼中的x.x.x
程式碼範例如下:
  • 方式一
    // -------- sink with raw json string stream --------
    fromElements(new String[]{
        "{\"score\": \"99\", \"name\": \"stephen\"}",
        "{\"score\": \"100\", \"name\": \"lebron\"}"
    }).addSink(
        StarRocksSink.sink(
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .build()
        )
    );
    
    
    // -------- sink with stream transformation --------
    class RowData {
        public int score;
        public String name;
        public RowData(int score, String name) {
            ......
        }
    }
    fromElements(
        new RowData[]{
            new RowData(99, "stephen"),
            new RowData(100, "lebron")
        }
    ).addSink(
        StarRocksSink.sink(
            // the table structure
            TableSchema.builder()
                .field("score", DataTypes.INT())
                .field("name", DataTypes.VARCHAR(20))
                .build(),
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.column_separator", "\\x01")
                .withProperty("sink.properties.row_delimiter", "\\x02")
                .build(),
            // set the slots with streamRowData
            (slots, streamRowData) -> {
                slots[0] = streamRowData.score;
                slots[1] = streamRowData.name;
            }
        )
    );
  • 方式二
    // create a table with `structure` and `properties`
    // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory`
    tEnv.executeSql(
        "CREATE TABLE USER_RESULT(" +
            "name VARCHAR," +
            "score BIGINT" +
        ") WITH ( " +
            "'connector' = 'starrocks'," +
            "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
            "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
            "'database-name' = 'xxx'," +
            "'table-name' = 'xxx'," +
            "'username' = 'xxx'," +
            "'password' = 'xxx'," +
            "'sink.buffer-flush.max-rows' = '1000000'," +
            "'sink.buffer-flush.max-bytes' = '300000000'," +
            "'sink.buffer-flush.interval-ms' = '5000'," +
            "'sink.properties.column_separator' = '\\x01'," +
            "'sink.properties.row_delimiter' = '\\x02'," +
            "'sink.max-retries' = '3'" +
            "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
        ")"
    );
其中,Sink選項如下表所示。
參數是否必選預設值類型描述
connectorStringstarrocks。
jdbc-urlString用於在StarRocks中執行查詢操作。
load-urlString指定FE的IP和HTTP連接埠,格式為fe_ip:http_port;fe_ip:http_port ,多個時使用半形分號(;)分隔。
database-nameStringStarRocks資料庫名稱。
table-nameStringStarRocks表名稱。
usernameStringStarRocks串連使用者名稱。
passwordStringStarRocks串連密碼。
sink.semanticat-least-onceString取值為at-least-once或exactly-once。
sink.buffer-flush.max-bytes94371840(90M)StringBuffer可容納的最巨量資料量,取值範圍為64 MB~10 GB。
sink.buffer-flush.max-rows500000StringBuffer可容納的最巨量資料行數,取值範圍為64,000~5000,000。
sink.buffer-flush.interval-ms300000StringBuffer重新整理時間間隔,取值範圍為 1000 ms~3600000 ms。
sink.max-retries1String最大重試次數,取值範圍為0~10。
sink.connect.timeout-ms1000String串連到load-url的逾時時間,取值範圍為100~60000。
sink.properties.*StringSink屬性。
重要
  • 為了保證Sink資料的Exactly-once語義,需要外部系統的Two-phase Commit機制。由於StarRocks無此機制,所以需要依賴Flink的checkpoint-interval,在每次Checkpoint時儲存批資料以及其Label,在Checkpoint完成後的第一次invoke中阻塞flush所有緩衝在state中的資料,以此達到Exactly-once。但如果StarRocks終止了,會導致您的Flink Sink Stream運算元長時間阻塞,並引起Flink的監控警示或強制退出。
  • 預設使用CSV格式進行匯入,您可以通過指定sink.properties.row_delimiter(該參數自StarRocks 1.15.0版本開始支援)為\\x02,sink.properties.column_separator為\\x01,來自訂行分隔字元與資料行分隔符號。
  • 如果遇到匯入停止的情況,請嘗試增加Flink任務的記憶體。
  • 如果代碼運行正常且能接收到資料,但是寫入不成功時,請確認當前機器是否能訪問BE的http_port連接埠,即能ping通BE服務使用的IP地址。

    例如,如果一台機器有外網和內網IP,且FE或BE的http_port均可通過外網ip:port訪問,叢集裡綁定的IP為內網IP,任務裡loadurl寫的FE外網ip:http_port,FE會將寫入任務轉寄給BE內網ip:port,此時如果Client機器ping不通BE的內網IP就會寫入失敗。

樣本:使用Flink-connector寫入實現MySQL資料同步

基本原理

通過Flink-cdc和StarRocks-migrate-tools(簡稱smt)可以實現MySQL資料的秒級同步。Flink
說明 本文圖片和部分內容來源於開源StarRocks的Realtime synchronization from MySQL

smt可以根據MySQL和StarRocks的叢集資訊和表結構自動產生Source table和Sink table的建表語句。

操作步驟

  1. 準備工作
  2. 複製flink-sql-connector-mysql-cdc-xxx.jarflink-connector-starrocks-xxx.jarflink-xxx/lib/下。
  3. 下載smt.tar.gz,解壓縮並修改設定檔。
    相關配置項描述如下:
    • db:修改為MySQL的串連資訊。
    • be_num:需要配置成StarRocks叢集的節點數。
    • [table-rule.1]:匹配規則,可以根據Regex匹配資料庫和表名產生建表的SQL,也可以配置多個規則。
    • flink.starrocks.* :StarRocks的叢集配置資訊。
    程式碼範例如下。
    [db]
    host = 192.168.**.**
    port = 3306
    user = root
    password =
    
    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # file to save the converted DDL SQL
    output_dir = ./result
    
    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^console_19321.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
  4. 執行starrocks-migrate-tool命令,將所有建表語句都產生在result目錄下。
    您可以通過ls result命令,查看result目錄。
    flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
    flink-create.all.sql  starrocks-create.1.sql
  5. 執行以下命令,產生StarRocks的表結構。
    Mysql -h<FE的IP地址> -P9030 -uroot -p < starrocks-create.1.sql
  6. 執行以下命令,產生Flink table,同步任務會持續執行。
    bin/sql-client.sh -f flink-create.1.sql
    重要 如果是Flink 1.13之前的版本可能無法直接執行該指令碼,需要逐行提交,並且需要開啟MySQL binlog。
  7. 執行以下命令,查看任務狀態。
    bin/flink list -running

    您可以通過Flink WebUI或者$FLINK_HOME/log目錄下的記錄檔查看任務的詳細資料和執行狀態。

使用本樣本時,需要注意以下資訊:
  • 如果有多組規則,則需要給每一組規則匹配database,table和flink-connector的配置。
    程式碼範例如下。
    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^console_19321.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    
    [table-rule.2]
    # pattern to match databases for setting properties
    database = ^database2.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    # 如果匯入資料不方便選出合適的分隔字元,則可以考慮使用JSON格式,但是會有一定的效能損失,使用方法:用以下參數替換flink.starrocks.sink.properties.column_separator和flink.starrocks.sink.properties.row_delimiter參數。
    flink.starrocks.sink.properties.strip_outer_array=true
    flink.starrocks.sink.properties.format=json
    說明 Flink.starrocks.sink系列的參數,可以給不同的規則配置不同的匯入頻率等。
  • 針對分庫分表的大表可以單獨配置一個規則。
    例如,有兩個資料庫edu_db_1和edu_db_2,每個資料庫下面分別有course_1、course_2兩張表,並且所有表的資料結構都是相同的,通過如以下配置把他們匯入StarRocks的一張表中進行分析。
    [table-rule.3]
    # pattern to match databases for setting properties
    database = ^edu_db_[0-9]*$
    # pattern to match tables for setting properties
    table = ^course_[0-9]*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=5000

    配置後會自動產生一個多對一的匯入關係,在StarRocks預設產生的表名是course__auto_shard,也可以自行在產生的設定檔中修改。

  • 如果在sql-client中命令列執行建表和同步任務,需要做對反斜線(\)字元進行轉義。
    程式碼範例如下。
    'sink.properties.column_separator' = '\\x01'
    'sink.properties.row_delimiter' = '\\x02'