すべてのプロダクト
Search
ドキュメントセンター

Tablestore:チュートリアル (TimeSeriesモデル)

最終更新日:Dec 28, 2024

このトピックでは、Realtime Compute for Apache Flinkを使用して、ワイドカラムモデルのTablestoreテーブルからTablestore時系列テーブルにデータを書き込む方法について説明します。

背景情報

TimeSeriesモデルは、時系列データの特性に基づいて設計されています。このモデルは、IoTデバイスの監視などのシナリオに適しており、デバイスによって収集されたデータやマシンの監視データを保存するために使用できます。詳細については、TimeSeriesモデルを参照してください。

TablestoreのTimeSeriesモデルでは、2次元時系列テーブルを使用して時系列データを保存します。

各行は、時系列の特定の時点におけるデータを表します。時系列識別子とタイムスタンプは、行のプライマリキーカラムであり、タイムスタンプ以下の時系列のデータポイントは、行のデータカラムです。1つの行に複数のデータカラムを含めることができます。時系列識別子は、測定値、データソース、およびタグで構成されます。タイムスタンプは、特定の時点を識別します。

使用上の注意

  • Realtime Compute for Apache Flinkの各TaskManagerの計算能力を最大限に活用するために、各TaskManagerに2つの計算ユニット(CU)と4 GBのメモリを設定することをお勧めします。TaskManagerは1秒あたり10,000行を書き込むことができます。

  • ソーステーブルのパーティション数が大きい場合は、Realtime Compute for Apache Flinkで並列度を16未満に設定することをお勧めします。書き込み速度は並列度に比例して増加します。

  • Realtime Compute for Apache FlinkインスタンスとTablestoreインスタンスは、同じ仮想プライベートクラウド(VPC)内にある必要があります。TablestoreインスタンスにはVPCエンドポイントを使用する必要があります。

  • Realtime Compute for Apache Flinkを使用して、中国(杭州)、中国(上海)、中国(北京)、中国(張家口)、中国(ウランチャブ)、中国(深セン)、中国(香港)、ドイツ(フランクフルト)、米国(バージニア)、シンガポールのリージョンで、ワイドカラムモデルのTablestoreテーブルからTablestore時系列テーブルにデータを書き込むことができます。

Tablestore結果テーブル

Realtime Compute for Apache Flinkでは、Tablestore時系列テーブルを使用して結果を保存できます。詳細については、Tablestoreコネクタを参照してください。

TimeSeriesモデルの時系列テーブルには、次のプライマリキーカラムがあります:_m_name_data_source_tags_time。時系列テーブルを結果テーブルとして使用する場合、4つのプライマリキーカラムを指定する必要があります。その他の設定は、データテーブルを結果テーブルとして使用する場合と同じです。WITH句のパラメータ、SINKテーブルのプライマリキー、およびMap形式のプライマリキーを使用して、時系列テーブルのプライマリキーカラムを指定できます。上記の3つの方法を同時に使用して時系列テーブルのプライマリキーカラムを指定する場合、WITH句のパラメータで指定されたプライマリキーカラムが最も優先されます。

WITH句のパラメータ

次のサンプルコードは、WITH句のパラメータを使用してDDL構文を定義する方法の例を示しています。

-- ソーステーブルの一時テーブルを作成します。 CREATE TEMPORARY TABLE timeseries_source (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_widecolume_source_table',    'tunnelName' = 'test_widecolume_source_tunnel',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'ignoreDelete' = 'true', // 削除操作が実行されたデータを無視するかどうかを指定します。);-- 結果テーブルの一時テーブルを作成します。 CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING,    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',    'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}');-- ソーステーブルから結果テーブルにデータを挿入します。 INSERT INTO timeseries_sink    select     measurement,    datasource,    tag_a,    `time`,    binary_value,    bool_value,    double_value,    long_value,    string_value,    tag_b,    tag_c,    tag_d,    tag_e,    tag_f    from        timeseries_source;

次の表に、WITH句のパラメータを示します。

パラメータ

適用モデル

必須

説明

connector

両方のモデル

はい

コネクタのタイプ。値はotsで、変更できません。

endPoint

両方のモデル

はい

Tablestoreインスタンスのエンドポイント。値は、インスタンスのVPCエンドポイントである必要があります。詳細については、エンドポイントを参照してください。

instanceName

両方のモデル

はい

Tablestoreインスタンスの名前。

テーブル名

両方のモデル

はい

Tablestore のデータテーブルまたは時系列テーブルの名前。

tableName

tunnelName

ワイドカラムモデル

はい

Tablestoreデータテーブルのトンネル名。トンネルの作成方法の詳細については、トンネルの作成を参照してください。

tunnelName

ワイドカラムモデル

はい

Alibaba CloudアカウントまたはRAMユーザーのAccessKey IDとAccessKeyシークレット。AccessKeyペアの取得方法の詳細については、AccessKeyペアの作成を参照してください。

アクセスキー

両方のモデル

はい

ignoreDelete

ワイドカラムモデル

いいえ

削除操作が実行されたリアルタイムデータを無視するかどうかを指定します。このパラメーターは省略可能です。既定値: false。データテーブルをソーステーブルとして使用する場合は、ビジネス要件に基づいてこのパラメーターを設定できます。

storageType

両方のモデル

はい

テーブルのタイプ。有効な値:

  • WIDE_COLUMN (デフォルト): データテーブル。

    データテーブルをソーステーブルとして使用する場合は、このパラメーターを WIDE_COLUMN に設定するか、このパラメーターを空のままにします。

  • TIMESERIES: 時系列テーブル。

    時系列テーブルを結果テーブルとして使用する場合は、このパラメーターを TIMESERIES に設定します。

accessId

両方のモデル

はい

時系列テーブルの一時テーブルの主キー列として指定する列。このパラメーターの値は、JSON 形式のキーと値のペアを使用して指定します。例: {"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

指定する主キー列の型は、時系列テーブルの主キー列の型と同じである必要があります。タグの主キー列は複数の列で構成できます。

SINKテーブルの主キー

このセクションでは、結果テーブルとして使用される時系列テーブルのDDL構文の例を示します。最初の主キー列は_m_name列で、測定名を指定します。2番目の主キー列は_data_source列で、データソースを指定します。最後の主キー列は_time列で、タイムスタンプを指定します。中間の主キー列は_tags列で、時系列のタグを指定します。

次のサンプルコードは、SINKテーブルの主キーを使用してDDL構文を定義する方法の例を示しています。

CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING    PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',);

マップ形式の主キー

結果テーブルとして使用される時系列テーブルの主キー列の場合、TablestoreはFlinkのMapデータ型を提供して、TimeSeriesモデルの時系列テーブルの_tags列の生成を容易にします。Mapデータ型は、列の名前変更や単純な関数などのマッピング操作をサポートしています。Mapを使用する場合は、_tags主キー列が3番目の位置にあることを確認してください。

-- ソーステーブルの一時テーブルを作成します。 CREATE TEMPORARY TABLE timeseries_source (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_widecolume_source_table',    'tunnelName' = 'test_widecolume_source_tunnel',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'ignoreDelete' = 'true', // 削除操作が実行されたデータを無視するかどうかを指定します。);-- 結果テーブルの一時テーブルを作成します。 CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tags Map<String, String>,     `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING,    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',);-- ソーステーブルから結果テーブルにデータを挿入します。 INSERT INTO timeseries_sink    select         m_name,        data_source,        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,        `time`,        cpu_sys,        cpu_user,        disk_0,        disk_1,        disk_2,        memory_used,        net_in,        net_out     from        timeseries_source;

リアルタイム コンピューティング ドラフトの開発

前提条件

  • AccessKeyペアが作成されます。詳細については、AccessKeyペアの作成を参照してください。

  • ソーステーブルとして使用されるTablestoreデータテーブルのトンネルが作成されます。トンネルの作成方法の詳細については、トンネルの作成を参照してください。

ステップ 1: SQL ドラフトを作成する

  1. Realtime Compute for Apache Flink コンソール (Realtime Compute for Apache Flink コンソール) にログインします。

  2. フルマネージド Flink タブで、管理するワークスペースを見つけ、アクション 列の コンソール をクリックします。

  3. 左側のナビゲーションペインで、SQL エディター をクリックします。

  4. SQL エディターページの左上隅にある 新規 をクリックします。

  5. 新規ドラフト ダイアログボックスで、空のストリームドラフト をクリックします。

    フルマネージド Flink は、さまざまなコードテンプレートとデータ同期テンプレートを提供します。各コードテンプレートは、特定のシナリオ、コードサンプル、および手順を提供します。テンプレートをクリックして、Realtime Compute for Apache Flink の機能と関連構文について学習し、ビジネスロジックを実装できます。詳細については、コードテンプレート および データ同期テンプレート を参照してください。

  6. 次へ をクリックします。

  7. ドラフトのパラメーターを構成します。次の表にパラメーターを示します。

    パラメーター

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は、現在のプロジェクト内で一意である必要があります。

    flink-test

    場所

    ドラフトのコードファイルが保存されるフォルダー。

    既存のフォルダーの右側にある 新建文件夹 アイコンをクリックして、サブフォルダーを作成することもできます。

    開発

    エンジンバージョン

    ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点については、エンジンバージョン を参照してください。

    vvr-6.0.4-flink-1.15

  8. 作成 をクリックします。

ステップ 2: ドラフトのコードを作成する

  1. Tablestore ソーステーブルと結果テーブルの一時テーブルを作成します。

    説明

    ドラフトを作成する際は、一時テーブルの使用回数を最小限に抑えることをお勧めします。また、カタログに登録されているテーブルを使用することをお勧めします。

    次のサンプルコードは、ソーステーブル用に timeseries_source という名前の一時テーブルと、結果テーブル用に timeseries_sink という名前の一時テーブルを作成する方法の例を示しています。

    CREATE TEMPORARY TABLE timeseries_source (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_widecolume_source_table',    'tunnelName' = 'test_widecolume_source_tunnel',    'accessId' = 'xxxxxxxxxxx',    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'ignoreDelete' = 'true', // 削除操作が実行されたデータを無視するかどうかを指定します。);CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING,    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' = 'xxxxxxxxxxx',    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',    'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}');
  2. ドラフトロジックを記述します。

    次のサンプルコードは、ソーステーブルから結果テーブルにデータを挿入する方法の例を示しています。

    INSERT INTO timeseries_sink    select     measurement,    datasource,    tag_a,    `time`,    binary_value,    bool_value,    double_value,    long_value,    string_value,    tag_b,    tag_c,    tag_d,    tag_e,    tag_f    from        timeseries_source;

手順 3: [構成] タブでパラメーターを構成する

SQL エディターページの右側にある構成タブをクリックし、次のパラメーターを構成します。

  • エンジンバージョン: Flinkエンジンのバージョン。ドラフトの作成時に選択したバージョンを変更できます。

    重要

    VVR 3.0.3 以降のバージョンでは、Ververica Platform (VVP) を使用して、異なるエンジンバージョンを使用する SQL ジョブを同時に実行できます。VVR 3.0.3 を使用する Flink エンジンのバージョンは Flink 1.12 です。ジョブのエンジンバージョンが Flink 1.12 以前の場合は、ジョブで使用されているエンジンバージョンに基づいて、次の操作を実行してエンジンバージョンを更新できます。

    • Flink 1.12: ジョブを停止してから再起動します。その後、システムはジョブのエンジンバージョンをvvr-3.0.3-flink-1.12に自動的に更新します。

    • Flink 1.11 または Flink 1.10: ジョブのエンジンバージョンをvvr-3.0.3-flink-1.12またはvvr-4.0.8-flink-1.13に手動で更新してから、ジョブを再起動します。そうしないと、ジョブの開始時にタイムアウトエラーが発生します。

  • 追加の依存関係: ドラフトで使用される追加の依存関係 (一時関数など)。

    説明

    Ververica Runtime (VVR) に対する権限がない場合は、VVR の依存関係をダウンロードし、[アーティファクトのアップロード] ページで VVR の依存関係をアップロードしてから、アップロードした VVR の依存関係を追加の依存関係として選択できます。詳細については、付録: VVR の依存関係の構成を参照してください。

ステップ 4: 構文チェックを実行する

SQL エディターページの右上隅にある検証をクリックして、構文チェックを実行します。

(オプション) ステップ 5: ドラフトをデバッグする

SQL エディターページの右上隅にあるデバッグをクリックします。

デバッグ機能を有効にして、デプロイ実行をシミュレートし、出力を確認し、SELECT ステートメントと INSERT ステートメントのビジネスロジックを検証できます。この機能は、開発効率を向上させ、データ品質低下のリスクを軽減します。詳細については、ドラフトのデバッグを参照してください。

ステップ 6: ドラフトをデプロイする

SQL エディターページの右上隅にあるデプロイをクリックします。ドラフトのデプロイダイアログボックスで、パラメーターを構成し、確認をクリックします。部署

説明

セッションクラスターは、開発環境やテスト環境などの非運用環境に適しています。セッションクラスターにドラフトをデプロイまたはデバッグして、JobManager のリソース使用率を向上させ、デプロイの起動を高速化できます。ただし、セッションクラスターにドラフトをデプロイしないことをお勧めします。セッションクラスターにドラフトをデプロイすると、安定性の問題が発生する可能性があります。詳細については、デプロイのデバッグトピックの「ステップ 1: セッションクラスターを作成する」の手順を参照してください。

ステップ 7: ドラフトのデプロイを開始し、計算結果を表示する

説明

デプロイの SQL コードを変更したり、WITH 句にパラメーターを追加または削除したり、デプロイのバージョンを変更したりする場合は、デプロイを再公開してキャンセルしてから、変更を有効にするためにデプロイを再起動する必要があります。デプロイが失敗し、状態データを再利用して復旧できない場合は、デプロイをキャンセルしてから再起動する必要があります。詳細については、デプロイのキャンセルを参照してください。

  1. 左側のナビゲーションペインで、デプロイをクリックします。

  2. 開始するデプロイメントを見つけ、アクション列の開始をクリックします。

    デプロイの起動パラメーターの構成方法の詳細については、デプロイの開始を参照してください。[開始] をクリックすると、デプロイの状態が実行中に変わります。これは、デプロイが想定どおりに実行されていることを示します。

  3. [デプロイ] ページで、計算結果を表示します。

    1. 左側のナビゲーションペインで、デプロイをクリックします。[デプロイ] ページで、管理するデプロイの名前をクリックします。

    2. 診断タブをクリックします。

    3. ログタブで、実行中のタスクマネージャーをクリックし、パス、ID列の値をクリックします。

    4. ログをクリックします。[ログ] タブで、シンクに関連するログを検索します。

付録: VVR 依存関係の構成

  1. VVR 依存関係をダウンロードします。

  2. VVR 依存関係をアップロードします。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. フルマネージド Flink タブで、管理するワークスペースを見つけ、アクション 列の コンソール をクリックします。

    3. 左側のナビゲーションペインで、成果物 をクリックします。

    4. 成果物をアップロード をクリックし、VVR 依存関係が格納されている JAR パッケージを選択します。

  3. ドラフトエディターページの 追加の依存関係 セクションで、VVR 依存関係が格納されている JAR パッケージを選択します。