本文提供了一個自訂彙總函式(UDAF),實現將多行資料合併為一行並按照指定列進行排序,並以居民用電戶電網終端資料為例,介紹如何在Realtime Compute控制台使用該函數進行資料彙總和排序。
樣本資料
居民用電戶電網終端資料表electric_info,包括事件標識event_id,使用者標識user_id,事件時間event_time,使用者終端狀態status。需要將使用者的終端狀態按照事件時間升序排列。
electric_info
event_id
user_id
event_time
status
1
1222
2023-06-30 11:14:00
LD
2
1333
2023-06-30 11:12:00
LD
3
1222
2023-06-30 11:11:00
TD
4
1333
2023-06-30 11:12:00
LD
5
1222
2023-06-30 11:15:00
TD
6
1333
2023-06-30 11:18:00
LD
7
1222
2023-06-30 11:19:00
TD
8
1333
2023-06-30 11:10:00
TD
9
1555
2023-06-30 11:16:00
TD
10
1555
2023-06-30 11:17:00
LD
預期結果
user_id
status
1222
TD,LD,TD,TD
1333
TD,LD,LD,LD
1555
TD,LD
步驟一:準備資料來源
本文以雲資料庫RDS為例。
- 說明
RDS MySQL版執行個體需要與Flink工作空間處於同一VPC。不在同一VPC下時請參見網路連通性。
建立名稱為electric的資料庫,並建立高許可權帳號或具有資料庫electric讀寫權限的普通帳號。
通過DMS登入RDS MySQL,在electric資料庫中建立表electric_info和electric_info_SortListAgg,並插入資料。
CREATE TABLE `electric_info` ( event_id bigint NOT NULL PRIMARY KEY COMMENT '事件id', user_id bigint NOT NULL COMMENT '使用者標識', event_time timestamp NOT NULL COMMENT '事件時間', status varchar(10) NOT NULL COMMENT '使用者終端狀態' ); CREATE TABLE `electric_info_SortListAgg` ( user_id bigint NOT NULL PRIMARY KEY COMMENT '使用者標識', status_sort varchar(50) NULL COMMENT '使用者終端狀態按事件時間升序' ); -- 準備資料 INSERT INTO electric_info VALUES (1,1222,'2023-06-30 11:14','LD'), (2,1333,'2023-06-30 11:12','LD'), (3,1222,'2023-06-30 11:11','TD'), (4,1333,'2023-06-30 11:12','LD'), (5,1222,'2023-06-30 11:15','TD'), (6,1333,'2023-06-30 11:18','LD'), (7,1222,'2023-06-30 11:19','TD'), (8,1333,'2023-06-30 11:10','TD'), (9,1555,'2023-06-30 11:16','TD'), (10,1555,'2023-06-30 11:17','LD');
步驟二:註冊UDF
pom.xml檔案已配置了Flink 1.17.1版該自訂函數需要的最小化依賴資訊。關於使用自訂函數的更多資訊,詳情請參見自訂函數。
本樣本中ASI_UDAF實現了多行資料合併一行並按照指定列進行排序,詳情如下。後續您可以根據實際業務情況進行修改。
package ASI_UDAF; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.AggregateFunction; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; public class ASI_UDAF{ /**Accumulator class*/ public static class AcList { public List<String> list; } /**Aggregate function class*/ public static class SortListAgg extends AggregateFunction<String,AcList> { public String getValue(AcList asc) { /**Sort the data in the list according to a specific rule*/ asc.list.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]); } }); /**Traverse the sorted list, extract the required fields, and join them into a string*/ List<String> ret = new ArrayList<String>(); Iterator<String> strlist = asc.list.iterator(); while (strlist.hasNext()) { ret.add(strlist.next().split("#")[0]); } String str = StringUtils.join(ret, ','); return str; } /**Method to create an accumulator*/ public AcList createAccumulator() { AcList ac = new AcList(); List<String> list = new ArrayList<String>(); ac.list = list; return ac; } /**Accumulation method: add the input data to the accumulator*/ public void accumulate(AcList acc, String tuple1) { acc.list.add(tuple1); } /**Retraction method*/ public void retract(AcList acc, String num) { } } }
進入註冊UDF頁面。
註冊UDF方式的優點是便於後續開發進行代碼複用。對於Java類型的UDF,您也可以通過依賴檔案項進行上傳,詳情請參見自訂彙總函式(UDAF)。
單擊目標工作空間操作列下的控制台。
單擊
。單擊左側的函數頁簽,單擊註冊UDF。
在選擇檔案位置上傳步驟1中的JAR檔案,單擊確定。
說明您的UDF JAR檔案會被上傳到該OSS Bucket的sql-artifacts目錄下。
此外,Flink開發控制台會解析您UDF JAR檔案中是否使用了Flink UDF、UDAF和UDTF介面的類,並自動提取類名,填充到Function Name欄位中。
在管理函數對話方塊,單擊建立函數。
在SQL編輯器頁面左側函數列表,您可以看到登入成功的UDF。
步驟三:建立Flink作業
在
頁面,單擊建立。單擊空白的流作業草稿。
單擊下一步。
在新增作業草稿對話方塊,填寫作業配置資訊。
作業參數
說明
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
儲存位置
指定該作業的儲存位置。
您還可以在現有檔案夾右側,單擊表徵圖,建立子檔案夾。
引擎版本
當前作業使用的Flink的引擎版本。需要與pom中的version一致。
引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
編寫DDL和DML代碼。
--建立暫存資料表electric_info CREATE TEMPORARY TABLE electric_info ( event_id bigint not null, `user_id` bigint not null, event_time timestamp(6) not null, status string not null, primary key(event_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info' ); CREATE TEMPORARY TABLE electric_info_sortlistagg ( `user_id` bigint not null, status_sort varchar(50) not null, primary key(user_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info_sortlistagg' ); --將electric_info表中的資料彙總並插入到electric_info_sortlistagg表中 --將status和event_time拼接成的字串作為參數傳遞給登入的自訂函數ASI_UDAF$SortListAgg INSERT INTO electric_info_sortlistagg SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING))) FROM electric_info GROUP BY user_id;
參數說明如下,您可以根據實際情況進行修改。MySQL連接器更多參數詳情請參見MySQL。
參數
說明
備忘
connector
連接器類型。
本樣本固定值為
mysql
。hostname
MySQL資料庫的IP地址或者Hostname。
本文填寫為RDS MySQL執行個體的內網地址。
username
MySQL資料庫服務的使用者名稱。
無。
password
MySQL資料庫服務的密碼。
本樣本通過使用名為mysql_pw密鑰的方式填寫密碼值,避免資訊泄露,詳情請參見變數管理。
database-name
MySQL資料庫名稱。
本樣本填寫為步驟一:準備資料來源中建立的資料庫electric。
table-name
MySQL表名。
本樣本填寫為electric或electric_info_sortlistagg。
port
MySQL資料庫服務的連接埠號碼。
無。
(可選)單擊右上方的深度檢查和調試,功能詳情請參見SQL作業開發。
單擊部署,單擊確定。
在
頁面,單擊目標作業名稱操作列下的啟動,選擇無狀態啟動。
步驟四:查詢結果
在RDS中使用如下語句查看使用者的終端狀態按照事件時間升序排列結果。
SELECT * FROM `electric_info_sortlistagg`;
結果如下: