すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:SDKデモを使用して追跡データを消費する

最終更新日:Nov 04, 2024

変更追跡タスクと消費者グループを作成した後、コードを記述するか、Data Transmission Service (DTS) が提供するSDKデモを使用して、データを追跡して消費することができます。 このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。

重要

このトピックでは、Java用のSDKデモを使用します。 PythonおよびGo向けのSDKデモの詳細については、GitHubの「aliyun-dts-subscribe-demo」をご参照ください。

手順

説明

このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。 この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。

  1. 変更追跡タスクを作成します。 詳細については、「変更追跡シナリオの概要」の関連トピックをご参照ください。

  2. 1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。

  3. ビジネス要件に基づいてSDKデモを使用します。

    重要

    追跡データを使用するときは、DefaultUserRecordのcommitメソッドを呼び出してオフセットをコミットする必要があります。 それ以外の場合、データは繰り返し消費されます。

    • (推奨) パッケージ化された新しい変更追跡SDKの使用

      1. IntelliJ IDEAを開き、[新しいプロジェクトの作成] をクリックします。

      2. 作成したプロジェクトで、pom.xmlファイルを見つけます。

      3. 次の依存関係をpom.xmlファイルに追加します。

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency> 
        重要
        • 変更追跡SDKの最新バージョンは、dts-new-subscribe-sdkページで確認できます。

        • dts-new-subscribe-sdkパッケージは、次のネイティブ依存関係をカプセル化します。

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency> 
        • バージョン2.0.0のdts-new-subscribe-sdkパッケージは、バージョン2.7.0のkafka-clients依存関係をカプセル化しています。 以前のバージョンのdts-new-subscribe-sdkパッケージは、バージョン1.0.0のkafka-clients依存関係をカプセル化しています。

      4. 新しい変更追跡SDKを使用します。 デモの詳細については、GitHubの「aliyun-dts-subscribe-sdk-java」をご参照ください。

    • コードをカスタマイズして新しい変更追跡SDKを使用する

      1. SDKデモパッケージをダウンロードし、パッケージを解凍します。

        説明

        codeアイコンをクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。

      2. パッケージが解凍されているディレクトリに移動します。 次に、テキストエディターを使用してpom.xmlファイルを開き、SDKのバージョンを最新に変更します。设置SDK版本

        重要
        • 変更追跡SDKの最新バージョンは、dts-new-subscribe-sdkページで確認できます。

        • dts-new-subscribe-sdkパッケージは、次のネイティブ依存関係をカプセル化します。

          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>{version}</version>
          </dependency> 
        • バージョン2.0.0のdts-new-subscribe-sdkパッケージは、バージョン2.7.0のkafka-clients依存関係をカプセル化しています。 以前のバージョンのdts-new-subscribe-sdkパッケージは、バージョン1.0.0のkafka-clients依存関係をカプセル化しています。

      3. IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。打开工程

      4. 表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動し、フォルダをクリックしてpom.xmlファイルを見つけます。 次に、[OK] をクリックします。找到项目对象模型文件

      5. 表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。

      6. IntelliJ IDEAで、フォルダをクリックしてJavaファイルを見つけます。 次に、を使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。DTSSucumerAssignDemo. javaまたはDTSSucumerSubscribeDemo. javaファイルを選択できます。java客户端文件

        説明

        DTSは、SDKクライアントを使用する次のモードをサポートしています。

        • ASSIGNモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 ASSIGNモードでSDKクライアントを使用する場合は、SDKクライアントを1つだけ起動することをお勧めします。

        • SUBSCRIBEモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 SUBSCRIBEモードでは、冗長性を高めるために、コンシューマーグループ内の複数のSDKクライアントを一度に起動できます。 コンシューマーグループ内のSDKクライアントが失敗した場合、他のSDKクライアントはランダムに自動的にパーティション0に割り当てられ、データ消費が再開されます。

  4. Javaファイルのコードで必要なパラメーターを指定します。

    assigndemo

    表 1. 必須パラメーター

    パラメーター

    説明

    パラメータ値を取得するメソッド

    brokerUrl

    変更追跡インスタンスのエンドポイントとポート番号。

    説明
    • 内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。

    • パブリックエンドポイントを使用しないことを推奨します。

    DTSコンソールの [変更トラッキング] ページで、管理する変更トラッキングインスタンスを見つけ、インスタンスIDをクリックします。 左側のウィンドウで、基本情報 タブをクリックして、ネットワーク セクションで変更追跡インスタンスのエンドポイントとポート番号を取得します。

    トピック

    変更追跡インスタンスの追跡対象トピックの名前。

    DTSコンソールの [変更トラッキング] ページで、管理する変更トラッキングインスタンスを見つけ、インスタンスIDをクリックします。 左側のウィンドウで、基本情報 タブをクリックして、基本情報 セクションで追跡対象のトピックの名前を取得します。

    sid

    消費者グループID。

    DTSコンソールの [変更トラッキング] ページで、管理する変更トラッキングインスタンスを見つけ、インスタンスIDをクリックします。 左側のウィンドウで、データ消費 タブをクリックして、コンシューマーグループ ID /名前アカウント 情報を表示します。

    説明

    消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。

    userName

    消費者グループアカウントのユーザー名。

    警告

    このトピックで説明するSDKクライアントを使用していない場合は、このパラメーターを <Username>-<Consumer group ID> の形式で指定する必要があります。 例: dtstest-dtsae ****** bpv そうしないと、接続は失敗します。

    パスワード

    消費者グループアカウントのパスワード。

    initCheckpoint

    コンシューマオフセット。SDKクライアントが最初のデータレコードを消費したときに生成されるタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1620962769。

    説明

    コンシューマオフセットは、次のシナリオで使用できます。

    • 消費プロセスが中断された場合、消費者オフセットを指定してデータ消費を再開できます。 これにより、データの損失を防ぎます。

    • 変更追跡クライアントを起動するときに、ビジネス要件に基づいてデータを消費するための消費者オフセットを指定できます。

    消費されたデータの消費者オフセットは、変更追跡インスタンスのデータ範囲内でなければなりません。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。

    説明
    • 変更トラッキングページの データ範囲 列に変更トラッキングインスタンスのデータ範囲を表示できます。

    • 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

    ConsumerContext.ConsumerSubscribeMode

    SDKクライアントを使用するモード。 有効な値:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGNモードでは、コンシューマグループ内で1つのSDKクライアントのみを起動して、追跡データを消費できます。

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBEモードでは、冗長性を高めるために、1つのコンシューマーグループ内の複数のSDKクライアントを一度に起動できます。

    非該当

  5. IntelliJ IDEAの上部メニューバーで、実行 > 実行SDKクライアントを実行します。

    説明

    IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするには一定の時間がかかります。

    • 次の図は、SDKクライアントがソースデータベースからのデータ変更を追跡できることを示す結果を示しています。消费数据

    • SDKクライアントは、消費されたデータに関する情報を定期的に計算して表示します。 この情報には、送受信されるデータレコードの総数、データの総量、および1秒あたりの要求数 (RPS) が含まれます。统计信息

      表 2. 消費されたデータに関する情報のパラメーター

      パラメーター

      説明

      outCounts

      SDKクライアントによって消費されたデータレコードの総数。

      outBytes

      SDKクライアントによって消費されたデータの合計量。 単位はバイトです。

      outRps

      SDKクライアントがデータを消費するときのRPS。

      outBps

      SDKクライアントがデータを消費するときに1秒あたりに送信されるビット数。

      inBytes

      DTSサーバーによって送信されたデータの合計量。 単位はバイトです。

      DStoreRecordQueue

      DTSサーバーがデータを送信するときのデータキャッシュキューのサイズ。

      カウント

      DTSサーバーによって送信されたデータレコードの総数。

      inRps

      DTSサーバーがデータを送信するときのRPS。

      __dt

      SDKクライアントがデータを受信したときに生成されるタイムスタンプ。 単位:ミリ秒。

      DefaultUserRecordQueue

      シリアル化後のデータキャッシュキューのサイズ。

コンシューマーオフセットの管理

SDKクライアントが初めて起動または再起動された場合、または内部再試行が発生した場合は、 消費者オフセット データ消費を開始または再開します。 次の表に、さまざまなシナリオでコンシューマオフセットを管理およびクエリする方法を示します。 これにより、データの損失やデータの重複を防ぎ、ビジネス要件に基づいてデータを消費できます。

SDKクライアントのコンシューマーオフセットをリセットする必要がある場合は、SDKクライアントを使用するモードに基づいて、次の表を参照してコンシューマーオフセットを照会およびリセットできます。

シナリオ

SDKクライアントを使用するモード

実装

コンシューマオフセットをクエリする必要があります。

割り当ておよびSUBSCRIBE

  • SDKクライアントは、5秒ごとにコンシューマーオフセットを保存し、コンシューマーオフセットをDTSサーバーに送信します。 最後のコンシューマオフセットを照会するには、次の方法を使用できます。

    • SDKクライアントがデプロイされているサーバーのlocalCheckpointStoreファイルを見つけます。

    • 変更追跡インスタンスの [データの使用] タブに移動します。

  • consumerContext.javaファイルのsetUserRegisteredStore(new UserMetaStore()) メソッドを呼び出してデータベースなどの外部永続共有ストレージメディアを指定した場合、ストレージメディアは5秒ごとにコンシューマオフセットを保存します。 ストレージメディアからコンシューマーオフセットを照会できます。

SDKクライアントを初めて起動するときは、データを消費するためのコンシューマーオフセットを指定する必要があります。

割り当ておよびSUBSCRIBE

SDKクライアントを使用するモードに基づいて、DTConsumerAssignDemo. javaまたはDTConsumerSubscribeDemo. javaファイルを選択します。 次に、initCheckpointパラメーターを指定してデータを消費します。 詳細については、このトピックの手順セクションの手順3および手順4を参照してください。

内部再試行が発生した場合、データ消費を再開するには、前のデータレコードのコンシューマオフセットを指定する必要があります。

割り当て

次の手順を実行して、前のデータレコードのコンシューマオフセットを照会します。

  1. consumerContext.javaファイルのsetUserRegisteredStore(new UserMetaStore()) メソッドを呼び出して、指定した外部記憶媒体を見つけます。

  2. SDKクライアントがデプロイされているサーバーのlocalCheckpointStoreファイルを見つけます。

  3. DTSSucumerSubscribeDemo. javaファイルのinitCheckpointパラメーターを使用して指定した開始タイムスタンプを見つけます。

サブスクリプション

次の手順を実行して、前のデータレコードのコンシューマオフセットを照会します。

  1. consumerContext.javaファイルのsetUserRegisteredStore(new UserMetaStore()) メソッドを呼び出して、指定した外部記憶媒体を見つけます。

  2. DStoreモジュールによって保存されたコンシューマーオフセットを見つけます。 DStoreモジュールは、増分データを読み取るためにDTSによって使用されます。

    説明

    このコンシューマオフセットは、SDKクライアントを使用してcommitメソッドを呼び出すことによってのみ更新できます。

  3. DTSSucumerSubscribeDemo. javaファイルのinitCheckpointパラメーターを使用して指定した開始タイムスタンプを見つけます。

  4. 新しいDStoreモジュールによって保存された開始コンシューマオフセットを使用します。

    重要

    DStoreモジュールの切り替えが発生した場合、新しいDStoreモジュールはSDKクライアントの最後のコンシューマーオフセットを保存できません。 SDKクライアントは、以前の消費者オフセットからの追跡データの消費を開始することができる。 SDKクライアントのコンシューマーオフセットを保存するために、永続的なストレージメディアを指定することを推奨します。 詳細については、このトピックの「コンシューマーオフセットを保存する永続的なストレージメディアの指定」をご参照ください。

SDKクライアントを再起動した後、データ消費を再開するには、前のデータレコードのコンシューマーオフセットを指定する必要があります。

割り当て

consumerContext.javaファイルのsetForceUseCheckpointパラメーターの値を確認し、コンシューマーオフセットを照会します。

  • パラメーターがtrueに設定されている場合、SDKクライアントが再起動されるたびに、initCheckpointパラメーターの値がコンシューマーオフセットとして使用されます。

  • パラメーターがfalseに設定されているか、指定されていない場合は、次の手順を実行して、前のデータレコードのコンシューマーオフセットを照会します。

    1. SDKクライアントがデプロイされているサーバーのlocalCheckpointStoreファイルを見つけます。

    2. DStoreモジュールによって保存されたコンシューマーオフセットを見つけます。 DStoreモジュールは、増分データを読み取るためにDTSによって使用されます。

      説明

      このコンシューマオフセットは、SDKクライアントを使用してcommitメソッドを呼び出すことによってのみ更新できます。

    3. consumerContext.javaファイルのsetUserRegisteredStore(new UserMetaStore()) メソッドを呼び出して、指定した外部記憶媒体を見つけます。

サブスクリプション

このモードでは、consumerContext.javaファイルのsetForceUseCheckpointパラメーターの設定は有効になりません。 次の手順を実行して、前のデータレコードのコンシューマオフセットを照会します。

  1. consumerContext.javaファイルのsetUserRegisteredStore(new UserMetaStore()) メソッドを呼び出して、指定した外部記憶媒体を見つけます。

  2. DStoreモジュールによって保存されたコンシューマーオフセットを見つけます。 DStoreモジュールは、増分データを読み取るためにDTSによって使用されます。

    説明

    このコンシューマオフセットは、SDKクライアントを使用してcommitメソッドを呼び出すことによってのみ更新できます。

  3. DTSSucumerSubscribeDemo. javaファイルのinitCheckpointパラメーターを使用して指定した開始タイムスタンプを見つけます。

  4. 新しいDStoreモジュールによって保存された開始コンシューマオフセットを使用します。

コンシューマオフセットを保存するための永続的なストレージメディアの指定

DStoreモジュールの切り替えがディザスタリカバリのためにトリガーされた場合、特にSUBSCRIBEモードでは、新しいDStoreモジュールはSDKクライアントの最後のコンシューマーオフセットを保存できません。 SDKクライアントは、以前の消費者オフセットからの追跡データの消費を開始することができる。 その結果、履歴データは繰り返し消費される。 たとえば、元のDStoreモジュールのコンシューマーオフセット範囲は、2023年11月11日の08:00:00から年11月12日の08:00:00までで、SDKクライアントのコンシューマーオフセットは2023年11月12日の08:00:00です。 切り替え後、新しいDStoreモジュールのコンシューマーオフセット範囲は、2023年11月8日の10:00:00から2023年11月12日の08:01:00までです。 この場合、SDKクライアントは、新しいDStoreモジュールの開始コンシューマーオフセットである2023年11月8日の10:00:00からデータの消費を開始します。 その結果、履歴データは繰り返し消費される。

切り替えシナリオで履歴データが繰り返し使用されないようにするには、永続的なストレージメディアを指定して、SDKクライアントのコンシューマーオフセットを保存することを推奨します。 次のサンプルコードは参考用です。 ビジネス要件に基づいてコードを変更できます。

  1. AbstractUserMetaStore() メソッドを継承して実装するUserMetaStore() メソッドを作成します。

    たとえば、UserMetaStore() メソッドを呼び出して、コンシューマーオフセットを保存するMySQLデータベースを指定できます。 Javaのサンプルコード:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.javaファイルのsetUserRegisteredStore(new UserMetaStore()) メソッドを呼び出して外部記憶媒体を指定します。

よくある質問

  • 変更追跡インスタンスに接続できない場合はどうすればよいですか?

    報告されたエラーメッセージに基づいて問題をトラブルシューティングします。 詳細については、このトピックの「トラブルシューティング」セクションをご参照ください。

  • 永続的に保存されるコンシューマーオフセットから返されるデータの形式は何ですか?

    コンシューマオフセットが永続的に格納されると、JSON形式のデータがコンシューマオフセットから返されます。 永続的に格納される消費者オフセットは、UNIXタイムスタンプである。 コンシューマオフセットをSDKに直接インポートして使用できます。 次の応答では、パラメータ「timestamp」の値1700709977は、永続的に記憶される消費者オフセットを示す。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
  • 複数のSDKクライアントを使用して、変更追跡タスクで追跡されたデータを同時に使用できますか?

    いいえ。 SUBSCRIBEモードでは、一度に複数のSDKクライアントを起動できます。 ただし、データを使用できるのは1つのクライアントだけです。

  • PythonとGoのSDKデモを使用して追跡データを消費できますか?

    はい。 PythonおよびGo向けのSDKデモの詳細については、GitHubの「dts-subscribe-demo」をご参照ください。

トラブルシューティング

問題

エラーメッセージ

原因

解決策

接続に失敗しました。

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrlパラメーターの値が無効です。

brokerUrluserName、およびpasswordパラメーターに有効な値を入力します。 詳細については、このトピックの「表1」をご参照ください。

telnet real node *** failed, please check the network

ブローカーアドレスを実際のIPアドレスにリダイレクトすることはできません。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

指定されたユーザー名またはパスワードは無効です。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.javaファイルのsetUseCheckpointパラメーターはtrueに設定されていますが、コンシューマーオフセットは変更追跡インスタンスのデータ範囲内にありません。

変更追跡インスタンスのデータ範囲内のコンシューマオフセットを指定します。 詳細については、このトピックの「表1」をご参照ください。

データ消費の応答時間が増加しました。

非該当

  • 原因を分析するには、DStoreRecordQueueおよびDefaultUserRecordQueueパラメーターを照会します。 詳細については、このトピックの「表2」をご参照ください。

    • DStoreRecordQueueパラメーターの値が0の場合、DTSサーバーがデータをプルする速度は低下します。

    • DefaultUserRecordQueueパラメーターの値がデフォルト値の512の場合、SDKクライアントがデータを消費する速度は低下します。

  • サンプルコードのinitCheckpointパラメーターの値を変更して、ビジネス要件に基づいてコンシューマオフセットをリセットします。