変更追跡タスクと消費者グループを作成した後、コードを記述するか、Data Transmission Service (DTS) が提供するSDKデモを使用して、データを追跡して消費することができます。 このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。
このトピックでは、Java用のSDKデモを使用します。 PythonおよびGo向けのSDKデモの詳細については、GitHubの「aliyun-dts-subscribe-demo」をご参照ください。
手順
PolarDB-X 1.0インスタンスまたはデータ管理 (DMS) 論理データベースから追跡されたデータを使用する方法については、「SDKデモを使用してPolarDB-X 1.0インスタンスから追跡されたデータを使用する」をご参照ください。
Resource Access Management (RAM) ユーザーとしてデータを追跡して使用する場合、RAMユーザーにはAliyunDTSFullAccessアクセス許可と、ソースオブジェクトにアクセスするためのアクセス許可が必要です。 権限を付与する方法の詳細については、「システムポリシーを使用してRAMユーザーにDTSインスタンスを管理する権限を付与する」および「RAMユーザーに権限を付与する」をご参照ください。
消費者グループは互いに独立しています。
このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。 この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。
変更追跡タスクを作成します。 詳細については、「変更追跡シナリオの概要」の関連トピックをご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
ビジネス要件に基づいてSDKデモを使用します。
重要追跡データを使用するときは、DefaultUserRecordのcommitメソッドを呼び出してオフセットをコミットする必要があります。 それ以外の場合、データは繰り返し消費されます。
(推奨) パッケージ化された新しい変更追跡SDKの使用
IntelliJ IDEAを開き、[新しいプロジェクトの作成] をクリックします。
作成したプロジェクトで、pom.xmlファイルを見つけます。
次の依存関係をpom.xmlファイルに追加します。
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>
重要変更追跡SDKの最新バージョンは、dts-new-subscribe-sdkページで確認できます。
dts-new-subscribe-sdkパッケージは、次のネイティブ依存関係をカプセル化します。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
バージョン2.0.0のdts-new-subscribe-sdkパッケージは、バージョン2.7.0のkafka-clients依存関係をカプセル化しています。 以前のバージョンのdts-new-subscribe-sdkパッケージは、バージョン1.0.0のkafka-clients依存関係をカプセル化しています。
新しい変更追跡SDKを使用します。 デモの詳細については、GitHubの「aliyun-dts-subscribe-sdk-java」をご参照ください。
コードをカスタマイズして新しい変更追跡SDKを使用する
SDKデモパッケージをダウンロードし、パッケージを解凍します。
説明アイコンをクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。
パッケージが解凍されているディレクトリに移動します。 次に、テキストエディターを使用してpom.xmlファイルを開き、SDKのバージョンを最新に変更します。
重要変更追跡SDKの最新バージョンは、dts-new-subscribe-sdkページで確認できます。
dts-new-subscribe-sdkパッケージは、次のネイティブ依存関係をカプセル化します。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>{version}</version> </dependency>
バージョン2.0.0のdts-new-subscribe-sdkパッケージは、バージョン2.7.0のkafka-clients依存関係をカプセル化しています。 以前のバージョンのdts-new-subscribe-sdkパッケージは、バージョン1.0.0のkafka-clients依存関係をカプセル化しています。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。
表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動し、フォルダをクリックしてpom.xmlファイルを見つけます。 次に、[OK] をクリックします。
表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。
IntelliJ IDEAで、フォルダをクリックしてJavaファイルを見つけます。 次に、を使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。DTSSucumerAssignDemo. javaまたはDTSSucumerSubscribeDemo. javaファイルを選択できます。
説明DTSは、SDKクライアントを使用する次のモードをサポートしています。
ASSIGNモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 ASSIGNモードでSDKクライアントを使用する場合は、SDKクライアントを1つだけ起動することをお勧めします。
SUBSCRIBEモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 SUBSCRIBEモードでは、冗長性を高めるために、コンシューマーグループ内の複数のSDKクライアントを一度に起動できます。 コンシューマーグループ内のSDKクライアントが失敗した場合、他のSDKクライアントはランダムに自動的にパーティション0に割り当てられ、データ消費が再開されます。
Javaファイルのコードで必要なパラメーターを指定します。
表 1. 必須パラメーター
パラメーター
説明
パラメータ値を取得するメソッド
brokerUrl
変更追跡インスタンスのエンドポイントとポート番号。
説明内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。
パブリックエンドポイントを使用しないことを推奨します。
DTSコンソールの [変更トラッキング] ページで、管理する変更トラッキングインスタンスを見つけ、インスタンスIDをクリックします。 左側のウィンドウで、基本情報 タブをクリックして、ネットワーク セクションで変更追跡インスタンスのエンドポイントとポート番号を取得します。
トピック
変更追跡インスタンスの追跡対象トピックの名前。
DTSコンソールの [変更トラッキング] ページで、管理する変更トラッキングインスタンスを見つけ、インスタンスIDをクリックします。 左側のウィンドウで、基本情報 タブをクリックして、基本情報 セクションで追跡対象のトピックの名前を取得します。
sid
消費者グループID。
DTSコンソールの [変更トラッキング] ページで、管理する変更トラッキングインスタンスを見つけ、インスタンスIDをクリックします。 左側のウィンドウで、データ消費 タブをクリックして、コンシューマーグループ ID /名前と アカウント 情報を表示します。
説明消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。
userName
消費者グループアカウントのユーザー名。
警告このトピックで説明するSDKクライアントを使用していない場合は、このパラメーターを
<Username>-<Consumer group ID>
の形式で指定する必要があります。 例:dtstest-dtsae ****** bpv
そうしないと、接続は失敗します。パスワード
消費者グループアカウントのパスワード。
initCheckpoint
コンシューマオフセット。SDKクライアントが最初のデータレコードを消費したときに生成されるタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1620962769。
説明コンシューマオフセットは、次のシナリオで使用できます。
消費プロセスが中断された場合、消費者オフセットを指定してデータ消費を再開できます。 これにより、データの損失を防ぎます。
変更追跡クライアントを起動するときに、ビジネス要件に基づいてデータを消費するための消費者オフセットを指定できます。
消費されたデータの消費者オフセットは、変更追跡インスタンスのデータ範囲内でなければなりません。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。
説明変更トラッキングページの データ範囲 列に変更トラッキングインスタンスのデータ範囲を表示できます。
検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。
ConsumerContext.ConsumerSubscribeMode
SDKクライアントを使用するモード。 有効な値:
ConsumerContext.ConsumerSubscribeMode.ASSIGN
: ASSIGNモードでは、コンシューマグループ内で1つのSDKクライアントのみを起動して、追跡データを消費できます。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE
: SUBSCRIBEモードでは、冗長性を高めるために、1つのコンシューマーグループ内の複数のSDKクライアントを一度に起動できます。
非該当
IntelliJ IDEAの上部メニューバーで、 SDKクライアントを実行します。
説明IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするには一定の時間がかかります。
次の図は、SDKクライアントがソースデータベースからのデータ変更を追跡できることを示す結果を示しています。
SDKクライアントは、消費されたデータに関する情報を定期的に計算して表示します。 この情報には、送受信されるデータレコードの総数、データの総量、および1秒あたりの要求数 (RPS) が含まれます。
表 2. 消費されたデータに関する情報のパラメーター
パラメーター
説明
outCounts
SDKクライアントによって消費されたデータレコードの総数。
outBytes
SDKクライアントによって消費されたデータの合計量。 単位はバイトです。
outRps
SDKクライアントがデータを消費するときのRPS。
outBps
SDKクライアントがデータを消費するときに1秒あたりに送信されるビット数。
inBytes
DTSサーバーによって送信されたデータの合計量。 単位はバイトです。
DStoreRecordQueue
DTSサーバーがデータを送信するときのデータキャッシュキューのサイズ。
カウント
DTSサーバーによって送信されたデータレコードの総数。
inRps
DTSサーバーがデータを送信するときのRPS。
__dt
SDKクライアントがデータを受信したときに生成されるタイムスタンプ。 単位:ミリ秒。
DefaultUserRecordQueue
シリアル化後のデータキャッシュキューのサイズ。
コンシューマーオフセットの管理
SDKクライアントが初めて起動または再起動された場合、または内部再試行が発生した場合は、 消費者オフセット データ消費を開始または再開します。 次の表に、さまざまなシナリオでコンシューマオフセットを管理およびクエリする方法を示します。 これにより、データの損失やデータの重複を防ぎ、ビジネス要件に基づいてデータを消費できます。
SDKクライアントのコンシューマーオフセットをリセットする必要がある場合は、SDKクライアントを使用するモードに基づいて、次の表を参照してコンシューマーオフセットを照会およびリセットできます。
シナリオ | SDKクライアントを使用するモード | 実装 |
コンシューマオフセットをクエリする必要があります。 | 割り当ておよびSUBSCRIBE |
|
SDKクライアントを初めて起動するときは、データを消費するためのコンシューマーオフセットを指定する必要があります。 | 割り当ておよびSUBSCRIBE | SDKクライアントを使用するモードに基づいて、DTConsumerAssignDemo. javaまたはDTConsumerSubscribeDemo. javaファイルを選択します。 次に、 |
内部再試行が発生した場合、データ消費を再開するには、前のデータレコードのコンシューマオフセットを指定する必要があります。 | 割り当て | 次の手順を実行して、前のデータレコードのコンシューマオフセットを照会します。
|
サブスクリプション | 次の手順を実行して、前のデータレコードのコンシューマオフセットを照会します。
| |
SDKクライアントを再起動した後、データ消費を再開するには、前のデータレコードのコンシューマーオフセットを指定する必要があります。 | 割り当て | consumerContext.javaファイルの
|
サブスクリプション | このモードでは、consumerContext.javaファイルの
|
コンシューマオフセットを保存するための永続的なストレージメディアの指定
DStoreモジュールの切り替えがディザスタリカバリのためにトリガーされた場合、特にSUBSCRIBEモードでは、新しいDStoreモジュールはSDKクライアントの最後のコンシューマーオフセットを保存できません。 SDKクライアントは、以前の消費者オフセットからの追跡データの消費を開始することができる。 その結果、履歴データは繰り返し消費される。 たとえば、元のDStoreモジュールのコンシューマーオフセット範囲は、2023年11月11日の08:00:00から年11月12日の08:00:00までで、SDKクライアントのコンシューマーオフセットは2023年11月12日の08:00:00です。 切り替え後、新しいDStoreモジュールのコンシューマーオフセット範囲は、2023年11月8日の10:00:00から2023年11月12日の08:01:00までです。 この場合、SDKクライアントは、新しいDStoreモジュールの開始コンシューマーオフセットである2023年11月8日の10:00:00からデータの消費を開始します。 その結果、履歴データは繰り返し消費される。
切り替えシナリオで履歴データが繰り返し使用されないようにするには、永続的なストレージメディアを指定して、SDKクライアントのコンシューマーオフセットを保存することを推奨します。 次のサンプルコードは参考用です。 ビジネス要件に基づいてコードを変更できます。
AbstractUserMetaStore()
メソッドを継承して実装するUserMetaStore()
メソッドを作成します。たとえば、UserMetaStore() メソッドを呼び出して、コンシューマーオフセットを保存するMySQLデータベースを指定できます。 Javaのサンプルコード:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
consumerContext.javaファイルの
setUserRegisteredStore(new UserMetaStore())
メソッドを呼び出して外部記憶媒体を指定します。
よくある質問
変更追跡インスタンスに接続できない場合はどうすればよいですか?
報告されたエラーメッセージに基づいて問題をトラブルシューティングします。 詳細については、このトピックの「トラブルシューティング」セクションをご参照ください。
永続的に保存されるコンシューマーオフセットから返されるデータの形式は何ですか?
コンシューマオフセットが永続的に格納されると、JSON形式のデータがコンシューマオフセットから返されます。 永続的に格納される消費者オフセットは、UNIXタイムスタンプである。 コンシューマオフセットをSDKに直接インポートして使用できます。 次の応答では、パラメータ
「timestamp」
の値1700709977
は、永続的に記憶される消費者オフセットを示す。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
複数のSDKクライアントを使用して、変更追跡タスクで追跡されたデータを同時に使用できますか?
いいえ。 SUBSCRIBEモードでは、一度に複数のSDKクライアントを起動できます。 ただし、データを使用できるのは1つのクライアントだけです。
PythonとGoのSDKデモを使用して追跡データを消費できますか?
はい。 PythonおよびGo向けのSDKデモの詳細については、GitHubの「dts-subscribe-demo」をご参照ください。
トラブルシューティング
問題 | エラーメッセージ | 原因 | 解決策 |
接続に失敗しました。 |
|
|
|
| ブローカーアドレスを実際のIPアドレスにリダイレクトすることはできません。 | ||
| 指定されたユーザー名またはパスワードは無効です。 | ||
| consumerContext.javaファイルの | 変更追跡インスタンスのデータ範囲内のコンシューマオフセットを指定します。 詳細については、このトピックの「表1」をご参照ください。 | |
データ消費の応答時間が増加しました。 | 非該当 |
|