このトピックでは、オープンソースのApache FlinkからAnalyticDB for MySQL data Warehouse Edition (V3.0) クラスターにデータをインポートする方法について説明します。
前提条件
Apache Flinkドライバーがダウンロードされ、すべてのApache Flinkノードの ${Flink展開ディレクトリ}/libディレクトリに展開されます。 Apache Flinkのバージョンに基づいてドライバをダウンロードできます。 次の一覧は、Apache Flinkのバージョンに対応するドライバーパッケージのダウンロードリンクを示しています。
Apache Flink 1.11: flink-connector-jdbc_2.11-1.11.0.jar
Apache Flink 1.12: flink-connector-jdbc_2.11-1.12.0.jar
Apache Flink 1.13: flink-connector-jdbc_2.11-1.13.0.jar
他のバージョンのApache Flinkのドライバーをダウンロードするには、JDBC SQLコネクタページに移動します。
MySQLドライバーがダウンロードされ、すべてのApache Flinkノードの ${Flinkデプロイメントディレクトリ}/libディレクトリにデプロイされます。
説明MySQLドライバーのバージョンは5.1.40以降である必要があります。 MySQLドライバーをダウンロードするには、mysql/mysql-connector-javaページに移動します。
すべてのJARパッケージがデプロイされた後、Apache Flinkクラスターが再起動されます。 Apache Flinkクラスターを開始する方法の詳細については、「手順2: クラスターの開始」をご参照ください。
AnalyticDB for MySQLクラスターにデータベースとテーブルが作成され、書き込みたいデータが保存されます。 データベースとテーブルの作成方法については、「create database」および「CREATE table」をご参照ください。
説明この例では、次のステートメントを実行して
tpch
という名前のデータベースを作成します。存在しない場合はデータベースを作成tpch;
この例では、次のステートメントを実行して
person
という名前のテーブルを作成します。CREATE TABLE IF NOT EXISTS人 (user_id文字列、user_name文字列、age int);
AnalyticDB for MySQLクラスターがエラスティックモードの場合、クラスター情報 ページの ネットワーク情報 セクションでEniネットワークをオンにする必要があります。
使用上の注意
このトピックでは、Apache Flink SQLを使用してテーブルを作成し、AnalyticDB for MySQLにデータを書き込む方法についてのみ説明します。 Apache Flink Java Database Connectivity (JDBC) APIを使用してデータを書き込む方法については、「JDBCコネクタ」をご参照ください。
このトピックで説明する方法は、Apache Flink 1.11以降にのみ適用できます。 他のApache FlinkバージョンからAnalyticDB for MySQLクラスターにデータを書き込むには、次のいずれかの方法を使用します。
Apache Flink 1.9および1.10からデータを書き込む方法については、「Flink v1.10」をご参照ください。
Apache Flink 1.8以前からデータを書き込む方法については、「Flink v1.8」をご参照ください。
手順
この例では、CSVファイルを書き込むソースデータとして使用します。
ステップ | 説明 |
CSVファイルを作成し、ファイルにデータを書き込み、すべてのApache Flinkノードの /rootディレクトリにファイルをデプロイします。 | |
SQL文を実行してApache Flinkでソーステーブルと結果テーブルを作成し、そのテーブルを使用してソースデータをAnalyticDB for MySQLクラスターに書き込みます。 | |
AnalyticDB for MySQLデータベースにログインして、ソースデータが書き込まれているかどうかを確認します。 |
ステップ1: データの準備
Apache Flinkノードのルートディレクトリで、
vim /root/data.csv
コマンドを実行して、data.csvという名前のCSVファイルを作成します。CSVファイルには次のデータが含まれています。 追加の行のデータをコピーして、書き込むデータの量を増やすことができます。
0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29
CSVファイルが作成されたら、他のApache Flinkノードの /rootディレクトリにファイルをデプロイします。
ステップ2: データの書き込み
Apache Flink SQLアプリケーションを起動して実行します。 詳細については、「SQLクライアントCLIの起動」をご参照ください。
次のステートメントを実行して、
csv_person
という名前のソーステーブルを作成します。CREATE TABLEが存在しない場合はcsv_person ( 'user_id 'ストリング、 'user_name' ストリング、 「年齢」INT ) WITH ( 'connector' = 'filesystem' 、 'path' = 'file:/// root/data.csv '、 'format' = 'csv' 、 'csv.ignore-parse-errors' = 'true' 、 'csv.allow-comments' = 'true' );
説明ソーステーブルの列名とデータ型は、AnalyticDB for MySQLクラスターのターゲットテーブルと同じである必要があります。
上記のステートメントの
path
フィールドは、data.csvファイルのオンプレミスディレクトリを指定します。 すべてのApache Flinkノード上のファイルのディレクトリが同じであることを確認します。 data.csvファイルがオンプレミスのデバイスに保存されていない場合は、ファイルの実際のディレクトリを指定します。上記のステートメントのその他のパラメーターについては、「FileSystem SQL Connector」をご参照ください。
次のステートメントを実行して、
mysql_person
という名前の結果テーブルを作成します。CREATE TABLE mysql_person ( user_id文字列、 user_name文字列、 年齢INT ) WITH ( 'connector' = 'jdbc' 、 'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true' 、 'table-name' = '<table_name>' 、 'username' = '<username>' 、 'password' = '<password>' 、 'sink.buffer-flush.max-rows ' = '10' 、 'sink .buffer-flush.int erval' = '1s' );
説明結果テーブルの列名とデータ型は、AnalyticDB for MySQLクラスターのターゲットテーブルと同じである必要があります。
次の表に、AnalyticDB for MySQLクラスターに接続するために必要なパラメーターを示します。 オプションパラメーターの詳細については、JDBC SQLコネクタのトピックの「コネクタオプション」を参照してください。
パラメーター
説明
コネクター
Apache Flinkで使用されるコネクタタイプ。 このパラメーターを
jdbc
に設定します。url
AnalyticDB for MySQLクラスターのJDBC URL。
形式:
jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'
endpoint
: AnalyticDB for MySQLクラスターのエンドポイント。説明パブリックエンドポイントを使用してAnalyticDB for MySQLクラスターに接続する場合は、パブリックエンドポイントを申請する必要があります。 パブリックエンドポイントの申請方法については、「パブリックエンドポイントの申請またはリリース」トピックの「パブリックエンドポイントの申請またはリリース」を参照してください。
db_name
: AnalyticDB for MySQLクラスター内のターゲットデータベースの名前。useServerPrepStmts=false&rewriteBatchedStatements=true
: AnalyticDB for MySQLクラスターへのデータのバッチ書き込みに必要な設定。 この設定は、書き込みパフォーマンスを向上させ、AnalyticDB for MySQLクラスターの負荷を軽減するために使用されます。
例:
jdbc:mysql:// am-*********** .ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true
テーブル名
書き込みたいデータを格納するために使用されるAnalyticDB for MySQLクラスター内のターゲットテーブルの名前。 この例では、宛先テーブルの名前は
person
です。ユーザー名
書き込み権限を持つAnalyticDB for MySQLデータベースアカウントの名前。
説明SHOW GRANTSステートメントを実行して、現在のアカウントの権限を照会できます。
GRANTステートメントを実行して、アカウントに権限を付与できます。
パスワード
書き込み権限を持つAnalyticDB for MySQLデータベースアカウントのパスワード。
sink.buffer-flush.max行
Apache FlinkからAnalyticDB for MySQLクラスターに一度に書き込むことができる行の最大数。 Apache Flinkはリアルタイムでデータを受信します。 Apache Flinkが受け取るデータ行の数がこのパラメーターの値に達すると、データ行はAnalyticDB for MySQLクラスターにバッチで書き込まれます。 有効な値:
0: Apache Flinkは、
sink.buffer-flush.int erval
パラメーターで指定された最大時間間隔に達した場合にのみデータをバッチで書き込みます。特定の行数を指定する0以外の値。 例: 1000または2000。
説明このパラメーターを0に設定しないことを推奨します。 このパラメーターを0に設定すると、書き込みパフォーマンスが低下し、同時クエリ中にAnalyticDB for MySQLクラスターの負荷が増加します。
sink.buffer-flush.max-rows
パラメーターとsink.buffer-flush.int erval
パラメーターの両方を0以外の値に設定した場合、次のバッチ書き込みルールが適用されます。Apache Flinkが受信するデータ行の数が
sink.buffer-flush.max-rows
パラメーターの値に達しても、最大時間間隔がsink.buffer-flush.int erval
パラメーターの値に達していない場合、Apache Flinkは、最大時間間隔の有効期限を待つことなく、データをAnalyticDB for MySQLクラスターにバッチで書き込みます。Apache Flinkが受信するデータ行の数が
sink.buffer-flush.max-rows
パラメーターの値に達していないが、最大時間間隔がsink.buffer-flush.int erval
パラメーターの値に達した場合、Apache Flinkは、Apache Flinkが受信するデータの量に関係なく、データをAnalyticDB for MySQLクラスターにバッチで書き込みます。
sink.buffer-flush.int erval
Apache FlinkがAnalyticDB for MySQLクラスターにデータをバッチ書き込みするための最大時間間隔。これは、次のバッチ書き込み操作までに必要な最大時間でもあります。 有効な値:
0: Apache Flinkは、
sink.buffer-flush.max-rows
パラメーターで指定されたデータ行の最大数に達した場合にのみ、データをバッチで書き込みます。特定の時間間隔を指定する0以外の値。 例: 1d、1h、1分、1s、または1ms。
説明オフピーク時にソースデータの量が少ない場合にデータがタイムリーに書き込まれるように、このパラメーターを0に設定しないことをお勧めします。
INSERT INTO
文を実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されず、INSERT INTOステートメントはINSERT IGNORE INTO
ステートメントと同等です。 詳細については、「INSERT INTO」をご参照ください。INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;
ステップ3: データの確認
データが書き込まれた後、AnalyticDB for MySQLクラスターのtpch
データベースにログインし、次のステートメントを実行して、ソースデータがperson
テーブルに書き込まれているかどうかを確認します。
SELECT * から人;