すべてのプロダクト
Search
ドキュメントセンター

:SDKデモを使用して追跡データを消費する

最終更新日:Oct 18, 2024

変更追跡タスクを設定した後、Data Transmission Service (DTS) が提供するSDKデモを使用して、データを追跡および使用できます。 このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。

手順

重要

このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。 この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。

  1. 変更追跡タスクを作成します。 詳細については、「ApsaraDB RDS For MySQLインスタンスからのデータ変更の追跡」、「PolarDB for MySQLクラスターからのデータ変更の追跡」、または「Track data changes from a self-managed Oracle database」をご参照ください。

  2. 1つ以上のコンシューマグループを作成します。 詳細については、「Create consumer groups」をご参照ください。

  3. ビジネス要件に基づいてSDKデモを使用します。

    • (推奨) パッケージ化された新しい変更追跡SDKの使用

      1. IntelliJ IDEAを開き、[新しいプロジェクトの作成] をクリックします。

      2. 作成したプロジェクトで、pom.xmlファイルを見つけます。

      3. 次の依存関係をpom.xmlファイルに追加します。

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        説明

        変更追跡SDKの最新バージョンは、dts-new-subscribe-sdkページで確認できます。

      4. 新しい変更追跡SDKを使用します。 デモコードの詳細については、「SDKデモコード」をご参照ください。

    • コードをカスタマイズして新しい変更追跡SDKを使用する

      1. SDKデモパッケージをダウンロードし、パッケージを解凍します。

        説明

        パッケージをダウンロードするには、code /> [ZIPのダウンロード] を選択します。

      2. パッケージが解凍されているディレクトリに移動します。 次に、テキストエディターを使用してpom.xmlファイルを開き、SDKのバージョンを最新に変更します。 Set the SDK version

        重要

        変更追跡SDKの最新バージョンは、Maven Webサイトから入手できます。 詳細については、変更追跡SDKのMavenページをご覧ください。

      3. IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。 Open a project

      4. 表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動し、pom.xmlファイルを見つけます。 次に、[OK] をクリックします。 Find the pom.xml file

      5. 表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。

      6. IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、を使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。DTSCusumerAssignDemo. javaおよびDTSCusumerSubscribeDemo. java Javaファイルが利用可能です。 Java files of the client

        説明

        DTSは、SDKクライアントを使用する次のモードをサポートしています。

        • ASSIGNモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 SDKクライアントをASSIGNモードで使用する場合は、SDKクライアントを1つだけ起動することをお勧めします。

        • SUBSCRIBEモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 SUBSCRIBEモードでは、コンシューマーグループ内の複数のSDKクライアントを同時に起動して、ディザスタリカバリを実装できます。 コンシューマーグループ内のSDKクライアントが失敗した場合、他のSDKクライアントはランダムに自動的にパーティション0に割り当てられ、データ消費が再開されます。

  4. Javaファイルのコードに必要なパラメーターを設定します。

    assigndemo

    表 1. 次の表に、必要なパラメーターを示します。

    パラメーター

    説明

    取得する方法

    brokerUrl

    変更追跡インスタンスのエンドポイントとポート番号。

    説明

    内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするElastic Compute Service (ECS) インスタンスが、クラシックネットワークまたは変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) に属している場合に適用されます。

    新しいDTSコンソールで、インスタンスIDをクリックします。 [基本情報] ページで、[ネットワーク] セクションでエンドポイントとポート番号を取得できます。 Network

    トピック

    変更追跡インスタンスのトピックの名前。

    DTSコンソールで、インスタンスIDをクリックします。 [基本情報] ページで、[基本情報] セクションで追跡対象のトピックを取得できます。 topic

    sid

    コンシューマーグループの ID です。

    DTSコンソールで、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、[データの使用] をクリックします。 コンシューマーグループのIDとアカウントを取得できます。

    説明

    コンシューマーグループアカウントのパスワードは、コンシューマーグループの作成時に自動的に指定されます。

    Consumer group account

    userName

    消費者グループのアカウント。

    警告

    このトピックで説明するSDKクライアントを使用していない場合は、このパラメーターを <Username>-<Consumer group ID> の形式で指定する必要があります。 例: dtstest-dtsae ****** bpv そうしないと、接続は失敗します。

    パスワード

    アカウントのパスワードを入力します。

    initCheckpoint

    消費チェックポイント。 SDKクライアントが最初のデータレコードを消費したときのタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1620962769。

    説明

    消費チェックポイントは、次のシナリオで使用できます。

    • 消費プロセスが中断された場合、消費チェックポイントを指定してデータ消費を再開できます。 これにより、データの損失を防ぐことができます。

    • 変更追跡クライアントを起動するときに、オンデマンドでデータを消費する消費チェックポイントを指定できます。

    次の図に示すように、消費チェックポイントは変更追跡インスタンスのデータ範囲内にある必要があります。 消費チェックポイントは、UNIXタイムスタンプに変換する必要があります。 Data range

    説明

    検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

    ConsumerContext.ConsumerSubscribeMode

    SDKクライアントを使用するモード。 有効な値:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGNモードでは、コンシューマグループ内の1つのSDKクライアントのみが追跡データを消費できます。

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBEモードでは、コンシューマーグループ内の複数のSDKクライアントを同時に起動して、ディザスタリカバリを実装できます。

    非該当

  5. IntelliJ IDEAの上部のナビゲーションバーで、[Run] > [Run] を選択してクライアントを実行します。

    説明

    IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするのに時間がかかります。

    • 以下の図に結果を示します。 この結果は、SDKクライアントがソースデータベースからのデータ変更を追跡できることを示しています。 Consume data

    • SDKクライアントは、消費されたデータに関する情報を定期的に計算して表示します。 この情報には、送受信されるデータレコードの総数、データの総量、および1秒あたりの要求数 (RPS) が含まれます。 Information about the consumed data

      表 2. 次の表に、情報のパラメーターを示します。

      パラメーター

      説明

      outCounts

      SDKクライアントによって消費されたデータレコードの総数。

      outBytes

      SDKクライアントによって消費されたデータの合計量。 単位はバイトです。

      outRps

      SDKクライアントがデータを消費するRPS。

      outBps

      SDKクライアントがデータを消費する1秒あたりの送信ビット数。

      inBytes

      DTSサーバーによって送信されるデータの合計量。 単位はバイトです。

      DStoreRecordQueue

      DTSサーバーがデータを送信するときの現在のデータキャッシュキューのサイズ。

      カウント

      DTSサーバーによって送信されたデータレコードの総数。

      inRps

      DTSサーバーがデータを送信するRPS。

      __dt

      SDKクライアントがデータを受信したときに生成されるタイムスタンプ。 単位:ミリ秒。

      DefaultUserRecordQueue

      シリアル化後の現在のデータキャッシュキューのサイズ。

消費チェックポイントを保存して照会する

SDKクライアントが初めて起動または再起動された場合、または内部再試行が発生した場合は、 消費チェックポイント データ消費を開始または再開します。 次の表に、さまざまなシナリオで消費チェックポイントを管理およびクエリする方法を示します。 これにより、データの損失や重複データを防ぎ、オンデマンドでデータを消費できます。

シナリオ

SDKクライアントの使用モード

照会方法

消費チェックポイントの照会

割り当ておよびSUBSCRIBE

  • SDKクライアントは、5秒ごとに消費チェックポイントを保存し、消費チェックポイントをDTSサーバーに送信します。 最後の消費チェックポイントを照会するには、次の方法を使用できます。

    • SDKクライアントが存在するサーバーのlocalCheckpointStoreファイルを見つけます。

    • 変更追跡インスタンスの [データの使用] ページに移動します。

  • consumerContext.javaファイルのsetUserRegisteredStore(newUserMetaStore()) パラメーターのデータベースなどの外部永続共有ストレージメディアを設定した場合、ストレージメディアは5秒ごとに消費チェックポイントを保存します。 ストレージメディアを使用して、消費チェックポイントを照会できます。

SDKクライアントを初めて起動するときは、データを消費するための消費チェックポイントを指定する必要があります。

割り当ておよびSUBSCRIBE

SDKクライアントを使用するモードに基づいて、DTConsumerAssignDemo. javaまたはDTConsumerSubscribeDemo. javaファイルを選択します。 次に、initCheckpointパラメーターを指定してデータを消費します。 詳細については、「34」をご参照ください。

内部再試行が発生した場合、データ消費を再開するには、前のデータレコードの消費チェックポイントを指定する必要があります。

割り当て

次の手順を実行して、前のデータレコードの消費チェックポイントを見つけます。

  1. consumerContext.javaファイルのsetUserRegisteredStore(newUserMetaStore()) パラメーターで設定した外部ストレージメディアを見つけます。

  2. SDKクライアントが存在するサーバーのlocalCheckpointStoreファイルを見つけます。

  3. DTSSucumerSubscribeDemo. javaファイルのinitCheckpointパラメーターで指定した開始タイムスタンプを見つけます。

サブスクリプション

次の手順を実行して、前のデータレコードの消費チェックポイントを見つけます。

  1. consumerContext.javaファイルのsetUserRegisteredStore(newUserMetaStore()) パラメーターで設定した外部ストレージメディアを見つけます。

  2. DTSサーバー (DStore) の保存された消費チェックポイントを見つけます。

  3. DTSSucumerSubscribeDemo. javaファイルのinitCheckpointパラメーターで指定した開始タイムスタンプを見つけます。

  4. DTSサーバー (新しいDStore) の開始消費チェックポイントを使用します。

SDKクライアントを再起動した後、データ消費を再開するには、最後のデータレコードの消費チェックポイントを指定する必要があります。

割り当て

consumerContext.javaファイルのsetForceUseCheckpointパラメーターの設定を確認し、消費チェックポイントを照会します。

  • パラメーターがtrueに設定されている場合、SDKクライアントが再起動されるたびに、initCheckpointパラメーターの値が消費チェックポイントとして使用されます。

  • パラメーターがfalseに設定されているか、指定されていない場合は、次の手順を実行して、前のデータレコードの消費チェックポイントを見つけます。

    1. SDKクライアントが存在するサーバーのlocalCheckpointStoreファイルを見つけます。

    2. DTSサーバー (DStore) の保存された消費チェックポイントを見つけます。

    3. consumerContext.javaファイルのsetUserRegisteredStore(newUserMetaStore()) パラメーターで設定した外部ストレージメディアを見つけます。

サブスクリプション

このモードでは、consumerContext.javaファイルのsetForceUseCheckpointパラメーターの設定は有効になりません。 次の手順を実行して、前のデータレコードの消費チェックポイントを見つけます。

  1. consumerContext.javaファイルのsetUserRegisteredStore(newUserMetaStore()) パラメーターで設定した外部ストレージメディアを見つけます。

  2. DTSサーバー (DStore) の保存された消費チェックポイントを見つけます。

  3. DTSSucumerSubscribeDemo. javaファイルのinitCheckpointパラメーターで指定した開始タイムスタンプを見つけます。

  4. DTSサーバー (新しいDStore) の開始消費チェックポイントを使用します。

トラブルシューティング

問題

エラーメッセージ

原因

解決策

接続に失敗しました。

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrlパラメーターの値が無効です。

brokerUrluserName、およびpasswordパラメーターに有効な値を入力します。 詳細については、「」をご参照ください。次の表に、必要なパラメーターを示します。.

telnet real node *** failed, please check the network

ブローカーアドレスを実際のIPアドレスにリダイレクトすることはできません。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

指定されたユーザー名またはパスワードは無効です。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.javaファイルのsetUseCheckpointパラメーターはtrueに設定されていますが、消費チェックポイントは変更追跡インスタンスのデータ範囲内にありません。

変更追跡インスタンスのデータ範囲内で消費チェックポイントを指定します。 詳細については、「」をご参照ください。次の表に、必要なパラメーターを示します。.

データ消費の応答時間が増加しました。

非該当

原因を分析するには、DStoreRecordQueueおよびDefaultUserRecordQueueパラメーターを照会します。 詳細については、「」をご参照ください。次の表に、情報のパラメーターを示します。.

  • DStoreRecordQueueパラメーターの値が0の場合、DTSサーバーがデータをプルする速度は低下します。

  • DefaultUserRecordQueueパラメーターの値がデフォルト値の512の場合、SDKクライアントがデータを消費する速度は低下します。