すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Tablestore ストリームデータソース

最終更新日:Jan 11, 2025

DataWorks は、Tablestore ストリームデータソースから増分データを読み取るための Tablestore Stream Reader を提供しています。このトピックでは、Tablestore ストリームデータソースからデータを同期する機能について説明します。

データ同期前に Tablestore ストリーム環境を準備する

Tablestore Stream Reader を使用する前に、ソーステーブルで Stream 機能が有効になっていることを確認してください。デフォルトでは、時系列テーブルでは Stream 機能が有効になっています。テーブルを作成するときに、Stream 機能を有効にすることができます。また、テーブルの作成後に、Tablestore SDK の UpdateTable オペレーションを呼び出して、この機能を有効にすることもできます。次のサンプルコードは、Stream 機能を有効にする方法の例を示しています。

SyncClient client = new SyncClient("", "", "", "");
// 方法 1: テーブルを作成するときに Stream 機能を有効にします。
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 値 24 は、Tablestore が増分データを 24 時間保持することを示します。
client.createTable(createTableRequest);
// 方法 2: テーブルを作成するときに Stream 機能を有効にしない場合は、テーブルの作成後に UpdateTable オペレーションを呼び出して、この機能を有効にすることができます。
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);

Tablestore SDK の UpdateTable オペレーションを呼び出して Stream 機能を有効にする場合は、次の点に注意してください。

  • 増分データの有効期限を指定できます。このようにして、Tablestore Stream Reader を使用して Tablestore から増分データを読み取ることができます。Stream 機能を有効にすると、Tablestore サーバーは Tablestore テーブルの操作ログを保存します。各パーティションは操作ログを順番にキューイングします。各操作ログは、指定された有効期限が経過するとリサイクルされます。

  • Tablestore SDK は、操作ログの読み取りに使用されるいくつかの Stream 関連 API オペレーションを提供します。Tablestore Stream Reader はこれらの API オペレーションを呼び出して増分データを読み取ります。Tablestore Stream Reader を使用して列モードでデータを読み取る場合、Tablestore Stream Reader は増分データを複数の 6 タプルに変換します。各 6 タプルは、pk、colName、version、colValue、opType、および sequenceInfo で構成されます。Tablestore Stream Reader を使用して行モードでデータを読み取る場合、Tablestore Stream Reader は行ごとに増分データを読み取ります。

サポートされている読み取りモードとデータ型

Tablestore Stream Reader を使用すると、Tablestore から列モードまたは行モードで増分データを読み取ることができます。このセクションでは、読み取りモードのデータ読み取りプロセスとサポートされているデータ型について説明します。

列モードでデータを読み取る

Tablestore のマルチバージョンモードでは、テーブルデータは 3 レベル構造 で編成されます。行、列、バージョンです。1 つの行に複数の列を含めることができ、列名は固定されていません。各列に複数のバージョンを含めることができ、各バージョンには特定のタイムスタンプ(バージョン番号)があります。

Tablestore の API オペレーションを呼び出して、データの読み取りと書き込みを行うことができます。Tablestore は、テーブルデータに対して実行された最近の書き込みおよび変更操作を記録して、増分データを記録します。したがって、増分データは一連の操作レコードと見なすことができます。

Tablestore は、次の種類の操作をサポートしています。

  • PutRow: 行を書き込みます。行が存在する場合は上書きされます。

  • UpdateRow: 元の行の他のデータを変更せずに、行を更新します。UpdateRow を使用して、列値を追加したり、列のバージョンが存在する場合は列値を上書きしたり、列の特定のバージョンまたはすべてのバージョンを削除したりできます。

  • DeleteRow: 行を削除します。

Tablestore は、操作のタイプごとに増分データレコードを生成します。Tablestore Stream Reader はこれらのレコードを読み取り、Data Integration でサポートされている形式に変換します。

Tablestore は動的列とマルチバージョンモードをサポートしています。したがって、Tablestore Stream Reader によって生成される行は、Tablestore の行ではなく、列のバージョンです。Tablestore Stream Reader が Tablestore の行からデータを読み取った後、Tablestore Stream Reader はデータを複数の行に変換します。各行には、プライマリキー値、列名、列のバージョン(バージョン番号)のタイムスタンプ、バージョンの値、および操作タイプが含まれます。isExportSequenceInfo パラメーターが true に設定されている場合は、時系列情報も含まれます。

Data Integration でサポートされている形式に変換されたデータには、次の種類の操作が定義されています。

  • U (UPDATE): 列のバージョンを書き込みます。

  • DO (DELETE_ONE_VERSION): 列のバージョンを削除します。

  • DA (DELETE_ALL_VERSION): プライマリキー値と列名に基づいて、列のすべてのバージョンを削除します。

  • DR (DELETE_ROW): プライマリキー値に基づいて行を削除します。

次の表に、Tablestore Stream Reader によって変換された、2 つのプライマリキー列(pkName1 と pkName2)を持つテーブルのデータを示します。

pkName1

pkName2

columnName

timestamp

columnValue

opType

pk1_V1

pk2_V1

col_a

1441803688001

col_val1

U

pk1_V1

pk2_V1

col_a

1441803688002

col_val2

U

pk1_V1

pk2_V1

col_b

1441803688003

col_val3

U

pk1_V2

pk2_V2

col_a

1441803688000

-

DO

pk1_V2

pk2_V2

col_b

-

-

DA

pk1_V3

pk2_V3

-

-

-

DR

pk1_V3

pk2_V3

col_a

1441803688005

col_val1

U

前の例では、Tablestore テーブルの 3 つの行が読み取られ、7 つの行に変換されています。3 つの行のプライマリキーは、(pk1_V1, pk2_V1)、(pk1_V2, pk2_V2)、および (pk1_V3, pk2_V3) です。

  • プライマリキーが (pk1_V1, pk2_V1) の行では、col_a 列の 2 つのバージョンと col_b 列の 1 つのバージョンが書き込まれます。

  • プライマリキーが (pk1_V2, pk2_V2) の行では、col_a 列の 1 つのバージョンと col_b 列のすべてのバージョンが削除されます。

  • プライマリキーが (pk1_V3, pk2_V3) の行では、col_a 列の 1 つのバージョンが書き込まれ、行が削除されます。

行モードでデータを読み取る

  • ワイドテーブル

    Tablestore Stream Reader を使用して、行モードでデータを読み取ることもできます。このモードでは、Tablestore Stream Reader は操作レコードを行として読み取ります。mode パラメーターを設定し、データを読み取る列を指定する必要があります。

    "parameter": {
      // mode を single_version_and_update_only に設定し、isExportSequenceInfo を false に設定し、ビジネス要件に基づいて datasource や table などの他のパラメーターを設定します。
      "mode": "single_version_and_update_only", // 読み取りモード。
      "column":[  // Tablestore からデータを読み取る列。ビジネス要件に基づいて列を指定できます。
              {
                 "name": "uid"  // 列の名前。プライマリキー列またはプロパティ列を指定できます。
              },
              {
                 "name": "name"  // 列の名前。プライマリキー列またはプロパティ列を指定できます。
              },
      ],
      "isExportSequenceInfo": false, // 時系列情報を読み取るかどかを指定します。mode パラメーターを single_version_and_update_only に設定した場合、このパラメーターは false にのみ設定できます。
    }
  • 時系列テーブル

    時系列テーブルを作成すると、Stream 機能は自動的に有効になります。

    Tablestore Stream Reader を使用すると、時系列テーブルから増分データを読み取ることができます。テーブルが時系列テーブルの場合、次のパラメーターを設定する必要があります。

    "parameter": {
      // ビジネス要件に基づいて、次のパラメーターと datasource や table などの他のパラメーターを設定します。
      "mode": "single_version_and_update_only", // 読み取りモード。
      "isTimeseriesTable":"true", // テーブルが時系列テーブルかどうかを指定します。
      "column":[  // Tablestore からデータを読み取る列。ビジネス要件に基づいて列を指定できます。
              {
                "name": "_m_name" // メトリック名列の名前。
              },
              {
                "name": "_data_source" // データソース列の名前。
              },
              {
                "name": "_tags" // タグ列の名前。タグは文字列型のデータに変換されます。
              },
              {
                "name": "tag1", // タグキーの名前。
                "is_timeseries_tag":"true"  // フィールドがタグの内部フィールドかどうかを指定します。
              },
              {
                "name": "time" // タイムスタンプ列の名前。
              },
              {
                 "name": "name" // プロパティ列の名前。
              },
      ],
      "isExportSequenceInfo": false, // 時系列情報を読み取るかどかを指定します。mode パラメーターを single_version_and_update_only に設定した場合、このパラメーターは false にのみ設定できます。
    }

    行ごとに読み取られるデータは、元の行のデータに近くなります。これにより、データのさらなる処理が容易になります。行ごとにデータを読み取る場合は、次の点に注意してください。

    • 読み取られる行は、操作レコードから抽出されます。各行は、書き込みまたは更新操作に対応しています。行の一部の列のみを更新する場合、操作レコードには更新された列のみが含まれます。

    • 各列のバージョン番号(各列のタイムスタンプ)は、読み取ったり削除したりすることはできません。

データ型のマッピング

Tablestore Stream Reader はすべての Tablestore データ型をサポートしています。次の表に、Tablestore Stream Reader がデータ型を変換する際に使用するデータ型のマッピングを示します。

カテゴリ

Tablestore データ型

整数

INTEGER

浮動小数点

DOUBLE

文字列

STRING

ブール値

BOOLEAN

バイナリ

BINARY

データ同期タスクの開発: Tablestore からの増分同期のプロシージャガイダンス

付録: コードとパラメーター

付録: コードエディターを使用してバッチ同期タスクを設定する

コードエディターを使用してバッチ同期タスクを設定する場合は、コードエディターの形式要件に基づいて、関連データソースのリーダーとライターのパラメーターを設定する必要があります。形式要件の詳細については、「コードエディターを使用してバッチ同期タスクを設定する」をご参照ください。次の情報では、コードエディターのリーダーとライターのパラメーターの設定の詳細について説明します。

Tablestore Stream Reader のコード

  • 列モードでデータを読み取る

    {
        "type":"job",
        "version":"2.0",// バージョン番号。
        "steps":[
            {
                "stepType":"otsstream",// プラグイン名。
                "parameter":{
                    "datasource":"$srcDatasource",// データソースの名前。
                    "dataTable":"",// テーブルの名前。
                    "statusTable":"TableStoreStreamReaderStatusTable",// Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。
                    "maxRetries":30,// Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。
                    "isExportSequenceInfo":false,// 時系列情報を読み取るかどかを指定します。
                    "startTimeString":"${startTime}${hh}",// 増分データの開始時刻。同期タスクの実行開始時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。
                    "endTimeString":"${endTime}${hh}"// 増分データの終了時刻。同期タスクの実行完了時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// 許容されるダーティデータレコードの最大数。
            },
            "speed":{
                "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
                "concurrent":1,// 並列スレッドの最大数。
                "mbps":"12"// 最大伝送速度。単位: MB/秒。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • ワイドテーブルから行モードでデータを読み取る

    {
        "type":"job",
        "version":"2.0",// バージョン番号。
        "steps":[
            {
                "stepType":"otsstream",// プラグイン名。
                "parameter":{
                    "datasource":"$srcDatasource",// データソースの名前。
                    "dataTable":"",// テーブルの名前。
                    "statusTable":"TableStoreStreamReaderStatusTable",// Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。
                    "maxRetries":30,// Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。
                    "isExportSequenceInfo":false,// 時系列情報を読み取るかどかを指定します。
                    "startTimeString":"${startTime}${hh}",// 増分データの開始時刻。同期タスクの実行開始時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。
                    "endTimeString":"${endTime}${hh}"// 増分データの終了時刻。同期タスクの実行完了時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。
                    "mode": "single_version_and_update_only",
                    "column":[
                            {
                                "name":"pId"
                            },
                            {
                                "name": "uId"
                            },
                            {
                                "name":"col0"
                            },
                            {
                                "name": "col1"
                            }
                        ],
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// 許容されるダーティデータレコードの最大数。
            },
            "speed":{
                "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
                "concurrent":1,// 並列スレッドの最大数。
                "mbps":"12" // 最大伝送速度。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 時系列テーブルから行モードでデータを読み取る

    {
        "type":"job",
        "version":"2.0",// バージョン番号。
        "steps":[
            {
                "stepType":"otsstream",// プラグイン名。
                "parameter":{
                    "datasource":"$srcDatasource",// データソースの名前。
                    "dataTable":"",// テーブルの名前。
                    "statusTable":"TableStoreStreamReaderStatusTable",// Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。
                    "maxRetries":30,// Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。
                    "isExportSequenceInfo":false,// 時系列情報を読み取るかどかを指定します。
                    "startTimeString":"${startTime}${hh}",// 増分データの開始時刻。同期タスクの実行開始時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。
                    "endTimeString":"${endTime}${hh}"// 増分データの終了時刻。同期タスクの実行完了時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。
                    "mode": "single_version_and_update_only",
                    "isTimeseriesTable":"true",
                    "column": [
                              {
                                "name": "_m_name"
                              },
                              {
                                "name": "_data_source",
                              },
                              {
                                "name": "_tags",
                              },
                              {
                                "name": "string_column",
                              }
                        ]
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// 許容されるダーティデータレコードの最大数。
            },
            "speed":{
                "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
                "concurrent":1,// 並列スレッドの最大数。
                "mbps":"12"// 最大伝送速度。単位: MB/秒。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }

Tablestore Stream Reader のコードのパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。

はい

デフォルト値なし

dataTable

増分データを読み取るテーブルの名前。テーブルでは Stream 機能が有効になっている必要があります。テーブルを作成するときに、Stream 機能を有効にすることができます。また、テーブルの作成後に UpdateTable オペレーションを呼び出して、この機能を有効にすることもできます。

はい

デフォルト値なし

statusTable

Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。これらのレコードは、不要なデータを見つけて読み取り効率を向上させるのに役立ちます。指定されたテーブルが存在しない場合、Tablestore Stream Reader は自動的にそのようなテーブルを作成します。オフライン読み取りタスクが完了したら、テーブルを削除する必要はありません。テーブル内のステータスレコードは、次の読み取りタスクに使用できます。

  • ステータステーブルを作成する必要はありません。テーブル名を提供するだけで済みます。Tablestore Stream Reader は、インスタンス上にステータステーブルを作成しようとします。そのようなテーブルが存在しない場合、Tablestore Stream Reader は自動的にテーブルを作成します。テーブルが既に存在する場合、Tablestore Stream Reader はテーブルのメタデータが期待どおりかどうかを確認します。メタデータが期待どおりでない場合、Tablestore Stream Reader はエラーを報告します。

  • 読み取りタスクが完了したら、テーブルを削除する必要はありません。テーブル内のステータスレコードは、次の読み取りタスクに使用できます。

  • テーブルでは Time-To-Live (TTL) が有効になっており、TTL が経過するとデータは自動的に期限切れになります。このようにして、テーブルには少量のデータが保存されます。

  • 同じステータステーブルを使用して、dataTable パラメーターで指定され、同じインスタンスによって管理される複数のテーブルのステータスレコードを保存できます。ステータスレコードは互いに独立しています。

TableStoreStreamReaderStatusTable に似た名前を設定できます。名前がビジネス関連のテーブルの名前と異なることを確認してください。

はい

デフォルト値なし

startTimestampMillis

増分データの開始時刻(ミリ秒単位)。開始時刻は、増分データの左閉右開の時間範囲の左境界です。

  • Tablestore Stream Reader は、startTimestampMillis パラメーターで指定された時刻に基づいて、statusTable パラメーターで指定されたテーブル内のステータスレコードを検索し、その時刻からデータを読み取ります。

  • Tablestore Stream Reader がステータスレコードを見つけられない場合、Tablestore Stream Reader はシステムによって保持されている増分データを最初のエントリから読み取り、startTimestampMillis パラメーターで指定された時刻より後に書き込まれたデータをスキップします。

いいえ

デフォルト値なし

endTimestampMillis

増分データの終了時刻(ミリ秒単位)。終了時刻は、増分データの左閉右開の時間範囲の右境界です。

  • Tablestore Stream Reader は、startTimestampMillis パラメーターで指定された時刻からデータのエクスポートを開始し、データレコードのタイムスタンプが endTimestampMillis パラメーターで指定された時刻以上になるとデータのエクスポートを停止します。

  • すべての増分データが読み取られると、Tablestore Stream Reader は endTimestampMillis パラメーターで指定された時刻より前にデータの読み取りを停止します。

いいえ

デフォルト値なし

date

読み取るデータが生成された日付。yyyyMMdd 形式(例: 20151111)でこのパラメーターを設定します。date パラメーター、startTimestampMillis パラメーターと endTimestampMillis パラメーター、または startTimeString パラメーターと endTimeString パラメーターを設定する必要があります。たとえば、Alibaba Cloud Data Process Center はタスクを日単位でスケジュールします。したがって、date パラメーターを設定する必要があり、startTimestampMillis パラメーターと endTimestampMillis パラメーター、または startTimeString パラメーターと endTimeString パラメーターを設定する必要はありません。

いいえ

デフォルト値なし

isExportSequenceInfo

時系列情報を読み取るかどかを指定します。時系列情報には、データが書き込まれた時刻が含まれます。デフォルト値は false です。これは、時系列情報が読み取られないことを示します。

いいえ

false

maxRetries

Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。再試行は特定の間隔で実行されます。30 回の再試行の合計継続時間は約 5 分です。デフォルト設定を維持できます。

いいえ

30

startTimeString

増分データの開始時刻(秒単位)。開始時刻は、増分データの左閉右開の時間範囲の左境界です。このパラメーターは yyyymmddhh24miss 形式で設定します。

いいえ

デフォルト値なし

endTimeString

増分データの終了時刻(秒単位)。終了時刻は、増分データの左閉右開の時間範囲の右境界です。このパラメーターは yyyymmddhh24miss 形式で設定します。

いいえ

デフォルト値なし

enableSeekIterator

Tablestore Stream Reader が増分データの読み取りを開始するオフセットを決定するかどうかを指定します。増分データが頻繁に読み取られる場合、Tablestore Stream Reader は前回データが読み取られたオフセットに基づいてオフセットを自動的に決定します。Tablestore Stream Reader が以前に実行されていない場合は、増分データの開始時刻からデータが読み取られます。デフォルトでは、増分データは 7 日間保存されます。開始時刻に達するまで、データはエクスポートされません。Tablestore Stream Reader の設定に "enableSeekIterator": true を追加すると、Tablestore Stream Reader が増分データの読み取りを開始する開始時刻を見つけるのに役立ちます。

いいえ

false

mode

読み取りモード。このパラメーターを single_version_and_update_only に設定すると、データは行モードで読み取られます。

いいえ

デフォルト値なし

isTimeseriesTable

テーブルが時系列テーブルかどうかを指定します。このパラメーターは、mode パラメーターが single_version_and_update_only に設定されている場合にのみ有効になります。

いいえ

false

column

mode パラメーターを single_version_and_update_only に設定したときにデータを読み取る列の名前。サンプルコード:

"column":[
    {"name":"pk1"},
	{"name":"col1"},
	{"name":"col2","dataType":"new"},
	{"name":"col2","dataType":"old"},
	{"name":"col2","dataType":"latest"}
],
  • name フィールドは、データを読み取る列の名前を指定します。このフィールドは必須です。

  • dataType フィールドは、読み取るデータのタイプを指定します。デフォルトのタイプは new です。このフィールドはオプションです。dataType フィールドは、次の列挙型をサポートしています。

    • new: 更新後の現在の列のデータを示します。

    • old: 更新前の現在の列のデータを示します。

    • latest: 現在の列の最新のデータを示します。

説明

行モードでデータを読み取る場合は、このパラメーターを設定する必要があります。設定しない場合、データを読み取ることができません。

  • 行モードでは必須

  • 列モードでは任意

デフォルト値なし