全部產品
Search
文件中心

Tablestore:使用教程(時序模型)

更新時間:Oct 25, 2024

本文以Table StoreTablestore中的寬表作為上遊資料來源為例介紹如何使用Realtime ComputeFlink寫資料到Tablestore的時序表中。

背景資訊

Tablestore的時序模型是針對時間序列資料的特點進行設計,適用於物聯網裝置監控、裝置採集資料、機器監控資料等情境。更多資訊,請參見時序模型介紹

在Tablestore的時序模型中,採用一張二維的時序表來儲存時序資料。

每行代表一個時間軸在某個時間點的資料,該行的主鍵部分為時間軸標識和時間戳記,該行的資料列部分為該時間軸在該時間戳記下的資料點,可以有多個資料列。其中度量名稱(measurement)、資料來源(data source)和標籤(tags)組成了一個時間軸標識,時間戳記(time)則標識具體的時間點。

注意事項

  • Flink中的每個TaskManager建議配置2 CPU和4GB記憶體,此配置可以充分發揮每個TaskManager的計算能力。單個TaskManager能達到1萬/s的寫入速率。

  • 在source表分區數目足夠多的情況下,建議Flink中並發配置在16以內,寫入速率隨並發線性增長。

  • Flink與Tablestore執行個體必須處於同一Virtual Private Cloud。Tablestore執行個體的服務地址必須使用VPC地址。

  • 當前支援使用此功能的地區有華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華北6(烏蘭察布)、華南1(深圳)、中國香港、德國(法蘭克福)、美國(維吉尼亞)、新加坡。

Tablestore資料結果表

Flink支援使用Tablestore時序表格儲存體輸出結果。更多資訊,請參見Table StoreTablestore連接器

時序模型主要有_m_name_data_source_tags_time四個主鍵,因此時序表作為結果表時需要指定四個主鍵,其餘配置與資料表作為結果表時的配置相同。目前支援WITH參數,SINK表主鍵和Map格式主鍵三種方式指定時序表主鍵。三種方式_tags列的轉換優先順序為WITH參數方式的優先順序最高,Map格式主鍵與SINK表主鍵方式次之。

WITH參數

使用WITH參數方式定義DDL的樣本如下:

--建立源表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_widecolume_source_table',
    'tunnelName' = 'test_widecolume_source_tunnel',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'true', --是否忽略delete操作的資料。
);

--建立結果表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_timeseries_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
);

--將源表資料插入到結果表。
INSERT INTO timeseries_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from
        timeseries_source;

WITH參數說明請參見下表。

參數

適用模型

是否必填

說明

connector

通用參數

連接器類型。固定取值為ots。

endPoint

通用參數

Tablestore執行個體的服務地址,必須使用執行個體的VPC地址。更多資訊,請參見服務地址

instanceName

通用參數

Tablestore執行個體的名稱。

tableName

通用參數

Tablestore的資料表或者時序表名稱。

資料表作為源表時填寫資料表名稱,時序表作為結果表時填寫時序表名稱。

tunnelName

寬表模型

Tablestore資料表的資料通道名稱。關於建立通道的具體操作,請參見建立資料通道

accessId

通用參數

阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。關於擷取AccessKey的具體操作,請參見建立AccessKey

accessKey

通用參數

ignoreDelete

寬表模型

是否忽略DELETE操作類型的即時資料,可選配置。預設值為false。資料表作為源表時可以根據需要配置。

storageType

通用參數

資料存放區類型。取值範圍如下:

  • WIDE_COLUMN(預設):資料表

    當資料表作為源表時,不配置此參數或者配置此參數為WIDE_COLUMN。

  • TIMESERIES:時序表

    當時序表作為結果表時,必須配置為TIMESERIES。

timeseriesSchema

時序模型

需要指定為時序表主鍵的列。以JSON的key-value格式來指定時序表主鍵,例如{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

配置的主鍵類型必須與時序表中主鍵類型一致。其中tags主鍵可以支援同時包含多列。

SINK表主鍵

時序結果表的DDL定義樣本如下所示。主鍵定義中的第一位measurement為_m_name列,第二位datasource為_data_source列,最後一位time為time列,中間的多列為tag列。

使用SINK表主鍵方式定義DDL的樣本如下:

CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_timeseries_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
);

Map格式的主鍵

對於時序Sink表主鍵,Tablestore引入了Flink的Map類型便於產生時序模型中時序表的_tags列,Map類型可以支援列的改名、簡單函數等映射操作。使用Map時必須保證其中的_tags主鍵聲明位置在第三位。

--建立源表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_widecolume_source_table',
    'tunnelName' = 'test_widecolume_source_tunnel',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'true', --是否忽略delete操作的資料。
);
--建立結果表的暫存資料表。
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_timeseries_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
);

--將源表資料插入到結果表。
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;

Realtime Compute作業開發流程

前提條件

  • 已建立AccessKey。具體操作,請參見建立AccessKey

  • 已為Tablestore資料表(源表)建立資料通道。具體操作,請參見建立資料通道

步驟一:建立作業

  1. 登入Realtime Compute控制台

  2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊SQL開發

  4. 單擊建立

  5. 新增作業草稿對話方塊中,單擊空白的流作業草稿

    Flink全託管也為您提供了豐富的代碼模板和資料同步,每種代碼模板都為您提供了具體的使用情境、程式碼範例和使用指導。您可以直接單擊對應的模板快速地瞭解Flink產品功能和相關文法,實現您的商務邏輯,詳情請參見代碼模板資料同步模板

  6. 單擊下一步

  7. 填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    flink-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.4-flink-1.15

  8. 單擊建立

步驟二:編寫作業代碼

  1. 建立一個源表(Tablestore資料表)和結果表(Tablestore時序表)的暫存資料表。

    說明

    在生產作業中,建議您盡量減少暫存資料表的使用,直接使用中繼資料管理中已經註冊的表。

    建立一個timeseries_source和timeseries_sink暫存資料表程式碼範例如下:

    CREATE TEMPORARY TABLE timeseries_source (
        measurement STRING,
        datasource STRING,
        tag_a STRING,
        `time` BIGINT,
        binary_value BINARY,
        bool_value BOOLEAN,
        double_value DOUBLE,
        long_value BIGINT,
        string_value STRING,
        tag_b STRING,
        tag_c STRING,
        tag_d STRING,
        tag_e STRING,
        tag_f STRING
    ) 
    WITH (
        'connector' = 'ots',
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
        'instanceName' = 'xxx',
        'tableName' = 'test_widecolume_source_table',
        'tunnelName' = 'test_widecolume_source_tunnel',
        'accessId' = 'xxxxxxxxxxx',
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        --是否忽略delete操作的資料。
        'ignoreDelete' = 'true', 
    );
    
    CREATE TEMPORARY TABLE timeseries_sink (
        measurement STRING,
        datasource STRING,
        tag_a STRING,
        `time` BIGINT,
        binary_value BINARY,
        bool_value BOOLEAN,
        double_value DOUBLE,
        long_value BIGINT,
        string_value STRING,
        tag_b STRING,
        tag_c STRING,
        tag_d STRING,
        tag_e STRING,
        tag_f STRING,
        PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
    ) 
    WITH (
        'connector' = 'ots',
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
        'instanceName' = 'xxx',
        'tableName' = 'test_timeseries_sink_table',
        'accessId' = 'xxxxxxxxxxx',
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'storageType' = 'TIMESERIES',
        'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
    );
  2. 編寫作業邏輯。

    將源表資料插入到結果表的程式碼範例如下:

    INSERT INTO timeseries_sink
        select 
        measurement,
        datasource,
        tag_a,
        `time`,
        binary_value,
        bool_value,
        double_value,
        long_value,
        string_value,
        tag_b,
        tag_c,
        tag_d,
        tag_e,
        tag_f
        from
            timeseries_source;

步驟三:進行更多配置

在作業開發頁面右側,單擊更多配置後,您可以填寫以下參數資訊:

  • 引擎版本:修改您建立作業時選擇的Flink引擎版本。

    說明 從VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支援同時運行多個不同引擎版本的SQL作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:
    • Flink 1.12版本:停止後啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
    • Flink 1.11或Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12vvr-4.0.7-flink-1.13版本後重啟作業,否則會在啟動作業時逾時報錯。
  • 附加依賴檔案:作業中需要使用到的附加依賴,例如臨時函數等。

    說明

    如果沒有VVR的許可權,您可以下載VVR依賴,並在資源上傳頁面進行上傳,然後選擇附加依賴檔案為上傳的VVR依賴即可。具體操作,請參見附錄:配置VVR依賴

步驟四:進行深度檢查

在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

(可選)步驟五:進行作業調試

在作業開發頁面頂部,單擊調試

您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險,詳情請參見作業調試

步驟六:作業部署

在作業開發頁面頂部,單擊部署,在部署新版本對話方塊,可根據需要填寫或選中相關內容,單擊確認部署

說明

Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式部署或調試作業,提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題。具體操作,請參見步驟一:建立Session叢集

步驟七:啟動並查看Flink計算結果

說明

如果您對作業進行了修改(例如更改SQL代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要先上線,然後停止再啟動。另外,如果作業無法複用State,希望作業全新啟動時,也需要停止後再啟動作業。作業停止詳情請參見作業停止

  1. 在左側導覽列,單擊作業營運

  2. 單擊目標作業名稱操作列中的啟動

    作業啟動參數配置詳情請參見作業啟動。單擊啟動後,您可以看到作業狀態變為運行中,則代表作業運行正常。

  3. 在作業營運詳情頁面,查看Flink計算結果。

    1. 作業營運頁面,單擊目標作業名稱。

    2. 單擊作業探查

    3. 作業記錄頁簽,單擊運行Task Managers頁簽下的Path, ID

    4. 單擊日誌,在頁面搜尋Sink相關的日誌資訊。

附錄:配置VVR依賴

  1. 下載VVR依賴

  2. 上傳VVR依賴。

    1. 登入Realtime Compute控制台

    2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    3. 在左側導覽列,單擊資源管理

    4. 單擊上傳資源,選擇要上傳的VVR依賴的JAR包。

  3. 在目標作業開發頁面附加依賴檔案項,選擇目標VVR依賴的JAR包。