すべてのプロダクト
Search
ドキュメントセンター

Tablestore:ワイドカラムモデルのチュートリアル

最終更新日:Dec 28, 2024

Tablestore のテーブルは、Realtime Compute for Apache Flink のソーステーブルおよび結果テーブルとして使用できます。Realtime Compute for Apache Flink を使用して Tablestore テーブルのデータを処理し、結果を別の Tablestore テーブルに保存できます。

背景情報

Realtime Compute for Apache Flink では、Tunnel Service のトンネルをストリーミングデータのソースとして使用できます。各データレコードは JSON に似た形式です。例:

{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}

フィールド

説明

OtsRecordType

操作タイプ。有効な値:

  • PUT: データを追加します。

  • UPDATE: データを更新します。

  • DELETE: データを削除します。

OtsRecordTimestamp

データ操作時刻。単位: マイクロ秒。Realtime Compute for Apache Flink に全データを読み込ませる場合は、このフィールドを 0 に設定します。

PrimaryKey

プライマリキーの設定。このフィールドの値は JSON 配列です。テーブルのプライマリキー列の設定に基づいて、1 ~ 4 つのプライマリキー列を指定できます。各プライマリキー列には、次のフィールドを指定する必要があります。

  • ColumnName: プライマリキー列の名前。

  • Value: プライマリキー列の値。

Columns

属性列の設定。このフィールドの値は JSON 配列です。各属性列には、次のフィールドを指定できます。

  • OtsColumnType: 列操作のタイプ。有効な値: PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION。

  • ColumnName: 属性列の名前。

  • Value: 属性列の値。

    OtsColumnType フィールドを DELETE_ONE_VERSION または DELETE_ALL_VERSION に設定した場合、このフィールドを指定する必要はありません。

Tablestore ソーステーブル

Realtime Compute for Apache Flinkでは、ソーステーブルのDDLステートメントを使用して、TablestoreとRealtime Compute for Apache Flink間のフィールドのデータ型マッピングに基づいて、Tablestoreのソーステーブルのプライマリキーと属性列の値を読み取ることができます。詳細については、Tablestoreコネクタをご参照ください。

DDL 構文

次のサンプルコードは、ソーステーブルの DDL 構文の例を示しています。

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
);

Tunnel Service は、消費されていないユーザーデータと OtsRecordType および OtsRecordTimestamp フィールドを返し、Realtime Compute for Apache Flink がデータとフィールドを属性列として読み取れるようにします。次の表に、フィールドを示します。

フィールド

Realtime Compute for Apache Flink でマッピングされたフィールド

説明

OtsRecordType

type

データ操作タイプ。

OtsRecordTimestamp

timestamp

データ操作時刻。単位: マイクロ秒。Realtime Compute for Apache Flink に全データを読み込ませる場合は、このフィールドを 0 に設定します。

Realtime Compute for Apache Flink に OtsRecordType および OtsRecordTimestamp フィールドを読み込ませる場合は、Realtime Compute for Apache Flink が提供する METADATA キーワードを使用して、Tablestore ソーステーブルから属性フィールドを取得できます。次の例は、DDL ステートメントを示しています。

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

WITH 句のパラメータ

パラメータ

必須

説明

connector

はい

ソーステーブルのタイプ。値は ots で、変更できません。

endPoint

はい

Tablestore インスタンスのエンドポイント。詳細については、エンドポイント を参照してください。

instanceName

はい

Tablestore インスタンスの名前。

tableName

はい

Tablestore ソーステーブルの名前。

tunnelName

はい

Tablestore ソーステーブルのトンネル名。トンネルの作成方法については、トンネルの作成 を参照してください。

accessId

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレット。AccessKey ペアの取得方法については、AccessKey ペアの取得 を参照してください。

accessKey

はい

ignoreDelete

いいえ

削除操作を無視するかどうかを指定します。デフォルト値: false。削除操作は無視されません。

skipInvalidData

いいえ

不正なデータを無視するかどうかを指定します。デフォルト値: false。不正なデータは無視されません。

不正なデータが無視されない場合、システムが不正なデータを処理するときにエラーが報告されます。不正なデータを無視するには、このパラメータを true に設定します。

Tablestore と Realtime Compute for Apache Flink 間のフィールドのデータ型マッピング

Tablestore のフィールドデータ型

Realtime Compute for Apache Flink のフィールドデータ型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tablestore 結果テーブル

Realtime Compute for Apache Flink では、結果を Tablestore テーブルに格納できます。詳細については、Tablestore コネクタを参照してください。

DDL 構文

次のサンプルコードは、結果テーブルの DDL 構文の例を示しています。

説明

Tablestore 結果テーブルには、プライマリキーと 1 つ以上の属性列を指定する必要があります。

CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

WITH 句のパラメータ

パラメータ

必須

説明

connector

はい

結果テーブルのタイプ。値は ots で、変更できません。

endPoint

はい

Tablestore インスタンスのエンドポイント。詳細については、エンドポイント を参照してください。

instanceName

はい

Tablestore インスタンスの名前。

tableName

はい

Tablestore 結果テーブルの名前。

tunnelName

はい

Tablestore 結果テーブルのトンネル名。トンネルの作成方法については、トンネルの作成 を参照してください。

accessId

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレット。AccessKey ペアの取得方法については、AccessKey ペアの取得 を参照してください。

accessKey

はい

valueColumns

はい

挿入する列の名前。複数の列を挿入する場合は、列名をコンマ (,) で区切ります。例: ID,NAME

bufferSize

いいえ

データが結果テーブルに書き込まれる前にバッファに格納できるデータレコードの最大数。デフォルト値: 5000。バッファ内のデータレコード数が 5,000 に達すると、データが結果テーブルに書き込まれます。

batchWriteTimeoutMs

いいえ

書き込みタイムアウト期間。単位: ミリ秒。デフォルト値: 5000。バッファ内のデータレコード数が 5,000 ミリ秒以内に bufferSize パラメータで指定された値に達しない場合、バッファ内のすべてのデータが結果テーブルに書き込まれます。

batchSize

いいえ

一度に結果テーブルに書き込むことができるデータレコードの数。デフォルト値: 100。

retryIntervalMs

いいえ

2 回の連続する再試行の間隔。単位: ミリ秒。デフォルト値: 1000。

maxRetryTimes

いいえ

最大再試行回数。デフォルト値: 100。

ignoreDelete

いいえ

削除操作を無視するかどうかを指定します。デフォルト値: false。削除操作は無視されません。

autoIncrementKey

いいえ

自動インクリメントプライマリキー列の名前。結果テーブルに自動インクリメントプライマリキー列が含まれている場合は、このパラメータを設定して自動インクリメントプライマリキー列の名前を指定できます。

defaultTimestampInMillisecond

いいえ

結果テーブルに書き込まれるデータのバージョン番号。単位: ミリ秒。このパラメータを設定しない場合、システムはデータの書き込み時刻をバージョン番号として使用します。

Tablestore と Realtime Compute for Apache Flink 間のフィールドのデータ型マッピング

Realtime Compute for Apache Flink のフィールドデータ型

Tablestore のフィールドデータ型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL ステートメントのサンプル

ソーステーブルからデータを読み取る

次の SQL ステートメントのサンプルは、バッチでソーステーブルからデータを読み取る方法の例を示しています。

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
);
SELECT * FROM tablestore_stream LIMIT 100;

結果テーブルにデータを同期する

次の SQL ステートメントのサンプルは、updateRow 操作を呼び出して結果データを結果テーブルに書き込む方法の例を示しています。

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

リアルタイムコンピューティングジョブを開発する

前提条件

  • AccessKey ペアが作成されていること。詳細については、AccessKey ペアの作成 を参照してください。

  • Tablestore ソーステーブルのトンネルが作成されていること。トンネルの作成方法については、トンネルの作成 を参照してください。

ステップ 1: SQL ドラフトを作成する

  1. Realtime Compute for Apache Flink コンソール にログインします。

  2. フルマネージド Flink タブで、管理するワークスペースを見つけ、アクション 列の コンソール をクリックします。

  3. 左側のナビゲーションペインで、SQL エディタ をクリックします。

  4. SQL エディタページの左上隅にある 新規 をクリックします。

  5. 新規ドラフト ダイアログボックスで、空のストリームドラフト をクリックします。

    フルマネージド Flink は、さまざまなコードテンプレートとデータ同期テンプレートを提供します。各コードテンプレートは、特定のシナリオ、コードサンプル、および手順を提供します。テンプレートをクリックすると、Realtime Compute for Apache Flink の機能と関連構文について学習し、ビジネスロジックを実装できます。詳細については、コードテンプレートデータ同期テンプレート を参照してください。

  6. 次へ をクリックします。

  7. ドラフトのパラメータを設定します。次の表に、パラメータを示します。

    パラメータ

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は、現在のプロジェクト内で一意である必要があります。

    flink-test

    場所

    ドラフトのコードファイルが格納されるフォルダ。

    既存のフォルダの右側にある 新建文件夹 アイコンをクリックして、サブフォルダを作成することもできます。

    開発

    エンジンバージョン

    ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、各バージョンのライフサイクルにおける重要な時点の詳細については、エンジンバージョン を参照してください。

    vvr-6.0.4-flink-1.15

  8. 作成 をクリックします。

ステップ 2: ドラフトのコードを記述する

  1. 一時ソーステーブルと一時結果テーブルを作成します。

    説明

    ドラフトを作成する際は、一時テーブルの使用回数を最小限に抑えることをお勧めします。また、カタログに登録されているテーブルを使用することをお勧めします。

    次のサンプルコードは、tablestore_stream という名前の一時ソーステーブルと ots_sink という名前の一時結果テーブルを作成する方法の例を示しています。

    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector'='ots',
        'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
        'instanceName' = 'flink-source',
        'tableName' ='flink_source_table',
        'tunnelName' = 'flinksourcestream',
        'accessId' ='xxxxxxxxxxx',
        'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
    );
    
    CREATE TEMPORARY TABLE ots_sink (
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED
    ) WITH (
        'connector'='ots',
        'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
        'instanceName'='flink-sink',
        'tableName'='flink_sink_table',
        'accessId'='xxxxxxxxxxx',
        'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'valueColumns'='customerid,customername'
    );
  2. ドラフトロジックを記述します。

    次のサンプルコードは、ソーステーブルから結果テーブルにデータを挿入する方法の例を示しています。

    INSERT INTO ots_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

ステップ 3: [構成] タブでパラメータを設定する

SQL エディタページの右側にある 構成 タブをクリックし、次のパラメータを設定します。

  • エンジンバージョン: Flink エンジンのバージョン。ドラフトの作成時に選択したバージョンを変更できます。

    重要

    VVR 3.0.3 以降のバージョンでは、Ververica Platform (VVP) を使用して、異なるエンジンバージョンを使用する SQL ジョブを同時に実行できます。VVR 3.0.3 を使用する Flink エンジンのバージョンは Flink 1.12 です。ジョブのエンジンバージョンが Flink 1.12 以前の場合は、ジョブで使用されているエンジンバージョンに基づいて、次の操作を実行してエンジンバージョンを更新できます。

    • Flink 1.12: ジョブを停止してから再起動します。その後、システムはジョブのエンジンバージョンを vvr-3.0.3-flink-1.12 に自動的に更新します。

    • Flink 1.11 または Flink 1.10: ジョブのエンジンバージョンを vvr-3.0.3-flink-1.12 または vvr-4.0.8-flink-1.13 に手動で更新してから、ジョブを再起動します。そうしないと、ジョブの開始時にタイムアウトエラーが発生します。

  • 追加の依存関係: ドラフトで使用される追加の依存関係 (一時関数など)。

ステップ 4: 構文チェックを実行する

SQL エディタページの右上隅にある 検証 をクリックして、構文チェックを実行します。

(オプション) ステップ 5: ドラフトをデバッグする

SQL エディタページの右上隅にある デバッグ をクリックします。

デバッグ機能を有効にして、デプロイ実行をシミュレートし、出力をチェックし、SELECT ステートメントと INSERT ステートメントのビジネスロジックを検証できます。この機能により、開発効率が向上し、データ品質低下のリスクが軽減されます。詳細については、ドラフトのデバッグ を参照してください。

ステップ 6: ドラフトをデプロイする

SQL エディタページの右上隅にある デプロイ をクリックします。ドラフトのデプロイ ダイアログボックスで、パラメータを設定し、確認 をクリックします。部署

説明

セッションクラスタは、開発環境やテスト環境などの非本番環境に適しています。セッションクラスタにドラフトをデプロイまたはデバッグして、JobManager のリソース使用率を向上させ、デプロイの起動を高速化できます。ただし、セッションクラスタにドラフトをデプロイしないことをお勧めします。セッションクラスタにドラフトをデプロイすると、安定性の問題が発生する可能性があります。詳細については、デプロイのデバッグ トピックの「ステップ 1: セッションクラスタを作成する」の手順を参照してください。

ステップ 7: ドラフトのデプロイを開始し、計算結果を表示する

説明

デプロイの SQL コードを変更したり、WITH 句にパラメータを追加または削除したり、デプロイのバージョンを変更したりする場合は、デプロイを再公開およびキャンセルしてから、変更を有効にするためにデプロイを再起動する必要があります。デプロイが失敗し、状態データを再利用して復旧できない場合は、デプロイをキャンセルしてから再起動する必要があります。詳細については、デプロイのキャンセル を参照してください。

  1. 左側のナビゲーションペインで、デプロイ をクリックします。

  2. 開始するデプロイを見つけ、アクション 列の 開始 をクリックします。

    デプロイ起動パラメータの設定方法の詳細については、デプロイの開始 を参照してください。[開始] をクリックすると、デプロイステータスが 実行中 に変わります。これは、デプロイが想定どおりに実行されていることを示します。

  3. [デプロイ] ページで、計算結果を表示します。

    1. 左側のナビゲーションペインで、デプロイ をクリックします。[デプロイ] ページで、管理するデプロイの名前をクリックします。

    2. 診断 タブをクリックします。

    3. ログ タブで、実行中のタスクマネージャ をクリックし、パス、ID 列の値をクリックします。

    4. ログ をクリックします。[ログ] タブで、シンクに関連するログを検索します。