Tablestore のテーブルは、Realtime Compute for Apache Flink のソーステーブルおよび結果テーブルとして使用できます。Realtime Compute for Apache Flink を使用して Tablestore テーブルのデータを処理し、結果を別の Tablestore テーブルに保存できます。
背景情報
Realtime Compute for Apache Flink では、Tunnel Service のトンネルをストリーミングデータのソースとして使用できます。各データレコードは JSON に似た形式です。例:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
フィールド | 説明 |
OtsRecordType | 操作タイプ。有効な値:
|
OtsRecordTimestamp | データ操作時刻。単位: マイクロ秒。Realtime Compute for Apache Flink に全データを読み込ませる場合は、このフィールドを 0 に設定します。 |
PrimaryKey | プライマリキーの設定。このフィールドの値は JSON 配列です。テーブルのプライマリキー列の設定に基づいて、1 ~ 4 つのプライマリキー列を指定できます。各プライマリキー列には、次のフィールドを指定する必要があります。
|
Columns | 属性列の設定。このフィールドの値は JSON 配列です。各属性列には、次のフィールドを指定できます。
|
Tablestore ソーステーブル
Realtime Compute for Apache Flinkでは、ソーステーブルのDDLステートメントを使用して、TablestoreとRealtime Compute for Apache Flink間のフィールドのデータ型マッピングに基づいて、Tablestoreのソーステーブルのプライマリキーと属性列の値を読み取ることができます。詳細については、Tablestoreコネクタをご参照ください。
DDL 構文
次のサンプルコードは、ソーステーブルの DDL 構文の例を示しています。
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
);
Tunnel Service は、消費されていないユーザーデータと OtsRecordType および OtsRecordTimestamp フィールドを返し、Realtime Compute for Apache Flink がデータとフィールドを属性列として読み取れるようにします。次の表に、フィールドを示します。
フィールド | Realtime Compute for Apache Flink でマッピングされたフィールド | 説明 |
OtsRecordType | type | データ操作タイプ。 |
OtsRecordTimestamp | timestamp | データ操作時刻。単位: マイクロ秒。Realtime Compute for Apache Flink に全データを読み込ませる場合は、このフィールドを 0 に設定します。 |
Realtime Compute for Apache Flink に OtsRecordType および OtsRecordTimestamp フィールドを読み込ませる場合は、Realtime Compute for Apache Flink が提供する METADATA キーワードを使用して、Tablestore ソーステーブルから属性フィールドを取得できます。次の例は、DDL ステートメントを示しています。
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH 句のパラメータ
パラメータ | 必須 | 説明 |
connector | はい | ソーステーブルのタイプ。値は ots で、変更できません。 |
endPoint | はい | Tablestore インスタンスのエンドポイント。詳細については、エンドポイント を参照してください。 |
instanceName | はい | Tablestore インスタンスの名前。 |
tableName | はい | Tablestore ソーステーブルの名前。 |
tunnelName | はい | Tablestore ソーステーブルのトンネル名。トンネルの作成方法については、トンネルの作成 を参照してください。 |
accessId | はい | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレット。AccessKey ペアの取得方法については、AccessKey ペアの取得 を参照してください。 |
accessKey | はい | |
ignoreDelete | いいえ | 削除操作を無視するかどうかを指定します。デフォルト値: false。削除操作は無視されません。 |
skipInvalidData | いいえ | 不正なデータを無視するかどうかを指定します。デフォルト値: false。不正なデータは無視されません。 不正なデータが無視されない場合、システムが不正なデータを処理するときにエラーが報告されます。不正なデータを無視するには、このパラメータを true に設定します。 |
Tablestore と Realtime Compute for Apache Flink 間のフィールドのデータ型マッピング
Tablestore のフィールドデータ型 | Realtime Compute for Apache Flink のフィールドデータ型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
Tablestore 結果テーブル
Realtime Compute for Apache Flink では、結果を Tablestore テーブルに格納できます。詳細については、Tablestore コネクタを参照してください。
DDL 構文
次のサンプルコードは、結果テーブルの DDL 構文の例を示しています。
Tablestore 結果テーブルには、プライマリキーと 1 つ以上の属性列を指定する必要があります。
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
WITH 句のパラメータ
パラメータ | 必須 | 説明 |
connector | はい | 結果テーブルのタイプ。値は ots で、変更できません。 |
endPoint | はい | Tablestore インスタンスのエンドポイント。詳細については、エンドポイント を参照してください。 |
instanceName | はい | Tablestore インスタンスの名前。 |
tableName | はい | Tablestore 結果テーブルの名前。 |
tunnelName | はい | Tablestore 結果テーブルのトンネル名。トンネルの作成方法については、トンネルの作成 を参照してください。 |
accessId | はい | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレット。AccessKey ペアの取得方法については、AccessKey ペアの取得 を参照してください。 |
accessKey | はい | |
valueColumns | はい | 挿入する列の名前。複数の列を挿入する場合は、列名をコンマ (,) で区切ります。例: |
bufferSize | いいえ | データが結果テーブルに書き込まれる前にバッファに格納できるデータレコードの最大数。デフォルト値: 5000。バッファ内のデータレコード数が 5,000 に達すると、データが結果テーブルに書き込まれます。 |
batchWriteTimeoutMs | いいえ | 書き込みタイムアウト期間。単位: ミリ秒。デフォルト値: 5000。バッファ内のデータレコード数が 5,000 ミリ秒以内に bufferSize パラメータで指定された値に達しない場合、バッファ内のすべてのデータが結果テーブルに書き込まれます。 |
batchSize | いいえ | 一度に結果テーブルに書き込むことができるデータレコードの数。デフォルト値: 100。 |
retryIntervalMs | いいえ | 2 回の連続する再試行の間隔。単位: ミリ秒。デフォルト値: 1000。 |
maxRetryTimes | いいえ | 最大再試行回数。デフォルト値: 100。 |
ignoreDelete | いいえ | 削除操作を無視するかどうかを指定します。デフォルト値: false。削除操作は無視されません。 |
autoIncrementKey | いいえ | 自動インクリメントプライマリキー列の名前。結果テーブルに自動インクリメントプライマリキー列が含まれている場合は、このパラメータを設定して自動インクリメントプライマリキー列の名前を指定できます。 |
defaultTimestampInMillisecond | いいえ | 結果テーブルに書き込まれるデータのバージョン番号。単位: ミリ秒。このパラメータを設定しない場合、システムはデータの書き込み時刻をバージョン番号として使用します。 |
Tablestore と Realtime Compute for Apache Flink 間のフィールドのデータ型マッピング
Realtime Compute for Apache Flink のフィールドデータ型 | Tablestore のフィールドデータ型 |
BINARY | BINARY |
VARBINARY | BINARY |
CHAR | STRING |
VARCHAR | STRING |
TINYINT | INTEGER |
SMALLINT | INTEGER |
INTEGER | INTEGER |
BIGINT | INTEGER |
FLOAT | DOUBLE |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
SQL ステートメントのサンプル
ソーステーブルからデータを読み取る
次の SQL ステートメントのサンプルは、バッチでソーステーブルからデータを読み取る方法の例を示しています。
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
);
SELECT * FROM tablestore_stream LIMIT 100;
結果テーブルにデータを同期する
次の SQL ステートメントのサンプルは、updateRow 操作を呼び出して結果データを結果テーブルに書き込む方法の例を示しています。
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
リアルタイムコンピューティングジョブを開発する
前提条件
AccessKey ペアが作成されていること。詳細については、AccessKey ペアの作成 を参照してください。
Tablestore ソーステーブルのトンネルが作成されていること。トンネルの作成方法については、トンネルの作成 を参照してください。
ステップ 1: SQL ドラフトを作成する
Realtime Compute for Apache Flink コンソール にログインします。
フルマネージド Flink タブで、管理するワークスペースを見つけ、アクション 列の コンソール をクリックします。
左側のナビゲーションペインで、SQL エディタ をクリックします。
SQL エディタページの左上隅にある 新規 をクリックします。
新規ドラフト ダイアログボックスで、空のストリームドラフト をクリックします。
フルマネージド Flink は、さまざまなコードテンプレートとデータ同期テンプレートを提供します。各コードテンプレートは、特定のシナリオ、コードサンプル、および手順を提供します。テンプレートをクリックすると、Realtime Compute for Apache Flink の機能と関連構文について学習し、ビジネスロジックを実装できます。詳細については、コードテンプレート と データ同期テンプレート を参照してください。
次へ をクリックします。
ドラフトのパラメータを設定します。次の表に、パラメータを示します。
パラメータ
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は、現在のプロジェクト内で一意である必要があります。
flink-test
場所
ドラフトのコードファイルが格納されるフォルダ。
既存のフォルダの右側にある アイコンをクリックして、サブフォルダを作成することもできます。
開発
エンジンバージョン
ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、各バージョンのライフサイクルにおける重要な時点の詳細については、エンジンバージョン を参照してください。
vvr-6.0.4-flink-1.15
作成 をクリックします。
ステップ 2: ドラフトのコードを記述する
一時ソーステーブルと一時結果テーブルを作成します。
説明ドラフトを作成する際は、一時テーブルの使用回数を最小限に抑えることをお勧めします。また、カタログに登録されているテーブルを使用することをお勧めします。
次のサンプルコードは、tablestore_stream という名前の一時ソーステーブルと ots_sink という名前の一時結果テーブルを作成する方法の例を示しています。
CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'false' // 削除操作を無視するかどうかを指定します。 ); CREATE TEMPORARY TABLE ots_sink ( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED ) WITH ( 'connector'='ots', 'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com', 'instanceName'='flink-sink', 'tableName'='flink_sink_table', 'accessId'='xxxxxxxxxxx', 'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'valueColumns'='customerid,customername' );
ドラフトロジックを記述します。
次のサンプルコードは、ソーステーブルから結果テーブルにデータを挿入する方法の例を示しています。
INSERT INTO ots_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
ステップ 3: [構成] タブでパラメータを設定する
SQL エディタページの右側にある 構成 タブをクリックし、次のパラメータを設定します。
エンジンバージョン: Flink エンジンのバージョン。ドラフトの作成時に選択したバージョンを変更できます。
重要VVR 3.0.3 以降のバージョンでは、Ververica Platform (VVP) を使用して、異なるエンジンバージョンを使用する SQL ジョブを同時に実行できます。VVR 3.0.3 を使用する Flink エンジンのバージョンは Flink 1.12 です。ジョブのエンジンバージョンが Flink 1.12 以前の場合は、ジョブで使用されているエンジンバージョンに基づいて、次の操作を実行してエンジンバージョンを更新できます。
Flink 1.12: ジョブを停止してから再起動します。その後、システムはジョブのエンジンバージョンを vvr-3.0.3-flink-1.12 に自動的に更新します。
Flink 1.11 または Flink 1.10: ジョブのエンジンバージョンを vvr-3.0.3-flink-1.12 または vvr-4.0.8-flink-1.13 に手動で更新してから、ジョブを再起動します。そうしないと、ジョブの開始時にタイムアウトエラーが発生します。
追加の依存関係: ドラフトで使用される追加の依存関係 (一時関数など)。
ステップ 4: 構文チェックを実行する
SQL エディタページの右上隅にある 検証 をクリックして、構文チェックを実行します。
(オプション) ステップ 5: ドラフトをデバッグする
SQL エディタページの右上隅にある デバッグ をクリックします。
デバッグ機能を有効にして、デプロイ実行をシミュレートし、出力をチェックし、SELECT ステートメントと INSERT ステートメントのビジネスロジックを検証できます。この機能により、開発効率が向上し、データ品質低下のリスクが軽減されます。詳細については、ドラフトのデバッグ を参照してください。
ステップ 6: ドラフトをデプロイする
SQL エディタページの右上隅にある デプロイ をクリックします。ドラフトのデプロイ ダイアログボックスで、パラメータを設定し、確認 をクリックします。
セッションクラスタは、開発環境やテスト環境などの非本番環境に適しています。セッションクラスタにドラフトをデプロイまたはデバッグして、JobManager のリソース使用率を向上させ、デプロイの起動を高速化できます。ただし、セッションクラスタにドラフトをデプロイしないことをお勧めします。セッションクラスタにドラフトをデプロイすると、安定性の問題が発生する可能性があります。詳細については、デプロイのデバッグ トピックの「ステップ 1: セッションクラスタを作成する」の手順を参照してください。
ステップ 7: ドラフトのデプロイを開始し、計算結果を表示する
デプロイの SQL コードを変更したり、WITH 句にパラメータを追加または削除したり、デプロイのバージョンを変更したりする場合は、デプロイを再公開およびキャンセルしてから、変更を有効にするためにデプロイを再起動する必要があります。デプロイが失敗し、状態データを再利用して復旧できない場合は、デプロイをキャンセルしてから再起動する必要があります。詳細については、デプロイのキャンセル を参照してください。
左側のナビゲーションペインで、デプロイ をクリックします。
開始するデプロイを見つけ、アクション 列の 開始 をクリックします。
デプロイ起動パラメータの設定方法の詳細については、デプロイの開始 を参照してください。[開始] をクリックすると、デプロイステータスが 実行中 に変わります。これは、デプロイが想定どおりに実行されていることを示します。
[デプロイ] ページで、計算結果を表示します。
左側のナビゲーションペインで、デプロイ をクリックします。[デプロイ] ページで、管理するデプロイの名前をクリックします。
診断 タブをクリックします。
ログ タブで、実行中のタスクマネージャ をクリックし、パス、ID 列の値をクリックします。
ログ をクリックします。[ログ] タブで、シンクに関連するログを検索します。