このトピックでは、OSS Selectを使用してデータクエリとデータクエリの利点を高速化するようにSparkを設定する方法について説明します。
背景情報
このトピックでは、CDH 6ベースのApache Impalaを使用してOSSデータをクエリするに基づいてCDH 6クラスターを構築および設定したことを前提としています。${}
のすべてのコンテンツは環境変数です。 これらの環境変数を変更します。手順1: OSSデータの読み取りと書き込みを行うようにSparkを設定する
デフォルトでは、OSS準拠パッケージはSparkのCLASSPATHでは除外されます。 このパッケージをCLASSPATHに追加し、OSSデータを読み書きするようにSparkを設定するには、すべてのCDHノードで次の操作を実行します。
- ${CDH_HOME}/lib/sparkディレクトリに移動します。 次のコマンドを実行します。
[root @ cdh-master spark]# cd jars / [root @ cdh-master jars]# ln -s .. /.. /../jars/hadoop-aliyun-3.0.0-cdh6.0.1.jar hadoop-aliyun.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-sdk-oss-2.8.3.jar aliyun-sdk-oss-2.8.3.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/jdom-1.1.jar jdom-1.1.jar
- ${CDH_HOME}/lib/sparkディレクトリに移動します。 クエリを実行します。
[root @ cdh-master spark]# 。/ビン /スパークシェル 警告: ユーザー定義SPARK_HOME (/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/spark) は、検出された (/opt/cloudera/parcels/CDH/lib/spark) をオーバーライドします。 警告: ユーザー定義の場所からsparkクラスを実行します。 デフォルトのログレベルを "WARN" に設定します。 ロギングレベルを調整するには、sc.setLogLevel(newLevel) を使用します。 SparkRの場合は、setLogLevel(newLevel) を使用します。 http:// x.x.x.x:4040で利用可能なSparkコンテキストWeb UI Sparkコンテキストは 'sc' (master = yarn、app id = application_1540878848110_0004) として使用できます。 Sparkセッションは 'spark' として利用できます。 へようこそ _____ /_______ _____ /_ _\ \/ _ \/ _ '/ __/ '_/ /___/ .__/\_,_/_/_/\_\ バージョン2.2.0-cdh6.0.1 /_/ Scalaバージョン2.11.8 (Java HotSpot(TM) 64ビットサーバーVM、Java 1.8.0_152) の使用 式を入力して評価します。 タイプ: より多くの情報のための助け。 scala> val myfile = sc.textFile("oss://{your-bucket-name}/50/store_sales") myfile: org.apache.spark.rdd.RDD[String] = oss://{your-bucket-name}/50/store_sales MapPartitionsRDD[1] at textFile at <console>:24 scala> myfile.count() res0: ロング=144004764 scala> myfile.map(line => line.split('|')).filter(_(0).toInt >= 2451262).take (3) res15: Array[Array[String]]] = Array(Array(Array(2451262、71079、20359、154660、284233、6206、150579、46、512、2160001、84、6.94、11.38、9.33、681.83、783.72、582.96、955.92、5.09、681.83、101.89、106.98、-481.07)) 、Array(2451262、71079、26863、154660、284233、6206、150579、46) 345、2160001、12、67.82、115.29、25.36、2451262、0.00、304.32、813.84、1383.48、21.30、0.00、304.32、325.62、-509.52) 、アレイ (34.67、71079、55852、154660、284233、6206、150579、46、243、2160001、74、32.41、、1.38、0.00、102.12、2398.34、2565.58、0.00、102.12、106.20) 、4.08-2296.22)) scala> myfile.map(line => line.split('|')).filter(_(0) >= "2451262").saveAsTextFile("oss://{your-bucket-name}/spark-oss-test.1")
クエリが正しく実行されると、Sparkがデプロイされます。
手順2: OSS SelectをサポートするようにSparkを設定する
OSS Selectの詳細については、「OSS Select」をご参照ください。 次のセクションでは、s oss-cn-shenzhen.aliyuncs.comをOSSエンドポイントとして使用します。 すべてのCDHノードで次の操作を実行します。
- ここをクリックして、spark-2.2.0-oss-select-0.1.0-SNAPSHOT.tar.gzパッケージを ${CDH_HOME}/jarsディレクトリにダウンロードします。 このパッケージはプレビュー中です。
- ダウンロードしたパッケージを解凍します。
[root @ cdh-master jars]# tar -tvf spark-2.2.0-oss-select-0.1.0-SNAPSHOT.tar.gz drwxr-xr-xルート /ルート0 2018-10-30 17:59 spark-2.2.0-oss-select-0.1.0-SNAPSHOT / -rw-r -- root/root 26514 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/stax-api-1.0.1.jar -rw-r -- root/root 547584 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/aliyun-sdk-oss-3.3.0.jar -rw-r -- root/root 13277 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/aliyun-java-sdk-sts-3.0.0.jar -rw-r -- root/root 116337 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/aliyun-java-sdk-core-3.4.0.jar -rw-r -- root/root 215492 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/aliyun-java-sdk-ram-3.0.0.jar -rw-r -- root/root 67758 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/jettison-1.1.jar -rw-r -- root/root 57264 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/json-20170516.jar -rw-r -- root/root 890168 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/jaxb-impl-2.2.3-1.jar -rw-r -- root/root 458739 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/jersey-core-1.9.jar -rw-r -- root/root 147952 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/jersey-json-1.9.jar -rw-r -- root/root 788137 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/aliyun-java-sdk-ecs-4.2.0.jar -rw-r -- root/root 153115 2018-10-30 16:11 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/jdom-1.1.jar -rw-r -- root/root 65437 2018-10-31 14:41 spark-2.2.0-oss-select-0.1.0-SNAPSHOT/aliyun-oss-select-spark_2.11-0.1.0-SNAPSHOT.jar
- ${CDH_HOME}/lib/spark/jarsディレクトリに移動します。 次のコマンドを実行します。
[root @ cdh-master jars]# pwd /opt/cloudera /小包 /CDH/lib/spark/jars [root @ cdh-master jars]# rm -f aliyun-sdk-oss-2.8.3.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-oss-select-spark_2.11-0.1.0-SNAPSHOT.jar aliyun-oss-select-spark_2.11-0.1.0-SNAPSHOT.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-java-sdk-core-3.4.0.jar aliyun-java-sdk-core-3.4.0.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-java-sdk-ecs-4.2.0.jar aliyun-java-sdk-ecs-4.2.0.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-java-sdk-ram-3.0.0.jar aliyun-java-sdk-ram-3.0.0.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-java-sdk-sts-3.0.0.jar aliyun-java-sdk-sts-3.0.0.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/aliyun-sdk-oss-3.3.0.jar aliyun-sdk-oss-3.3.0.jar [root @ cdh-master jars]# ln -s .. /.. /../jars/jdom-1.1.jar jdom-1.1.jar
比較テスト
テスト環境: Spark on YARNを比較テストに使用します。 4つのノード・マネージャ・ノードのそれぞれで最大4つのコンテナを実行できます。 各コンテナは、1つのコアのCPUと2 GBのメモリで構成されています。
テストデータ: 名前、会社、年齢の3つの列を含む合計630 MBのサイズ。
ot @ cdh-master jars]# hadoop fs -ls oss:// select-test-sz/people /
10アイテムを見つけました
-rw-rw- 1 63079930 2018-10-30 17:03 oss:// select-test-sz/people/part-00000
-rw-rw- 1 63079930 2018-10-30 17:03 oss:// select-test-sz/people/part-00001
-rw-rw- 1 63079930 2018-10-30 17:05 oss:// select-test-sz/people/part-00002
-rw-rw- 1 63079930 2018-10-30 17:05 oss:// select-test-sz/people/part-00003
-rw-rw- 1 63079930 2018-10-30 17:06 oss:// select-test-sz/people/part-00004
-rw-rw- 1 63079930 2018-10-30 17:12 oss:// select-test-sz/people/part-00005
-rw-rw- 1 63079930 2018-10-30 17:14 oss:// select-test-sz/people/part-00006
-rw-rw- 1 63079930 2018-10-30 17:14 oss:// select-test-sz/people/part-00007
-rw-rw- 1 63079930 2018-10-30 17:15 oss:// select-test-sz/people/part-00008
-rw-rw- 1 63079930 2018-10-30 17:16 oss:// select-test-sz/people/part-00009
[root @ cdh-master spark]# 。/ビン /スパークシェル
警告: ユーザー定義SPARK_HOME (/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/spark) は、検出された (/opt/cloudera/parcels/CDH/lib/spark) をオーバーライドします。
警告: ユーザー定義の場所からsparkクラスを実行します。
デフォルトのログレベルを "WARN" に設定します。
ロギングレベルを調整するには、sc.setLogLevel(newLevel) を使用します。 SparkRの場合は、setLogLevel(newLevel) を使用します。
http:// x.x.x.x:4040で利用可能なSparkコンテキストWeb UI
Sparkコンテキストは 'sc' (master = yarn、app id = application_1540887123331_0008) として使用できます。
Sparkセッションは「Spark」として利用できます。
へようこそ
_____
/_______ _____ /_
_\ \/ _ \/ _ '/ __/ '_/
/___/ .__/\_,_/_/_/\_\ バージョン2.2.0-cdh6.0.1
/_/
Scalaバージョン2.11.8 (Java HotSpot(TM) 64ビットサーバーVM、Java 1.8.0_152) の使用
式を入力して評価します。
タイプ: より多くの情報のための助け。
scala> val sqlContext = spark.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext @ 4bdef487
scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss" +
| "オプション (" +
| "oss.bucket 'select-test-sz', " +
| "oss.prefix 'people', " + // このプレフィックスを持つオブジェクトはこのテーブルに属します
| "oss.schema 'name string, company string, age long '," // like 'column_a long, column_b string'
| "oss.data.format 'csv'," + // 現在はcsvのみをサポートしています
| "oss.input.csv.header 'None'," +
| "oss.input.csv.recordDelimiter '\r\n'," +
| "oss.input.csv.fieldDelimiter ','," +
| "oss.input.csv.com mentChar '#'," +
| "oss.input.csv.quoteChar '\"',"+
| "oss.output.csv.recordDelimiter '\n'," +
| "oss.output.csv.fieldDelimiter ','," +
| "oss.output.csv.quoteChar '\"',"+
| "oss.endpoint 'oss -cn-shenzhen.aliyuncs.com ', " +
| "oss.accessKeyId 'あなたのアクセスキーID', " +
| "oss.accessKeySecret 'Your Access Key Secret')")
res0: org.apache.spark.sql.DataFrame = []
scala> val sql: String = "'Lora % 'のような名前の人からカウント (*) を選択"
sql: String=「Lora % 」のような名前の人からの選択カウント (*)
scala> sqlContext.sql(sql).show()
+ --------
| count(1)|
+ --------
| 31770 |
+ --------
scala> val textFile = sc.textFile("oss:// select-test-sz/people/")
textFile: org.apache.spark.rdd.RDD[String] = oss:// select-test-sz/people/ MapPartitionsRDD[8] at textFile at <console>:24
scala> textFile.map(line => line.split(',')).filter(_(0).startsWith("Lora")).count()
res3: ロング=31770
次の図は、OSS Selectを使用する場合のクエリ間の時間差を示しています。 OSS Selectを使用する場合のクエリ時間は15秒、OSS Selectを使用しない場合のクエリ時間は54秒です。
Spark準拠のOSS Selectパッケージの実装 (プレビュー)
- 定義仕様:
scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss" + | "オプション (" + | "oss.bucket 'select-test-sz', " + | "oss.prefix 'people', " + // このプレフィックスを持つオブジェクトはこのテーブルに属します | "oss.schema 'name string, company string, age long '," // like 'column_a long, column_b string' | "oss.data.format 'csv'," + // 現在はcsvのみをサポートしています | "oss.input.csv.header 'None'," + | "oss.input.csv.recordDelimiter '\r\n'," + | "oss.input.csv.fieldDelimiter ','," + | "oss.input.csv.com mentChar '#'," + | "oss.input.csv.quoteChar '\"',"+ | "oss.output.csv.recordDelimiter '\n'," + | "oss.output.csv.fieldDelimiter ','," + | "oss.output.csv.quoteChar '\"',"+ | "oss.endpoint 'oss -cn-shenzhen.aliyuncs.com ', " + | "oss.accessKeyId 'あなたのアクセスキーID', " + | "oss.accessKeySecret 'Your Access Key Secret')
フィールド 説明 oss.bucket データを含むバケットを入力します。 oss.prefix このプレフィックスを含む名前を持つすべてのオブジェクトは、定義されたTEMPORARY VIEWテーブルにマップされます。 oss.schema 定義したTEMPORARY VIEWテーブルのスキーマを指定します。 スキーマはString型です。 ファイルは、将来、スキーマを指定するために使用されます。 oss.data.format データコンテンツの形式を指定します。 CSV形式がサポートされています。 その他の形式もサポートされます。 oss.input.csv. * CSVオブジェクトを照会するときの入力パラメーターを指定します。 oss.output.csv. * CSVオブジェクトを照会するときの出力パラメーターを指定します。 oss.endpoint バケットが配置されているリージョンへのアクセスに使用するエンドポイントを入力します。 oss.accessKeyId OSSへのアクセスに使用するAccessKey IDを入力します。 oss.accessKeySecret OSSへのアクセスに使用するAccessKeyシークレットを入力します。 説明 基本パラメータのみが定義される。 詳細は、「SelectObject」をご参照ください。 - 次の演算子がフィルター条件でサポートされています。
=,<,>,<=, >=,||, or,not,and,in,like(StringStartsWith,StringEndsWith,StringContains)
。 PrunedFilteredScanでサポートされていない算術演算や文字列連結などの条件を使用してデータをプッシュダウンできない場合は、必要な列のみがOSS Selectにプッシュダウンされます。説明OSS Selectは他のフィルター条件をサポートしています。 詳細は、「SelectObject」をご参照ください。
TPC-Hを使用したクエリの比較
TPC-Hのquery1.sqlは、lineitemテーブルのクエリ、クエリのパフォーマンスのテスト、および構成効果の検証に使用されます。 OSS Selectでさらにデータをフィルタリングできるようにするには、where条件をl_shipdate <= '1998-09-16' からwhere l_shipdate > '1997-09-16' に変更します。 テスト用のデータのサイズは2.27 GBです。 クエリ方法:
- Spark SQL文のみを使用してデータを照会します。
[root @ cdh-master ~]# hadoop fs -ls oss:// select-test-sz/data/lineitem.csv -rw-rw- 1 2441079322 2018-10-31 11:18 oss:// select-test-sz/data/lineitem.csv
- Spark SQL文でOSS Selectを使用してデータを照会します。
scala> org.apache.spark.sql.typesをインポートします。{IntegerType, LongType, StringType, StructField, StructType, DoubleType} org.apache.spark.sql.typesをインポートします。{IntegerType, LongType, StringType, StructField, StructType, DoubleType} scala> org.apache.spark.sqlをインポートします。{行, SQLContext} org.apache.spark.sqlをインポートします。{行, SQLContext} scala> val sqlContext = spark.sqlContext sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext @ 74e2cfc5 scala> val textFile = sc.textFile("oss:// select-test-sz/data/lineitem.csv") textFile: org.apache.spark.rdd.RDD[String] = oss:// select-test-sz/data/lineitem.csv MapPartitionsRDD[1] at textFile at <console>:26 scala> val dataRdd = textFile.map(_.split('|')) dataRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:28 scala> val schema = StructType ( | リスト ( | StructField("L_ORDERKEY" 、LongType、true) 、 | StructField("L_PARTKEY" 、LongType、true) 、 | StructField("L_SUPPKEY" 、LongType、true) 、 | StructField("L_LINENUMBER",IntegerType,true), | StructField("L_QUANTITY",DoubleType,true), | StructField("L_EXTENDEDPRICE",DoubleType,true), | StructField("L_DISCOUNT",DoubleType,true), | StructField("L_TAX",DoubleType,true), | StructField("L_RETURNFLAG" 、StringType、true) 、 | StructField("L_LINESTATUS" 、StringType、true) 、 | StructField("L_SHIPDATE" 、StringType、true) 、 | StructField("L_COMMITDATE",StringType,true), | StructField("L_RECEIPTDATE" 、StringType、true) 、 | StructField("L_SHIPINSTRUCT",StringType,true), | StructField("L_SHIPMODE" 、StringType、true) 、 | StructField("L_COMMENT",StringType,true) | ) |) schema: org.apache.spark.sql.types.StructType = StructType(StructField(L_ORDERKEY,LongType,true), StructField(L_PARTKEY,LongType,true), StructField(L_SUPPKEY,LongType,true), StructField(L_LINENUMBER,IntegerType), StructleH, StructbleField, Discretfield, Unitfield, DiscretField, Unitfield, Unitle, Unidleフィールド (structField (L_TAX,DoubleType,true), StructField(L_RETURNFLAG,StringType,true), StructField(L_LINESTATUS,StringType,true) true)) scala> val dataRowRdd = dataRdd.map(p => Row(p(0).toLong, p(1).toLong, p(2).toLong, p(3).toInt, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15)) dataRowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30 scala> val dataFrame = sqlContext.createDataFrame(dataRowRdd、スキーマ) dataFrame: org.apache.spark.sql.DataFrame = [L_ORDERKEY: bigint, L_PARTKEY: bigint ... 14その他のフィールド] scala> dataFrame.createOrReplaceTempView("lineitem") scala> spark.sql count(*) as count_order from lineitem where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus ").show() ----------- ---------------------------- -------------------- -------------------- -------------------- -------------------- ------------------- ------------------- ------------- | l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disk | count_order | ----------- ---------------------------- -------------------- -------------------- -------------------- -------------------- ------------------- ------------------- ------------- | N | O | 7.5697385E7 | 1.135107538838699... | 1.078345555027154... | 1.121504616321447... | 25.501957856643052 | 38241.036487881756 | 0.04999335309103123 | 2968297 | ----------- ---------------------------- -------------------- -------------------- -------------------- -------------------- ------------------- ------------------- ------------- scala> sqlContext.sql (「com.aliyun.ossを使用して一時的なビューアイテムを作成」 + | "オプション (" + | "oss.bucket 'select-test-sz', " + | "oss.prefix 'data', " + | "oss.schema 'L_ORDERKEY long, L_PARTKEY long, L_SUPPKEY long, L_LINENUMBER int, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_RETURNFLAG文字列, L_LINESTATUS文字列, L_SHISTISTATE文字列, L_COMASTINSTRMISTATE文字列, 文字列, 文字列, 文字列 | "oss.data.format 'csv'," + // 現在はcsvのみをサポートしています | "oss.input.csv.header 'None'," + | "oss.input.csv.recordDelimiter '\n'," + | "oss.input.csv.fieldDelimiter '|' 、" + | "oss.input.csv.com mentChar '#'," + | "oss.input.csv.quoteChar '\"',"+ | "oss.output.csv.recordDelimiter '\n'," + | "oss.output.csv.fieldDelimiter ','," + | "oss.output.csv.com mentChar '#'," + | "oss.output.csv.quoteChar '\"',"+ | "oss.endpoint 'oss -cn-shenzhen.aliyuncs.com ', " + | "oss.accessKeyId 'あなたのアクセスキーID', " + | "oss.accessKeySecret 'Your Access Key Secret')") res2: org.apache.spark.sql.DataFrame = [] scala> sqlContext.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qglty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_discount,_g (_deg_quity), (_(_),_,_,_,_(_) count(*) as count_order from item where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show() scala> sqlContext.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qglty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_discount,_g (_deg_quity), (_(_),_,_,_,_(_) count(*) as count_order from item where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show() ----------- ---------------------------- -------------------- -------------------- -------------------- -------------------- ------------------- ------------------- ------------- | l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disk | count_order | ----------- ---------------------------- -------------------- -------------------- -------------------- -------------------- ------------------- ------------------- ------------- | N | O | 7.5697385E7 | 1.135107538838701E11 | 1.078345555027154... | 1.121504616321447... | 25.501957856643052 | 38241.03648788181 | 0.04999335309103024 | 2968297 | ----------- ----------------------------- -------------------- -------------------- -------------------- -------------------- ------------------- ------------------- ------------