このトピックでは、Tablestore コネクタの使用方法について説明します。
背景情報
Tablestore は、大量の構造化データの保存に最適化された、テーブルベースの低コストサーバーレスストレージサービスです。 Tablestore を使用すると、ミリ秒単位でオンラインデータをクエリおよび取得し、保存されたデータを多次元で分析できます。 Tablestore は、大量の請求書、インスタントメッセージング(IM)、IoT、車載インターネット(IoV)、リスク管理、インテリジェントレコメンデーションなど、さまざまなシナリオに適しています。 Tablestore は、IoT アプリケーション向けに高度に最適化されたエンドツーエンドのストレージソリューションも提供します。 詳細については、「Tablestore とは」をご参照ください。
次の表に、Tablestore コネクタでサポートされている機能を示します。
項目 | 説明 |
実行モード | ストリーミングモード |
API タイプ | SQL API |
テーブルタイプ | ソーステーブル、ディメンションテーブル、および結果テーブル |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
結果テーブルでのデータの更新または削除 | サポートされています |
前提条件
Tablestore インスタンスが購入され、Tablestore テーブルが作成されていること。 詳細については、「Tablestore の使用」をご参照ください。
制限
Ververica Runtime(VVR)3.0.0 以降を使用する Apache Flink 用 Realtime Compute のみ、Tablestore コネクタをサポートしています。
構文
結果テーブルを作成するためのステートメント
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', // Tablestore インスタンス名 'tableName'='<yourTableName>', // Tablestore テーブル名 'accessId'='${ak_id}', // AccessKey ID 'accessKey'='${ak_secret}', // AccessKey Secret 'endPoint'='<yourEndpoint>', // エンドポイント 'valueColumns'='birthday' // 挿入するカラム名 );
説明Tablestore の結果テーブルにはプライマリキーを指定する必要があります。 最新の出力データが Tablestore の結果テーブルに追加され、テーブルデータが更新されます。
ディメンションテーブルを作成するためのステートメント
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', // エンドポイント 'instanceName'='<yourInstanceName>', // Tablestore インスタンス名 'tableName'='<yourTableName>', // Tablestore テーブル名 'accessId'='${ak_id}', // AccessKey ID 'accessKey'='${ak_secret}' // AccessKey Secret );
ソーステーブルを作成するためのステートメント
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', // エンドポイント 'instanceName' = 'flink-source', // Tablestore インスタンス名 'tableName' ='flink_source_table', // Tablestore テーブル名 'tunnelName' = 'flinksourcestream', // Tunnel 名 'accessId' ='${ak_id}', // AccessKey ID 'accessKey' ='${ak_secret}', // AccessKey Secret 'ignoreDelete' = 'false' // 削除操作を無視するかどうか );
データの消費が必要なフィールド、および Tunnel Service の戻りデータの
OtsRecordType
フィールドとOtsRecordTimestamp
フィールドは、属性カラムとして読み書きできます。 次の表に、フィールドを示します。フィールド
Apache Flink 用 Realtime Compute でのマッピングフィールド
説明
OtsRecordType
type
データ操作タイプ。
OtsRecordTimestamp
timestamp
データ操作時刻。 単位: マイクロ秒。
説明全データを読み取る場合、OtsRecordTimestamp パラメータの値は 0 に設定されます。
OtsRecordType
フィールドとOtsRecordTimestamp
フィールドを読み取る場合は、Apache Flink 用 Realtime Compute が提供する METADATA キーワードを使用して、Tablestore ソーステーブルから属性フィールドを取得できます。 次の例は、DDL ステートメントを示しています。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', // レコードタイプ record_timestamp BIGINT METADATA FROM 'timestamp' // レコードタイムスタンプ ) WITH ( ... );
WITH 句のパラメータ
共通パラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
String
はい
デフォルト値なし
値を
ots
に設定します。instanceName
Tablestore インスタンスの名前。
String
はい
デフォルト値なし
該当なし。
endPoint
Tablestore インスタンスのエンドポイント。
String
はい
デフォルト値なし
詳細については、「エンドポイント」をご参照ください。
tableName
テーブルの名前
String
はい
デフォルト値なし
該当なし。
accessId
Alibaba Cloud アカウントまたは Resource Access Management(RAM)ユーザーの AccessKey ID。
String
はい
デフォルト値なし
詳細については、「リファレンス」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。
重要AccessKey ペアを保護するために、キー管理方式を使用して AccessKey ID を設定することをお勧めします。 詳細については、「変数とキーの管理」をご参照ください。
accessKey
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。
String
はい
デフォルト値なし
詳細については、「リファレンス」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。
重要AccessKey ペアを保護するために、キー管理方式を使用して AccessKey シークレットを設定することをお勧めします。 詳細については、「変数とキーの管理」をご参照ください。
connectTimeout
Tablestore コネクタが Tablestore に接続するためのタイムアウト期間。
Integer
いいえ
30000
単位: ミリ秒。
socketTimeout
Tablestore コネクタが Tablestore に接続するためのソケットタイムアウト期間。
Integer
いいえ
30000
単位: ミリ秒。
ioThreadCount
I/O スレッドの数。
Integer
いいえ
4
該当なし。
callbackThreadPoolSize
コールバックスレッドプールのサイズ。
Integer
いいえ
4
該当なし。
ソーステーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
tunnelName
Tablestore ソーステーブルのトンネル名。
String
はい
デフォルト値なし
事前に Tablestore コンソールでトンネルを作成する必要があります。 トンネルを作成する際には、トンネル名とトンネルタイプを指定します。 トンネルタイプは、増分、フル、または差分です。 トンネルの作成方法の詳細については、「クイックスタート」トピックの「トンネルの作成」セクションをご参照ください。
ignoreDelete
削除操作を無視するかどうかを指定します。
Boolean
いいえ
false
有効な値:
true: 削除操作は無視されます。
false (デフォルト): 削除操作は無視されません。
skipInvalidData
ダーティデータを無視するかどうかを指定します。 ダーティデータが無視されない場合、システムがダーティデータを処理するときにエラーが報告されます。
Boolean
いいえ
false
有効な値:
true: ダーティデータは無視されます。
false (デフォルト): ダーティデータは無視されません。
説明VVR 8.0.4 以降を使用する Apache Flink 用 Realtime Compute のみ、このパラメータをサポートしています。
retryStrategy
再試行ポリシー。
Enum
いいえ
TIME
有効な値:
TIME: retryTimeoutMs パラメータで指定されたタイムアウト期間が終了するまで、システムは継続的に再試行します。
COUNT: retryCount パラメータで指定された最大再試行回数に達するまで、システムは継続的に再試行します。
retryCount
最大再試行回数。
Integer
いいえ
3
retryStrategy パラメータを COUNT に設定した場合、このパラメータを指定できます。
retryTimeoutMs
再試行のタイムアウト期間。
Integer
いいえ
180000
retryStrategy パラメータを TIME に設定した場合、このパラメータを指定できます。 単位: ミリ秒。
streamOriginColumnMapping
元のカラム名と関連する実際のカラム名の間のマッピング。
String
いいえ
デフォルト値なし
元のカラム名と関連する実際のカラム名をコロン(:)で区切ります。 複数のマッピングをカンマ(,)で区切ります。 例:
origin_col1:col1,origin_col2:col2
。outputSpecificRowType
特定の行タイプを渡すかどうかを指定します。
Boolean
いいえ
false
有効な値:
false: 特定の行タイプを渡しません。 すべてのデータは INSERT タイプです。
true: 特定の行タイプを渡します。 データは INSERT、DELETE、または UPDATE_AFTER タイプです。
結果テーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
retryIntervalMs
再試行間隔。
Integer
いいえ
1000
単位: ミリ秒。
maxRetryTimes
最大再試行回数。
Integer
いいえ
10
該当なし。
valueColumns
挿入するカラムの名前。
String
はい
デフォルト値なし
ID フィールドや NAME フィールドなど、複数のフィールドをカンマ(,)で区切ります。
bufferSize
結果テーブルにデータが書き込まれる前にバッファに保存できるデータレコードの最大数。
Integer
いいえ
5000
該当なし。
batchWriteTimeoutMs
書き込みタイムアウト期間。
Integer
いいえ
5000
単位: ミリ秒。 batchWriteTimeoutMs
パラメータで指定された期間内にキャッシュされたデータレコード数が上限に達しない場合、キャッシュされたすべてのデータが結果テーブルに書き込まれます。batchSize
一度に書き込むことができるデータレコードの数。
Integer
いいえ
100
最大値: 200。
ignoreDelete
削除操作を無視するかどうかを指定します。
Boolean
いいえ
False
該当なし。
autoIncrementKey
自動インクリメントプライマリキーカラムの名前。 結果テーブルに自動インクリメントプライマリキーカラムが含まれている場合、このパラメータを設定して自動インクリメントプライマリキーカラムの名前を指定できます。
String
いいえ
デフォルト値なし
結果テーブルに自動インクリメントプライマリキーカラムがない場合は、このパラメータを設定する必要はありません。
説明VVR 8.0.4 以降を使用する Apache Flink 用 Realtime Compute のみ、このパラメータをサポートしています。
overwriteMode
データ上書きモード。
Enum
いいえ
PUT
有効な値:
PUT: データは PUT モードで Tablestore テーブルに書き込まれます。
UPDATE: データは UPDATE モードで Tablestore テーブルに書き込まれます。
説明動的カラムモードでは、UPDATE モードのみがサポートされています。
defaultTimestampInMillisecond
Tablestore テーブルにデータを書き込むために使用されるデフォルトのタイムスタンプ。
Long
いいえ
-1
このパラメータを指定しない場合、現在のシステム時刻のタイムスタンプが使用されます。
dynamicColumnSink
動的カラムモードを有効にするかどうかを指定します。
Boolean
いいえ
false
動的カラムモードは、テーブルでカラムが指定されておらず、デプロイ状況に基づいてカラムがテーブルに挿入されるシナリオに適しています。 最初のいくつかのカラムは、テーブル作成ステートメントでプライマリキーとして定義されます。 最後の 2 つのカラムの最初のカラムの値はカラム名として使用され、最後のカラムの値は前のカラムの値として使用され、最後の 2 つのカラムのデータ型は STRING である必要があります。
説明動的カラムモードを有効にする場合、自動インクリメントプライマリキーカラムはサポートされておらず、overwriteMode パラメータを UPDATE に設定する必要があります。
checkSinkTableMeta
結果テーブルのメタデータを確認するかどうかを指定します。
Boolean
いいえ
true
このパラメータを true に設定すると、システムは Tablestore テーブルのプライマリキーカラムがテーブル作成ステートメントで指定されたプライマリキーと同じかどうかを確認します。
enableRequestCompression
データ書き込み中にデータ圧縮を有効にするかどうかを指定します。
Boolean
いいえ
false
該当なし。
ディメンションテーブル専用のパラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
retryIntervalMs
再試行間隔。
Integer
いいえ
1000
単位: ミリ秒。
maxRetryTimes
最大再試行回数。
Integer
いいえ
10
該当なし。
cache
キャッシュポリシー。
String
いいえ
ALL
有効な値:
None: データはキャッシュされません。
LRU: ディメンションテーブルの特定のデータのみがキャッシュされます。 システムがデータレコードを受信するたびに、システムはキャッシュを検索します。 システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。
このキャッシュポリシーを使用する場合は、cacheSize パラメータと cacheTTLMs パラメータを設定する必要があります。
ALL (デフォルト): ディメンションテーブルのすべてのデータがキャッシュされます。 ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。 これにより、ディメンションテーブルの後続のすべてのクエリでキャッシュが検索されます。 キーが存在しない場合、システムはキャッシュ内でデータレコードを見つけられません。 システムは、キャッシュエントリが期限切れになった後、キャッシュ内のすべてのデータをリロードします。
リモートテーブルのデータ量が少なく、多数の欠落キーが存在する場合は、このパラメータを ALL に設定することをお勧めします。 ソーステーブルとディメンションテーブルは、ON 句に基づいて関連付けることができません。 このキャッシュポリシーを使用する場合は、cacheTTLMs パラメータと cacheReloadTimeBlackList パラメータを設定する必要があります。
説明cache パラメータを ALL に設定する場合は、システムがディメンションテーブルからデータを非同期にロードするため、テーブルを結合するためのノードのメモリを増やす必要があります。 増加したメモリサイズは、リモートテーブルの 2 倍です。
cacheSize
キャッシュできるデータレコードの最大数。
Integer
いいえ
デフォルト値なし
cache パラメータを LRU に設定した場合、このパラメータを指定できます。
説明このパラメータの値は、キャッシュできるデータレコードの最大数です。
cacheTTLMs
キャッシュタイムアウト期間。
Integer
いいえ
デフォルト値なし
単位: ミリ秒。 cacheTTLMs パラメータの構成は、cache パラメータの値によって異なります。
cache パラメータを None に設定した場合、cacheTTLMs パラメータは空のままにすることができます。 これは、キャッシュエントリが期限切れにならないことを示します。
cache パラメータを LRU に設定した場合、cacheTTLMs パラメータはキャッシュのタイムアウト期間を指定します。 デフォルトでは、キャッシュエントリは期限切れになりません。
cache パラメータを ALL に設定した場合、cacheTTLMs パラメータはシステムがキャッシュを更新する間隔を指定します。 デフォルトでは、キャッシュはリロードされません。
cacheEmpty
空の結果をキャッシュするかどうかを指定します。
Boolean
いいえ
デフォルト値なし
true: 空の結果はキャッシュされます。
false: 空の結果はキャッシュされません。
cacheReloadTimeBlackList
キャッシュが更新されない期間。 このパラメータは、cache パラメータが ALL に設定されている場合に有効になります。 このパラメータに指定した期間中は、キャッシュは更新されません。 このパラメータは、ダブル 11 などの大規模なオンラインプロモーションイベントに適しています。
String
いいえ
デフォルト値なし
次の例は、値の形式を示しています。2017-10-24 14:00 -> 2017-10-24 15:00、2017-11-10 23:30 -> 2017-11-11 08:00。 次のルールに基づいて区切り文字を使用します。
複数の期間をカンマ(,)で区切ります。
各期間の開始時刻と終了時刻を、ハイフン(-)と閉じ山かっこ(>)の組み合わせである矢印(->)で区切ります。
async
非同期モードでデータ同期を有効にするかどうかを指定します。
Boolean
いいえ
false
true: 非同期モードでのデータ同期が有効になります。 デフォルトでは、非同期モードでデータを同期する場合、データはソートされません。
false (デフォルト): 非同期モードでのデータ同期は無効になります。
データ型マッピング
ソーステーブル
Tablestore のフィールドのデータ型
Apache Flink 用 Realtime Compute のフィールドのデータ型
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
結果テーブル
Apache Flink 用 Realtime Compute のフィールドのデータ型
Tablestore のフィールドのデータ型
BINARY
BINARY
VARBINARY BINARY
CHAR
STRING
VARCHAR STRING
TINYINT
INTEGER
SMALLINT INTEGER
INTEGER INTEGER
BIGINT INTEGER
FLOAT
DOUBLE
DOUBLE DOUBLE
BOOLEAN
BOOLEAN
サンプルコード
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>', // エンドポイント
'instanceName' = 'flink-source', // Tablestore インスタンス名
'tableName' ='flink_source_table', // Tablestore テーブル名
'tunnelName' = 'flinksourcestream', // Tunnel 名
'accessId' ='${ak_id}', // AccessKey ID
'accessKey' ='${ak_secret}', // AccessKey Secret
'ignoreDelete' = 'false', // 削除操作を無視するかどうか
'skipInvalidData' ='false' // 無効なデータをスキップするかどうか
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>', // エンドポイント
'instanceName'='flink-sink', // Tablestore インスタンス名
'tableName'='flink_sink_table', // Tablestore テーブル名
'accessId'='${ak_id}', // AccessKey ID
'accessKey'='${ak_secret}', // AccessKey Secret
'valueColumns'='customerid,customername', // 挿入するカラム名
'autoIncrementKey'='${auto_increment_primary_key_name}' // 自動インクリメントプライマリキーの名前
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;