すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:flink-dts-connectorを使用した追跡データの消費

最終更新日:Nov 04, 2024

変更追跡タスクを設定した後、flink-dts-connectorファイルを使用して追跡データを消費できます。 このトピックでは、flink-dts-connectorファイルを使用して追跡データを消費する方法について説明します。

使用上の注意

  • Data Transmission Service (DTS) は、次のタイプのFlinkプログラムをサポートしています。DataStream APIおよびTable API & SQL。

  • Table APIとSQLプログラムを使用する場合、変更追跡タスクを設定するたびに1つのテーブルのデータのみを使用できます。 複数のテーブルのデータを使用する場合は、テーブルごとにタスクを設定する必要があります。

手順

この例では、IntelliJ IDEA Community Edition 2020.1 Windowsが使用されます。

  1. 変更追跡タスクを作成します。 詳細については、「変更追跡シナリオの概要」の関連トピックをご参照ください。

  2. 1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。

  3. flink-dts-connectorファイルをダウンロードして解凍します。

  4. IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。

    Open a project

  5. 表示されるダイアログボックスで、flink-dts-connectorファイルが解凍されているディレクトリに移動し、フォルダを展開してpom.xmlファイルを見つけます。

    Find the pom.xml file

  6. 表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。

  7. 次の依存関係をpom.xmlファイルに追加します。

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、使用するFlinkコネクタのタイプに基づいてJavaファイルをダブルクリックします。

    • DataStream APIコネクタを使用する場合は、flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.javaファイルをダブルクリックし、次の操作を実行する必要があります。

      1. IntelliJ IDEAの上部メニューバーで、実行アイコンをクリックします。 Run icon

      2. 表示されるダイアログボックスで、[DtsExample] > [Edit] を選択します。 edit

      3. [プログラム引数] フィールドにパラメーターと対応する値を入力し、[実行] をクリックしてflink-dts-connectorを実行します。

        説明

        パラメーターとパラメーター値を取得するためのメソッドの詳細については、このトピックの「パラメーター」をご参照ください。

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. 次の図は、Flinkプログラムがソースデータベースからのデータ変更を追跡できることを示しています。 Data changes (DataStream API)

        説明

        データ変更の特定のレコードを照会するには、FlinkプログラムのTask Managerページに移動します。

    • Table APIとSQLコネクタを使用する場合は、flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.javaファイルをダブルクリックし、次の操作を実行する必要があります。

      説明

      単一のDtsTableISelectTCaseTest.javaファイルを使用して、1つの変更追跡タスクのみを構成し、1つのテーブルのデータのみを使用できます。 複数のテーブルのデータを使用する場合は、テーブルごとにタスクを設定する必要があります。

      1. 次の図に示すように、スラッシュ (//) を2つ入力し、コメントを追加します。 Add comments

      2. データの変更を追跡するテーブルの情報を指定します。 SQL文がサポートされています。

      3. 変更追跡インスタンスに必要なパラメーターを設定します。 詳細については、このトピックの「パラメーター」セクションをご参照ください。 Parameters for Table API & SQL

      4. IntelliJ IDEAの上部メニューバーで、[Run'DtsTableISelectTCaseTest'] をクリックしてflink-dts-connectorを実行します。

      5. 次の図は、Flinkプログラムがソースデータベースからのデータ変更を追跡できることを示しています。 Data changes (Table API & SQL)

        説明

        データ変更の特定のレコードを照会するには、FlinkプログラムのTask Managerページに移動します。

パラメーター

DstExampleファイルのパラメーター

DtsTableISelectTCaseTestファイルのパラメーター

説明

パラメータ値を取得するメソッド

broker-url

dts.server

変更追跡インスタンスのエンドポイントとポート番号。

説明

内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、FlinkプログラムをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。

DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、インスタンスの トピックネットワーク を表示できます。

トピック

トピック

変更追跡インスタンスのトピックの名前。

sid

dts.sid

コンシューマーグループの ID です。

DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、データ消費 をクリックします。 インスタンスのコンシューマーグループ ID /名前とコンシューマーグループのアカウントを表示できます。

説明

消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。

ユーザー

dts.us er

コンシューマーグループのユーザー名。

警告

このトピックで説明するflink-dts-connectorファイルを使用していない場合は、このパラメーターを <Username>-<Consumer group ID> の形式で指定する必要があります。 そうしないと、接続は失敗します。 例: dtstest-dtsae ****** bpv

パスワード

dts.password

コンシューマーグループのパスワード。

チェックポイント

dts.checkpoint

消費者オフセット。 flink-dts-connectorが最初のデータレコードを消費したときに生成されるタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1624440043。

説明

コンシューマオフセットは、次のシナリオで使用できます。

  • 消費プロセスが中断された後、消費者オフセットを指定してデータ消費を再開できます。 これにより、データの損失を防ぐことができます。

  • 変更追跡クライアントを起動するときに、ビジネス要件に基づいてデータを消費するための消費者オフセットを指定できます。

消費されたデータの消費者オフセットは、変更追跡インスタンスのデータ範囲内でなければなりません。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。

説明
  • 変更追跡タスクページの データ範囲 列に変更追跡インスタンスのデータ範囲を表示できます。

  • 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

非該当

dts-cdc.table.name

変更追跡用のオブジェクト。The object for change tracking. <データベース名>.<テーブル名> の形式で指定できるテーブルは1つだけです。 例: dtstestdata.order

DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 または タスク管理 ページの上部で、オブジェクトを表示 をクリックして、変更追跡用のオブジェクトが属するデータベースとテーブルを表示します。

よくある質問

エラーメッセージ

考えられる原因

解決策

Cluster changed from *** to ***, consumer require restart.

増分データを読み取るためにDTSが使用するDStoreモジュールが切り替えられます。 その結果、Flinkプログラムのコンシューマオフセットが失われる。

Flinkプログラムを再起動する必要はありません。 データ消費を再開するには、Flinkプログラムのコンシューマオフセットをクエリし、DtsExample.javaおよびDtsTableISelectTCaseTest.javaファイルにcheckpointまたはdts.checkpointパラメーターを再度設定するだけです。