全部產品
Search
文件中心

Realtime Compute for Apache Flink:使用UDAF實現資料排序和彙總

更新時間:Sep 13, 2024

本文提供了一個自訂彙總函式(UDAF),實現將多行資料合併為一行並按照指定列進行排序,並以居民用電戶電網終端資料為例,介紹如何在Realtime Compute控制台使用該函數進行資料彙總和排序。

樣本資料

居民用電戶電網終端資料表electric_info,包括事件標識event_id,使用者標識user_id,事件時間event_time,使用者終端狀態status。需要將使用者的終端狀態按照事件時間升序排列。

  • electric_info

    event_id

    user_id

    event_time

    status

    1

    1222

    2023-06-30 11:14:00

    LD

    2

    1333

    2023-06-30 11:12:00

    LD

    3

    1222

    2023-06-30 11:11:00

    TD

    4

    1333

    2023-06-30 11:12:00

    LD

    5

    1222

    2023-06-30 11:15:00

    TD

    6

    1333

    2023-06-30 11:18:00

    LD

    7

    1222

    2023-06-30 11:19:00

    TD

    8

    1333

    2023-06-30 11:10:00

    TD

    9

    1555

    2023-06-30 11:16:00

    TD

    10

    1555

    2023-06-30 11:17:00

    LD

  • 預期結果

    user_id

    status

    1222

    TD,LD,TD,TD

    1333

    TD,LD,LD,LD

    1555

    TD,LD

步驟一:準備資料來源

本文以雲資料庫RDS為例。

  1. 快速建立RDS MySQL執行個體

    說明

    RDS MySQL版執行個體需要與Flink工作空間處於同一VPC。不在同一VPC下時請參見網路連通性

  2. 建立資料庫和帳號

    建立名稱為electric的資料庫,並建立高許可權帳號或具有資料庫electric讀寫權限的普通帳號。

  3. 通過DMS登入RDS MySQL,在electric資料庫中建立表electric_info和electric_info_SortListAgg,並插入資料。

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT '事件id',
      user_id bigint NOT NULL COMMENT '使用者標識', 
      event_time timestamp NOT NULL COMMENT '事件時間',
      status varchar(10) NOT NULL COMMENT '使用者終端狀態'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT '使用者標識', 
      status_sort varchar(50) NULL COMMENT '使用者終端狀態按事件時間升序'
    );
    
    -- 準備資料
    INSERT INTO electric_info VALUES 
    (1,1222,'2023-06-30 11:14','LD'),
    (2,1333,'2023-06-30 11:12','LD'),
    (3,1222,'2023-06-30 11:11','TD'),
    (4,1333,'2023-06-30 11:12','LD'),
    (5,1222,'2023-06-30 11:15','TD'),
    (6,1333,'2023-06-30 11:18','LD'),
    (7,1222,'2023-06-30 11:19','TD'),
    (8,1333,'2023-06-30 11:10','TD'),
    (9,1555,'2023-06-30 11:16','TD'),
    (10,1555,'2023-06-30 11:17','LD');

步驟二:註冊UDF

  1. 下載ASI_UDX-1.0-SNAPSHOT.jar

    pom.xml檔案已配置了Flink 1.17.1版該自訂函數需要的最小化依賴資訊。關於使用自訂函數的更多資訊,詳情請參見自訂函數

  2. 本樣本中ASI_UDAF實現了多行資料合併一行並按照指定列進行排序,詳情如下。後續您可以根據實際業務情況進行修改。

    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF{
    	/**Accumulator class*/
    	public static class AcList {
    		public  List<String> list;
    	}
    
    	/**Aggregate function class*/
    	public static class SortListAgg extends AggregateFunction<String,AcList> {
    		public String getValue(AcList asc) {
    			/**Sort the data in the list according to a specific rule*/
    			asc.list.sort(new Comparator<String>() {
    				@Override
    				public int compare(String o1, String o2) {
    					return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
    				}
    			});
    			/**Traverse the sorted list, extract the required fields, and join them into a string*/
    			List<String> ret = new ArrayList<String>();
    			Iterator<String> strlist = asc.list.iterator();
    			while (strlist.hasNext()) {
    				ret.add(strlist.next().split("#")[0]);
    			}
    			String str = StringUtils.join(ret, ',');
    			return str;
    		}
    
    		/**Method to create an accumulator*/
    		public AcList createAccumulator() {
    			AcList ac = new AcList();
    			List<String> list = new ArrayList<String>();
    			ac.list = list;
    			return ac;
    		}
    
    		/**Accumulation method: add the input data to the accumulator*/
    		public void accumulate(AcList acc, String tuple1) {
    			acc.list.add(tuple1);
    		}
    
    		/**Retraction method*/
    		public void retract(AcList acc, String num) {
    		}
    	}
    }
  3. 進入註冊UDF頁面。

    註冊UDF方式的優點是便於後續開發進行代碼複用。對於Java類型的UDF,您也可以通過依賴檔案項進行上傳,詳情請參見自訂彙總函式(UDAF)

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 單擊資料開發 > ETL

    4. 單擊左側的函數頁簽,單擊註冊UDF

      image.png

  4. 選擇檔案位置上傳步驟1中的JAR檔案,單擊確定

    註冊UDF

    說明
    • 您的UDF JAR檔案會被上傳到該OSS Bucket的sql-artifacts目錄下。

    • 此外,Flink開發控制台會解析您UDF JAR檔案中是否使用了Flink UDF、UDAF和UDTF介面的類,並自動提取類名,填充到Function Name欄位中。

  5. 管理函數對話方塊,單擊建立函數

    在SQL編輯器頁面左側函數列表,您可以看到登入成功的UDF。

步驟三:建立Flink作業

  1. 資料開發 > ETL頁面,單擊建立

    image.png

  2. 單擊空白的流作業草稿

  3. 單擊下一步

  4. 新增作業草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    儲存位置

    指定該作業的儲存位置。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    引擎版本

    當前作業使用的Flink的引擎版本。需要與pom中的version一致。

    引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

  5. 編寫DDL和DML代碼。

    --建立暫存資料表electric_info
    CREATE TEMPORARY TABLE electric_info (
      event_id bigint not null,
      `user_id` bigint not null, 
      event_time timestamp(6) not null,
      status string not null,
      primary key(event_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info'
    );
    
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id` bigint not null, 
      status_sort varchar(50) not null,
      primary key(user_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info_sortlistagg'
    );
    
    --將electric_info表中的資料彙總並插入到electric_info_sortlistagg表中
    --將status和event_time拼接成的字串作為參數傳遞給登入的自訂函數ASI_UDAF$SortListAgg
    INSERT INTO electric_info_sortlistagg 
    SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING)))
    FROM electric_info GROUP BY user_id;

    參數說明如下,您可以根據實際情況進行修改。MySQL連接器更多參數詳情請參見MySQL

    參數

    說明

    備忘

    connector

    連接器類型。

    本樣本固定值為mysql

    hostname

    MySQL資料庫的IP地址或者Hostname。

    本文填寫為RDS MySQL執行個體的內網地址。

    username

    MySQL資料庫服務的使用者名稱。

    無。

    password

    MySQL資料庫服務的密碼。

    本樣本通過使用名為mysql_pw密鑰的方式填寫密碼值,避免資訊泄露,詳情請參見變數管理

    database-name

    MySQL資料庫名稱。

    本樣本填寫為步驟一:準備資料來源中建立的資料庫electric。

    table-name

    MySQL表名。

    本樣本填寫為electric或electric_info_sortlistagg。

    port

    MySQL資料庫服務的連接埠號碼。

    無。

  6. (可選)單擊右上方的深度檢查調試,功能詳情請參見SQL作業開發

  7. 單擊部署,單擊確定

  8. 營運中心 > 作業營運頁面,單擊目標作業名稱操作列下的啟動,選擇無狀態啟動

步驟四:查詢結果

在RDS中使用如下語句查看使用者的終端狀態按照事件時間升序排列結果。

SELECT * FROM `electric_info_sortlistagg`;

結果如下:

image.png

相關文檔