全部產品
Search
文件中心

Tablestore:使用教程(寬表模型)

更新時間:Jun 30, 2024

Tablestore支援作為Realtime ComputeFlink的源表和結果表使用,您可以將Tablestore資料表中的資料經過Flink處理後得到的結果儲存到Tablestore的另一張資料表中。

背景資訊

Realtime ComputeFlink能將Tunnel Service的資料通道作為流式資料的輸入,每條資料類似一個JSON格式。樣本如下:

{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}

欄位名

描述

OtsRecordType

資料操作類型,取值範圍如下:

  • PUT:新增資料操作。

  • UPDATE:更新資料操作。

  • DELETE:刪除資料操作。

OtsRecordTimestamp

資料操作時間,單位為微秒。全量資料時取值為0。

PrimaryKey

主鍵列資訊,以JSON格式數組表示。支援配置1~4列,請以實際主鍵列為準。包括如下選項:

  • ColumnName:列名稱。

  • Value:列值。

Columns

屬性列資訊,以JSON格式的數組表示。包括如下選項:

  • OtsColumnType:列操作類型。取值範圍為PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION。

  • ColumnName:列名。

  • Value:列值。

    當設定OtsColumnType為DELETE_ONE_VERSION或者DELETE_ALL_VERSION時,不需要配置該參數。

Tablestore資料來源表

儲存在Tablestore中資料的主鍵和屬性列值均可以在Flink中通過資料來源表DDL以列名與相應的類型映射進行讀取。更多資訊,請參見Table StoreTablestore連接器

DDL定義

資料來源表的DDL定義樣本如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的資料.
);

除了待消費的使用者資料外,Tunnel Service返回資料中的OtsRecordType、OtsRecordTimestamp欄位均支援通過屬性欄位的方式讀取。欄位說明請參見下表。

欄位名

Flink映射名

描述

OtsRecordType

type

資料操作類型。

OtsRecordTimestamp

timestamp

資料操作時間,單位為微秒。全量資料時取值為0。

當需要讀取OtsRecordType和OtsRecordTimestamp欄位時,Flink提供了METADATA關鍵字用於擷取源表中的屬性欄位,具體DDL樣本如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

WITH參數

參數

是否必填

描述

connector

源表類型。固定取值為ots。

endPoint

Tablestore執行個體的服務地址。更多資訊,請參見服務地址

instanceName

Tablestore的執行個體名稱。

tableName

Tablestore的資料表名稱。

tunnelName

Tablestore資料表的資料通道名稱。關於建立通道的具體操作,請參見建立資料通道

accessId

阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。擷取AccessKey的具體操作,請參見擷取AccessKey

accessKey

ignoreDelete

是否忽略DELETE操作類型的即時資料。預設值為false,表示不忽略DELETE操作類型的即時資料。

skipInvalidData

是否忽略髒資料。預設值為false,表示不忽略髒資料。

如果不忽略髒資料,則處理髒資料時會進行報錯。如果需要忽略髒資料,請設定此參數為true。

源表欄位類型映射

Tablestore欄位類型

Flink欄位類型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tablestore資料結果表

Flink支援使用Tablestore儲存輸出結果。更多資訊,請參見Table StoreTablestore連接器

DDL定義

結果表的DDL定義樣本如下:

說明

在Tablestore資料結果表定義中除了主鍵列外,需要包含至少一個屬性列。

CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

WITH參數

參數

是否必填

描述

connector

結果表類型。固定取值為ots。

endPoint

Tablestore執行個體的服務地址。更多資訊,請參見服務地址

instanceName

Tablestore的執行個體名稱。

tableName

Tablestore的資料表名稱。

tunnelName

Tablestore資料表的資料通道名稱。關於建立通道的具體操作,請參見建立資料通道

accessId

阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。擷取AccessKey的具體操作,請參見擷取AccessKey

accessKey

valueColumns

指定插入的欄位列名。插入多個欄位以半形逗號(,)分隔。例如ID,NAME

bufferSize

流入多少條資料後開始輸出。預設值為5000,表示輸入的資料達到5000條就開始輸出。

batchWriteTimeoutMs

寫入逾時的時間。單位為毫秒。預設值為5000,表示如果緩衝中的資料在等待5秒後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

batchSize

一次批量寫入的條數。預設值為100。

retryIntervalMs

稍候再試時間,單位毫秒。預設值為1000。

maxRetryTimes

最大重試次數。預設值為100。

ignoreDelete

是否忽略DELETE操作類型的即時資料。預設值為false,表示不忽略DELETE操作類型的即時資料。

autoIncrementKey

當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。

defaultTimestampInMillisecond

寫入結果表的資料的版本號碼,單位為毫秒。當不進行配置時,版本號碼取決於寫入的時間。

結果表欄位類型映射

Flink欄位類型

Tablestore欄位類型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL樣本

讀取資料來源表的資料

批量從資料來源表ots source中讀取資料,SQL樣本如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的資料。
);
SELECT * FROM tablestore_stream LIMIT 100;

資料同步到結果表

ots sink資料會以updateRow的方式寫入結果表,SQL樣本如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的資料。
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Realtime Compute作業開發流程

前提條件

  • 已建立AccessKey。具體操作,請參見建立AccessKey

  • 已為Tablestore資料表(源表)建立資料通道。具體操作,請參見建立資料通道

步驟一:建立作業

  1. 登入Realtime Compute控制台

  2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊SQL開發

  4. 單擊建立

  5. 新增作業草稿對話方塊中,單擊空白的流作業草稿

    Flink全託管也為您提供了豐富的代碼模板和資料同步,每種代碼模板都為您提供了具體的使用情境、程式碼範例和使用指導。您可以直接單擊對應的模板快速地瞭解Flink產品功能和相關文法,實現您的商務邏輯,詳情請參見代碼模板資料同步模板

  6. 單擊下一步

  7. 填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    flink-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.4-flink-1.15

  8. 單擊建立

步驟二:編寫作業代碼

  1. 建立一個源表和結果表的暫存資料表。

    說明

    在生產作業中,建議您盡量減少暫存資料表的使用,直接使用中繼資料管理中已經註冊的表。

    建立一個tablestore_stream和ots_sink暫存資料表程式碼範例如下:

    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector'='ots',
        'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
        'instanceName' = 'flink-source',
        'tableName' ='flink_source_table',
        'tunnelName' = 'flinksourcestream',
        'accessId' ='xxxxxxxxxxx',
        'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'ignoreDelete' = 'false' //是否忽略delete操作的資料。
    );
    
    CREATE TEMPORARY TABLE ots_sink (
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED
    ) WITH (
        'connector'='ots',
        'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
        'instanceName'='flink-sink',
        'tableName'='flink_sink_table',
        'accessId'='xxxxxxxxxxx',
        'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'valueColumns'='customerid,customername'
    );
  2. 編寫作業邏輯。

    將源表資料插入到結果表的程式碼範例如下:

    INSERT INTO ots_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

步驟三:進行更多配置

在作業開發頁面右側,單擊更多配置後,您可以填寫以下參數資訊:

  • 引擎版本:修改您建立作業時選擇的Flink引擎版本。

    說明 從VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支援同時運行多個不同引擎版本的SQL作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:
    • Flink 1.12版本:停止後啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
    • Flink 1.11或Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12vvr-4.0.7-flink-1.13版本後重啟作業,否則會在啟動作業時逾時報錯。
  • 附加依賴檔案:作業中需要使用到的附加依賴,例如臨時函數等。

步驟四:進行深度檢查

在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

步驟五:(可選)進行作業調試

在作業開發頁面頂部,單擊調試

您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。具體操作,請參見作業調試

步驟六:作業部署

在作業開發頁面頂部,單擊部署,在部署新版本對話方塊,可根據需要填寫或選中相關內容,單擊確認部署

說明

Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式部署或調試作業,提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題。具體操作,請參見步驟一:建立Session叢集

步驟七:啟動並查看Flink計算結果

說明

如果您對作業進行了修改(例如更改SQL代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要先上線,然後停止再啟動。另外,如果作業無法複用State,希望作業全新啟動時,也需要停止後再啟動作業。關於作業停止的具體操作,請參見作業停止

  1. 在左側導覽列,單擊作業營運

  2. 單擊目標作業名稱操作列中的啟動

    作業啟動參數配置詳情請參見作業啟動。單擊啟動後,您可以看到作業狀態變為運行中,則代表作業運行正常。

  3. 在作業營運詳情頁面,查看Flink計算結果。

    1. 作業營運頁面,單擊目標作業名稱。

    2. 單擊作業探查

    3. 作業記錄頁簽,單擊運行Task Managers頁簽下的Path, ID

    4. 單擊日誌,在頁面搜尋Sink相關的日誌資訊。

      image..png