すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:SDKデモを使用して、PolarDB-X 1.0インスタンスから追跡されたデータを消費する

最終更新日:Nov 04, 2024

変更追跡タスクを設定した後、Data Transmission Service (DTS) が提供するSDKデモを使用して、データを追跡および使用できます。 このトピックでは、SDKデモを使用して、分散データベースから追跡されたデータを使用する方法について説明します。 サポートされているソースデータベースは、PolarDB for Xscale (PloarDB-X) 1.0インスタンスとData Management (DMS) 論理データベースです。

前提条件

  • Java Development Kit (JDK) V1.8がインストールされています。

  • IntelliJ IDEAがインストールされています。

使用上の注意

Resource Access Management (RAM) ユーザーとしてデータを追跡して使用する場合は、RAMユーザーにAliyunDTSFullAccess権限と、ソースオブジェクトにアクセスするための権限が必要です。 権限を付与する方法の詳細については、「システムポリシーを使用してRAMユーザーにDTSインスタンスを管理する権限を付与する」および「RAMユーザーに権限を付与する」をご参照ください。

手順

このトピックでは、SDKデモを使用して、PolarDB-X 1.0インスタンスから追跡されたデータを消費する方法について説明します。 この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。

  1. 変更追跡タスクを作成します。 詳細については、「ApsaraDB RDS For MySQLインスタンスからのデータ変更の追跡」をご参照ください。

  2. 1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。

  3. SDKデモパッケージをダウンロードし、パッケージを解凍します。

  4. IntelliJ IDEAでプロジェクトとして使用するファイルを開きます。

    1. IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。

      Open a project

    2. 表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動します。 次に、フォルダを開き、pom.xmlファイルをダブルクリックします。

      1

    3. 表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。

  5. IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、を使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。このシナリオでは、DistributedDTSSuumerDemoを選択します。

    Find the Java file

  6. Javaファイルのコードに必要なパラメーターを設定します。

    public static void main(String[] args) throws ClientException {
            // Configure a change tracking task for a distributed database such as a PolarDB-X 1.0 instance. Set the parameters that are used to specify your AccessKey pair, instance ID, task ID, and consumer groups. 
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Convert physical database/table name to logical database/table name
            boolean mapping = true;
            // if force use config checkpoint when start. for checkpoint reset, only assign mode works
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }

    パラメーター

    説明

    パラメータ値を取得するメソッド

    accessKeyId

    AccessKey ID。

    AccessKeyペアの取得方法の詳細については、「AccessKeyペアの作成と取得」をご参照ください。

    accessKeySecret

    AccessKeyシークレット。

    regionId

    変更追跡タスクが存在するリージョンのID。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、インスタンスが存在するリージョンを取得できます。 たとえば、インスタンスが中国 (杭州) リージョンにある場合、パラメーターをcn-Hangzhouに設定します。 詳細については、「サポートされているリージョン」をご参照ください。

    dtsInstanceId

    変更追跡インスタンスのID。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、DTS インスタンス ID を表示できます。

    jobId

    変更追跡タスクのID。

    DescribeDtsJobsを呼び出して、タスクIDを照会できます。

    sid

    コンシューマーグループの ID です。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、データ消費 をクリックします。 インスタンスのコンシューマーグループ ID /名前とコンシューマーグループのアカウントを表示できます。

    説明

    消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。

    userName

    消費者グループのアカウント。

    パスワード

    コンシューマーグループのパスワード。

    proxyUrl

    変更追跡インスタンスのエンドポイントとポート番号。

    説明

    内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、インスタンスの ネットワーク を表示できます。

    チェックポイント

    消費者オフセット。 SDKクライアントが最初のデータレコードを消費したときに生成されるタイムスタンプです。 値はUNIXタイムスタンプ (秒単位) です。

    説明

    コンシューマオフセットは、次のシナリオで使用できます。

    • 消費プロセスが中断された場合、消費者オフセットを指定してデータ消費を再開できます。 これにより、データの損失を防ぐことができます。

    • 変更追跡クライアントを起動するときに、ビジネス要件に基づいてデータを消費するための消費者オフセットを指定できます。

    消費されたデータの消費者オフセットは、変更追跡インスタンスのデータ範囲内でなければなりません。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。

    説明
    • 変更追跡タスクページの データ範囲 列に変更追跡インスタンスのデータ範囲を表示できます。

    • 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

  7. IntelliJ IDEAの上部メニューバーで、[Run] > [Run] を選択してクライアントを実行します。

    説明

    IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするのに時間がかかります。

    • この結果は、SDKクライアントがソースインスタンスからのデータ変更を追跡できることを示しています。

    • SDKクライアントは、消費されたデータに関する情報を定期的に計算して表示します。 この情報には、送受信されるデータレコードの総数、データの総量、および1秒あたりの要求数 (RPS) が含まれます。

      表 1. 次の表に、情報のパラメーターを示します。

      パラメーター

      説明

      outCounts

      SDKクライアントによって消費されたデータレコードの総数。

      outBytes

      SDKクライアントによって消費されたデータの合計量。 単位はバイトです。

      outRps

      SDKクライアントがデータを消費するRPS。

      outBps

      SDKクライアントがデータを消費する1秒あたりの送信ビット数。

      カウント

      なし。

      inBytes

      DTSサーバーによって送信されるデータの合計量。 単位はバイトです。

      DStoreRecordQueue

      DTSサーバーがデータを送信するときの現在のデータキャッシュキューのサイズ。

      カウント

      DTSサーバーによって送信されたデータレコードの総数。

      inRps

      DTSサーバーがデータを送信するRPS。

      inBps

      DTSサーバーがデータを送信するときに1秒あたりに送信されるビット数。

      __dt

      SDKクライアントがデータを受信したときに生成されるタイムスタンプ。 単位:ミリ秒。

      DefaultUserRecordQueue

      シリアル化後の現在のデータキャッシュキューのサイズ。

  8. オプション: 追跡するデータのデータ型を変更するには、buildRecordListener() メソッドのコードを変更するか、カスタムクラスを使用します。

    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }