すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Tablestore コネクタ

最終更新日:Jan 07, 2025

このトピックでは、Tablestore コネクタの使用方法について説明します。

背景情報

Tablestore は、大量の構造化データの保存に最適化された、テーブルベースの低コストサーバーレスストレージサービスです。 Tablestore を使用すると、ミリ秒単位でオンラインデータをクエリおよび取得し、保存されたデータを多次元で分析できます。 Tablestore は、大量の請求書、インスタントメッセージング(IM)、IoT、車載インターネット(IoV)、リスク管理、インテリジェントレコメンデーションなど、さまざまなシナリオに適しています。 Tablestore は、IoT アプリケーション向けに高度に最適化されたエンドツーエンドのストレージソリューションも提供します。 詳細については、「Tablestore とは」をご参照ください。

次の表に、Tablestore コネクタでサポートされている機能を示します。

項目

説明

実行モード

ストリーミングモード

API タイプ

SQL API

テーブルタイプ

ソーステーブル、ディメンションテーブル、および結果テーブル

データ形式

該当なし

メトリック

  • ソーステーブルのメトリック: なし

  • ディメンションテーブルのメトリック: なし

  • 結果テーブルのメトリック:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「メトリック」をご参照ください。

結果テーブルでのデータの更新または削除

サポートされています

前提条件

Tablestore インスタンスが購入され、Tablestore テーブルが作成されていること。 詳細については、「Tablestore の使用」をご参照ください。

制限

Ververica Runtime(VVR)3.0.0 以降を使用する Apache Flink 用 Realtime Compute のみ、Tablestore コネクタをサポートしています。

構文

  • 結果テーブルを作成するためのステートメント

    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',  // Tablestore インスタンス名
      'tableName'='<yourTableName>', // Tablestore テーブル名
      'accessId'='${ak_id}', // AccessKey ID
      'accessKey'='${ak_secret}', // AccessKey Secret
      'endPoint'='<yourEndpoint>', // エンドポイント
      'valueColumns'='birthday' // 挿入するカラム名
    );
    説明

    Tablestore の結果テーブルにはプライマリキーを指定する必要があります。 最新の出力データが Tablestore の結果テーブルに追加され、テーブルデータが更新されます。

  • ディメンションテーブルを作成するためのステートメント

    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>', // エンドポイント
      'instanceName'='<yourInstanceName>', // Tablestore インスタンス名
      'tableName'='<yourTableName>', // Tablestore テーブル名
      'accessId'='${ak_id}', // AccessKey ID
      'accessKey'='${ak_secret}' // AccessKey Secret
    );
  • ソーステーブルを作成するためのステートメント

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>', // エンドポイント
      'instanceName' = 'flink-source', // Tablestore インスタンス名
      'tableName' ='flink_source_table', // Tablestore テーブル名
      'tunnelName' = 'flinksourcestream', // Tunnel 名
      'accessId' ='${ak_id}', // AccessKey ID
      'accessKey' ='${ak_secret}', // AccessKey Secret
      'ignoreDelete' = 'false' // 削除操作を無視するかどうか
    );

    データの消費が必要なフィールド、および Tunnel Service の戻りデータの OtsRecordType フィールドと OtsRecordTimestamp フィールドは、属性カラムとして読み書きできます。 次の表に、フィールドを示します。

    フィールド

    Apache Flink 用 Realtime Compute でのマッピングフィールド

    説明

    OtsRecordType

    type

    データ操作タイプ。

    OtsRecordTimestamp

    timestamp

    データ操作時刻。 単位: マイクロ秒。

    説明

    全データを読み取る場合、OtsRecordTimestamp パラメータの値は 0 に設定されます。

    OtsRecordType フィールドと OtsRecordTimestamp フィールドを読み取る場合は、Apache Flink 用 Realtime Compute が提供する METADATA キーワードを使用して、Tablestore ソーステーブルから属性フィールドを取得できます。 次の例は、DDL ステートメントを示しています。

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',  // レコードタイプ
      record_timestamp BIGINT METADATA FROM 'timestamp' // レコードタイムスタンプ
    ) WITH (
      ...
    );

WITH 句のパラメータ

  • 共通パラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    String

    はい

    デフォルト値なし

    値を ots に設定します。

    instanceName

    Tablestore インスタンスの名前。

    String

    はい

    デフォルト値なし

    該当なし。

    endPoint

    Tablestore インスタンスのエンドポイント。

    String

    はい

    デフォルト値なし

    詳細については、「エンドポイント」をご参照ください。

    tableName

    テーブルの名前

    String

    はい

    デフォルト値なし

    該当なし。

    accessId

    Alibaba Cloud アカウントまたは Resource Access Management(RAM)ユーザーの AccessKey ID。

    String

    はい

    デフォルト値なし

    詳細については、「リファレンス」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。

    重要

    AccessKey ペアを保護するために、キー管理方式を使用して AccessKey ID を設定することをお勧めします。 詳細については、「変数とキーの管理」をご参照ください。

    accessKey

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。

    String

    はい

    デフォルト値なし

    詳細については、「リファレンス」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。

    重要

    AccessKey ペアを保護するために、キー管理方式を使用して AccessKey シークレットを設定することをお勧めします。 詳細については、「変数とキーの管理」をご参照ください。

    connectTimeout

    Tablestore コネクタが Tablestore に接続するためのタイムアウト期間。

    Integer

    いいえ

    30000

    単位: ミリ秒。

    socketTimeout

    Tablestore コネクタが Tablestore に接続するためのソケットタイムアウト期間。

    Integer

    いいえ

    30000

    単位: ミリ秒。

    ioThreadCount

    I/O スレッドの数。

    Integer

    いいえ

    4

    該当なし。

    callbackThreadPoolSize

    コールバックスレッドプールのサイズ。

    Integer

    いいえ

    4

    該当なし。

  • ソーステーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    tunnelName

    Tablestore ソーステーブルのトンネル名。

    String

    はい

    デフォルト値なし

    事前に Tablestore コンソールでトンネルを作成する必要があります。 トンネルを作成する際には、トンネル名とトンネルタイプを指定します。 トンネルタイプは、増分、フル、または差分です。 トンネルの作成方法の詳細については、「クイックスタート」トピックの「トンネルの作成」セクションをご参照ください。

    ignoreDelete

    削除操作を無視するかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: 削除操作は無視されます。

    • false (デフォルト): 削除操作は無視されません。

    skipInvalidData

    ダーティデータを無視するかどうかを指定します。 ダーティデータが無視されない場合、システムがダーティデータを処理するときにエラーが報告されます。

    Boolean

    いいえ

    false

    有効な値:

    • true: ダーティデータは無視されます。

    • false (デフォルト): ダーティデータは無視されません。

    説明

    VVR 8.0.4 以降を使用する Apache Flink 用 Realtime Compute のみ、このパラメータをサポートしています。

    retryStrategy

    再試行ポリシー。

    Enum

    いいえ

    TIME

    有効な値:

    • TIME: retryTimeoutMs パラメータで指定されたタイムアウト期間が終了するまで、システムは継続的に再試行します。

    • COUNT: retryCount パラメータで指定された最大再試行回数に達するまで、システムは継続的に再試行します。

    retryCount

    最大再試行回数。

    Integer

    いいえ

    3

    retryStrategy パラメータを COUNT に設定した場合、このパラメータを指定できます。

    retryTimeoutMs

    再試行のタイムアウト期間。

    Integer

    いいえ

    180000

    retryStrategy パラメータを TIME に設定した場合、このパラメータを指定できます。 単位: ミリ秒。

    streamOriginColumnMapping

    元のカラム名と関連する実際のカラム名の間のマッピング。

    String

    いいえ

    デフォルト値なし

    元のカラム名と関連する実際のカラム名をコロン(:)で区切ります。 複数のマッピングをカンマ(,)で区切ります。 例: origin_col1:col1,origin_col2:col2

    outputSpecificRowType

    特定の行タイプを渡すかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • false: 特定の行タイプを渡しません。 すべてのデータは INSERT タイプです。

    • true: 特定の行タイプを渡します。 データは INSERT、DELETE、または UPDATE_AFTER タイプです。

  • 結果テーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    retryIntervalMs

    再試行間隔。

    Integer

    いいえ

    1000

    単位: ミリ秒。

    maxRetryTimes

    最大再試行回数。

    Integer

    いいえ

    10

    該当なし。

    valueColumns

    挿入するカラムの名前。

    String

    はい

    デフォルト値なし

    ID フィールドや NAME フィールドなど、複数のフィールドをカンマ(,)で区切ります。

    bufferSize

    結果テーブルにデータが書き込まれる前にバッファに保存できるデータレコードの最大数。

    Integer

    いいえ

    5000

    該当なし。

    batchWriteTimeoutMs

    書き込みタイムアウト期間。

    Integer

    いいえ

    5000

    単位: ミリ秒。 batchWriteTimeoutMs

    パラメータで指定された期間内にキャッシュされたデータレコード数が上限に達しない場合、キャッシュされたすべてのデータが結果テーブルに書き込まれます。

    batchSize

    一度に書き込むことができるデータレコードの数。

    Integer

    いいえ

    100

    最大値: 200。

    ignoreDelete

    削除操作を無視するかどうかを指定します。

    Boolean

    いいえ

    False

    該当なし。

    autoIncrementKey

    自動インクリメントプライマリキーカラムの名前。 結果テーブルに自動インクリメントプライマリキーカラムが含まれている場合、このパラメータを設定して自動インクリメントプライマリキーカラムの名前を指定できます。

    String

    いいえ

    デフォルト値なし

    結果テーブルに自動インクリメントプライマリキーカラムがない場合は、このパラメータを設定する必要はありません。

    説明

    VVR 8.0.4 以降を使用する Apache Flink 用 Realtime Compute のみ、このパラメータをサポートしています。

    overwriteMode

    データ上書きモード。

    Enum

    いいえ

    PUT

    有効な値:

    • PUT: データは PUT モードで Tablestore テーブルに書き込まれます。

    • UPDATE: データは UPDATE モードで Tablestore テーブルに書き込まれます。

    説明

    動的カラムモードでは、UPDATE モードのみがサポートされています。

    defaultTimestampInMillisecond

    Tablestore テーブルにデータを書き込むために使用されるデフォルトのタイムスタンプ。

    Long

    いいえ

    -1

    このパラメータを指定しない場合、現在のシステム時刻のタイムスタンプが使用されます。

    dynamicColumnSink

    動的カラムモードを有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    動的カラムモードは、テーブルでカラムが指定されておらず、デプロイ状況に基づいてカラムがテーブルに挿入されるシナリオに適しています。 最初のいくつかのカラムは、テーブル作成ステートメントでプライマリキーとして定義されます。 最後の 2 つのカラムの最初のカラムの値はカラム名として使用され、最後のカラムの値は前のカラムの値として使用され、最後の 2 つのカラムのデータ型は STRING である必要があります。

    説明

    動的カラムモードを有効にする場合、自動インクリメントプライマリキーカラムはサポートされておらず、overwriteMode パラメータを UPDATE に設定する必要があります。

    checkSinkTableMeta

    結果テーブルのメタデータを確認するかどうかを指定します。

    Boolean

    いいえ

    true

    このパラメータを true に設定すると、システムは Tablestore テーブルのプライマリキーカラムがテーブル作成ステートメントで指定されたプライマリキーと同じかどうかを確認します。

    enableRequestCompression

    データ書き込み中にデータ圧縮を有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    該当なし。

  • ディメンションテーブル専用のパラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    retryIntervalMs

    再試行間隔。

    Integer

    いいえ

    1000

    単位: ミリ秒。

    maxRetryTimes

    最大再試行回数。

    Integer

    いいえ

    10

    該当なし。

    cache

    キャッシュポリシー。

    String

    いいえ

    ALL

    有効な値:

    • None: データはキャッシュされません。

    • LRU: ディメンションテーブルの特定のデータのみがキャッシュされます。 システムがデータレコードを受信するたびに、システムはキャッシュを検索します。 システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。

      このキャッシュポリシーを使用する場合は、cacheSize パラメータと cacheTTLMs パラメータを設定する必要があります。

    • ALL (デフォルト): ディメンションテーブルのすべてのデータがキャッシュされます。 ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。 これにより、ディメンションテーブルの後続のすべてのクエリでキャッシュが検索されます。 キーが存在しない場合、システムはキャッシュ内でデータレコードを見つけられません。 システムは、キャッシュエントリが期限切れになった後、キャッシュ内のすべてのデータをリロードします。

      リモートテーブルのデータ量が少なく、多数の欠落キーが存在する場合は、このパラメータを ALL に設定することをお勧めします。 ソーステーブルとディメンションテーブルは、ON 句に基づいて関連付けることができません。 このキャッシュポリシーを使用する場合は、cacheTTLMs パラメータと cacheReloadTimeBlackList パラメータを設定する必要があります。

      説明

      cache パラメータを ALL に設定する場合は、システムがディメンションテーブルからデータを非同期にロードするため、テーブルを結合するためのノードのメモリを増やす必要があります。 増加したメモリサイズは、リモートテーブルの 2 倍です。

    cacheSize

    キャッシュできるデータレコードの最大数。

    Integer

    いいえ

    デフォルト値なし

    cache パラメータを LRU に設定した場合、このパラメータを指定できます。

    説明

    このパラメータの値は、キャッシュできるデータレコードの最大数です。

    cacheTTLMs

    キャッシュタイムアウト期間。

    Integer

    いいえ

    デフォルト値なし

    単位: ミリ秒。 cacheTTLMs パラメータの構成は、cache パラメータの値によって異なります。

    • cache パラメータを None に設定した場合、cacheTTLMs パラメータは空のままにすることができます。 これは、キャッシュエントリが期限切れにならないことを示します。

    • cache パラメータを LRU に設定した場合、cacheTTLMs パラメータはキャッシュのタイムアウト期間を指定します。 デフォルトでは、キャッシュエントリは期限切れになりません。

    • cache パラメータを ALL に設定した場合、cacheTTLMs パラメータはシステムがキャッシュを更新する間隔を指定します。 デフォルトでは、キャッシュはリロードされません。

    cacheEmpty

    空の結果をキャッシュするかどうかを指定します。

    Boolean

    いいえ

    デフォルト値なし

    • true: 空の結果はキャッシュされます。

    • false: 空の結果はキャッシュされません。

    cacheReloadTimeBlackList

    キャッシュが更新されない期間。 このパラメータは、cache パラメータが ALL に設定されている場合に有効になります。 このパラメータに指定した期間中は、キャッシュは更新されません。 このパラメータは、ダブル 11 などの大規模なオンラインプロモーションイベントに適しています。

    String

    いいえ

    デフォルト値なし

    次の例は、値の形式を示しています。2017-10-24 14:00 -> 2017-10-24 15:002017-11-10 23:30 -> 2017-11-11 08:00。 次のルールに基づいて区切り文字を使用します。

    • 複数の期間をカンマ(,)で区切ります。

    • 各期間の開始時刻と終了時刻を、ハイフン(-)と閉じ山かっこ(>)の組み合わせである矢印(->)で区切ります。

    async

    非同期モードでデータ同期を有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    • true: 非同期モードでのデータ同期が有効になります。 デフォルトでは、非同期モードでデータを同期する場合、データはソートされません。

    • false (デフォルト): 非同期モードでのデータ同期は無効になります。

データ型マッピング

  • ソーステーブル

    Tablestore のフィールドのデータ型

    Apache Flink 用 Realtime Compute のフィールドのデータ型

    INTEGER

    BIGINT

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    DOUBLE

    DOUBLE

    BINARY

    BINARY

  • 結果テーブル

    VARBINARYVARCHARSMALLINTINTEGERBIGINTDOUBLE

    Apache Flink 用 Realtime Compute のフィールドのデータ型

    Tablestore のフィールドのデータ型

    BINARY

    BINARY

    BINARY

    CHAR

    STRING

    STRING

    TINYINT

    INTEGER

    INTEGER

    INTEGER

    INTEGER

    FLOAT

    DOUBLE

    DOUBLE

    BOOLEAN

    BOOLEAN

サンプルコード

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>', // エンドポイント
  'instanceName' = 'flink-source', // Tablestore インスタンス名
  'tableName' ='flink_source_table', // Tablestore テーブル名
  'tunnelName' = 'flinksourcestream', // Tunnel 名
  'accessId' ='${ak_id}', // AccessKey ID
  'accessKey' ='${ak_secret}', // AccessKey Secret
  'ignoreDelete' = 'false', // 削除操作を無視するかどうか
  'skipInvalidData' ='false' // 無効なデータをスキップするかどうか
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>', // エンドポイント
  'instanceName'='flink-sink', // Tablestore インスタンス名
  'tableName'='flink_sink_table', // Tablestore テーブル名
  'accessId'='${ak_id}', // AccessKey ID
  'accessKey'='${ak_secret}', // AccessKey Secret
  'valueColumns'='customerid,customername', // 挿入するカラム名
  'autoIncrementKey'='${auto_increment_primary_key_name}' // 自動インクリメントプライマリキーの名前
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;