Deltaは、Object Storage Service (OSS) に基づいて使用できるデータレイクテーブル形式で、UPDATE、DELETE、およびINSERT操作をサポートします。 AnalyticDB for MySQLはDeltaと統合されています。 これにより、Spark SQLを使用してDelta外部テーブルを読み書きできます。 このトピックでは、Spark SQLを使用してDelta外部テーブルを読み書きする方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
AnalyticDB for MySQLのジョブリソースグループが作成されます。 Data Lakehouse Editionクラスター。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLのデータベースアカウントが作成されました Data Lakehouse Editionクラスター。
使用上の注意
XIHEエンジンは、デルタテーブルの読み書きには使用できません。
AnalyticDB for MySQL Sparkは、対応するDeltaバージョンと統合および更新されますが、Deltaの問題と異なるDeltaバージョン間の非互換性のトラブルシューティングについては責任を負いません。
デルタ外部テーブルの読み取りと書き込み
AnalyticDB for MySQLに組み込まれているDeltaパッケージは、Spark SQLを使用してDelta外部テーブルを読み書きするのに役立ちます。 組み込みのDeltaパッケージのバージョン (2.0.2) が読み取りおよび書き込み要件を満たさない場合は、カスタムのDeltaパッケージバージョンを設定できます。
AnalyticDB for MySQLの組み込みのDeltaパッケージを使用する
ステップ1: SQL Developmentページへ
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、
を選択します。[SQLConsole] タブで、Sparkエンジンとジョブリソースグループを選択します。
手順2: 外部データベースとDelta外部テーブルの作成
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
次の文を実行してデータベースを作成します。 データベースが既に存在する場合は、この手順をスキップします。
CREATE DATABASE if not exists external_delta_db location "oss://<bucket_name>/test/"; /* The location where you want to create the database. Replace the parameter value with your OSS path. */
次のステートメントを実行して、Delta外部テーブルを作成します。
存在しない場合は
CREATE TABLE if not exists external_delta_db.delta_test_tbl ( id int, name string, age int ) using delta partitioned by (age) location "oss://<bucket_name>/test/delta_test_tbl";
手順3: データをDelta外部テーブルに書き込む
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
インサート
次のいずれかの方法を使用して、Delta外部テーブルにデータを書き込みます。
方法1: INSERT INTOステートメントを実行する
INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
方法2: INSERT OVERWRITEステートメントを実行する
INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
方法3: INSERT OVERWRITEステートメントを実行してデータを静的パーティションに書き込む
INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
方法4: INSERT OVERWRITEステートメントを実行して、動的パーティションにデータを書き込む
INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);
更新
次の文を実行してデータを更新します。 この例では、id値が1であるname列のデータがボックスに更新されます。
UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;
削除
次の文を実行してデータを削除します。 この例では、id値が1の行が削除されます。
DELETE FROM external_delta_db.delta_test_tbl where id = 1;
ステップ4: データの照会
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
Spark SQLステートメントを実行すると、実行の成功または失敗を示すメッセージが返されますが、データは返されません。 データを表示するには、Spark JAR開発ページに移動し、[アプリケーション] タブのアプリケーションに対応する [操作] 列の [ログ] をクリックします。 詳細については、Sparkエディターのトピックの「Sparkアプリケーションに関する情報の表示」をご参照ください。
次のステートメントを実行して、Delta外部テーブルのデータを照会します。
SELECT * FROM external_delta_db.delta_test_tbl;
カスタムデルタパッケージの使用
バージョンの非互換性を防ぐために、AnalyticDB for MySQL Spark 3.2.0と互換性のあるDeltaパッケージバージョンを使用することを推奨します。
ステップ1: SQL Developmentページへ
AnalyticDB for MySQL コンソールにログインします。
ページの左上隅で、リージョンを選択します。
左側のナビゲーションペインで、クラスターリスト をクリックします。
Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、
を選択します。[SQLConsole] タブで、Sparkエンジンとジョブリソースグループを選択します。
手順2: 外部データベースとDelta外部テーブルの作成
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
次の文を実行してデータベースを作成します。 データベースが既に存在する場合は、この手順をスキップします。
add jar oss://<bucket_name>/path/to/delta-core_xx.jar; /* A custom Delta package. You must manually upload the package to OSS. */ add jar oss://<bucket_name>/path/to/delta-storage-xx.jar; /* A custom Delta package. You must manually upload the package to OSS. */ SET spark.adb.connectors=oss; /* The connector of AnalyticDB for MySQL Spark. Set the connector to OSS. */ SET spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension; /* An open source Spark parameter. */ SET spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; /* An open source Spark parameter. */ CREATE DATABASE if not exists external_delta_db location "oss://<bucket_name>/test/"; /* The location where you want to create the database. Replace the parameter value with your OSS path. */
次のステートメントを実行して、Delta外部テーブルを作成します。
存在しない場合は
CREATE TABLE if not exists external_delta_db.delta_test_tbl ( id int, name string, age int ) using delta partitioned by (age) location "oss://<bucket_name>/test/delta_test_tbl";
手順3: データをDelta外部テーブルに書き込む
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
インサート
次のいずれかの方法を使用して、Delta外部テーブルにデータを書き込みます。
方法1: INSERT INTOステートメントを実行する
INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
方法2: INSERT OVERWRITEステートメントを実行する
INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
方法3: INSERT OVERWRITEステートメントを実行してデータを静的パーティションに書き込む
INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
方法4: INSERT OVERWRITEステートメントを実行して、動的パーティションにデータを書き込む
INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);
更新
次の文を実行してデータを更新します。 この例では、id値が1であるname列のデータがボックスに更新されます。
UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;
削除
次の文を実行してデータを削除します。 この例では、id値が1の行が削除されます。
DELETE FROM external_delta_db.delta_test_tbl where id = 1;
ステップ4: データの照会
次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。
Spark SQLステートメントを実行すると、実行の成功または失敗を示すメッセージが返されますが、データは返されません。 データを表示するには、Spark JAR開発ページに移動し、[アプリケーション] タブのアプリケーションに対応する [操作] 列の [ログ] をクリックします。 詳細については、Sparkエディターのトピックの「Sparkアプリケーションに関する情報の表示」をご参照ください。
次のステートメントを実行して、Delta外部テーブルのデータを照会します。
SELECT * FROM external_delta_db.delta_test_tbl;