AnalyticDB for MySQLでは、Realtime Compute for Apache Flinkを使用してバイナリログをサブスクライブできます。 これにより、データベースのデータ変更を取得して処理し、効率的なデータ同期とストリームコンピューティングを実装できます。 このトピックでは、Realtime Compute for Apache Flinkを使用してAnalyticDB for MySQLバイナリログをサブスクライブする方法について説明します。
前提条件
AnalyticDB for MySQLのクラスター Data Lakehouse EditionまたはData Warehouse Edition in elastic modeが作成されます。
AnalyticDB for MySQLクラスターのマイナーバージョンは3 .2.1.0以降です。
説明AnalyticDB for MySQLのマイナーバージョンを照会するには Data Lakehouse Editionクラスターで、
SELECT adb_version();
文を実行します。 クラスターのマイナーバージョンを更新するには、テクニカルサポートにお問い合わせください。AnalyticDB For MySQL Data Warehouse Editionクラスターのマイナーバージョンを表示および更新する方法については、「クラスターのマイナーバージョンの更新」をご参照ください。
Realtime Compute for Apache FlinkはVerverica Runtime (VVR) 8.0.4以降を使用します。
フルマネージド型Flinkワークスペースは、AnalyticDB for MySQLクラスターと同じ仮想プライベートクラウド (VPC) に作成されます。 詳細については、「クラスターの作成」および「Realtime Compute For Apache Flinkの有効化」をご参照ください。
フルマネージドFlinkワークスペースのCIDRブロックがAnalyticDB for MySQLクラスターのIPアドレスホワイトリストに追加されます。 詳細については、「ネットワーク接続に関するFAQ」トピックの「ホワイトリストを設定するにはどうすればよいですか」のセクションを参照してください。およびIPアドレスホワイトリストの設定。
制限事項
バイナリログ機能は、AnalyticDB for MySQLのテーブルに対してのみ有効にできます。
Realtime Compute for Apache Flinkは、すべての基本データ型とJSON複合データ型のAnalyticDB for MySQLバイナリログを処理できます。 AnalyticDB For MySQLでサポートされているデータ型については、「基本データ型」をご参照ください。
Realtime Compute for Apache Flinkは、AnalyticDB for MySQLバイナリログのパーティションテーブルおよびDDL操作に対する自動パーティション削除操作のレコードを処理しません。
手順1: バイナリログ機能の有効化
テーブルのバイナリログ機能を有効にします。 この例では、source_tableという名前のテーブルが使用されます。
テーブルを作成するときにバイナリログ機能を有効にする
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;
既存のテーブルのバイナリログ機能を有効にする
ALTER TABLE source_table BINLOG=true;
(オプション) バイナリログ情報を表示します。
説明次のSQL文を実行してバイナリログファイルに関する情報を照会する場合、バイナリログ機能が有効になっているが、Realtime Compute for Apache Flinkがバイナリログをサブスクライブしていない場合、ログ情報は表示されません。 ログ情報は、Realtime Compute for Apache Flinkがバイナリログをサブスクライブしている場合にのみ表示されます。
最新のバイナリログオフセットを照会するためのSQL文:
SHOW MASTER STATUS FOR source_table;
AnalyticDB for MySQLクラスター内のテーブルのすべてのバイナリログファイルに関する情報を照会するSQL文:
SHOW BINARY LOGS FOR source_table;
(オプション) バイナリログの保持期間を変更します。
binlog_ttl
パラメーターを変更して、バイナリログの保存期間を変更できます。 次の文を実行して、source_tableテーブルのバイナリログの保持期間を1日に変更します。ALTER TABLE source_table binlog_ttl='1d';
binlog_ttl
パラメーターは、次の形式の値をサポートします。Pure number: ミリ秒単位の期間。 例えば、60は60ミリ秒を指定する。
Number + s: 秒単位の時間。 例えば、30sは30秒を指定する。
Number + h: 時間単位の期間。 例えば、2hは2時間を指定する。
番号 + d: 日数単位の期間。 例えば、1dは1日を指定する。
手順2: Realtime Compute for Apache Flinkのコネクタの設定
[完全管理Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 [コネクタ] をクリックします。
[コネクタ] ページで、 [カスタムコネクタの作成] をクリックします。
カスタムコネクタJARパッケージをアップロードします。 ダウンロードリンク: AnalyticDB for MySQLコネクタ
アップロードが完了したら、 [次へ] をクリックします。
[完了] をクリックします。 作成されたカスタムコネクタがコネクタリストに表示されます。
ステップ3: バイナリログの購読
Realtime Compute for Apache Flinkコンソールにログインし、SQLドラフトを作成します。 詳細については、「Flink SQLデプロイメント」トピックの「ステップ1: SQLドラフトの作成」セクションを参照してください。
AnalyticDB for MySQLクラスターに接続し、source_tableテーブルからバイナリログデータを読み取るために使用されるソーステーブルを作成します。
説明Realtime Compute for Apache FlinkのDDLステートメントで指定されたプライマリキーは、プライマリキーの名前を含め、AnalyticDB for MySQLクラスターのテーブルで指定されたプライマリキーと同じである必要があります。 そうしないと、データの精度に影響します。
フィールドのデータ型は、Realtime Compute for Apache FlinkとAnalyticDB for MySQLの間で互換性がある必要があります。 データ型マッピングの詳細については、このトピックの「データ型マッピング」を参照してください。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );
WITH句のパラメーターを次の表に示します。
パラメーター
必須
デフォルト値
データ型
説明
コネクター
はい
なし
STRING
カスタムコネクタの名前。
このパラメーターを
adb-mysql-cd
に設定します。hostname
はい
なし
STRING
AnalyticDB for MySQLクラスターへの接続に使用されるVPCエンドポイント。
username
はい
なし
STRING
AnalyticDB for MySQLクラスターのデータベースアカウントの名前。
password
はい
なし
STRING
AnalyticDB for MySQLクラスターのデータベースアカウントのパスワード。
データベース名
はい
なし
STRING
AnalyticDB for MySQLクラスター内のデータベースの名前。
バイナリログ機能はAnalyticDB for MySQLのテーブルにのみ実装されているため、データベース名を1つだけ指定します。
テーブル名
はい
なし
STRING
AnalyticDB for MySQLクラスターのデータベースにあるテーブルの名前。
バイナリログ機能はAnalyticDB for MySQLのテーブルにのみ実装されているため、テーブル名を1つだけ指定します。
port
いいえ
3306
INTEGER
クラスターへの接続に使用されるポート番号。
scan.incremental.snapshot.enabled
いいえ
true
BOOLEAN
増分スナップショット機能を有効にするかどうかを指定します。
デフォルトでは、この機能は有効になっています。 増分スナップショット機能は、テーブルスナップショットを読み取る新しい方法を提供します。 元のスナップショット方法と比較して、増分スナップショット機能には次の利点があります。
同時スナップショット読み取りをサポートします。
スナップショットの読み取り時にチャンクの粒度でチェックポイントをサポートします。
スナップショットを読み取る前にデータベースロック権限は必要ありません。
scan.incremental.snapshot.chunk.size
いいえ
8096
INTEGER
テーブルスナップショットのチャンクサイズ。チャンク内の行数です。
テーブルの増分スナップショット機能を有効にすると、テーブルは読み取り用に複数のチャンクに分割されます。
scan.snapshot.fetch.size
いいえ
1024
INTEGER
テーブルスナップショットを読み取るたびに読み取ることができる行の最大数。
scan.startup.mode
いいえ
初期化 (initial)
STRING
データが消費されるモード。
有効な値:
initial (デフォルト): 完全な履歴データをスキャンし、コネクタの最初の起動時に最新のバイナリログデータを読み取ります。
earliest-offset: 完全な履歴データをスキャンせず、アクセス可能なバイナリログデータの読み取りを開始します。
specific-offset: 履歴データ全体をスキャンせず、指定されたオフセットからバイナリログデータの読み取りを開始します。
scan.startup.specific-offset.file
パラメーターを設定してデータ消費のバイナリログファイルの名前を指定し、scan.startup.specific-offset.pos
パラメーターを設定してデータ消費の開始元のオフセットを指定できます。
scan.startup.specific-offset.file
✕
なし
STRING
scan.startup.mo deパラメーターをspecific-offsetに設定した場合のデータ消費用のバイナリログファイルの名前。
最新のバイナリログファイル名を取得するには、
SHOW MASTER STATUES for table_name
ステートメントを実行します。scan.startup.specific-offset.pos
✕
なし
LONG
scan.startup.mo deパラメーターをspecific-offsetに設定した場合のデータ消費開始時のオフセット。
最新のバイナリログオフセットを取得するには、
SHOW MASTER STATUES for table_name
ステートメントを実行します。scan.startup.specific-offset.skip-events
✕
なし
LONG
データ消費が開始される指定されたオフセットの後にスキップするイベントの数。
scan.startup.specific-offset.skip-rows
✕
なし
LONG
指定されたオフセットの後にスキップしてデータ消費を開始する行数。
server-time-zone
✕
なし
STRING
AnalyticDB for MySQLクラスターで使用されるセッションのタイムゾーン。
例: "アジア /上海" 。 このパラメーターは、データの読み取り時にAnalyticDB for MySQLのTIMESTAMPタイプのデータをSTRINGタイプに変換する方法を決定します。 このパラメーターを指定しない場合、
ZONELD.SYSTEMDEFAULT()
関数を使用してAnalyticDB for MySQLクラスターのタイムゾーンが決定されます。debezium.min.row.count.to.stream.result
いいえ
1000
INTEGER
ストリーム処理の行数のしきい値。 テーブルの行数が指定された値より大きい場合、コネクタは結果に対してストリーム処理を実行します。
このパラメーターを
0
に設定すると、テーブルサイズのチェックがスキップされ、スナップショットの読み取り中にすべての結果に対してストリーム処理が実行されます。connect.timeout
いいえ
30 秒
期間
AnalyticDB for MySQLクラスターへの接続の最大タイムアウト期間。
デフォルトの単位: 秒。
connect.max-retries
いいえ
3
INTEGER
AnalyticDB for MySQLクラスターへの接続が失敗した後の最大再試行回数。
処理されたデータを格納する宛先テーブルを作成します。 この例では、AnalyticDB for MySQLテーブルが作成されます。 Realtime Compute For Apache Flinkでサポートされているコネクタの詳細については、「サポートされているコネクタ」トピックの「サポートされているコネクタ」を参照してください。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )
処理されたデータをAnalyticDB for MySQLのtarget_tableテーブルに書き込むために、手順3で作成したテーブルに接続する結果テーブルを作成します。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );
結果テーブルのWITH句のパラメーターと関連するデータ型のマッピングについては、「AnalyticDB For MySQL V3.0 connector」をご参照ください。
取得したソースデータの変更を結果テーブルに同期し、結果テーブルのデータを宛先に同期します。
INSERT INTO adb_sink SELECT * FROM adb_source;
[保存] をクリックします。
[検証] をクリックします。
ドラフトのSQLセマンティクス、ネットワーク接続、およびドラフトで使用されるテーブルのメタデータ情報を確認します。 計算結果で [SQLアドバイス] をクリックして、SQLリスクと関連する最適化の提案に関する情報を表示することもできます。
(オプション) [デバッグ] をクリックします。
デバッグ機能を有効にして、展開の実行をシミュレートし、出力を確認し、SELECTおよびINSERTステートメントのビジネスロジックを検証できます。 この機能により、開発効率が向上し、データ品質が低下するリスクが軽減されます。 詳細については、「デプロイメントのデバッグ」をご参照ください。
[デプロイ] をクリックします。 詳細については、「SQLジョブのデプロイ」をご参照ください。
ドラフト開発と構文チェックが完了したら、ドラフトをデプロイしてデータを本番環境に公開できます。 ドラフトがデプロイされた後、[Deployments] ページでドラフトのデプロイを開始してデプロイを実行できます。 詳細については、「デプロイの開始」をご参照ください。
データ型マッピング
次の表に、AnalyticDB for MySQLとRealtime Compute for Apache Flinkの間のデータ型マッピングを示します。
AnalyticDB for MySQLでサポートされているデータタイプ | Realtime Compute for Apacheでサポートされているデータ型Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p、s) またはNUMERIC(p、s) | デシマル (p,s) |
VARCHAR | STRING |
BINARY | BYTES |
日付 | 日付 |
時間 | 時間 |
日付時刻 | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
ポイント | STRING |
JSON | STRING |