このトピックでは、Kafkaクライアントのデモを使用して追跡データを使用する方法について説明します。 新しいバージョンの変更追跡機能を使用すると、V0.11からV2.7のKafkaクライアントを使用して追跡データを消費できます。
使用上の注意
変更追跡機能を使用するときに自動コミットを有効にすると、一部のデータが消費される前にコミットされることがあります。 これにより、データが失われます。 手動でデータをコミットすることを推奨します。
説明データのコミットに失敗した場合は、クライアントを再起動して、最後に記録された消費チェックポイントからデータを消費し続けることができます。 しかしながら、この期間中に重複データが生成されることがある。 重複データを手動で除外する必要があります。
データはシリアル化され、Avro形式で保存されます。 詳細については、「Record.avsc」をご参照ください。
警告このトピックで説明しているKafkaクライアントを使用していない場合は、Avroスキーマに基づいて追跡されたデータを解析し、解析されたデータを確認する必要があります。
検索単位は、データ送信サービス (DTS) が
offsetForTimes
操作を呼び出すときに2番目になります。 ネイティブKafkaクライアントがこの操作を呼び出す場合、検索単位はミリ秒です。障害復旧などのいくつかの理由により、Kafkaクライアントと変更追跡サーバーの間で一時的な接続が発生する場合があります。 このトピックで説明しているKafkaクライアントを使用していない場合は、Kafkaクライアントにネットワーク再接続機能が必要です。
ネイティブKafkaクライアントを使用して追跡データを使用する場合、DTSで増分データ収集モジュールが変更される可能性があります。 サブスクライブモードでは、KafkaクライアントがDTSサーバーに保存する消費チェックポイントが削除されます。 ビジネス要件に基づいて追跡データを消費するには、消費チェックポイントを指定する必要があります。 サブスクライブモードでデータを消費する場合は、DTSが提供するSDKデモを使用してデータを追跡および消費するか、消費チェックポイントを手動で管理することをお勧めします。 詳細については、このトピックの「SDKデモを使用して追跡データを消費する」および「消費チェックポイントの管理」セクションをご参照ください。
Kafkaクライアントを実行する
Kafka clientデモをダウンロードします。 デモの使用方法の詳細については、「Readme」をご参照ください。
をクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。
バージョン2.0のKafkaクライアントを使用する場合は、subscribe_example-master/javaimpl/pom.xmlファイルのバージョン番号を2.0.0に変更する必要があります。
表1プロセスの説明
ステップ | 関连ディレクトリまたはファイル |
1. ネイティブKafkaコンシューマーを使用して、変更追跡インスタンスから増分データを取得します。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator / |
2. 増分データのイメージを逆シリアル化し、 事前イメージ 、 ポストイメージ 、およびその他の属性。 警告
| subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3. 逆シリアル化されたデータのdataTypeNumber値を、対応するデータベースのデータ型に変換します。 説明 詳細については、このトピックの次のセクションを参照してください。 | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql / |
手順
次の手順は、Kafkaクライアントを実行して追跡データを消費する方法を示しています。 この例では、IntelliJ IDEA Community Edition 2018.1.4 for Windowsが使用されています。
変更追跡インスタンスを作成します。 詳細については、「変更追跡シナリオの概要」をご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
のパッケージをダウンロードします。Kafkaクライアントのデモパッケージを解凍します。
説明をクリックし、[ZIPのダウンロード] を選択してパッケージをダウンロードします。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] をクリックします。
表示されるダイアログボックスで、ダウンロードしたデモが存在するディレクトリに移動します。 pom.xmlファイルを見つけます。
表示されるダイアログボックスで、プロジェクトとして開くを選択します。
IntelliJ IDEAのプロジェクトツールウィンドウで、フォルダーをクリックしてKafkaクライアントのデモファイルを検索し、ファイルをダブルクリックします。 ファイル名はNotifyDemoDB.javaです。
パラメーターを指定します。NotifyDemoDB.javaファイルを作成します。
パラメーター
説明
パラメータ値を取得するメソッド
USER_NAME
消費者グループアカウントのユーザー名。
警告このトピックで説明するKafkaクライアントを使用していない場合は、このパラメーターを
<Username>-<Consumer group ID>
の形式で指定する必要があります。 例:dtstest-dtsae ****** bpv
そうしないと、接続は失敗します。DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、データ消費 をクリックします。 表示されるページで、コンシューマグループのIDや名前、アカウントなど、コンシューマグループに関する情報を表示できます。
説明消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。
パスワード名
アカウントのパスワードを入力します。
SID_NAME
コンシューマーグループの ID です。
グループ名
消費者グループの名前です。 このパラメーターをコンシューマーグループIDに設定します。
KAFKA_TOPIC
変更追跡インスタンスの追跡対象トピックの名前。
DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、トピックとネットワークに関する情報を表示できます。
KAFKA_BROKER_URL_NAME
変更追跡インスタンスのエンドポイント。
説明内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、KafkaクライアントをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。
INITIAL_CHECKPOINT_NAME
消費されたデータの消費チェックポイント。 この値は UNIX タイムスタンプです。 例: 1592269238。
説明次の理由で、消費チェックポイントを保存する必要があります。
消費プロセスが中断された場合、Kafkaクライアントで消費チェックポイントを指定してデータ消費を再開できます。 これにより、データの損失を防ぎます。
Kafkaクライアントを起動すると、ビジネス要件に基づいてデータを消費する消費チェックポイントを指定できます。
SUBSCRIBE_MODE_NAMEパラメーターがsubscribeに設定されている場合、指定したINITIAL_CHECKPOINT_NAMEパラメーターは、Kafkaクライアントを初めて起動したときにのみ有効になります。
消費されるデータの消費チェックポイントは、変更追跡インスタンスのデータ範囲内でなければなりません。 消費チェックポイントは、UNIXタイムスタンプに変換する必要があります。
説明変更追跡タスクページの データ範囲 列に変更追跡インスタンスのデータ範囲を表示できます。
検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。
USE_CONFIG_CHECKPOINT_NAME
指定された消費チェックポイントからのデータをクライアントに消費させるかどうかを指定します。 デフォルト値: true。 このパラメーターをtrueに設定すると、受信したが処理されていないデータが失われないようにすることができます。
なし
SUBSCRIBE_MODE_NAME
コンシューマグループに対して2つ以上のKafkaクライアントを実行するかどうかを指定します。 この機能を使用する場合は、このパラメーターをこれらのKafkaクライアントのサブスクライブに設定します。
デフォルト値はassignで、この機能が使用されていないことを示します。 コンシューマグループに1つのKafkaクライアントのみをデプロイすることを推奨します。
なし
IntelliJ IDEAの上部メニューバーで、 クライアントを実行します。
説明IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするには、特定の期間が必要です。
Kafkaクライアントの結果
次の図は、Kafkaクライアントがソースデータベースからのデータ変更を追跡できることを示しています。
NotifyDemoDB.javaファイルの25行目の // log.info(ret)
文字列から2重スラッシュ (//
) を削除できます。 次に、クライアントを再度実行して、データ変更情報を表示します。
よくある質問
Q: Kafkaクライアントの消費チェックポイントを記録する必要があるのはなぜですか?
A: DTSによって記録された消費チェックポイントは、DTSがKafkaクライアントからコミット操作を受信した時点です。 記録された消費チェックポイントは、実際の消費時間とは異なり得る。 ビジネスアプリケーションまたはKafkaクライアントが予期せず中断された場合は、正確な消費チェックポイントを指定してデータの消費を継続できます。 これにより、データの損失や重複データの消費を防ぎます。
消費チェックポイントの管理
DTSでデータ収集モジュールの切り替えをリッスンするようにKafkaクライアントを設定します。
DTSでデータ収集モジュールの切り替えをリッスンするように、Kafkaクライアントのコンシューマープロパティを設定できます。 次のコードは、コンシューマープロパティを設定する方法の例を示しています。
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());
次のコードは、ClusterSwitchListenerの実装方法の例を示しています。
public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor { private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class); private ClusterResource originClusterResource = null; private ClusterResource currentClusterResource = null; public ConsumerRecords onConsume(ConsumerRecords records) { return records; } public void close() { } public void onCommit(Map offsets) { } public void onUpdate(ClusterResource clusterResource) { synchronized (this) { originClusterResource = currentClusterResource; currentClusterResource = clusterResource; if (null == originClusterResource) { LOG.info("Cluster updated to " + currentClusterResource.clusterId()); } else { if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { LOG.info("Cluster not changed on update:" + clusterResource.clusterId()); } else { LOG.error("Cluster changed"); throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId() + ", consumer require restart"); } } } } public boolean isClusterResourceChanged() { if (null == originClusterResource) { return false; } if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { return false; } return true; } public void configure(Map<String, ?> configs) { } public static class ClusterSwitchException extends KafkaException { public ClusterSwitchException(String message, Throwable cause) { super(message, cause); } public ClusterSwitchException(String message) { super(message); } public ClusterSwitchException(Throwable cause) { super(cause); } public ClusterSwitchException() { super(); } }
DTSのデータ収集モジュールのキャプチャされた切り替えに基づいて、消費チェックポイントを指定します。
次のデータ追跡の開始消費チェックポイントを、クライアントが消費した最新の追跡データエントリのタイムスタンプに設定します。 次のコードは、消費チェックポイントを指定する方法の例を示しています。
try{ //do some action } catch (ClusterSwitchListener.ClusterSwitchException e) { reset(); } // Reset the consumption checkpoint. public reset() { long offset = kafkaConsumer.offsetsForTimes(timestamp); kafkaConsumer.seek(tp,offset); }
説明例の詳細については、「KafkaRecordFetcher」をご参照ください。
MySQLデータ型とdataTypeNumber値のマッピング
MySQLデータ型 | dataTypeNumberの値 |
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |
Oracleデータ型とdataTypeNumber値のマッピング
Oracleデータ型 | dataTypeNumberの値 |
VARCHAR2/NVARCHAR2 | 1 |
番号 /フロート | 2 |
LONG | 8 |
日付 | 12 |
RAW | 23 |
LONG_RAW | 24 |
未定 | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHARとNCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
ウービッド | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
PostgreSQLデータ型とdataTypeNumber値のマッピング
PostgreSQLデータ型 | dataTypeNumberの値 |
INT2/SMALLINT | 21 |
INT4/INTEGER /シリアル | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
お金 | 790 |
日付 | 1082 |
タイムゾーンなしの時間 /時間 | 1083 |
タイムゾーンでの時間 | 1266 |
タイムゾーンなしのタイムスタンプ /タイムスタンプ | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
ポイント | 600 |
LSEG | 601 |
PATH | 602 |
ボックス | 603 |
ポリゴン | 604 |
ライン | 628 |
CIDR | 650 |
サークル | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |