すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Spark SQLを使用したDelta外部テーブルの読み書き

最終更新日:Sep 10, 2024

Deltaは、Object Storage Service (OSS) に基づいて使用できるデータレイクテーブル形式で、UPDATE、DELETE、およびINSERT操作をサポートします。 AnalyticDB for MySQLはDeltaと統合されています。 これにより、Spark SQLを使用してDelta外部テーブルを読み書きできます。 このトピックでは、Spark SQLを使用してDelta外部テーブルを読み書きする方法について説明します。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。

  • AnalyticDB for MySQLのジョブリソースグループが作成されます。 Data Lakehouse Editionクラスター。 詳細については、「リソースグループの作成」をご参照ください。

  • AnalyticDB for MySQLのデータベースアカウントが作成されました Data Lakehouse Editionクラスター。

使用上の注意

  • XIHEエンジンは、デルタテーブルの読み書きには使用できません。

  • AnalyticDB for MySQL Sparkは、対応するDeltaバージョンと統合および更新されますが、Deltaの問題と異なるDeltaバージョン間の非互換性のトラブルシューティングについては責任を負いません。

デルタ外部テーブルの読み取りと書き込み

AnalyticDB for MySQLに組み込まれているDeltaパッケージは、Spark SQLを使用してDelta外部テーブルを読み書きするのに役立ちます。 組み込みのDeltaパッケージのバージョン (2.0.2) が読み取りおよび書き込み要件を満たさない場合は、カスタムのDeltaパッケージバージョンを設定できます。

AnalyticDB for MySQLの組み込みのDeltaパッケージを使用する

ステップ1: SQL Developmentページへ

  1. AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。   [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

  2. 左側のナビゲーションウィンドウで、[ジョブ開発] > [SQL開発] を選択します。

  3. [SQLConsole] タブで、Sparkエンジンとジョブリソースグループを選択します。

手順2: 外部データベースとDelta外部テーブルの作成

説明

次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。

  1. 次の文を実行してデータベースを作成します。 データベースが既に存在する場合は、この手順をスキップします。

    CREATE DATABASE if not exists external_delta_db
    location "oss://<bucket_name>/test/";      /* The location where you want to create the database. Replace the parameter value with your OSS path. */
  2. 次のステートメントを実行して、Delta外部テーブルを作成します。

    存在しない場合は

    CREATE TABLE if not exists external_delta_db.delta_test_tbl (
      id int, 
      name string, 
      age int
    ) using delta 
    partitioned by (age) 
    location "oss://<bucket_name>/test/delta_test_tbl";

手順3: データをDelta外部テーブルに書き込む

説明

次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。

インサート

次のいずれかの方法を使用して、Delta外部テーブルにデータを書き込みます。

  • 方法1: INSERT INTOステートメントを実行する

    INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方法2: INSERT OVERWRITEステートメントを実行する

    INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
  • 方法3: INSERT OVERWRITEステートメントを実行してデータを静的パーティションに書き込む

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
  • 方法4: INSERT OVERWRITEステートメントを実行して、動的パーティションにデータを書き込む

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);

更新

次の文を実行してデータを更新します。 この例では、id値が1であるname列のデータがボックスに更新されます。

UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;

削除

次の文を実行してデータを削除します。 この例では、id値が1の行が削除されます。

DELETE FROM external_delta_db.delta_test_tbl where id = 1;

ステップ4: データの照会

説明
  • 次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。

  • Spark SQLステートメントを実行すると、実行の成功または失敗を示すメッセージが返されますが、データは返されません。 データを表示するには、Spark JAR開発ページに移動し、[アプリケーション] タブのアプリケーションに対応する [操作] 列の [ログ] をクリックします。 詳細については、Sparkエディターのトピックの「Sparkアプリケーションに関する情報の表示」をご参照ください。

次のステートメントを実行して、Delta外部テーブルのデータを照会します。

SELECT * FROM external_delta_db.delta_test_tbl;

カスタムデルタパッケージの使用

重要

バージョンの非互換性を防ぐために、AnalyticDB for MySQL Spark 3.2.0と互換性のあるDeltaパッケージバージョンを使用することを推奨します。

ステップ1: SQL Developmentページへ

  1. AnalyticDB for MySQL コンソールにログインします。

  2. ページの左上隅で、リージョンを選択します。

  3. 左側のナビゲーションペインで、クラスターリスト をクリックします。

  4. Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスター ID をクリックします。

  5. 左側のナビゲーションウィンドウで、[ジョブ開発] > [SQL開発] を選択します。

  6. [SQLConsole] タブで、Sparkエンジンとジョブリソースグループを選択します。

手順2: 外部データベースとDelta外部テーブルの作成

説明

次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。

  1. 次の文を実行してデータベースを作成します。 データベースが既に存在する場合は、この手順をスキップします。

    add jar oss://<bucket_name>/path/to/delta-core_xx.jar;    /* A custom Delta package. You must manually upload the package to OSS. */
    
    add jar oss://<bucket_name>/path/to/delta-storage-xx.jar; /* A custom Delta package. You must manually upload the package to OSS. */
    
    SET spark.adb.connectors=oss;   /* The connector of AnalyticDB for MySQL Spark. Set the connector to OSS. */ 
    
    SET spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension;  /* An open source Spark parameter. */
    
    SET spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;   /* An open source Spark parameter. */
    
    CREATE DATABASE if not exists external_delta_db
    location "oss://<bucket_name>/test/";          /* The location where you want to create the database. Replace the parameter value with your OSS path. */
  2. 次のステートメントを実行して、Delta外部テーブルを作成します。

    存在しない場合は

    CREATE TABLE if not exists external_delta_db.delta_test_tbl (
      id int, 
      name string, 
      age int
    ) using delta 
    partitioned by (age) 
    location "oss://<bucket_name>/test/delta_test_tbl";

手順3: データをDelta外部テーブルに書き込む

説明

次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。

インサート

次のいずれかの方法を使用して、Delta外部テーブルにデータを書き込みます。

  • 方法1: INSERT INTOステートメントを実行する

    INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方法2: INSERT OVERWRITEステートメントを実行する

    INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
  • 方法3: INSERT OVERWRITEステートメントを実行してデータを静的パーティションに書き込む

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
  • 方法4: INSERT OVERWRITEステートメントを実行して、動的パーティションにデータを書き込む

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);

更新

次の文を実行してデータを更新します。 この例では、id値が1であるname列のデータがボックスに更新されます。

UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;

削除

次の文を実行してデータを削除します。 この例では、id値が1の行が削除されます。

DELETE FROM external_delta_db.delta_test_tbl where id = 1;

ステップ4: データの照会

説明
  • 次のSQL文は、バッチモードまたは対話モードで実行できます。 詳細については、「Spark SQL実行モード」をご参照ください。

  • Spark SQLステートメントを実行すると、実行の成功または失敗を示すメッセージが返されますが、データは返されません。 データを表示するには、Spark JAR開発ページに移動し、[アプリケーション] タブのアプリケーションに対応する [操作] 列の [ログ] をクリックします。 詳細については、Sparkエディターのトピックの「Sparkアプリケーションに関する情報の表示」をご参照ください。

次のステートメントを実行して、Delta外部テーブルのデータを照会します。

SELECT * FROM external_delta_db.delta_test_tbl;