本文以Table StoreTablestore中的寬表作為上遊資料來源為例介紹如何使用Realtime ComputeFlink寫資料到Tablestore的時序表中。
背景資訊
Tablestore的時序模型是針對時間序列資料的特點進行設計,適用於物聯網裝置監控、裝置採集資料、機器監控資料等情境。更多資訊,請參見時序模型介紹。
在Tablestore的時序模型中,採用一張二維的時序表來儲存時序資料。
每行代表一個時間軸在某個時間點的資料,該行的主鍵部分為時間軸標識和時間戳記,該行的資料列部分為該時間軸在該時間戳記下的資料點,可以有多個資料列。其中度量名稱(measurement)、資料來源(data source)和標籤(tags)組成了一個時間軸標識,時間戳記(time)則標識具體的時間點。
注意事項
Flink中的每個TaskManager建議配置2 CPU和4GB記憶體,此配置可以充分發揮每個TaskManager的計算能力。單個TaskManager能達到1萬/s的寫入速率。
在source表分區數目足夠多的情況下,建議Flink中並發配置在16以內,寫入速率隨並發線性增長。
Flink與Tablestore執行個體必須處於同一Virtual Private Cloud。Tablestore執行個體的服務地址必須使用VPC地址。
當前支援使用此功能的地區有華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華北6(烏蘭察布)、華南1(深圳)、中國香港、德國(法蘭克福)、美國(維吉尼亞)、新加坡。
Tablestore資料結果表
Flink支援使用Tablestore時序表格儲存體輸出結果。更多資訊,請參見Table StoreTablestore連接器。
時序模型主要有_m_name
、_data_source
、_tags
、_time
四個主鍵,因此時序表作為結果表時需要指定四個主鍵,其餘配置與資料表作為結果表時的配置相同。目前支援WITH參數,SINK表主鍵和Map格式主鍵三種方式指定時序表主鍵。三種方式_tags列的轉換優先順序為WITH參數方式的優先順序最高,Map格式主鍵與SINK表主鍵方式次之。
WITH參數
使用WITH參數方式定義DDL的樣本如下:
--建立源表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_widecolume_source_table',
'tunnelName' = 'test_widecolume_source_tunnel',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'true', --是否忽略delete操作的資料。
);
--建立結果表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_timeseries_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
);
--將源表資料插入到結果表。
INSERT INTO timeseries_sink
select
measurement,
datasource,
tag_a,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f
from
timeseries_source;
WITH參數說明請參見下表。
參數 | 適用模型 | 是否必填 | 說明 |
connector | 通用參數 | 是 | 連接器類型。固定取值為ots。 |
endPoint | 通用參數 | 是 | Tablestore執行個體的服務地址,必須使用執行個體的VPC地址。更多資訊,請參見服務地址。 |
instanceName | 通用參數 | 是 | Tablestore執行個體的名稱。 |
tableName | 通用參數 | 是 | Tablestore的資料表或者時序表名稱。 資料表作為源表時填寫資料表名稱,時序表作為結果表時填寫時序表名稱。 |
tunnelName | 寬表模型 | 是 | Tablestore資料表的資料通道名稱。關於建立通道的具體操作,請參見建立資料通道。 |
accessId | 通用參數 | 是 | 阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。關於擷取AccessKey的具體操作,請參見建立AccessKey。 |
accessKey | 通用參數 | 是 | |
ignoreDelete | 寬表模型 | 否 | 是否忽略DELETE操作類型的即時資料,可選配置。預設值為false。資料表作為源表時可以根據需要配置。 |
storageType | 通用參數 | 是 | 資料存放區類型。取值範圍如下:
|
timeseriesSchema | 時序模型 | 是 | 需要指定為時序表主鍵的列。以JSON的key-value格式來指定時序表主鍵,例如 配置的主鍵類型必須與時序表中主鍵類型一致。其中tags主鍵可以支援同時包含多列。 |
SINK表主鍵
時序結果表的DDL定義樣本如下所示。主鍵定義中的第一位measurement為_m_name列,第二位datasource為_data_source列,最後一位time為time列,中間的多列為tag列。
使用SINK表主鍵方式定義DDL的樣本如下:
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_timeseries_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
);
Map格式的主鍵
對於時序Sink表主鍵,Tablestore引入了Flink的Map類型便於產生時序模型中時序表的_tags列,Map類型可以支援列的改名、簡單函數等映射操作。使用Map時必須保證其中的_tags主鍵聲明位置在第三位。
--建立源表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_widecolume_source_table',
'tunnelName' = 'test_widecolume_source_tunnel',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'true', --是否忽略delete操作的資料。
);
--建立結果表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_timeseries_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
);
--將源表資料插入到結果表。
INSERT INTO timeseries_sink
select
m_name,
data_source,
MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
`time`,
cpu_sys,
cpu_user,
disk_0,
disk_1,
disk_2,
memory_used,
net_in,
net_out
from
timeseries_source;
Realtime Compute作業開發流程
前提條件
已建立AccessKey。具體操作,請參見建立AccessKey。
已為Tablestore資料表(源表)建立資料通道。具體操作,請參見建立資料通道。
步驟一:建立作業
在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊SQL開發。
單擊建立。
在新增作業草稿對話方塊中,單擊空白的流作業草稿。
Flink全託管也為您提供了豐富的代碼模板和資料同步,每種代碼模板都為您提供了具體的使用情境、程式碼範例和使用指導。您可以直接單擊對應的模板快速地瞭解Flink產品功能和相關文法,實現您的商務邏輯,詳情請參見代碼模板和資料同步模板。
單擊下一步。
填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
flink-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊表徵圖,建立子檔案夾。
作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.4-flink-1.15
單擊建立。
步驟二:編寫作業代碼
建立一個源表(Tablestore資料表)和結果表(Tablestore時序表)的暫存資料表。
說明在生產作業中,建議您盡量減少暫存資料表的使用,直接使用中繼資料管理中已經註冊的表。
建立一個timeseries_source和timeseries_sink暫存資料表程式碼範例如下:
CREATE TEMPORARY TABLE timeseries_source ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING ) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_widecolume_source_table', 'tunnelName' = 'test_widecolume_source_tunnel', 'accessId' = 'xxxxxxxxxxx', 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', --是否忽略delete操作的資料。 'ignoreDelete' = 'true', ); CREATE TEMPORARY TABLE timeseries_sink ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING, PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED ) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_timeseries_sink_table', 'accessId' = 'xxxxxxxxxxx', 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'storageType' = 'TIMESERIES', 'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}' );
編寫作業邏輯。
將源表資料插入到結果表的程式碼範例如下:
INSERT INTO timeseries_sink select measurement, datasource, tag_a, `time`, binary_value, bool_value, double_value, long_value, string_value, tag_b, tag_c, tag_d, tag_e, tag_f from timeseries_source;
步驟三:進行更多配置
在作業開發頁面右側,單擊更多配置後,您可以填寫以下參數資訊:
引擎版本:修改您建立作業時選擇的Flink引擎版本。
說明 從VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支援同時運行多個不同引擎版本的SQL作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:- Flink 1.12版本:停止後啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12或vvr-4.0.7-flink-1.13版本後重啟作業,否則會在啟動作業時逾時報錯。
附加依賴檔案:作業中需要使用到的附加依賴,例如臨時函數等。
說明如果沒有VVR的許可權,您可以下載VVR依賴,並在資源上傳頁面進行上傳,然後選擇附加依賴檔案為上傳的VVR依賴即可。具體操作,請參見附錄:配置VVR依賴。
步驟四:進行深度檢查
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
(可選)步驟五:進行作業調試
在作業開發頁面頂部,單擊調試。
您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險,詳情請參見作業調試。
步驟六:作業部署
在作業開發頁面頂部,單擊部署,在部署新版本對話方塊,可根據需要填寫或選中相關內容,單擊確認。
Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式部署或調試作業,提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題。具體操作,請參見步驟一:建立Session叢集。
步驟七:啟動並查看Flink計算結果
如果您對作業進行了修改(例如更改SQL代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要先上線,然後停止再啟動。另外,如果作業無法複用State,希望作業全新啟動時,也需要停止後再啟動作業。作業停止詳情請參見作業停止。
在左側導覽列,單擊作業營運。
單擊目標作業名稱操作列中的啟動。
作業啟動參數配置詳情請參見作業啟動。單擊啟動後,您可以看到作業狀態變為運行中,則代表作業運行正常。
在作業營運詳情頁面,查看Flink計算結果。
在作業營運頁面,單擊目標作業名稱。
單擊作業探查。
在作業記錄頁簽,單擊運行Task Managers頁簽下的Path, ID。
單擊日誌,在頁面搜尋Sink相關的日誌資訊。
附錄:配置VVR依賴
下載VVR依賴。
上傳VVR依賴。
在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊資源管理。
單擊上傳資源,選擇要上傳的VVR依賴的JAR包。
在目標作業開發頁面附加依賴檔案項,選擇目標VVR依賴的JAR包。