Apache Hudiテーブル形式は、Object Storage Service (OSS) に基づいて使用でき、UPDATE、DELETE、およびINSERT操作をサポートします。 AnalyticDB for MySQLはHudiと統合されています。 これにより、Spark SQLを使用してHudi外部テーブルを読み書きできます。 このトピックでは、Spark SQLを使用してHudi外部テーブルを読み書きする方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
説明予約済みストレージリソースが0を超えるAnalyticDB計算ユニット (ACU) を持つAnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
AnalyticDB for MySQLのジョブリソースグループが作成されます。 Data Lakehouse Editionクラスター。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLのデータベースアカウントが作成されました Data Lakehouse Editionクラスター。
手順1: SQL Developmentページに移動します
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、
を選択します。SQLConsoleタブで、Sparkエンジンとジョブリソースグループを選択します。
ステップ2: データベースとHudi外部テーブルを作成する
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
次の文を実行してデータベースを作成します。 データベースが既に存在する場合は、この手順をスキップできます。
CREATE DATABASE adb_external_db_hudi location 'oss://<bucket_name>/test/'; /* The location that is used to store the database. Replace the parameter value with your OSS path. */
次のステートメントを実行して、Hudi外部テーブルを作成します。
CREATE TABLE adb_external_db_hudi.test_hudi_tbl ( `id` int, `name` string, `age` int ) using hudi tblproperties (primaryKey = 'id', preCombineField = 'age') partitioned by (age) location 'oss://<bucket_name>/test/table/'; /* The location that is used to store the external table. Replace the parameter value with your OSS path. */
重要データベースと外部テーブルの格納に使用されるバケットは同じである必要があります。
外部テーブルのOSSパスには、データベースのディレクトリレベルより少なくとも1つ多いディレクトリレベルが必要です。 外部テーブルの場所は、データベースパス内にある必要があります。
外部テーブルを作成するときは、primaryKeyパラメーターを使用して主キーを指定する必要があります。 preCombineFieldパラメーターはオプションです。 このパラメーターを指定せずにUPDATE操作を実行すると、エラーメッセージが返されます。
ステップ3: Hudi外部テーブルへのデータの書き込み
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
INSERT
次のいずれかの方法を使用して、Hudi外部テーブルにデータを書き込みます。
方法1: INSERT INTOステートメントを実行する
INSERT INTO adb_external_db_hudi.test_hudi_tbl values(1, 'lisa', 10),(2, 'jams', 10);
方法2: INSERT OVERWRITEステートメントを実行する
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl values (1, 'lisa', 10), (2, 'jams', 20);
方法3: INSERT OVERWRITEステートメントを実行してデータを静的パーティションに書き込む
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl partition(age=10) values(1, 'anna');
方法4: INSERT OVERWRITEステートメントを実行して、動的パーティションにデータを書き込む
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl partition (age) values (1, 'bom', 10);
UPDATE
次の文を実行してデータを更新します。 この例では、id値が2の行のname列の値がboxに更新されます。
UPDATE adb_external_db_hudi.test_hudi_tbl SET name = 'box' where id = 2;
DELETE
次の文を実行してデータを削除します。 この例では、id値が1の行が削除されます。
DELETE FROM adb_external_db_hudi.test_hudi_tbl where id = 1;
同時実行制御
Hudi外部テーブルは、ロックプロバイダーベースの同時実行制御メカニズムを使用して、DML操作の実行中の同時実行の競合を防ぎます。 異なるデータ範囲への複数の同時書き込みが許可されます。 書き込みの競合を防ぎ、データの正確性と一貫性を確保するために、書き込みたいデータに重複するエントリが含まれていないことを確認してください。 同時実行制御を有効にするには、次のパラメーターを設定する必要があります。 Hudiの同時実行制御メカニズムの詳細については、「同時実行制御」をご参照ください。
オープンソースのHudi JARパッケージを使用する場合、MdsBasedLockProviderクラスを使用して同時実行制御を実装することはできません。
set hoodie.cleaner.policy.failed.writes=LAZY;
set hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL;
set hoodie.write.lock.provider=org.apache.hudi.sync.adb.MdsBasedLockProvider;
下表に、各パラメーターを説明します。
パラメーター名 | パラメーター値 | 必須 | 説明 | |
hoodie.cleaner.policy.failed.writes | レイジー | あり | 失敗した書き込みに対するダーティデータのクリーンアップポリシー。 LAZYの値は、不完全な書き込みがすぐにはクリーンアップされないことを指定します。 失敗した書き込みは、ハートビートの有効期限が切れるとバッチクリーンアップされます。 この設定は、複数の同時書き込みを伴うシナリオに適しています。 | |
hoodie.write.concurrency.mo de | OPTIMISTIC_CONCURRENCY_CONTROL | あり | 書き込み操作の同時実行モード。 OPTIMISTIC_CONCURRENCY_CONTROLの値は、Hudi外部テーブルで複数の書き込みが実行された場合に、各書き込みが完了する前にシステムが書き込みの競合をチェックすることを指定します。 競合が検出された場合、書き込みは失敗します。 | |
hoodie.write.lock.provider | org.apache.hudi.sync.adb.MdsBasedLockProvider | あり | ロックプロバイダークラスの名前。 ビジネス要件に基づいてロックプロバイダーを指定できます。 ロックプロバイダークラスは、 |
ステップ4: データの照会
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
Spark SQLステートメントを実行すると、実行の成功または失敗を示すメッセージが返されますが、データは返されません。 データを表示するには、Spark JAR開発ページに移動し、[アプリケーション] タブのアプリケーションに対応する [操作] 列の [ログ] をクリックします。 詳細については、Sparkエディターのトピックの「Sparkアプリケーションに関する情報の表示」をご参照ください。
次のステートメントを実行して、Hudi外部テーブルのデータを照会します。
SELECT * FROM adb_external_db_hudi.test_hudi_tbl;