変更追跡タスクを設定した後、flink-dts-connectorファイルを使用して追跡データを消費できます。 このトピックでは、flink-dts-connectorファイルを使用して追跡データを消費する方法について説明します。
使用上の注意
Data Transmission Service (DTS) は、次のタイプのFlinkプログラムをサポートしています。DataStream APIおよびTable API & SQL。
Table APIとSQLプログラムを使用する場合、変更追跡タスクを設定するたびに1つのテーブルのデータのみを使用できます。 複数のテーブルのデータを使用する場合は、テーブルごとにタスクを設定する必要があります。
手順
この例では、IntelliJ IDEA Community Edition 2020.1 Windowsが使用されます。
変更追跡タスクを作成します。 詳細については、「変更追跡シナリオの概要」の関連トピックをご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
flink-dts-connectorファイルをダウンロードして解凍します。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。
表示されるダイアログボックスで、flink-dts-connectorファイルが解凍されているディレクトリに移動し、フォルダを展開してpom.xmlファイルを見つけます。
表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。
次の依存関係をpom.xmlファイルに追加します。
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、使用するFlinkコネクタのタイプに基づいてJavaファイルをダブルクリックします。
DataStream APIコネクタを使用する場合は、flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.javaファイルをダブルクリックし、次の操作を実行する必要があります。
IntelliJ IDEAの上部メニューバーで、実行アイコンをクリックします。
表示されるダイアログボックスで、
を選択します。[プログラム引数] フィールドにパラメーターと対応する値を入力し、[実行] をクリックしてflink-dts-connectorを実行します。
説明パラメーターとパラメーター値を取得するためのメソッドの詳細については、このトピックの「パラメーター」をご参照ください。
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
次の図は、Flinkプログラムがソースデータベースからのデータ変更を追跡できることを示しています。
説明データ変更の特定のレコードを照会するには、FlinkプログラムのTask Managerページに移動します。
Table APIとSQLコネクタを使用する場合は、flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.javaファイルをダブルクリックし、次の操作を実行する必要があります。
説明単一の
DtsTableISelectTCaseTest.java
ファイルを使用して、1つの変更追跡タスクのみを構成し、1つのテーブルのデータのみを使用できます。 複数のテーブルのデータを使用する場合は、テーブルごとにタスクを設定する必要があります。次の図に示すように、スラッシュ (
//
) を2つ入力し、コメントを追加します。データの変更を追跡するテーブルの情報を指定します。 SQL文がサポートされています。
変更追跡インスタンスに必要なパラメーターを設定します。 詳細については、このトピックの「パラメーター」セクションをご参照ください。
IntelliJ IDEAの上部メニューバーで、[Run'DtsTableISelectTCaseTest'] をクリックしてflink-dts-connectorを実行します。
次の図は、Flinkプログラムがソースデータベースからのデータ変更を追跡できることを示しています。
説明データ変更の特定のレコードを照会するには、FlinkプログラムのTask Managerページに移動します。
パラメーター
DstExampleファイルのパラメーター | DtsTableISelectTCaseTestファイルのパラメーター | 説明 | パラメータ値を取得するメソッド |
|
| 変更追跡インスタンスのエンドポイントとポート番号。 説明 内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、FlinkプログラムをデプロイするElastic Compute Service (ECS) インスタンスがクラシックネットワーク上にある場合、または変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) にある場合に適用されます。 | DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 ページで、インスタンスの トピック と ネットワーク を表示できます。 |
|
| 変更追跡インスタンスのトピックの名前。 | |
|
| コンシューマーグループの ID です。 | DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、データ消費 をクリックします。 インスタンスのコンシューマーグループ ID /名前とコンシューマーグループのアカウントを表示できます。 説明 消費者グループアカウントのパスワードは、消費者グループの作成時に指定します。 |
|
| コンシューマーグループのユーザー名。 警告 このトピックで説明するflink-dts-connectorファイルを使用していない場合は、このパラメーターを | |
|
| コンシューマーグループのパスワード。 | |
|
| 消費者オフセット。 flink-dts-connectorが最初のデータレコードを消費したときに生成されるタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1624440043。 説明 コンシューマオフセットは、次のシナリオで使用できます。
| 消費されたデータの消費者オフセットは、変更追跡インスタンスのデータ範囲内でなければなりません。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。 説明
|
非該当 |
| 変更追跡用のオブジェクト。The object for change tracking. | DTSコンソールで、管理する変更追跡インスタンスを見つけ、インスタンスIDをクリックします。 基本情報 または タスク管理 ページの上部で、オブジェクトを表示 をクリックして、変更追跡用のオブジェクトが属するデータベースとテーブルを表示します。 |
よくある質問
エラーメッセージ | 考えられる原因 | 解決策 |
| 増分データを読み取るためにDTSが使用するDStoreモジュールが切り替えられます。 その結果、Flinkプログラムのコンシューマオフセットが失われる。 | Flinkプログラムを再起動する必要はありません。 データ消費を再開するには、Flinkプログラムのコンシューマオフセットをクエリし、DtsExample.javaおよびDtsTableISelectTCaseTest.javaファイルに |