Tablestore支援作為Realtime ComputeFlink的源表和結果表使用,您可以將Tablestore資料表中的資料經過Flink處理後得到的結果儲存到Tablestore的另一張資料表中。
背景資訊
Realtime ComputeFlink能將Tunnel Service的資料通道作為流式資料的輸入,每條資料類似一個JSON格式。樣本如下:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
欄位名 | 描述 |
OtsRecordType | 資料操作類型,取值範圍如下:
|
OtsRecordTimestamp | 資料操作時間,單位為微秒。全量資料時取值為0。 |
PrimaryKey | 主鍵列資訊,以JSON格式數組表示。支援配置1~4列,請以實際主鍵列為準。包括如下選項:
|
Columns | 屬性列資訊,以JSON格式的數組表示。包括如下選項:
|
Tablestore資料來源表
儲存在Tablestore中資料的主鍵和屬性列值均可以在Flink中通過資料來源表DDL以列名與相應的類型映射進行讀取。更多資訊,請參見Table StoreTablestore連接器。
DDL定義
資料來源表的DDL定義樣本如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的資料.
);
除了待消費的使用者資料外,Tunnel Service返回資料中的OtsRecordType、OtsRecordTimestamp欄位均支援通過屬性欄位的方式讀取。欄位說明請參見下表。
欄位名 | Flink映射名 | 描述 |
OtsRecordType | type | 資料操作類型。 |
OtsRecordTimestamp | timestamp | 資料操作時間,單位為微秒。全量資料時取值為0。 |
當需要讀取OtsRecordType和OtsRecordTimestamp欄位時,Flink提供了METADATA關鍵字用於擷取源表中的屬性欄位,具體DDL樣本如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH參數
參數 | 是否必填 | 描述 |
connector | 是 | 源表類型。固定取值為ots。 |
endPoint | 是 | Tablestore執行個體的服務地址。更多資訊,請參見服務地址。 |
instanceName | 是 | Tablestore的執行個體名稱。 |
tableName | 是 | Tablestore的資料表名稱。 |
tunnelName | 是 | Tablestore資料表的資料通道名稱。關於建立通道的具體操作,請參見建立資料通道。 |
accessId | 是 | 阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。擷取AccessKey的具體操作,請參見擷取AccessKey。 |
accessKey | 是 | |
ignoreDelete | 否 | 是否忽略DELETE操作類型的即時資料。預設值為false,表示不忽略DELETE操作類型的即時資料。 |
skipInvalidData | 否 | 是否忽略髒資料。預設值為false,表示不忽略髒資料。 如果不忽略髒資料,則處理髒資料時會進行報錯。如果需要忽略髒資料,請設定此參數為true。 |
源表欄位類型映射
Tablestore欄位類型 | Flink欄位類型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
Tablestore資料結果表
Flink支援使用Tablestore儲存輸出結果。更多資訊,請參見Table StoreTablestore連接器。
DDL定義
結果表的DDL定義樣本如下:
在Tablestore資料結果表定義中除了主鍵列外,需要包含至少一個屬性列。
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
WITH參數
參數 | 是否必填 | 描述 |
connector | 是 | 結果表類型。固定取值為ots。 |
endPoint | 是 | Tablestore執行個體的服務地址。更多資訊,請參見服務地址。 |
instanceName | 是 | Tablestore的執行個體名稱。 |
tableName | 是 | Tablestore的資料表名稱。 |
tunnelName | 是 | Tablestore資料表的資料通道名稱。關於建立通道的具體操作,請參見建立資料通道。 |
accessId | 是 | 阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。擷取AccessKey的具體操作,請參見擷取AccessKey。 |
accessKey | 是 | |
valueColumns | 是 | 指定插入的欄位列名。插入多個欄位以半形逗號(,)分隔。例如 |
bufferSize | 否 | 流入多少條資料後開始輸出。預設值為5000,表示輸入的資料達到5000條就開始輸出。 |
batchWriteTimeoutMs | 否 | 寫入逾時的時間。單位為毫秒。預設值為5000,表示如果緩衝中的資料在等待5秒後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。 |
batchSize | 否 | 一次批量寫入的條數。預設值為100。 |
retryIntervalMs | 否 | 稍候再試時間,單位毫秒。預設值為1000。 |
maxRetryTimes | 否 | 最大重試次數。預設值為100。 |
ignoreDelete | 否 | 是否忽略DELETE操作類型的即時資料。預設值為false,表示不忽略DELETE操作類型的即時資料。 |
autoIncrementKey | 否 | 當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。 |
defaultTimestampInMillisecond | 否 | 寫入結果表的資料的版本號碼,單位為毫秒。當不進行配置時,版本號碼取決於寫入的時間。 |
結果表欄位類型映射
Flink欄位類型 | Tablestore欄位類型 |
BINARY | BINARY |
VARBINARY | BINARY |
CHAR | STRING |
VARCHAR | STRING |
TINYINT | INTEGER |
SMALLINT | INTEGER |
INTEGER | INTEGER |
BIGINT | INTEGER |
FLOAT | DOUBLE |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
SQL樣本
讀取資料來源表的資料
批量從資料來源表ots source中讀取資料,SQL樣本如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的資料。
);
SELECT * FROM tablestore_stream LIMIT 100;
資料同步到結果表
ots sink資料會以updateRow的方式寫入結果表,SQL樣本如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的資料。
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Realtime Compute作業開發流程
前提條件
已建立AccessKey。具體操作,請參見建立AccessKey。
已為Tablestore資料表(源表)建立資料通道。具體操作,請參見建立資料通道。
步驟一:建立作業
在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊SQL開發。
單擊建立。
在新增作業草稿對話方塊中,單擊空白的流作業草稿。
Flink全託管也為您提供了豐富的代碼模板和資料同步,每種代碼模板都為您提供了具體的使用情境、程式碼範例和使用指導。您可以直接單擊對應的模板快速地瞭解Flink產品功能和相關文法,實現您的商務邏輯,詳情請參見代碼模板和資料同步模板。
單擊下一步。
填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
flink-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊表徵圖,建立子檔案夾。
作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.4-flink-1.15
單擊建立。
步驟二:編寫作業代碼
建立一個源表和結果表的暫存資料表。
說明在生產作業中,建議您盡量減少暫存資料表的使用,直接使用中繼資料管理中已經註冊的表。
建立一個tablestore_stream和ots_sink暫存資料表程式碼範例如下:
CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'false' //是否忽略delete操作的資料。 ); CREATE TEMPORARY TABLE ots_sink ( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED ) WITH ( 'connector'='ots', 'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com', 'instanceName'='flink-sink', 'tableName'='flink_sink_table', 'accessId'='xxxxxxxxxxx', 'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'valueColumns'='customerid,customername' );
編寫作業邏輯。
將源表資料插入到結果表的程式碼範例如下:
INSERT INTO ots_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
步驟三:進行更多配置
在作業開發頁面右側,單擊更多配置後,您可以填寫以下參數資訊:
引擎版本:修改您建立作業時選擇的Flink引擎版本。
說明 從VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支援同時運行多個不同引擎版本的SQL作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:- Flink 1.12版本:停止後啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12或vvr-4.0.7-flink-1.13版本後重啟作業,否則會在啟動作業時逾時報錯。
附加依賴檔案:作業中需要使用到的附加依賴,例如臨時函數等。
步驟四:進行深度檢查
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
步驟五:(可選)進行作業調試
在作業開發頁面頂部,單擊調試。
您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。具體操作,請參見作業調試。
步驟六:作業部署
在作業開發頁面頂部,單擊部署,在部署新版本對話方塊,可根據需要填寫或選中相關內容,單擊確認。
Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式部署或調試作業,提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題。具體操作,請參見步驟一:建立Session叢集。
步驟七:啟動並查看Flink計算結果
如果您對作業進行了修改(例如更改SQL代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要先上線,然後停止再啟動。另外,如果作業無法複用State,希望作業全新啟動時,也需要停止後再啟動作業。關於作業停止的具體操作,請參見作業停止。
在左側導覽列,單擊作業營運。
單擊目標作業名稱操作列中的啟動。
作業啟動參數配置詳情請參見作業啟動。單擊啟動後,您可以看到作業狀態變為運行中,則代表作業運行正常。
在作業營運詳情頁面,查看Flink計算結果。
在作業營運頁面,單擊目標作業名稱。
單擊作業探查。
在作業記錄頁簽,單擊運行Task Managers頁簽下的Path, ID。
單擊日誌,在頁面搜尋Sink相關的日誌資訊。