すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Realtime Compute for Apache Flinkを使用してAnalyticDB for MySQLバイナリログをサブスクライブする

最終更新日:Sep 30, 2024

AnalyticDB for MySQLでは、Realtime Compute for Apache Flinkを使用してバイナリログをサブスクライブできます。 これにより、データベースのデータ変更を取得して処理し、効率的なデータ同期とストリームコンピューティングを実装できます。 このトピックでは、Realtime Compute for Apache Flinkを使用してAnalyticDB for MySQLバイナリログをサブスクライブする方法について説明します

前提条件

  • AnalyticDB for MySQLのクラスター Data Lakehouse EditionまたはData Warehouse Edition in elastic modeが作成されます。

  • AnalyticDB for MySQLクラスターのマイナーバージョンは3 .2.1.0以降です。

    説明
    • AnalyticDB for MySQLのマイナーバージョンを照会するには Data Lakehouse Editionクラスターで、SELECT adb_version(); 文を実行します。 クラスターのマイナーバージョンを更新するには、テクニカルサポートにお問い合わせください。

    • AnalyticDB For MySQL Data Warehouse Editionクラスターのマイナーバージョンを表示および更新する方法については、「クラスターのマイナーバージョンの更新」をご参照ください。

  • Realtime Compute for Apache FlinkはVerverica Runtime (VVR) 8.0.4以降を使用します。

  • フルマネージド型Flinkワークスペースは、AnalyticDB for MySQLクラスターと同じ仮想プライベートクラウド (VPC) に作成されます。 詳細については、「クラスターの作成」および「Realtime Compute For Apache Flinkの有効化」をご参照ください。

  • フルマネージドFlinkワークスペースのCIDRブロックがAnalyticDB for MySQLクラスターのIPアドレスホワイトリストに追加されます。 詳細については、「ネットワーク接続に関するFAQ」トピックの「ホワイトリストを設定するにはどうすればよいですか」のセクションを参照してください。およびIPアドレスホワイトリストの設定

制限事項

  • バイナリログ機能は、AnalyticDB for MySQLのテーブルに対してのみ有効にできます。

  • Realtime Compute for Apache Flinkは、すべての基本データ型とJSON複合データ型のAnalyticDB for MySQLバイナリログを処理できます。 AnalyticDB For MySQLでサポートされているデータ型については、「基本データ型」をご参照ください。

  • Realtime Compute for Apache Flinkは、AnalyticDB for MySQLバイナリログのパーティションテーブルおよびDDL操作に対する自動パーティション削除操作のレコードを処理しません。

手順1: バイナリログ機能の有効化

  1. テーブルのバイナリログ機能を有効にします。 この例では、source_tableという名前のテーブルが使用されます。

    テーブルを作成するときにバイナリログ機能を有効にする

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    既存のテーブルのバイナリログ機能を有効にする

    ALTER TABLE source_table BINLOG=true;
  2. (オプション) バイナリログ情報を表示します。

    説明

    次のSQL文を実行してバイナリログファイルに関する情報を照会する場合、バイナリログ機能が有効になっているが、Realtime Compute for Apache Flinkがバイナリログをサブスクライブしていない場合、ログ情報は表示されません。 ログ情報は、Realtime Compute for Apache Flinkがバイナリログをサブスクライブしている場合にのみ表示されます。

    • 最新のバイナリログオフセットを照会するためのSQL文:

      SHOW MASTER STATUS FOR source_table;
    • AnalyticDB for MySQLクラスター内のテーブルのすべてのバイナリログファイルに関する情報を照会するSQL文:

      SHOW BINARY LOGS FOR source_table;
  3. (オプション) バイナリログの保持期間を変更します。

    binlog_ttlパラメーターを変更して、バイナリログの保存期間を変更できます。 次の文を実行して、source_tableテーブルのバイナリログの保持期間を1日に変更します。

    ALTER TABLE source_table binlog_ttl='1d';

    binlog_ttlパラメーターは、次の形式の値をサポートします。

    • Pure number: ミリ秒単位の期間。 例えば、60は60ミリ秒を指定する。

    • Number + s: 秒単位の時間。 例えば、30sは30秒を指定する。

    • Number + h: 時間単位の期間。 例えば、2hは2時間を指定する。

    • 番号 + d: 日数単位の期間。 例えば、1dは1日を指定する。

手順2: Realtime Compute for Apache Flinkのコネクタの設定

  1. Realtime Compute for Apache Flinkコンソールにログインします。

  2. [完全管理Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで、 [コネクタ] をクリックします。

  4. [コネクタ] ページで、 [カスタムコネクタの作成] をクリックします。

  5. カスタムコネクタJARパッケージをアップロードします。 ダウンロードリンク: AnalyticDB for MySQLコネクタ

  6. アップロードが完了したら、 [次へ] をクリックします。

  7. [完了] をクリックします。 作成されたカスタムコネクタがコネクタリストに表示されます。

ステップ3: バイナリログの購読

  1. Realtime Compute for Apache Flinkコンソールにログインし、SQLドラフトを作成します。 詳細については、「Flink SQLデプロイメント」トピックの「ステップ1: SQLドラフトの作成」セクションを参照してください。

  2. AnalyticDB for MySQLクラスターに接続し、source_tableテーブルからバイナリログデータを読み取るために使用されるソーステーブルを作成します。

    説明
    • Realtime Compute for Apache FlinkのDDLステートメントで指定されたプライマリキーは、プライマリキーの名前を含め、AnalyticDB for MySQLクラスターのテーブルで指定されたプライマリキーと同じである必要があります。 そうしないと、データの精度に影響します。

    • フィールドのデータ型は、Realtime Compute for Apache FlinkAnalyticDB for MySQLの間で互換性がある必要があります。 データ型マッピングの詳細については、このトピックの「データ型マッピング」を参照してください。

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    WITH句のパラメーターを次の表に示します。

    パラメーター

    必須

    デフォルト値

    データ型

    説明

    コネクター

    はい

    なし

    STRING

    カスタムコネクタの名前。

    このパラメーターをadb-mysql-cdに設定します。

    hostname

    はい

    なし

    STRING

    AnalyticDB for MySQLクラスターへの接続に使用されるVPCエンドポイント。

    username

    はい

    なし

    STRING

    AnalyticDB for MySQLクラスターのデータベースアカウントの名前。

    password

    はい

    なし

    STRING

    AnalyticDB for MySQLクラスターのデータベースアカウントのパスワード。

    データベース名

    はい

    なし

    STRING

    AnalyticDB for MySQLクラスター内のデータベースの名前。

    バイナリログ機能はAnalyticDB for MySQLのテーブルにのみ実装されているため、データベース名を1つだけ指定します。

    テーブル名

    はい

    なし

    STRING

    AnalyticDB for MySQLクラスターのデータベースにあるテーブルの名前。

    バイナリログ機能はAnalyticDB for MySQLのテーブルにのみ実装されているため、テーブル名を1つだけ指定します。

    port

    いいえ

    3306

    INTEGER

    クラスターへの接続に使用されるポート番号。

    scan.incremental.snapshot.enabled

    いいえ

    true

    BOOLEAN

    増分スナップショット機能を有効にするかどうかを指定します。

    デフォルトでは、この機能は有効になっています。 増分スナップショット機能は、テーブルスナップショットを読み取る新しい方法を提供します。 元のスナップショット方法と比較して、増分スナップショット機能には次の利点があります。

    • 同時スナップショット読み取りをサポートします。

    • スナップショットの読み取り時にチャンクの粒度でチェックポイントをサポートします。

    • スナップショットを読み取る前にデータベースロック権限は必要ありません。

    scan.incremental.snapshot.chunk.size

    いいえ

    8096

    INTEGER

    テーブルスナップショットのチャンクサイズ。チャンク内の行数です。

    テーブルの増分スナップショット機能を有効にすると、テーブルは読み取り用に複数のチャンクに分割されます。

    scan.snapshot.fetch.size

    いいえ

    1024

    INTEGER

    テーブルスナップショットを読み取るたびに読み取ることができる行の最大数。

    scan.startup.mode

    いいえ

    初期化 (initial)

    STRING

    データが消費されるモード。

    有効な値:

    • initial (デフォルト): 完全な履歴データをスキャンし、コネクタの最初の起動時に最新のバイナリログデータを読み取ります。

    • earliest-offset: 完全な履歴データをスキャンせず、アクセス可能なバイナリログデータの読み取りを開始します。

    • specific-offset: 履歴データ全体をスキャンせず、指定されたオフセットからバイナリログデータの読み取りを開始します。 scan.startup.specific-offset.fileパラメーターを設定してデータ消費のバイナリログファイルの名前を指定し、scan.startup.specific-offset.posパラメーターを設定してデータ消費の開始元のオフセットを指定できます。

    scan.startup.specific-offset.file

    なし

    STRING

    scan.startup.mo deパラメーターをspecific-offsetに設定した場合のデータ消費用のバイナリログファイルの名前。

    最新のバイナリログファイル名を取得するには、SHOW MASTER STATUES for table_nameステートメントを実行します。

    scan.startup.specific-offset.pos

    なし

    LONG

    scan.startup.mo deパラメーターをspecific-offsetに設定した場合のデータ消費開始時のオフセット。

    最新のバイナリログオフセットを取得するには、SHOW MASTER STATUES for table_nameステートメントを実行します。

    scan.startup.specific-offset.skip-events

    なし

    LONG

    データ消費が開始される指定されたオフセットの後にスキップするイベントの数。

    scan.startup.specific-offset.skip-rows

    なし

    LONG

    指定されたオフセットの後にスキップしてデータ消費を開始する行数。

    server-time-zone

    なし

    STRING

    AnalyticDB for MySQLクラスターで使用されるセッションのタイムゾーン。

    例: "アジア /上海" 。 このパラメーターは、データの読み取り時にAnalyticDB for MySQLのTIMESTAMPタイプのデータをSTRINGタイプに変換する方法を決定します。 このパラメーターを指定しない場合、ZONELD.SYSTEMDEFAULT() 関数を使用してAnalyticDB for MySQLクラスターのタイムゾーンが決定されます。

    debezium.min.row.count.to.stream.result

    いいえ

    1000

    INTEGER

    ストリーム処理の行数のしきい値。 テーブルの行数が指定された値より大きい場合、コネクタは結果に対してストリーム処理を実行します。

    このパラメーターを0に設定すると、テーブルサイズのチェックがスキップされ、スナップショットの読み取り中にすべての結果に対してストリーム処理が実行されます。

    connect.timeout

    いいえ

    30 秒

    期間

    AnalyticDB for MySQLクラスターへの接続の最大タイムアウト期間。

    デフォルトの単位: 秒。

    connect.max-retries

    いいえ

    3

    INTEGER

    AnalyticDB for MySQLクラスターへの接続が失敗した後の最大再試行回数。

  3. 処理されたデータを格納する宛先テーブルを作成します。 この例では、AnalyticDB for MySQLテーブルが作成されます。 Realtime Compute For Apache Flinkでサポートされているコネクタの詳細については、「サポートされているコネクタ」トピックの「サポートされているコネクタ」を参照してください。

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. 処理されたデータをAnalyticDB for MySQLのtarget_tableテーブルに書き込むために、手順3で作成したテーブルに接続する結果テーブルを作成します。

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    結果テーブルのWITH句のパラメーターと関連するデータ型のマッピングについては、「AnalyticDB For MySQL V3.0 connector」をご参照ください。

  5. 取得したソースデータの変更を結果テーブルに同期し、結果テーブルのデータを宛先に同期します。

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. [保存] をクリックします。

  7. [検証] をクリックします。

    ドラフトのSQLセマンティクス、ネットワーク接続、およびドラフトで使用されるテーブルのメタデータ情報を確認します。 計算結果で [SQLアドバイス] をクリックして、SQLリスクと関連する最適化の提案に関する情報を表示することもできます。

  8. (オプション) [デバッグ] をクリックします。

    デバッグ機能を有効にして、展開の実行をシミュレートし、出力を確認し、SELECTおよびINSERTステートメントのビジネスロジックを検証できます。 この機能により、開発効率が向上し、データ品質が低下するリスクが軽減されます。 詳細については、「デプロイメントのデバッグ」をご参照ください。

  9. [デプロイ] をクリックします。 詳細については、「SQLジョブのデプロイ」をご参照ください。

    ドラフト開発と構文チェックが完了したら、ドラフトをデプロイしてデータを本番環境に公開できます。 ドラフトがデプロイされた後、[Deployments] ページでドラフトのデプロイを開始してデプロイを実行できます。 詳細については、「デプロイの開始」をご参照ください。

データ型マッピング

次の表に、AnalyticDB for MySQLRealtime Compute for Apache Flinkの間のデータ型マッピングを示します。

AnalyticDB for MySQLでサポートされているデータタイプ

Realtime Compute for Apacheでサポートされているデータ型Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p、s) またはNUMERIC(p、s)

デシマル (p,s)

VARCHAR

STRING

BINARY

BYTES

日付

日付

時間

時間

日付時刻

TIMESTAMP

TIMESTAMP

TIMESTAMP

ポイント

STRING

JSON

STRING