すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:Kafkaクライアントを使用して追跡データを消費する

最終更新日:Jan 22, 2025

このトピックでは、Kafkaクライアントのデモを使用して追跡データを使用する方法について説明します。 新しいバージョンの変更追跡機能を使用すると、V0.11からV2.7のKafkaクライアントを使用して追跡データを消費できます。

使用上の注意

  • 変更追跡機能を使用するときに自動コミットを有効にすると、一部のデータが消費される前にコミットされることがあります。 これにより、データが失われます。 手動でデータをコミットすることを推奨します。

    説明

    データのコミットに失敗した場合は、クライアントを再起動して、最後に記録された消費チェックポイントからデータを消費し続けることができます。 しかしながら、この期間中に重複データが生成されることがある。 重複データを手動で除外する必要があります。

  • データはシリアル化され、Avro形式で保存されます。 詳細については、「Record.avsc」をご参照ください。

    警告

    このトピックで説明しているKafkaクライアントを使用していない場合は、Avroスキーマに基づいて追跡データを解析する必要があります。

  • 検索単位は、データ送信サービス (DTS) がoffsetForTimes操作を呼び出すときに2番目になります。 ネイティブKafkaクライアントがこの操作を呼び出す場合、検索単位はミリ秒です。

  • 障害復旧などのいくつかの理由により、Kafkaクライアントと変更追跡サーバーの間で一時的な接続が発生する場合があります。 このトピックで説明しているKafkaクライアントを使用していない場合は、Kafkaクライアントにネットワーク再接続機能が必要です。

Kafkaクライアントを実行する

Kafka clientデモをダウンロードします。 デモの使用方法の詳細については、「Readme」をご参照ください。

説明
  • codeアイコンをクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。

  • バージョン2.0のKafkaクライアントを使用する場合は、subscribe_example-master/javaimpl/pom.xmlファイルのバージョン番号を2.0.0に変更する必要があります。

kafka2.0

表 1. 次の表に、Kafkaクライアントを実行する手順を示します。

ステップ

ファイルまたはディレクトリ

1. ネイティブKafkaコンシューマーを使用して、変更追跡インスタンスから増分データを取得します。

subscribe_example-master/javaimpl/src/main/java/recordgenerator /

2. 増分データのイメージを逆シリアル化し、 事前イメージポストイメージ 、およびその他の属性。

警告
  • ソースインスタンスが自己管理型Oracleデータベースの場合、すべての列に対して補足ログを有効にする必要があります。 これは、クライアントが追跡されたデータをうまく消費できることを保証し、プレ画像とポスト画像の完全性を保証する。

  • ソースインスタンスが自己管理型Oracleデータベースでない場合、DTSはプレイメージの整合性を保証しません。 取得したプレイメージを確認することを推奨します。

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. 逆シリアル化されたデータのdataTypeNumber値を、対応するデータベースのデータ型に変換します。

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql /

手順

次の手順は、Kafkaクライアントを実行して追跡データを消費する方法を示しています。 この例では、IntelliJ IDEA Community Edition 2018.1.4 for Windowsが使用されています。

  1. 変更追跡タスクを作成します。 詳細については、「ApsaraDB RDS For MySQLインスタンスからのデータ変更の追跡」、「PolarDB for MySQLクラスターからのデータ変更の追跡」、または「自己管理型Oracleデータベースからのデータ変更の追跡」をご参照ください。

  2. 1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。

  3. Kafkaクライアントデモをダウンロードし、パッケージを解凍します。

    説明

    codeアイコンをクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。

  4. IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] をクリックします。

    Open a project

  5. 表示されるダイアログボックスで、ダウンロードしたデモが存在するディレクトリに移動します。 pom.xmlファイルを見つけます。

    Open the pom.xml file

  6. 表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。

  7. IntelliJ IDEAのプロジェクトツールウィンドウで、フォルダーをクリックしてKafkaクライアントのデモファイルを検索し、ファイルをダブルクリックします。 ファイル名はNotifyDemoDB.javaです。

  8. NotifyDemoDB.javaファイルのパラメーターを設定します。

    Configure the parameters

    パラメーター

    説明

    パラメータ値を取得するメソッド

    USER_NAME

    消費者グループのアカウント。

    警告

    このトピックで説明するKafkaクライアントを使用していない場合は、<Username>-<Consumer group ID> の形式でアカウントを指定する必要があります。 例: dtstest-dtsae ****** bpv そうしないと、接続は失敗します。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、[データの使用] をクリックします。 表示されるページで、コンシューマーグループIDと対応するユーザー名を取得できます。

    説明

    コンシューマーグループアカウントのパスワードは、コンシューマーグループの作成時に自動的に指定されます。

    View the consumer group ID and username

    パスワード名

    アカウントのパスワードを入力します。

    SID_NAME

    コンシューマーグループの ID です。

    グループ名

    消費者グループの名前です。 このパラメーターをコンシューマーグループIDに設定します。

    KAFKA_TOPIC

    変更追跡インスタンスのトピック。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 [タスク管理] ページで、トピックとネットワーク情報を取得できます。 Obtain the topic and network information

    KAFKA_BROKER_URL_NAME

    変更追跡インスタンスのエンドポイント。

    説明

    内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、KafkaクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。

    INITIAL_CHECKPOINT_NAME

    消費されたデータの消費チェックポイント。 この値は UNIX タイムスタンプです。 例: 1592269238。

    説明
    • 次の理由で、消費チェックポイントを保存する必要があります。

      • 消費プロセスが中断された場合、Kafkaクライアントで消費チェックポイントを指定してデータ消費を再開できます。 これにより、データの損失を防ぎます。

      • Kafkaクライアントを起動すると、オンデマンドでデータを消費するための消費チェックポイントを指定できます。

    • SUBSCRIBE_MODE_NAMEパラメーターがsubscribeに設定されている場合、指定したINITIAL_CHECKPOINT_NAMEパラメーターは、Kafkaクライアントを初めて起動したときにのみ有効になります。

    次の図に示すように、消費されるデータの消費チェックポイントは、変更追跡インスタンスのデータ範囲内にある必要があります。 消費チェックポイントは、UNIXタイムスタンプに変換する必要があります。 Data range

    説明

    検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

    USE_CONFIG_CHECKPOINT_NAME

    指定された消費チェックポイントからのデータをクライアントに消費させるかどうかを指定します。 デフォルト値: true。 このパラメーターをtrueに設定すると、受信したが処理されていないデータが失われないようにすることができます。

    非該当

    SUBSCRIBE_MODE_NAME

    コンシューマグループに対して2つ以上のKafkaクライアントを実行するかどうかを指定します。 この機能を使用する場合は、このパラメーターをこれらのKafkaクライアントのサブスクライブに設定します。

    デフォルト値はassignで、この機能が使用されていないことを示します。 コンシューマグループに1つのKafkaクライアントのみをデプロイすることを推奨します。

    非該当

  9. IntelliJ IDEAの上部メニューバーで、[Run] > [Run] を選択してクライアントを実行します。

    説明

    IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするには、特定の期間が必要です。

Kafkaクライアントの結果

次の図は、Kafkaクライアントがソースデータベースからのデータ変更を追跡できることを示しています。

Results on the Kafka client

NotifyDemoDB.javaファイルの25行目の // log.info(ret); 文字列から2重スラッシュ (//) を削除できます。 次に、クライアントを再度実行して、データ変更情報を表示します。

よくある質問

  • Q: Kafkaクライアントの消費チェックポイントを記録する必要があるのはなぜですか?

    A: DTSによって記録された消費チェックポイントは、DTSがKafkaクライアントからコミット操作を受信した時点です。 記録された消費チェックポイントは、実際の消費時間とは異なり得る。 ビジネスアプリケーションまたはKafkaクライアントが予期せず中断された場合は、正確な消費チェックポイントを指定してデータの消費を継続できます。 これにより、データの損失や重複データの消費を防ぎます。

MySQLデータ型とdataTypeNumber値のマッピング

MySQLデータ型

dataTypeNumberの値

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Oracleデータ型とdataTypeNumber値のマッピング

Oracleデータ型

dataTypeNumberの値

VARCHAR2およびNVARCHAR2

1

番号とフロート

2

LONG

8

日付

12

RAW

23

LONG_RAW

24

未定

29

XMLTYPE

58

ROWID

69

CHARとNCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOBおよびNCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

ウービッド

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

PostgreSQLデータ型とdataTypeNumber値のマッピング

PostgreSQLデータ型

dataTypeNumberの値

INT2とSMALLINT

21

INT4、INTEGER、およびシリアル

23

INT8およびBIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

お金

790

日付

1082

タイムゾーンなしの時間と時間

1083

タイムゾーンでの時間

1266

タイムゾーンなしのタイムスタンプとタイムスタンプ

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

ポイント

600

LSEG

601

PATH

602

ボックス

603

ポリゴン

604

ライン

628

CIDR

650

サークル

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615