すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Apache Flinkからデータをインポートする

最終更新日:Jun 12, 2024

このトピックでは、オープンソースのApache FlinkからAnalyticDB for MySQL data Warehouse Edition (V3.0) クラスターにデータをインポートする方法について説明します。

前提条件

  • Apache Flinkドライバーがダウンロードされ、すべてのApache Flinkノードの ${Flink展開ディレクトリ}/libディレクトリに展開されます。 Apache Flinkのバージョンに基づいてドライバをダウンロードできます。 次の一覧は、Apache Flinkのバージョンに対応するドライバーパッケージのダウンロードリンクを示しています。

    他のバージョンのApache Flinkのドライバーをダウンロードするには、JDBC SQLコネクタページに移動します。

  • MySQLドライバーがダウンロードされ、すべてのApache Flinkノードの ${Flinkデプロイメントディレクトリ}/libディレクトリにデプロイされます。

    説明

    MySQLドライバーのバージョンは5.1.40以降である必要があります。 MySQLドライバーをダウンロードするには、mysql/mysql-connector-javaページに移動します。

  • すべてのJARパッケージがデプロイされた後、Apache Flinkクラスターが再起動されます。 Apache Flinkクラスターを開始する方法の詳細については、「手順2: クラスターの開始」をご参照ください。

  • AnalyticDB for MySQLクラスターにデータベースとテーブルが作成され、書き込みたいデータが保存されます。 データベースとテーブルの作成方法については、「create database」および「CREATE table」をご参照ください。

    説明
    • この例では、次のステートメントを実行してtpchという名前のデータベースを作成します。

      存在しない場合はデータベースを作成tpch;
    • この例では、次のステートメントを実行してpersonという名前のテーブルを作成します。

      CREATE TABLE IF NOT EXISTS人 (user_id文字列、user_name文字列、age int);
  • AnalyticDB for MySQLクラスターがエラスティックモードの場合、クラスター情報 ページの ネットワーク情報 セクションでEniネットワークをオンにする必要があります。启用ENI网络

使用上の注意

  • このトピックでは、Apache Flink SQLを使用してテーブルを作成し、AnalyticDB for MySQLにデータを書き込む方法についてのみ説明します。 Apache Flink Java Database Connectivity (JDBC) APIを使用してデータを書き込む方法については、「JDBCコネクタ」をご参照ください。

  • このトピックで説明する方法は、Apache Flink 1.11以降にのみ適用できます。 他のApache FlinkバージョンからAnalyticDB for MySQLクラスターにデータを書き込むには、次のいずれかの方法を使用します。

    • Apache Flink 1.9および1.10からデータを書き込む方法については、「Flink v1.10」をご参照ください。

    • Apache Flink 1.8以前からデータを書き込む方法については、「Flink v1.8」をご参照ください。

手順

説明

この例では、CSVファイルを書き込むソースデータとして使用します。

ステップ

説明

ステップ1: データの準備

CSVファイルを作成し、ファイルにデータを書き込み、すべてのApache Flinkノードの /rootディレクトリにファイルをデプロイします。

ステップ2: データの書き込み

SQL文を実行してApache Flinkでソーステーブルと結果テーブルを作成し、そのテーブルを使用してソースデータをAnalyticDB for MySQLクラスターに書き込みます。

ステップ3: データの検証

AnalyticDB for MySQLデータベースにログインして、ソースデータが書き込まれているかどうかを確認します。

ステップ1: データの準備

  1. Apache Flinkノードのルートディレクトリで、vim /root/data.csvコマンドを実行して、data.csvという名前のCSVファイルを作成します。

    CSVファイルには次のデータが含まれています。 追加の行のデータをコピーして、書き込むデータの量を増やすことができます。

    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29 
  2. CSVファイルが作成されたら、他のApache Flinkノードの /rootディレクトリにファイルをデプロイします。

ステップ2: データの書き込み

  1. Apache Flink SQLアプリケーションを起動して実行します。 詳細については、「SQLクライアントCLIの起動」をご参照ください。

  2. 次のステートメントを実行して、csv_personという名前のソーステーブルを作成します。

    CREATE TABLEが存在しない場合はcsv_person (
      'user_id 'ストリング、
      'user_name' ストリング、
      「年齢」INT
    ) WITH (
      'connector' = 'filesystem' 、
      'path' = 'file:/// root/data.csv '、
      'format' = 'csv' 、
      'csv.ignore-parse-errors' = 'true' 、
      'csv.allow-comments' = 'true'
    ); 
    説明
    • ソーステーブルの列名とデータ型は、AnalyticDB for MySQLクラスターのターゲットテーブルと同じである必要があります。

    • 上記のステートメントのpathフィールドは、data.csvファイルのオンプレミスディレクトリを指定します。 すべてのApache Flinkノード上のファイルのディレクトリが同じであることを確認します。 data.csvファイルがオンプレミスのデバイスに保存されていない場合は、ファイルの実際のディレクトリを指定します。

      上記のステートメントのその他のパラメーターについては、「FileSystem SQL Connector」をご参照ください。

  3. 次のステートメントを実行して、mysql_personという名前の結果テーブルを作成します。

    CREATE TABLE mysql_person (
      user_id文字列、
      user_name文字列、
      年齢INT
    ) WITH (
      'connector' = 'jdbc' 、
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true' 、
      'table-name' = '<table_name>' 、
      'username' = '<username>' 、
      'password' = '<password>' 、
      'sink.buffer-flush.max-rows ' = '10' 、
      'sink .buffer-flush.int erval' = '1s'
      );
    説明
    • 結果テーブルの列名とデータ型は、AnalyticDB for MySQLクラスターのターゲットテーブルと同じである必要があります。

    • 次の表に、AnalyticDB for MySQLクラスターに接続するために必要なパラメーターを示します。 オプションパラメーターの詳細については、JDBC SQLコネクタのトピックの「コネクタオプション」を参照してください。

    パラメーター

    説明

    コネクター

    Apache Flinkで使用されるコネクタタイプ。 このパラメーターをjdbcに設定します。

    url

    AnalyticDB for MySQLクラスターのJDBC URL。

    形式: jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'

    • endpoint: AnalyticDB for MySQLクラスターのエンドポイント。

      説明

      パブリックエンドポイントを使用してAnalyticDB for MySQLクラスターに接続する場合は、パブリックエンドポイントを申請する必要があります。 パブリックエンドポイントの申請方法については、「パブリックエンドポイントの申請またはリリース」トピックの「パブリックエンドポイントの申請またはリリース」を参照してください。

    • db_name: AnalyticDB for MySQLクラスター内のターゲットデータベースの名前。

    • useServerPrepStmts=false&rewriteBatchedStatements=true: AnalyticDB for MySQLクラスターへのデータのバッチ書き込みに必要な設定。 この設定は、書き込みパフォーマンスを向上させ、AnalyticDB for MySQLクラスターの負荷を軽減するために使用されます。

    例: jdbc:mysql:// am-*********** .ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true

    テーブル名

    書き込みたいデータを格納するために使用されるAnalyticDB for MySQLクラスター内のターゲットテーブルの名前。 この例では、宛先テーブルの名前はpersonです。

    ユーザー名

    書き込み権限を持つAnalyticDB for MySQLデータベースアカウントの名前。

    説明
    • SHOW GRANTSステートメントを実行して、現在のアカウントの権限を照会できます。

    • GRANTステートメントを実行して、アカウントに権限を付与できます。

    パスワード

    書き込み権限を持つAnalyticDB for MySQLデータベースアカウントのパスワード。

    sink.buffer-flush.max行

    Apache FlinkからAnalyticDB for MySQLクラスターに一度に書き込むことができる行の最大数。 Apache Flinkはリアルタイムでデータを受信します。 Apache Flinkが受け取るデータ行の数がこのパラメーターの値に達すると、データ行はAnalyticDB for MySQLクラスターにバッチで書き込まれます。 有効な値:

    • 0: Apache Flinkは、sink.buffer-flush.int ervalパラメーターで指定された最大時間間隔に達した場合にのみデータをバッチで書き込みます。

    • 特定の行数を指定する0以外の値。 例: 1000または2000

    説明

    このパラメーターを0に設定しないことを推奨します。 このパラメーターを0に設定すると、書き込みパフォーマンスが低下し、同時クエリ中にAnalyticDB for MySQLクラスターの負荷が増加します。

    sink.buffer-flush.max-rowsパラメーターとsink.buffer-flush.int ervalパラメーターの両方を0以外の値に設定した場合、次のバッチ書き込みルールが適用されます。

    • Apache Flinkが受信するデータ行の数がsink.buffer-flush.max-rowsパラメーターの値に達しても、最大時間間隔がsink.buffer-flush.int ervalパラメーターの値に達していない場合、Apache Flinkは、最大時間間隔の有効期限を待つことなく、データをAnalyticDB for MySQLクラスターにバッチで書き込みます。

    • Apache Flinkが受信するデータ行の数がsink.buffer-flush.max-rowsパラメーターの値に達していないが、最大時間間隔がsink.buffer-flush.int ervalパラメーターの値に達した場合、Apache Flinkは、Apache Flinkが受信するデータの量に関係なく、データをAnalyticDB for MySQLクラスターにバッチで書き込みます。

    sink.buffer-flush.int erval

    Apache FlinkがAnalyticDB for MySQLクラスターにデータをバッチ書き込みするための最大時間間隔。これは、次のバッチ書き込み操作までに必要な最大時間でもあります。 有効な値:

    • 0: Apache Flinkは、sink.buffer-flush.max-rowsパラメーターで指定されたデータ行の最大数に達した場合にのみ、データをバッチで書き込みます。

    • 特定の時間間隔を指定する0以外の値。 例: 1d1h1分1s、または1ms

    説明

    オフピーク時にソースデータの量が少ない場合にデータがタイムリーに書き込まれるように、このパラメーターを0に設定しないことをお勧めします。

  4. INSERT INTO文を実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されず、INSERT INTOステートメントはINSERT IGNORE INTOステートメントと同等です。 詳細については、「INSERT INTO」をご参照ください。

    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

ステップ3: データの確認

データが書き込まれた後、AnalyticDB for MySQLクラスターのtpchデータベースにログインし、次のステートメントを実行して、ソースデータがpersonテーブルに書き込まれているかどうかを確認します。

SELECT * から人;