DataWorks は、Tablestore ストリームデータソースから増分データを読み取るための Tablestore Stream Reader を提供しています。このトピックでは、Tablestore ストリームデータソースからデータを同期する機能について説明します。
データ同期前に Tablestore ストリーム環境を準備する
Tablestore Stream Reader を使用する前に、ソーステーブルで Stream 機能が有効になっていることを確認してください。デフォルトでは、時系列テーブルでは Stream 機能が有効になっています。テーブルを作成するときに、Stream 機能を有効にすることができます。また、テーブルの作成後に、Tablestore SDK の UpdateTable オペレーションを呼び出して、この機能を有効にすることもできます。次のサンプルコードは、Stream 機能を有効にする方法の例を示しています。
SyncClient client = new SyncClient("", "", "", "");
// 方法 1: テーブルを作成するときに Stream 機能を有効にします。
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 値 24 は、Tablestore が増分データを 24 時間保持することを示します。
client.createTable(createTableRequest);
// 方法 2: テーブルを作成するときに Stream 機能を有効にしない場合は、テーブルの作成後に UpdateTable オペレーションを呼び出して、この機能を有効にすることができます。
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);Tablestore SDK の UpdateTable オペレーションを呼び出して Stream 機能を有効にする場合は、次の点に注意してください。
増分データの有効期限を指定できます。このようにして、Tablestore Stream Reader を使用して Tablestore から増分データを読み取ることができます。Stream 機能を有効にすると、Tablestore サーバーは Tablestore テーブルの操作ログを保存します。各パーティションは操作ログを順番にキューイングします。各操作ログは、指定された有効期限が経過するとリサイクルされます。
Tablestore SDK は、操作ログの読み取りに使用されるいくつかの Stream 関連 API オペレーションを提供します。Tablestore Stream Reader はこれらの API オペレーションを呼び出して増分データを読み取ります。Tablestore Stream Reader を使用して列モードでデータを読み取る場合、Tablestore Stream Reader は増分データを複数の 6 タプルに変換します。各 6 タプルは、pk、colName、version、colValue、opType、および sequenceInfo で構成されます。Tablestore Stream Reader を使用して行モードでデータを読み取る場合、Tablestore Stream Reader は行ごとに増分データを読み取ります。
サポートされている読み取りモードとデータ型
Tablestore Stream Reader を使用すると、Tablestore から列モードまたは行モードで増分データを読み取ることができます。このセクションでは、読み取りモードのデータ読み取りプロセスとサポートされているデータ型について説明します。
列モードでデータを読み取る
Tablestore のマルチバージョンモードでは、テーブルデータは 3 レベル構造 で編成されます。行、列、バージョンです。1 つの行に複数の列を含めることができ、列名は固定されていません。各列に複数のバージョンを含めることができ、各バージョンには特定のタイムスタンプ(バージョン番号)があります。
Tablestore の API オペレーションを呼び出して、データの読み取りと書き込みを行うことができます。Tablestore は、テーブルデータに対して実行された最近の書き込みおよび変更操作を記録して、増分データを記録します。したがって、増分データは一連の操作レコードと見なすことができます。
Tablestore は、次の種類の操作をサポートしています。
PutRow: 行を書き込みます。行が存在する場合は上書きされます。
UpdateRow: 元の行の他のデータを変更せずに、行を更新します。UpdateRow を使用して、列値を追加したり、列のバージョンが存在する場合は列値を上書きしたり、列の特定のバージョンまたはすべてのバージョンを削除したりできます。
DeleteRow: 行を削除します。
Tablestore は、操作のタイプごとに増分データレコードを生成します。Tablestore Stream Reader はこれらのレコードを読み取り、Data Integration でサポートされている形式に変換します。
Tablestore は動的列とマルチバージョンモードをサポートしています。したがって、Tablestore Stream Reader によって生成される行は、Tablestore の行ではなく、列のバージョンです。Tablestore Stream Reader が Tablestore の行からデータを読み取った後、Tablestore Stream Reader はデータを複数の行に変換します。各行には、プライマリキー値、列名、列のバージョン(バージョン番号)のタイムスタンプ、バージョンの値、および操作タイプが含まれます。isExportSequenceInfo パラメーターが true に設定されている場合は、時系列情報も含まれます。
Data Integration でサポートされている形式に変換されたデータには、次の種類の操作が定義されています。
U (UPDATE): 列のバージョンを書き込みます。
DO (DELETE_ONE_VERSION): 列のバージョンを削除します。
DA (DELETE_ALL_VERSION): プライマリキー値と列名に基づいて、列のすべてのバージョンを削除します。
DR (DELETE_ROW): プライマリキー値に基づいて行を削除します。
次の表に、Tablestore Stream Reader によって変換された、2 つのプライマリキー列(pkName1 と pkName2)を持つテーブルのデータを示します。
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | - | DO |
pk1_V2 | pk2_V2 | col_b | - | - | DA |
pk1_V3 | pk2_V3 | - | - | - | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
前の例では、Tablestore テーブルの 3 つの行が読み取られ、7 つの行に変換されています。3 つの行のプライマリキーは、(pk1_V1, pk2_V1)、(pk1_V2, pk2_V2)、および (pk1_V3, pk2_V3) です。
プライマリキーが (pk1_V1, pk2_V1) の行では、col_a 列の 2 つのバージョンと col_b 列の 1 つのバージョンが書き込まれます。
プライマリキーが (pk1_V2, pk2_V2) の行では、col_a 列の 1 つのバージョンと col_b 列のすべてのバージョンが削除されます。
プライマリキーが (pk1_V3, pk2_V3) の行では、col_a 列の 1 つのバージョンが書き込まれ、行が削除されます。
行モードでデータを読み取る
ワイドテーブル
Tablestore Stream Reader を使用して、行モードでデータを読み取ることもできます。このモードでは、Tablestore Stream Reader は操作レコードを行として読み取ります。mode パラメーターを設定し、データを読み取る列を指定する必要があります。
"parameter": { // mode を single_version_and_update_only に設定し、isExportSequenceInfo を false に設定し、ビジネス要件に基づいて datasource や table などの他のパラメーターを設定します。 "mode": "single_version_and_update_only", // 読み取りモード。 "column":[ // Tablestore からデータを読み取る列。ビジネス要件に基づいて列を指定できます。 { "name": "uid" // 列の名前。プライマリキー列またはプロパティ列を指定できます。 }, { "name": "name" // 列の名前。プライマリキー列またはプロパティ列を指定できます。 }, ], "isExportSequenceInfo": false, // 時系列情報を読み取るかどかを指定します。mode パラメーターを single_version_and_update_only に設定した場合、このパラメーターは false にのみ設定できます。 }時系列テーブル
時系列テーブルを作成すると、Stream 機能は自動的に有効になります。
Tablestore Stream Reader を使用すると、時系列テーブルから増分データを読み取ることができます。テーブルが時系列テーブルの場合、次のパラメーターを設定する必要があります。
"parameter": { // ビジネス要件に基づいて、次のパラメーターと datasource や table などの他のパラメーターを設定します。 "mode": "single_version_and_update_only", // 読み取りモード。 "isTimeseriesTable":"true", // テーブルが時系列テーブルかどうかを指定します。 "column":[ // Tablestore からデータを読み取る列。ビジネス要件に基づいて列を指定できます。 { "name": "_m_name" // メトリック名列の名前。 }, { "name": "_data_source" // データソース列の名前。 }, { "name": "_tags" // タグ列の名前。タグは文字列型のデータに変換されます。 }, { "name": "tag1", // タグキーの名前。 "is_timeseries_tag":"true" // フィールドがタグの内部フィールドかどうかを指定します。 }, { "name": "time" // タイムスタンプ列の名前。 }, { "name": "name" // プロパティ列の名前。 }, ], "isExportSequenceInfo": false, // 時系列情報を読み取るかどかを指定します。mode パラメーターを single_version_and_update_only に設定した場合、このパラメーターは false にのみ設定できます。 }行ごとに読み取られるデータは、元の行のデータに近くなります。これにより、データのさらなる処理が容易になります。行ごとにデータを読み取る場合は、次の点に注意してください。
読み取られる行は、操作レコードから抽出されます。各行は、書き込みまたは更新操作に対応しています。行の一部の列のみを更新する場合、操作レコードには更新された列のみが含まれます。
各列のバージョン番号(各列のタイムスタンプ)は、読み取ったり削除したりすることはできません。
データ型のマッピング
Tablestore Stream Reader はすべての Tablestore データ型をサポートしています。次の表に、Tablestore Stream Reader がデータ型を変換する際に使用するデータ型のマッピングを示します。
カテゴリ | Tablestore データ型 |
整数 | INTEGER |
浮動小数点 | DOUBLE |
文字列 | STRING |
ブール値 | BOOLEAN |
バイナリ | BINARY |
データ同期タスクの開発: Tablestore からの増分同期のプロシージャガイダンス
設定手順については、「コードレス UI を使用してバッチ同期タスクを設定する」および「コードエディターを使用してバッチ同期タスクを設定する」をご参照ください。
コードエディターを使用してバッチ同期タスクを設定する場合に設定されるすべてのパラメーターと実行されるコードについては、「付録: コードとパラメーター」をご参照ください。
付録: コードとパラメーター
付録: コードエディターを使用してバッチ同期タスクを設定する
コードエディターを使用してバッチ同期タスクを設定する場合は、コードエディターの形式要件に基づいて、関連データソースのリーダーとライターのパラメーターを設定する必要があります。形式要件の詳細については、「コードエディターを使用してバッチ同期タスクを設定する」をご参照ください。次の情報では、コードエディターのリーダーとライターのパラメーターの設定の詳細について説明します。
Tablestore Stream Reader のコード
列モードでデータを読み取る
{ "type":"job", "version":"2.0",// バージョン番号。 "steps":[ { "stepType":"otsstream",// プラグイン名。 "parameter":{ "datasource":"$srcDatasource",// データソースの名前。 "dataTable":"",// テーブルの名前。 "statusTable":"TableStoreStreamReaderStatusTable",// Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。 "maxRetries":30,// Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。 "isExportSequenceInfo":false,// 時系列情報を読み取るかどかを指定します。 "startTimeString":"${startTime}${hh}",// 増分データの開始時刻。同期タスクの実行開始時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。 "endTimeString":"${endTime}${hh}"// 増分データの終了時刻。同期タスクの実行完了時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。 }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// 許容されるダーティデータレコードの最大数。 }, "speed":{ "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。 "concurrent":1,// 並列スレッドの最大数。 "mbps":"12"// 最大伝送速度。単位: MB/秒。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }ワイドテーブルから行モードでデータを読み取る
{ "type":"job", "version":"2.0",// バージョン番号。 "steps":[ { "stepType":"otsstream",// プラグイン名。 "parameter":{ "datasource":"$srcDatasource",// データソースの名前。 "dataTable":"",// テーブルの名前。 "statusTable":"TableStoreStreamReaderStatusTable",// Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。 "maxRetries":30,// Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。 "isExportSequenceInfo":false,// 時系列情報を読み取るかどかを指定します。 "startTimeString":"${startTime}${hh}",// 増分データの開始時刻。同期タスクの実行開始時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。 "endTimeString":"${endTime}${hh}"// 増分データの終了時刻。同期タスクの実行完了時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。 "mode": "single_version_and_update_only", "column":[ { "name":"pId" }, { "name": "uId" }, { "name":"col0" }, { "name": "col1" } ], }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// 許容されるダーティデータレコードの最大数。 }, "speed":{ "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。 "concurrent":1,// 並列スレッドの最大数。 "mbps":"12" // 最大伝送速度。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }時系列テーブルから行モードでデータを読み取る
{ "type":"job", "version":"2.0",// バージョン番号。 "steps":[ { "stepType":"otsstream",// プラグイン名。 "parameter":{ "datasource":"$srcDatasource",// データソースの名前。 "dataTable":"",// テーブルの名前。 "statusTable":"TableStoreStreamReaderStatusTable",// Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。 "maxRetries":30,// Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。 "isExportSequenceInfo":false,// 時系列情報を読み取るかどかを指定します。 "startTimeString":"${startTime}${hh}",// 増分データの開始時刻。同期タスクの実行開始時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。 "endTimeString":"${endTime}${hh}"// 増分データの終了時刻。同期タスクの実行完了時刻と同じです。このパラメーターは yyyymmddhh24miss 形式で設定します。 "mode": "single_version_and_update_only", "isTimeseriesTable":"true", "column": [ { "name": "_m_name" }, { "name": "_data_source", }, { "name": "_tags", }, { "name": "string_column", } ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// 許容されるダーティデータレコードの最大数。 }, "speed":{ "throttle":true,// スロットリングを有効にするかどうかを指定します。値 false はスロットリングが無効になっていることを示し、値 true はスロットリングが有効になっていることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。 "concurrent":1,// 並列スレッドの最大数。 "mbps":"12"// 最大伝送速度。単位: MB/秒。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Tablestore Stream Reader のコードのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加されたデータソースの名前と同じである必要があります。コードエディターを使用してデータソースを追加できます。 | はい | デフォルト値なし |
dataTable | 増分データを読み取るテーブルの名前。テーブルでは Stream 機能が有効になっている必要があります。テーブルを作成するときに、Stream 機能を有効にすることができます。また、テーブルの作成後に UpdateTable オペレーションを呼び出して、この機能を有効にすることもできます。 | はい | デフォルト値なし |
statusTable | Tablestore Stream Reader がステータスレコードの保存に使用するテーブルの名前。これらのレコードは、不要なデータを見つけて読み取り効率を向上させるのに役立ちます。指定されたテーブルが存在しない場合、Tablestore Stream Reader は自動的にそのようなテーブルを作成します。オフライン読み取りタスクが完了したら、テーブルを削除する必要はありません。テーブル内のステータスレコードは、次の読み取りタスクに使用できます。
TableStoreStreamReaderStatusTable に似た名前を設定できます。名前がビジネス関連のテーブルの名前と異なることを確認してください。 | はい | デフォルト値なし |
startTimestampMillis | 増分データの開始時刻(ミリ秒単位)。開始時刻は、増分データの左閉右開の時間範囲の左境界です。
| いいえ | デフォルト値なし |
endTimestampMillis | 増分データの終了時刻(ミリ秒単位)。終了時刻は、増分データの左閉右開の時間範囲の右境界です。
| いいえ | デフォルト値なし |
date | 読み取るデータが生成された日付。yyyyMMdd 形式(例: 20151111)でこのパラメーターを設定します。date パラメーター、startTimestampMillis パラメーターと endTimestampMillis パラメーター、または startTimeString パラメーターと endTimeString パラメーターを設定する必要があります。たとえば、Alibaba Cloud Data Process Center はタスクを日単位でスケジュールします。したがって、date パラメーターを設定する必要があり、startTimestampMillis パラメーターと endTimestampMillis パラメーター、または startTimeString パラメーターと endTimeString パラメーターを設定する必要はありません。 | いいえ | デフォルト値なし |
isExportSequenceInfo | 時系列情報を読み取るかどかを指定します。時系列情報には、データが書き込まれた時刻が含まれます。デフォルト値は false です。これは、時系列情報が読み取られないことを示します。 | いいえ | false |
maxRetries | Tablestore から増分データを読み取る各リクエストの最大再試行回数。デフォルト値: 30。再試行は特定の間隔で実行されます。30 回の再試行の合計継続時間は約 5 分です。デフォルト設定を維持できます。 | いいえ | 30 |
startTimeString | 増分データの開始時刻(秒単位)。開始時刻は、増分データの左閉右開の時間範囲の左境界です。このパラメーターは | いいえ | デフォルト値なし |
endTimeString | 増分データの終了時刻(秒単位)。終了時刻は、増分データの左閉右開の時間範囲の右境界です。このパラメーターは | いいえ | デフォルト値なし |
enableSeekIterator | Tablestore Stream Reader が増分データの読み取りを開始するオフセットを決定するかどうかを指定します。増分データが頻繁に読み取られる場合、Tablestore Stream Reader は前回データが読み取られたオフセットに基づいてオフセットを自動的に決定します。Tablestore Stream Reader が以前に実行されていない場合は、増分データの開始時刻からデータが読み取られます。デフォルトでは、増分データは 7 日間保存されます。開始時刻に達するまで、データはエクスポートされません。Tablestore Stream Reader の設定に | いいえ | false |
mode | 読み取りモード。このパラメーターを single_version_and_update_only に設定すると、データは行モードで読み取られます。 | いいえ | デフォルト値なし |
isTimeseriesTable | テーブルが時系列テーブルかどうかを指定します。このパラメーターは、mode パラメーターが single_version_and_update_only に設定されている場合にのみ有効になります。 | いいえ | false |
column | mode パラメーターを
説明 行モードでデータを読み取る場合は、このパラメーターを設定する必要があります。設定しない場合、データを読み取ることができません。 |
| デフォルト値なし |