すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:Kafkaクライアントを使用して追跡データを消費する

最終更新日:Nov 04, 2024

このトピックでは、Kafkaクライアントのデモを使用して追跡データを使用する方法について説明します。 新しいバージョンの変更追跡機能を使用すると、V0.11からV2.7のKafkaクライアントを使用して追跡データを消費できます。

使用上の注意

  • 変更追跡機能を使用するときに自動コミットを有効にすると、一部のデータが消費される前にコミットされることがあります。 これにより、データが失われます。 手動でデータをコミットすることを推奨します。

    説明

    データのコミットに失敗した場合は、クライアントを再起動して、最後に記録された消費チェックポイントからデータを消費し続けることができます。 しかしながら、この期間中に重複データが生成されることがある。 重複データを手動で除外する必要があります。

  • データはシリアル化され、Avro形式で保存されます。 詳細については、「Record.avsc」をご参照ください。

    警告

    このトピックで説明しているKafkaクライアントを使用していない場合は、Avroスキーマに基づいて追跡されたデータを解析し、解析されたデータを確認する必要があります。

  • 検索単位は、データ送信サービス (DTS) がoffsetForTimes操作を呼び出すときに2番目になります。 ネイティブKafkaクライアントがこの操作を呼び出す場合、検索単位はミリ秒です。

  • 障害復旧などのいくつかの理由により、Kafkaクライアントと変更追跡サーバーの間で一時的な接続が発生する場合があります。 このトピックで説明しているKafkaクライアントを使用していない場合は、Kafkaクライアントにネットワーク再接続機能が必要です。

  • ネイティブKafkaクライアントを使用して追跡データを使用する場合、DTSで増分データ収集モジュールが変更される可能性があります。 サブスクライブモードでは、KafkaクライアントがDTSサーバーに保存する消費チェックポイントが削除されます。 ビジネス要件に基づいて追跡データを消費するには、消費チェックポイントを指定する必要があります。 サブスクライブモードでデータを消費する場合は、DTSが提供するSDKデモを使用してデータを追跡および消費するか、消費チェックポイントを手動で管理することをお勧めします。 詳細については、このトピックの「SDKデモを使用して追跡データを消費する」および「消費チェックポイントの管理」セクションをご参照ください。

Kafkaクライアントを実行する

Kafka clientデモをダウンロードします。 デモの使用方法の詳細については、「Readme」をご参照ください。

説明
  • codeをクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。

  • バージョン2.0のKafkaクライアントを使用する場合は、subscribe_example-master/javaimpl/pom.xmlファイルのバージョン番号を2.0.0に変更する必要があります。

kafka2.0

表1プロセスの説明

ステップ

関连ディレクトリまたはファイル

1. ネイティブKafkaコンシューマーを使用して、変更追跡インスタンスから増分データを取得します。

subscribe_example-master/javaimpl/src/main/java/recordgenerator /

2. 増分データのイメージを逆シリアル化し、 事前イメージポストイメージ 、およびその他の属性。

警告
  • ソースインスタンスが自己管理型Oracleデータベースの場合、すべての列に対して補足ログを有効にする必要があります。 これは、クライアントが追跡されたデータをうまく消費できることを保証し、プレ画像とポスト画像の完全性を保証する。

  • ソースインスタンスが自己管理型Oracleデータベースでない場合、DTSはプレイメージの整合性を保証しません。 取得したプレイメージを確認することを推奨します。

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. 逆シリアル化されたデータのdataTypeNumber値を、対応するデータベースのデータ型に変換します。

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql /

手順

次の手順は、Kafkaクライアントを実行して追跡データを消費する方法を示しています。 この例では、IntelliJ IDEA Community Edition 2018.1.4 for Windowsが使用されています。

  1. 変更追跡インスタンスを作成します。 詳細については、「変更追跡シナリオの概要」をご参照ください。

  2. 1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。

  3. のパッケージをダウンロードします。Kafkaクライアントのデモパッケージを解凍します。

    説明

    codeをクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。

  4. IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] をクリックします。

    打开项目

  5. 表示されるダイアログボックスで、ダウンロードしたデモが存在するディレクトリに移動します。 pom.xmlファイルを見つけます。

    打开项目文件

  6. 表示されるダイアログボックスで、プロジェクトとして開くを選択します。

  7. IntelliJ IDEAのプロジェクトツールウィンドウで、フォルダーをクリックしてKafkaクライアントのデモファイルを検索し、ファイルをダブルクリックします。 ファイル名はNotifyDemoDB.javaです。

  8. パラメーターを指定します。NotifyDemoDB.javaファイルを作成します。

    设置参数值

    パラメーター

    説明

    パラメータ値を取得するメソッド

    USER_NAME

    消費者グループアカウントのユーザー名。

    警告

    このトピックで説明するKafkaクライアントを使用していない場合は、このパラメーターを <Username>-<Consumer group ID> の形式で指定する必要があります。 例: dtstest-dtsae ****** bpv そうしないと、接続は失敗します。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、データ消費 をクリックします。 表示されるページで、コンシューマグループのIDや名前、アカウントなど、コンシューマグループに関する情報を表示できます。

    説明

    消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。

    パスワード名

    アカウントのパスワードを入力します。

    SID_NAME

    コンシューマーグループの ID です。

    グループ名

    消費者グループの名前です。 このパラメーターをコンシューマーグループIDに設定します。

    KAFKA_TOPIC

    変更追跡インスタンスの追跡対象トピックの名前。

    DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、トピックとネットワークに関する情報を表示できます。

    KAFKA_BROKER_URL_NAME

    変更追跡インスタンスのエンドポイント。

    説明

    内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、KafkaクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。

    INITIAL_CHECKPOINT_NAME

    消費されたデータの消費チェックポイント。 この値は UNIX タイムスタンプです。 例: 1592269238。

    説明
    • 次の理由で、消費チェックポイントを保存する必要があります。

      • 消費プロセスが中断された場合、Kafkaクライアントで消費チェックポイントを指定してデータ消費を再開できます。 これにより、データの損失を防ぎます。

      • Kafkaクライアントを起動すると、ビジネス要件に基づいてデータを消費する消費チェックポイントを指定できます。

    • SUBSCRIBE_MODE_NAMEパラメーターがsubscribeに設定されている場合、指定したINITIAL_CHECKPOINT_NAMEパラメーターは、Kafkaクライアントを初めて起動したときにのみ有効になります。

    消費されるデータの消費チェックポイントは、変更追跡インスタンスのデータ範囲内でなければなりません。 消費チェックポイントは、UNIXタイムスタンプに変換する必要があります。

    説明
    • 変更追跡タスクページの データ範囲 列に変更追跡インスタンスのデータ範囲を表示できます。

    • 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

    USE_CONFIG_CHECKPOINT_NAME

    指定された消費チェックポイントからのデータをクライアントに消費させるかどうかを指定します。 デフォルト値: true。 このパラメーターをtrueに設定すると、受信したが処理されていないデータが失われないようにすることができます。

    なし

    SUBSCRIBE_MODE_NAME

    コンシューマグループに対して2つ以上のKafkaクライアントを実行するかどうかを指定します。 この機能を使用する場合は、このパラメーターをこれらのKafkaクライアントのサブスクライブに設定します。

    デフォルト値はassignで、この機能が使用されていないことを示します。 コンシューマグループに1つのKafkaクライアントのみをデプロイすることを推奨します。

    なし

  9. IntelliJ IDEAの上部メニューバーで、実行 > 実行クライアントを実行します。

    説明

    IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするには、特定の期間が必要です。

Kafkaクライアントの結果

次の図は、Kafkaクライアントがソースデータベースからのデータ変更を追跡できることを示しています。

Kafka客户端订阅结果

NotifyDemoDB.javaファイルの25行目の // log.info(ret) 文字列から2重スラッシュ (//) を削除できます。 次に、クライアントを再度実行して、データ変更情報を表示します。

よくある質問

  • Q: Kafkaクライアントの消費チェックポイントを記録する必要があるのはなぜですか?

    A: DTSによって記録された消費チェックポイントは、DTSがKafkaクライアントからコミット操作を受信した時点です。 記録された消費チェックポイントは、実際の消費時間とは異なり得る。 ビジネスアプリケーションまたはKafkaクライアントが予期せず中断された場合は、正確な消費チェックポイントを指定してデータの消費を継続できます。 これにより、データの損失や重複データの消費を防ぎます。

消費チェックポイントの管理

  1. DTSでデータ収集モジュールの切り替えをリッスンするようにKafkaクライアントを設定します。

    DTSでデータ収集モジュールの切り替えをリッスンするように、Kafkaクライアントのコンシューマープロパティを設定できます。 次のコードは、コンシューマープロパティを設定する方法の例を示しています。

    properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

    次のコードは、ClusterSwitchListenerの実装方法の例を示しています。

    public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
        private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
        private ClusterResource originClusterResource = null;
        private ClusterResource currentClusterResource = null;
    
        public ConsumerRecords onConsume(ConsumerRecords records) {
            return records;
        }
    
    
        public void close() {
        }
    
        public void onCommit(Map offsets) {
        }
    
    
        public void onUpdate(ClusterResource clusterResource) {
            synchronized (this) {
                originClusterResource = currentClusterResource;
                currentClusterResource = clusterResource;
                if (null == originClusterResource) {
                    LOG.info("Cluster updated to " + currentClusterResource.clusterId());
                } else {
                    if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                        LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
                    } else {
                        LOG.error("Cluster changed");
                        throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
                                + ", consumer require restart");
                    }
                }
            }
        }
    
        public boolean isClusterResourceChanged() {
            if (null == originClusterResource) {
                return false;
            }
            if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                return false;
            }
            return true;
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public static class ClusterSwitchException extends KafkaException {
            public ClusterSwitchException(String message, Throwable cause) {
                super(message, cause);
            }
    
            public ClusterSwitchException(String message) {
                super(message);
            }
    
            public ClusterSwitchException(Throwable cause) {
                super(cause);
            }
    
            public ClusterSwitchException() {
                super();
            }
    
        }
  2. DTSのデータ収集モジュールのキャプチャされた切り替えに基づいて、消費チェックポイントを指定します。

    次のデータ追跡の開始消費チェックポイントを、クライアントが消費した最新の追跡データエントリのタイムスタンプに設定します。 次のコードは、消費チェックポイントを指定する方法の例を示しています。

    try{
       //do some action
    } catch (ClusterSwitchListener.ClusterSwitchException e) {
       reset();
    }
    
    // Reset the consumption checkpoint.
    public reset() {
      long offset = kafkaConsumer.offsetsForTimes(timestamp);
      kafkaConsumer.seek(tp,offset);
    }
    説明

    例の詳細については、「KafkaRecordFetcher」をご参照ください。

MySQLデータ型とdataTypeNumber値のマッピング

MySQLデータ型

dataTypeNumberの値

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Oracleデータ型とdataTypeNumber値のマッピング

Oracleデータ型

dataTypeNumberの値

VARCHAR2/NVARCHAR2

1

番号 /フロート

2

LONG

8

日付

12

RAW

23

LONG_RAW

24

未定

29

XMLTYPE

58

ROWID

69

CHARとNCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

ウービッド

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

PostgreSQLデータ型とdataTypeNumber値のマッピング

PostgreSQLデータ型

dataTypeNumberの値

INT2/SMALLINT

21

INT4/INTEGER /シリアル

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

お金

790

日付

1082

タイムゾーンなしの時間 /時間

1083

タイムゾーンでの時間

1266

タイムゾーンなしのタイムスタンプ /タイムスタンプ

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

ポイント

600

LSEG

601

PATH

602

ボックス

603

ポリゴン

604

ライン

628

CIDR

650

サークル

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615