すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Spark分散SQLエンジンを使用したSpark SQLジョブの開発

最終更新日:Jun 11, 2024

データをリアルタイムで分析する場合、またはJava Database Connectivity (JDBC) APIを使用してSpark SQLジョブを開発する場合は、AnalyticDB for MySQLが提供するSpark分散SQLエンジンを使用してSQLジョブを開発できます。 Spark分散SQLエンジンを使用すると、SQL文を使用してデータを簡単に分析、処理、クエリし、SQLの実行効率を向上できます。 このトピックでは、Spark分散SQLエンジンを起動して接続する方法について説明します。

概要

AnalyticDB for MySQL Sparkは、オープンソースのSpark分散SQLエンジンがSpark SQLジョブを開発するためのマネージドサービスを提供します。 Spark分散SQLエンジンは、Sparkに組み込まれ、JDBC APIをサポートする分散SQL実行エンジンです。 JDBC APIを使用して、標準のHive ThriftServerをサポートするクライアントからSpark分散SQLエンジンに接続できます。

複数のSpark分散SQLエンジンを起動できます。 Spark分散SQLエンジンは、相互に分離された実行環境と独立したURLを持ちますが、分散SQLエンジンは同じネットワーク帯域幅を共有します。 この場合、複数のSpark分散SQLエンジンを使用すると、大量のデータのインポートには適していません。 Spark分散SQLエンジンの詳細については、「分散SQLエンジン」をご参照ください。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターは、Object Storage Service (OSS) バケットと同じリージョンに作成されます。

  • ジョブのリソースグループがAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターに作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター用にデータベースアカウントが作成されます。

  • BeelineクライアントとJavaアプリケーションを実行するためのJava 8開発環境がインストールされています。

  • クライアントのIPアドレスがAnalyticDB for MySQLクラスターのホワイトリストに追加されます。 詳細は、ホワイトリストの構成をご参照ください。

Spark分散SQLエンジンの起動

  1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

  2. 左側のナビゲーションウィンドウで、[ジョブの開発] > [Spark JARの開発] を選択します。

  3. Sparkエディターの上部で、ジョブリソースグループとSQLEngineジョブタイプを選択します。

  4. Sparkエディターで次のコードを実行します。

    -- Spark SQLEngineの起動例を以下に示します。 コンテンツを変更し、sparkプログラムを実行します。
    CONF spark.adb.eni.enabled=true;
    CONF spark.adb.eni.vswitchId=vsw-bp1wh6lp1abs3fi0 ****;
    CONF spark.adb.eni.securityGroupId=sg-bp15ncw8v0624xe ****;
    CONF spark.driver.resourceSpec=medium;
    CONF spark.exe cutor.instances=1;
    CONF spark.exe cutor.resourceSpec=small;
    CON F spark.app.name=Spark SQLEngine;
    CONF spark.sql.hive.metastore.version=adb;
    CONF spark.kubernetes.driverEnv.HIVE_SERVER2_USER=AdbSpark14 ****;
    CONF spark.kubernetes.driverEnv.HIVE_SERVER2_PASSWORD=Spark23 ****;
    CONF spark.adb.sessionTTLSeconds=1200; 

    下表に、各パラメーターを説明します。

    パラメーター

    必須

    説明

    spark.adb.eni.enabled

    特定の条件が満たされればはい

    Elastic Network Interface (ENI) 機能を有効にするかどうかを指定します。

    • IPアドレスベースのURLを使用してSpark分散SQLエンジンに接続する場合は、このパラメーターをtrueに設定します。

    • エンドポイントベースのURLを使用してSpark分散SQLエンジンに接続する場合、このパラメーターを設定する必要はありません。

    spark.adb.eni.vswitchId

    特定の条件が満たされればはい

    ENIに関連付けられているvSwitchのID。

    • IPアドレスベースのURLを使用してSpark分散SQLエンジンに接続する場合、このパラメーターをクライアントが存在する仮想プライベートクラウド (VPC) のvSwitch IDに設定します。

    • エンドポイントベースのURLを使用してSpark分散SQLエンジンに接続する場合、このパラメーターを設定する必要はありません。

    spark.adb.eni.securityGroupId

    特定の条件が満たされればはい

    ENIに関連付けられているセキュリティグループのID。

    • IPアドレスベースのURLを使用してSpark分散SQLエンジンに接続する場合、このパラメーターをクライアントが存在するVPCのセキュリティグループIDに設定します。

    • エンドポイントベースのURLを使用してSpark分散SQLエンジンに接続する場合、このパラメーターを設定する必要はありません。

    spark.app.name

    Sparkアプリケーションの名前。

    spark.sql.hive.metastore.version

    アクセスするメタデータのバージョン。 有効な値:

    • adb: AnalyticDB for MySQLのバージョン。

    • <hive_version>: Hiveメタストアのバージョン。

    説明
    • Apache SparkでサポートされているHiveバージョンについては、「Sparkの設定」をご参照ください。

    • 自己管理型のHiveメタストアにアクセスするには、デフォルト設定を標準のApache Spark設定に置き換えることができます。 詳細については、「Spark設定」をご参照ください。

    spark.kubernetes.driverEnv.HIVE_SERVER2_USER

    任意

    Spark分散SQLエンジンのユーザー名。 ユーザー名には、大文字、小文字、および数字を含める必要があります。 ユーザー名の長さに制限はありません。

    spark.kubernetes.driverEnv.HIVE_SERVER2_PASSWORD

    任意

    Spark分散SQLエンジンのパスワード。 パスワードには、大文字、小文字、および数字を含める必要があります。 パスワードの長さに制限はありません。

    spark.adb.sessionTTLSeconds

    任意

    Spark分散SQLエンジンの生存時間 (TTL) 。 単位は秒です。 デフォルト値: 1200 デフォルト値は、Spark分散SQLエンジンが最後のSQLコードブロックが実行されてから1,200秒後に自動的に破棄されることを指定します。

    説明

    Spark分散SQLエンジンが破棄されると、Spark JARジョブのステータスが完了に変わります。

    その他のパラメーターについては、「Sparkアプリケーション設定パラメーター」をご参照ください。

  5. [今すぐ実行] をクリックします。

    重要

    Spark JARジョブのステータスは、[アプリケーション] タブで確認できます。 Spark分散SQLエンジンに接続できるのは、Spark JARジョブが [実行中] 状態の場合のみです。

Spark分散SQLエンジンへの接続

IPアドレスベースのURLまたはエンドポイントベースのURLを使用して、標準のHive ThriftServerをサポートするクライアントからSpark分散SQLエンジンに接続できます。 クライアントの詳細については、「HiveServer2クライアント」をご参照ください。

IPアドレスベースのURLまたはエンドポイントベースのURLを使用してSpark分散SQLエンジンに接続する場合の使用法に関する注意事項:

  • (推奨) IPアドレスベースのURLを使用してSpark分散SQLエンジンに接続する: spark.adb.eni.enabledパラメーターをtrueに設定してENI機能を有効にし、クライアントとENIが同じVPCに存在する場合、IPアドレスベースのURLを使用して分散Spark SQLエンジンに接続できます。 この場合、Spark分散SQLエンジンのIPアドレスベースのURLを取得する必要があります。 クライアントが別のVPCにある場合、spark.adb. ENI. vswitchIdパラメーターで指定されたeniのvSwitch IDを変更し、Spark分散SQLエンジンを再起動する必要があります。

  • エンドポイントベースのURLを使用してSpark分散SQLエンジンに接続する: クライアントとAnalyticDB for MySQLクラスターが同じVPCにある場合、またはインターネット経由で接続を確立する場合は、エンドポイントベースのURLを使用してSpark分散SQLエンジンに接続できます。 この場合、Spark分散SQLエンジンのエンドポイントベースのURLを取得する必要があります。

手順1: Spark分散SQLエンジンの接続URLの取得

Spark分散SQLエンジンのIPアドレスベースのURLの取得

  1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

  2. 左側のナビゲーションウィンドウで、[ジョブの開発] > [Spark JARの開発] を選択します。

  3. [アプリケーション] タブで、管理するSparkアプリケーションを見つけ、[操作] 列の [ログ] をクリックして、Spark分散SQLエンジンのIPアドレスベースのURLを取得します。

    jdbc:hive2:// 172.XX. XX.XX:10001/<db>;transportMode=http;httpPath=s202401301607hz ************. cliservice

Spark分散SQLエンジンのエンドポイントベースのURLの取得

  1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

  2. 左側のナビゲーションウィンドウで、[ジョブの開発] > [Spark JARの開発] を選択します。

  3. [アプリケーション] タブで、管理するSparkアプリケーションを見つけ、[操作] 列の [詳細] をクリックして、Spark分散SQLエンジンのエンドポイントベースのURLを取得します。

    • インターネット経由でSpark分散SQLエンジンに接続する場合は、Spark JDBCパブリック接続URLパラメーターに対応するエンドポイントベースのURLを使用する必要があります。

    • VPC経由でSpark分散SQLエンジンに接続する場合は、Spark JDBC VPC connect URLパラメーターに対応するエンドポイントベースのURLを使用する必要があります。

    重要
    • Spark分散SQLエンジンを初めて起動すると、システムはネットワークを初期化するのに約1分かかります。 ネットワークの初期化後、エンドポイントベースのURLを取得できます。

    • パブリックエンドポイントの申請またはリリース時に、エンドポイントベースのURLを作成することはできません。 Spark分散SQLエンジンを複数回起動した後にエンドポイントベースのURLを作成できない場合、 チケットを起票します。

手順2: Spark分散SQLエンジンへの接続

この例では、BeelineクライアントとJavaアプリケーションを使用して、Spark分散SQLエンジンに接続します。

Beelineクライアントを使用したSpark分散SQLエンジンへの接続

  1. Spark分散SQLエンジンに接続します。

    構文:

    ! 接続 <接続URL> <ユーザー名> <パスワード> 
    • 接続URL: 手順1で取得したSpark分散SQLエンジンの接続URL。

    • Username: Spark分散SQLエンジンのユーザー名。

    • パスワード: Spark分散SQLエンジンのパスワード。

    例:

    ! 接続jdbc:hive2:// jdbc:hive2:// amv-bp1c3em7b2e **** -spark.ads.aliyuncs.com:10001/adb_test;transportMode=http;httpPath=s202401301607hz ***********. cliservice AdbSpark14 ****
    重要
    • spark分散sqlエンジンの起動時にSpark. SQL. hive.metastore.versionパラメーターをadbに設定した場合、接続URLにAnalyticDB for MySQLクラスターの既存のデータベースを含める必要があります。 接続URLに既存のデータベースを含めない場合、既定のデータベースが使用され、接続は失敗します。

    • ネットワークジッタが発生すると、ThriftServer HTTPプロトコルはHTTPステータスコード509を返します。これは、アクセスが拒否されたことを示し、接続を自動的に再試行しません。 Spark分散SQLエンジンに接続するには、コマンドを再実行する必要があります。

    • 自動的に生成されるtransportMode=http;httpPath=s202401301607hzdf852 *************. cliserviceパラメーターを変更または削除することはできません。 接続URLの末尾に追加の設定パラメーターを追加できます。

    サンプル結果:

    接続先: Spark SQL (バージョン3.2.0)
    ドライバ: Hive JDBC (バージョン2.3.9)
    トランザクションの分離: TRANSACTION_REPEATABLE_READ 
  2. Spark分散SQLエンジンを使用してSpark SQL文を実行します。

    ショーテーブル;

Javaアプリケーションを使用してSpark分散SQLエンジンに接続する

  1. 環境変数を設定します。

    export HIVEURL="jdbc:hive2:// jdbc:hive2:// amv-bp1c3em7b2e **** -spark.ads.aliyuncs.com:10001/adb_test;transportMode=http;httpPath=s202401301607hz ************. cliservice"
    エクスポートHIVEUSER="AdbSpark14 ****"
    エクスポートHIVEPASSWORD="Spark23 ****" 
    • HIVEURL: 手順1で取得したSpark分散SQLエンジンの接続URL。

    • HIVEUSER: Spark分散SQLエンジンのユーザー名。

    • HIVEPASSWORD: Spark分散SQLエンジンのパスワード。

  2. Mavenの依存関係をpom.xmlファイルに追加します。 サンプルコード:

    <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-jdbc</artifactId>
          <version>2.3.9</version>
        </dependency> 
  3. Spark分散SQLエンジンに接続し、Spark SQLステートメントを実行します。

    パッケージcom.aliyun;
    java.sql.Connectionをインポートします。java.sql.DriverManagerをインポートします。java.sql.ResultSetをインポートします。java.sql.Statementをインポートします。パブリッククラスTestHive {
        public static void main(String[] args) throws Exception {
            String hiveUrl = System.getenv("HIVEURL");;
            文字列username = System.getenv("HIVEUSER");
            文字列パスワード=System.getenv("HIVEPASSWORD");
            // ドライバがインストールされています。
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            // 手順1で取得した接続URLを使用して、Spark分散SQLエンジンに接続します。
            String url = String.format(hiveUrl);
            接続con = DriverManager.getConnection(url、ユーザー名、パスワード);
            Statement stmt = con.createStatement();
            // SHOW TABLESステートメントを実行します。
            ResultSet tables = stmt.exe cuteQuery("show tables");
            while (tables.next()) {
                System.out.println(tables.getString("tableName"));
            }
            // テーブルデータを照会します。
            ResultSet rows = stmt.exe cuteQuery("select * from test");
            while (rows.next()) {
                System.out.println(rows.getString(0));
            }
        }
    }
    重要
    • spark分散sqlエンジンの起動時にSpark. SQL. hive.metastore.versionパラメーターをadbに設定した場合、接続URLにAnalyticDB for MySQLクラスターの既存のデータベースを含める必要があります。 接続URLに既存のデータベースを含めない場合、既定のデータベースが使用され、接続は失敗します。

    • ネットワークジッタが発生すると、ThriftServer HTTPプロトコルはHTTPステータスコード509を返します。これは、アクセスが拒否されたことを示し、接続を自動的に再試行しません。 Spark分散SQLエンジンに接続するには、コマンドを再実行する必要があります。

    • 自動的に生成されるtransportMode=http;httpPath=s202401301607hzdf852 *************. cliserviceパラメーターを変更または削除することはできません。 接続URLの末尾に追加の設定パラメーターを追加できます。