全部產品
Search
文件中心

Realtime Compute for Apache Flink:日誌即時入倉快速入門

更新時間:Oct 25, 2024

Flink全託管產品提供豐富強大的日誌資料即時入倉能力。本文為您介紹如何在Flink全託管控制台上快速構建一個從Kafka到Hologres的資料同步作業。

背景資訊

假設訊息佇列Kafka執行個體中有一個名稱為users的Topic,其中有100條JSON資料,代表通過記錄檔採集工具或者應用寫入Kafka的日誌資料,其資料分布大致如下圖所示。資料分布

本文使用Flink全託管提供的CREATE TABLE AS(CTAS)語句,一鍵完成日誌資料的同步,以及即時的表結構變更同步。

前提條件

步驟一:配置IP白名單

為了讓Flink能訪問Kafka和Hologres執行個體,您需要將Flink全託管工作空間的網段添加到在Kafka和Hologres的白名單中。

  1. 擷取Flink全託管工作空間的VPC網段。

    1. 登入Realtime Compute控制台

    2. 在目標工作空間右側操作列,選擇更多 > 工作空間詳情

    3. 工作空間詳情對話方塊,查看Flink全託管虛擬交換器的網段資訊。

      網段資訊

  2. 在訊息佇列Kafka的IP白名單中,添加Flink全託管網段資訊。

    操作步驟詳情請參見配置白名單Kafka白名單

  3. 在Hologres的IP白名單中,添加Flink全託管網段資訊。

    操作步驟詳情請參見IP白名單Holo白名單

步驟二:準備Kafka測試資料

使用Flink全託管的類比資料產生源表作為資料產生器,將資料寫入到Kafka中。請按以下步驟使用Flink全託管開發控制台將資料寫入至訊息佇列Kafka。

  1. 在Kafka控制台建立一個名稱為users的Topic。

    操作詳情請參見步驟一:建立Topic

  2. 建立將資料寫入到Kafka的作業。

    1. 登入Realtime Compute管理主控台

    2. 單擊目標工作空間操作列下的控制台

    3. 在左側導覽列,單擊資料開發 > ETL,單擊建立

    4. 新增作業草稿對話方塊,選擇目標模板(例如:選擇空白的流作業草稿),完成後單擊下一步,填寫作業配置資訊。

      作業參數

      樣本

      說明

      檔案名稱

      kafka-data-input

      作業的名稱。

      說明

      作業名稱在當前專案中必須保持唯一。

      儲存位置

      作業草稿

      指定該作業的代碼檔案所屬的檔案夾。預設存放在作業草稿目錄。

      您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

      引擎版本

      vvr-8.0.5-flink-1.17

      在引擎版本下拉式清單中選擇目標引擎版本。

    5. 單擊建立

    6. 將以下作業代碼拷貝到作業文本編輯區。

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json'
      );
      
      INSERT INTO sink SELECT * FROM source;
    7. 請按您的實際配置,修改以下參數配置資訊。

      參數

      樣本值

      說明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000

      Kafka Broker地址。

      格式為host:port,host:port,host:port,以英文逗號(,)分割。

      topic

      users

      Kafka Topic名稱。

  3. 啟動作業。

    1. 資料開發 > ETL頁面,單擊部署

    2. 部署新版本對話方塊中,單擊確定

    3. 配置作業資源,資源設定填寫詳情請參見配置作業資源

    4. 營運中心 > 作業營運頁面,單擊目標作業名稱操作列中的啟動。關於作業啟動的配置說明,請參見作業啟動

    5. 您可以在作業營運頁面觀察作業的運行資訊和狀態。image

      由於faker資料來源是一個有限流,因此在作業處於運行狀態後,大約1分鐘左右後,作業就會處於完成狀態。當作業結束運行代表作業已經將相關的資料寫入到Kafka的users中。其中,寫入到訊息佇列Kafka的JSON資料格式大致如下。

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

步驟三:建立Hologres Catalog

單表同步都需要依賴目標Catalog來建立目標表。因此,您需要通過控制台建立目標Catalog。本文將以目標Catalog為Hologres Catalog為例,為您進行介紹。

  1. 建立名稱為holo的Hologres Catalog。

    操作步驟詳情請參見建立Hologres Catalogholo catalog

    重要

    您需要在您的目標執行個體中已建立flink_test_db資料庫,否則建立Catalog會報錯。

  2. Schemas頁簽,確認已建立名為holo的Catalog。

    重新整理按鈕

步驟四:建立並啟動資料同步作業

  1. 登入Flink全託管開發控制台,建立資料同步作業。

    1. 登入Realtime Compute管理主控台

    2. 單擊目標工作空間操作列下的控制台

    3. 在左側導覽列,單擊資料開發 > ETL,單擊建立

    4. 新增作業草稿對話方塊,選擇目標模板(例如:選擇空白的流作業草稿),完成後單擊下一步,填寫作業配置資訊。

      作業參數

      樣本

      說明

      檔案名稱

      flink-quickstart-test

      作業的名稱。

      說明

      作業名稱在當前專案中必須保持唯一。

      儲存位置

      作業草稿

      指定該作業的代碼檔案所屬的檔案夾。預設存放在作業草稿目錄。

      您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

      引擎版本

      vvr-8.0.5-flink-1.17

      在引擎版本下拉式清單中選擇目標引擎版本。

    5. 單擊建立

  2. 將以下作業代碼拷貝到作業文本編輯區。

    將訊息佇列Kafka中名稱為users的Topic資料同步至Hologres的flink_test_db資料庫的sync_kafka_users表中。您可以通過以下任意一種方式進行:

    • 通過CATS語句同步

      該方式無需您手動在Hologres中建立該表,也無需指明對應的列類型為JSON或JSONB。

      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country'),
        PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自動延伸嵌套列。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
      WITH (
        'connector' = 'hologres'
      ) AS TABLE kafka_users;
      說明

      為了避免作業Failover後,作業重啟將重複資料寫入到Hologres中,您可以添加相關主鍵從而唯一地標識資料。當資料重發時,Hologres將會保證相同partition和offset的資料只會保留一份。

    • 通過INSERT INTO語句同步

      考慮到Hologres中對於JSON和JSONB類型的資料會進行特殊的最佳化,您也可以通過INSERT INTO語句將嵌套JSON寫入到Hologres中。

      該方式需要您手動在Hologres中建立該表並指明需要對應的列類型為JSON或JSONB,然後通過下文的SQL,會將address資料寫入到 Hologres中類型為JSON的列。

      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING, -- 該列對應的資料為嵌套JSON。
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country')
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自動延伸嵌套列。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE holo (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT,
        `partition` BIGINT,
        `timestamp` TIMESTAMP,
        `date` DATE,
        `country` STRING
      ) WITH (
        'connector' = 'hologres',
        'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
        'username' = 'LTAI5tE572UJ44Xwhx6i****',
        'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****',
        'dbname' = 'flink_test_db',
        'tablename' = 'sync_kafka_users'
      );
      
      INSERT INTO holo
      SELECT * FROM kafka_users;
  3. 請按您的實際配置,修改以下參數配置資訊。

    參數

    樣本值

    說明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000

    Kafka Broker地址。

    格式為host:port,host:port,host:port,以英文逗號(,)分割。

    topic

    users

    Kafka Topic名稱。

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Hologres端點。

    格式為<ip>:<port>。

    username

    LTAI5tE572UJ44Xwhx6i****

    Hologres使用者名稱,請填寫阿里雲帳號的AccessKey ID。

    password

    KtyIXK3HIDKA9VzKX4tpct9xTm****

    Hologres密碼,請填寫阿里雲帳號的AccessKey Secret。

    dbname

    flink_test_db

    Hologres資料庫名稱。

    tablename

    sync_kafka_users

    Hologres表名稱。

    說明
    • 如果您通過INSERT INTO方式同步資料,則需要提前在目標執行個體的資料庫中建立sync_kafka_users表和欄位。

    • 如果Schema不為Public時,則tablename需要填寫為schema.tableName。

  4. 單擊儲存

  5. 資料開發 > ETL頁面,單擊部署

  6. 營運中心 > 作業營運頁面,單擊目標作業名稱操作列中的啟動關於作業啟動的配置說明,請參見作業啟動

  7. 單擊啟動

    作業啟動後,您可以在作業營運介面觀察作業的運行資訊和狀態。image

步驟五:觀察全量同步結果

  1. 登入Hologres管理主控台

  2. 執行個體列表頁面,單擊目標執行個體名稱。

  3. 在頁面右上方,單擊登入執行個體

  4. 中繼資料管理頁簽,查看users資料庫中同步的sync_kafka_users表結構和資料。

    sync_kafka_users表

    同步後的表結構和資料如下圖所示。

    • 表結構

      雙擊sync_kafka_users表名稱,查看錶結構。

      表結構

      說明

      在同步過程中,建議聲明Kafka的Metadata partition和offset作為Hologres表中的主鍵。這樣可以避免由於作業Failover,資料重發導致下遊儲存多份相同資料。

    • 表資料

      在sync_kafka_users表資訊頁面右上方,單擊查詢表後,輸入如下命令,單擊運行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      表資料結果如下圖所示。表資料

步驟六:觀察自動同步表結構變更

  1. 在Kafka控制台手動發送一條包含新增列的訊息。

    1. 登入雲訊息佇列 Kafka 版控制台

    2. 執行個體列表頁面,單擊目標執行個體名稱。

    3. Topic管理頁面,單擊目標Topic名稱users。

    4. 單擊體驗發送訊息

    5. 填寫訊息內容。

      訊息內容

      配置項

      樣本

      發送方式

      選中控制台

      訊息Key

      填寫為flinktest。

      訊息內容

      將以下JSON內容複寫粘貼到訊息內容中。

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      說明

      該樣本中house-points是一個新增的嵌套列。

      發送到指定分區

      選中

      分區ID

      填寫為0。

    6. 單擊確定

  2. 在Hologres控制台,查看sync_kafka_users表結構和資料的變化。

    1. 登入Hologres管理主控台

    2. 執行個體列表頁面,單擊目標執行個體名稱。

    3. 在頁面右上方,單擊登入執行個體

    4. 中繼資料管理頁簽,雙擊sync_kafka_users表名稱。

    5. 單擊查詢表後,輸入如下命令,單擊運行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. 查看錶資料結果。

      表資料結果如下圖所示。Hologres表結果

      可以觀察到id為100001的資料已經成功地寫入到了Hologres中。同時,Hologres中多了house-points.house和house-points.points 兩列。

      說明

      雖然插入到Kafka中的資料僅只有一個嵌套列house-points,但是由於在kafka_users表的WITH參數內聲明要求json.infer-schema.flatten-nested-columns.enable,那麼Flink 就會自動展平新增的嵌套列,並用訪問該列的路徑作為展開後的列的名字。

(可選)步驟七:調整作業資源配置

根據資料量的不同,我們往往需要調節不同節點的並發和資源,以達到更優的作業效能。您可以使用資源配置的基礎模式簡單配置作業並發度和CU數,也可以使用資源配置的專家模式細粒度地調整節點的並發和資源。

  1. 登入Flink全託管開發控制台,進入作業詳情頁面。

    1. 登入Realtime Compute管理主控台

    2. 單擊目標工作空間操作列下的控制台

    3. 在左側導覽列,單擊營運中心 > 作業營運

  2. 修改資源配置。

    1. 部署詳情頁面,單擊資源配置右側的編輯資源模式選擇為專家模式

    2. 在配置計劃中單擊立即擷取

    3. 單擊展開全部

      觀察完整的拓撲圖,通過完整的拓撲圖能瞭解到整個資料的同步計劃,即具體同步哪些表。

    4. 手動設定每個節點的並發。

      由於Kafka users Topic有四個分區,因此可以設定作業為4並發。由於日誌資料只是寫入到Hologres一張表中,為了降低Hologres的串連數,可以調節Hologres的並發為2。資源配置步驟詳情請參見配置作業部署資訊。經過調節後的作業資源配置計劃如下圖所示。作業配置計劃

    5. 單擊確定

    6. 填寫基礎配置後,單擊啟動關於作業啟動的配置說明,請參見作業啟動

  3. 營運中心 > 作業營運頁面,單擊目標作業名稱。

  4. 狀態總覽頁面,查看調整效果。

    image.png

相關文檔