変更追跡タスクを設定した後、Data Transmission Service (DTS) が提供するSDKデモを使用して、データを追跡および使用できます。 このトピックでは、SDKデモを使用して、分散データベースから追跡されたデータを使用する方法について説明します。 サポートされているソースデータベースは、PolarDB for Xscale (PloarDB-X) 1.0インスタンスとData Management (DMS) 論理データベースです。
前提条件
Java Development Kit (JDK) V1.8がインストールされています。
IntelliJ IDEAがインストールされています。
使用上の注意
Resource Access Management (RAM) ユーザーとしてデータを追跡して使用する場合は、RAMユーザーにAliyunDTSFullAccess権限と、ソースオブジェクトにアクセスするための権限が必要です。 権限を付与する方法の詳細については、「システムポリシーを使用してRAMユーザーにDTSインスタンスを管理する権限を付与する」および「RAMユーザーに権限を付与する」をご参照ください。
手順
このトピックでは、SDKデモを使用して、PolarDB-X 1.0インスタンスから追跡されたデータを消費する方法について説明します。 この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。
変更追跡タスクを作成します。 詳細については、「ApsaraDB RDS For MySQLインスタンスからのデータ変更の追跡」をご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
SDKデモパッケージをダウンロードし、パッケージを解凍します。
IntelliJ IDEAでプロジェクトとして使用するファイルを開きます。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。
表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動します。 次に、フォルダを開き、pom.xmlファイルをダブルクリックします。
表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。
IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、を使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。このシナリオでは、DistributedDTSSuumerDemoを選択します。
Javaファイルのコードに必要なパラメーターを設定します。
public static void main(String[] args) throws ClientException { // Configure a change tracking task for a distributed database such as a PolarDB-X 1.0 instance. Set the parameters that are used to specify your AccessKey pair, instance ID, task ID, and consumer groups. String accessKeyId = "LTA***********99reZ"; String accessKeySecret = "****************"; String regionId = "cn-hangzhou"; String dtsInstanceId = "dtse5212sed162****"; String jobId = "l791216x16d****"; String sid = "dtsip412t13160****"; String userName = "xftest"; String password = "******"; String proxyUrl = "dts-cn-****.com:18001"; // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) String checkpoint = "1639620090"; // Convert physical database/table name to logical database/table name boolean mapping = true; // if force use config checkpoint when start. for checkpoint reset, only assign mode works boolean isForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId, jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl, checkpoint, isForceUseInitCheckpoint, mapping); demo.start(); }
パラメーター
説明
パラメータ値を取得するメソッド
accessKeyId
AccessKey ID。
AccessKeyペアの取得方法の詳細については、「AccessKeyペアの作成と取得」をご参照ください。
accessKeySecret
AccessKeyシークレット。
regionId
変更追跡タスクが存在するリージョンのID。
DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、インスタンスが存在するリージョンを取得できます。 たとえば、インスタンスが中国 (杭州) リージョンにある場合、パラメーターを
cn-Hangzhou
に設定します。 詳細については、「サポートされているリージョン」をご参照ください。dtsInstanceId
変更追跡インスタンスのID。
DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、DTS インスタンス ID を表示できます。
jobId
変更追跡タスクのID。
DescribeDtsJobsを呼び出して、タスクIDを照会できます。
sid
コンシューマーグループの ID です。
DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、データ消費 をクリックします。 インスタンスのコンシューマーグループ ID /名前とコンシューマーグループのアカウントを表示できます。
説明消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。
userName
消費者グループのアカウント。
パスワード
コンシューマーグループのパスワード。
proxyUrl
変更追跡インスタンスのエンドポイントとポート番号。
説明内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。
DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、インスタンスの ネットワーク を表示できます。
チェックポイント
消費者オフセット。 SDKクライアントが最初のデータレコードを消費したときに生成されるタイムスタンプです。 値はUNIXタイムスタンプ (秒単位) です。
説明コンシューマオフセットは、次のシナリオで使用できます。
消費プロセスが中断された場合、消費者オフセットを指定してデータ消費を再開できます。 これにより、データの損失を防ぐことができます。
変更追跡クライアントを起動するときに、ビジネス要件に基づいてデータを消費するための消費者オフセットを指定できます。
消費されたデータの消費者オフセットは、変更追跡インスタンスのデータ範囲内でなければなりません。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。
説明変更追跡タスクページの データ範囲 列に変更追跡インスタンスのデータ範囲を表示できます。
検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。
IntelliJ IDEAの上部メニューバーで、 を選択してクライアントを実行します。
説明IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするのに時間がかかります。
この結果は、SDKクライアントがソースインスタンスからのデータ変更を追跡できることを示しています。
SDKクライアントは、消費されたデータに関する情報を定期的に計算して表示します。 この情報には、送受信されるデータレコードの総数、データの総量、および1秒あたりの要求数 (RPS) が含まれます。
表 1. 次の表に、情報のパラメーターを示します。
パラメーター
説明
outCounts
SDKクライアントによって消費されたデータレコードの総数。
outBytes
SDKクライアントによって消費されたデータの合計量。 単位はバイトです。
outRps
SDKクライアントがデータを消費するRPS。
outBps
SDKクライアントがデータを消費する1秒あたりの送信ビット数。
カウント
なし。
inBytes
DTSサーバーによって送信されるデータの合計量。 単位はバイトです。
DStoreRecordQueue
DTSサーバーがデータを送信するときの現在のデータキャッシュキューのサイズ。
カウント
DTSサーバーによって送信されたデータレコードの総数。
inRps
DTSサーバーがデータを送信するRPS。
inBps
DTSサーバーがデータを送信するときに1秒あたりに送信されるビット数。
__dt
SDKクライアントがデータを受信したときに生成されるタイムスタンプ。 単位:ミリ秒。
DefaultUserRecordQueue
シリアル化後の現在のデータキャッシュキューのサイズ。
オプション: 追跡するデータのデータ型を変更するには、
buildRecordListener()
メソッドのコードを変更するか、カスタムクラスを使用します。public static Map<String, RecordListener> buildRecordListener() { // user can impl their own listener RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.HEARTBEAT)) { // consume record RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume(record); //commit method push the checkpoint update record.commit(""); } } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); }