全部產品
Search
文件中心

Data Lake Formation:Flink VVP+DLF資料入湖與分析實踐

更新時間:Aug 16, 2024

資料湖構建(DLF)可以結合阿里雲Realtime ComputeFlink版(Flink VVP),以及Flink CDC相關技術,實現靈活定製化的資料入湖。並利用DLF統一中繼資料管理、許可權管理等能力,實現資料湖多引擎分析、資料湖管理等功能。本文為您介紹Flink+DLF資料湖方案具體步驟。

背景資訊

阿里雲Realtime ComputeFlink版是一套基於Apache Flink構建的即時巨量資料分析平台,支援多種資料來源和結果表類型。Flink任務可以利用資料湖統一儲存的優勢,使用Hudi結果表或Iceberg結果表,將作業的結果輸出到資料湖中,實現資料湖分析。在寫入資料湖的過程中,Flink可以通過設定DLF Catalog,將表的中繼資料同步到資料湖構建(DLF)中。依託資料湖構建產品(DLF)提供的企業級統一中繼資料能力,Flink+DLF方案可以實現寫入的資料湖表無縫對接阿里雲上的計算引擎,如EMR、MaxCompute、Hologres等。也可以通過DLF提供的豐富的資料湖管理能力,實現資料湖生命週期管理和湖格式的最佳化。

前提條件

  • 已開通Realtime ComputeFlink版,建立Flink全託管工作空間。

  • 已開通阿里雲資料湖構建(DLF)服務。如果您沒有開通,則可以在DLF產品首頁,單擊免費開通

  • 本文以MySQL資料來源為例,需要建立RDS MySQL,詳情請參見建立RDS MySQL執行個體。如果使用其他資料來源入湖可忽略。

說明

建立的RDS MySQL需要和Realtime ComputeFlink版在同一個地區同一個VPC內,RDS MySQL須為5.7及以上版本。

操作流程

步驟一:Flink建立DLF Catalog

  1. 登入Realtime Compute管理主控台

  2. 進入建立Catalog頁面。

    1. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    2. 在左側導覽列,單擊中繼資料管理

    3. 單擊建立Catalog

  3. 建立DLF Catalog。

    1. 建立Catalog頁面,選擇DLF

    2. 填寫參數配置資訊。詳情可參考管理DLF Catalog。如下圖。

image

步驟二:準備MySQL資料

  1. 登入準備好的MySQL執行個體,詳情請參見通過DMS登入RDS MySQL

  2. 執行如下命令,建立一張表,並插入若干測試資料。

CREATE TABLE `student` (
  `id` bigint(20) NOT NULL,
  `name` varchar(256) DEFAULT NULL,
  `age` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

INSERT INTO student VALUES (1,'name1',10);
INSERT INTO student VALUES (2,'name2',20);

步驟三:建立Flink入湖作業

  1. 登入Realtime Compute管理主控台

  2. 建立資料來源表和目標表。

    1. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    2. 在左側導覽列,單擊SQL開發

    3. SQL開發頁面,建立一個新的流SQL作業,執行如下代碼。

  3. -- catalog名為步驟一建立的dlf catalog name,本例中填dlf
    CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb;
    
    CREATE TABLE IF NOT EXISTS student_source (
      id INT,
      name VARCHAR (256),
      age INT,
      PRIMARY KEY (id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mysql-cdc',
      -- hostname替換為RDS的串連地址
      'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = '<RDS user name>',
      'password' = '<RDS password>',
      'database-name' = '<RDS database>',
      -- table-name為資料來源表,本例中填步驟二建立的student表
      'table-name' = 'student'
    );
    
    CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi (
      id    BIGINT PRIMARY KEY NOT ENFORCED,
      name  STRING,
      age    BIGINT
    ) WITH(
      'connector' = 'hudi'
    );
    
說明

關於MySQL源表的參數設定和使用條件,請參考 MySQL的CDC源表

關於Hudi結果表的參數設定,請參考 Hudi結果表

  1. 建立Flink入湖任務。

    1. SQL開發頁面,建立一個新的流SQL作業,填寫如下代碼

  2. INSERT INTO dlf.dlf_testdb.student_hudi SELECT * FROM student_source  /*+ OPTIONS('server-id'='123456') */;

b. 點擊儲存

c. 點擊部署

  1. 在左側導覽列,單擊作業營運。找到上面建立的流SQL作業,點擊啟動

  2. 啟動成功後一段時間,可以看到作業的狀態變成運行中

步驟四:使用DLF資料湖分析

  1. 登入資料湖構建控制台

  2. 點擊菜單“資料探索”,進入資料探索頁面。

  3. 在查詢輸入框中,輸入查詢語句

    select * from dlf_testdb.student_hudi
  4. 點擊運行

  5. 結果如圖所示,可以直接對Flink寫入資料湖的資料進行查詢和分析。

image

  1. 如果您購買了EMR叢集,並且開啟了資料湖DLF中繼資料,也可以直接通過EMR叢集對Flink入湖結果進行資料湖分析,參考 Hudi與Spark SQL整合

相關資料

如果您想要通過EMR DataFlow叢集的Flink讀寫DLF,請參考文章 通過資料湖中繼資料DLF讀寫Hudi