Spark Load通過外部的Spark資源實現對匯入資料的預先處理,提高StarRocks巨量資料量的匯入效能並且節省StarRocks叢集的計算資源。Spark Load主要用於初次遷移、巨量資料量匯入StarRocks的情境(資料量可到TB層級)。本文為您介紹Spark Load匯入的基本概念、基本原理、使用樣本、最佳實務以及常見問題。
背景資訊
Spark Load是一種非同步匯入方式,您需要通過MySQL協議建立Spark類型匯入任務,並可以通過SHOW LOAD
命令查看匯入結果。
本文圖片和部分內容來源於開源StarRocks的Bulk load using Apache Spark。
基本概念
Spark ETL:在匯入流程中主要負責資料的ETL工作,包括全域字典構建(BITMAP類型)、分區、排序和彙總等。
Broker:是一個獨立的無狀態進程。封裝了檔案系統介面,提供StarRocks讀取遠端儲存系統中檔案的能力。
全域字典:儲存了資料從原始值到編碼值對應的資料結構,原始值可以是任意資料類型,而編碼後的值為整型。全域字典主要應用於精確去重預計算的情境。
基本原理
使用者通過MySQL用戶端提交Spark類型匯入任務,FE記錄中繼資料並返回使用者提交成功。
Spark Load的主要流程如下圖所示。
Spark Load任務的執行主要分為以下幾個階段:
向FE提交Spark Load任務。
FE調度提交ETL任務到Spark叢集執行。
Spark叢集執行ETL完成對匯入資料的預先處理,包括全域字典構建(BITMAP類型)、分區、排序和彙總等。
ETL任務完成後,FE擷取預先處理過的每個分區的資料路徑,並調度相關的BE執行Push任務。
BE通過Broker讀取資料,轉化為StarRocks儲存格式。
FE調度生效版本,完成匯入任務。
全域字典
適用情境
目前StarRocks中BITMAP列是使用類庫Roaringbitmap實現的,而Roaringbitmap的輸入資料類型只能是整型,因此如果要在匯入流程中實現對於BITMAP列的預計算,則需要將輸入資料的類型轉換成整型。在StarRocks現有的匯入流程中,全域字典的資料結構是基於Hive表實現的,儲存了原始值到編碼值的映射。
構建流程
讀取上遊資料來源的資料,產生一張Hive暫存資料表,記為hive-table。
從hive-table中抽取待去重欄位的去重值,產生一張新的Hive表,記為distinct-value-table。
建立一張全域字典表,記為dict-table。一列為原始值,一列為編碼後的值。
將distinct-value-table與dict-table進行LEFT JOIN,計算出新增的去重值集合,然後對這個集合使用視窗函數進行編碼,此時去重列原始值就多了一列編碼後的值,最後將這兩列的資料寫回dict-table。
將dict-table與hive-table進行JOIN,完成hive-table中原始值替換成整型編碼值的工作。
hive-table會被下一步資料預先處理的流程所讀取,經過計算後匯入到StarRocks中。
資料預先處理
資料預先處理的基本流程如下:
從資料來源讀取資料,上遊資料來源可以是HDFS檔案,也可以是Hive表。
對讀取到的資料完成欄位對應、運算式計算,並根據分區資訊產生分桶欄位bucket-id。
根據StarRocks表的Rollup中繼資料產生RollupTree。
遍曆RollupTree,進行分層的彙總操作,下一個層級的Rollup可以由上一個層級的Rollup計算得來。
每次完成彙總計算後,會根據bucket-id對資料進行分桶然後寫入HDFS中。
後續Broker會拉取HDFS中的檔案然後匯入StarRocks BE節點中。
基本操作
配置ETL叢集
Spark作為一種外部計算資源在StarRocks中用來完成ETL工作,未來可能還有其他的外部資源會加入到StarRocks中使用。例如,Spark或GPU用於查詢,HDFS或S3用於外部儲存,MapReduce用於ETL等,因此引入Resource Management來管理StarRocks使用的這些外部資源。
提交Spark匯入任務之前,需要配置執行ETL任務的Spark叢集。操作文法如下所示。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value
);
-- drop spark resource
DROP RESOURCE resource_name;
-- show resources
SHOW RESOURCES
SHOW PROC "/resources";
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
建立資源
resource-name為StarRocks中配置的Spark資源的名字。
PROPERTIES是Spark資源的相關參數,參數描述如下表所示,更多參數描述請參見Spark Configuration。
參數
描述
type
資源類型。必填參數,目前僅支援Spark。
Spark相關參數
spark.master
必填參數,目前支援yarn。
spark.submit.deployMode
Spark程式的部署模式。必填參數,支援cluster和client兩種。
spark.hadoop.fs.defaultFS
Master為YARN時必填。
spark.hadoop.yarn.resourcemanager.address
單點Resource Manager地址。
spark.hadoop.yarn.resourcemanager.ha.enabled
Resource Manager啟用HA。預設值為true。
spark.hadoop.yarn.resourcemanager.ha.rm-ids
Resource Manager邏輯ID列表。
spark.hadoop.yarn.resourcemanager.hostname.rm-id
對於每個rm-id,指定Resource Manager對應的主機名稱。
說明HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-id或spark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。
spark.hadoop.yarn.resourcemanager.address.rm-id
對於每個rm-id,指定host:port以供用戶端提交作業。
說明HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-id或spark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。
working_dir
ETL使用的目錄。
說明Spark作為ETL資源使用時必填。例如,hdfs://host:port/tmp/starrocks。
broker
Broker名字。
說明Spark作為ETL資源使用時必填。需要使用
ALTER SYSTEM ADD BROKER
命令提前完成配置。broker.property_key
Broker讀取ETL產生中間檔案時需要指定的認證資訊等。
建立資源樣本如下所示。
-- yarn cluster模式 CREATE EXTERNAL RESOURCE "spark0" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.jars" = "xxx.jar,yyy.jar", "spark.files" = "/tmp/aaa,/tmp/bbb", "spark.executor.memory" = "1g", "spark.yarn.queue" = "queue0", "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032", "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000", "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks", "broker" = "broker0", "broker.username" = "user0", "broker.password" = "password0" ); -- yarn HA cluster模式 CREATE EXTERNAL RESOURCE "spark1" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true", "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2", "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1", "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2", "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000", "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks", "broker" = "broker1" ); -- HDFS HA cluster模式 CREATE EXTERNAL RESOURCE "spark2" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032", "spark.hadoop.fs.defaultFS" = "hdfs://myha", "spark.hadoop.dfs.nameservices" = "myha", "spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2", "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port", "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port", "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", "working_dir" = "hdfs://myha/tmp/starrocks", "broker" = "broker2", "broker.dfs.nameservices" = "myha", "broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2", "broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port", "broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port", "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );
查看資源
show resources;
普通賬戶只能看到自己有USAGE-PRIV使用許可權的資源。root和admin賬戶可以看到所有的資源。
資源許可權
資源許可權通過GRANT REVOKE來管理,目前僅支援USAGE-PRIV使用許可權。您可以將USAGE-PRIV許可權賦予某個使用者或者某個角色,角色的使用與之前一致。樣本如下。
-- 授予spark0資源的使用許可權給使用者user0 GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%"; -- 授予spark0資源的使用許可權給角色role0 GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0"; -- 授予所有資源的使用許可權給使用者user0 GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%"; -- 授予所有資源的使用許可權給角色role0 GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0"; -- 撤銷使用者user0的spark0資源使用許可權 REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";
配置Spark用戶端
FE底層通過執行spark-submit命令提交Spark任務,因此需要為FE配置Spark用戶端,建議使用官方2.4.5或以上版本的Spark 2.x,Spark下載地址下載完成後,請按照以下步驟完成配置:
配置SPARK-HOME環境變數
將Spark用戶端放在FE同一台機器上的目錄下,並在FE的設定檔中配置spark_home_default_dir指向此目錄,此配置項的值預設為FE根目錄下的lib/spark2x路徑,此配置項不可為空白。
配置Spark依賴包
將Spark用戶端下的jars檔案夾內所有JAR包歸檔打包成一個ZIP檔案,並在FE的設定檔中配置spark_resource_path指向此ZIP檔案。如果此配置項為空白,則FE會嘗試尋找FE根目錄下的lib/spark2x/jars/spark-2x.zip檔案,如果沒有找到則會報檔案不存在的錯誤。
當提交Spark Load任務時,會將歸檔好的依賴檔案上傳至遠端倉庫,預設倉庫路徑掛在working_dir/{cluster_id}目錄下,並以
--spark-repository--{resource-name}
命名,表示叢集內的一個Resource對應一個遠端倉庫,遠端倉庫目錄結構參考如下。---spark-repository--spark0/ |---archive-1.0.0/ | |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar | |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip |---archive-1.1.0/ | |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar | |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip |---archive-1.2.0/ | |-...
除了Spark依賴(預設以spark-2x.zip命名),FE還會上傳DPP的依賴包至遠端倉庫。如果此次Spark Load提交的所有依賴檔案都已存在遠端倉庫,則不需要再上傳依賴,節省下了每次重複上傳大量檔案的時間。
配置YARN用戶端
FE底層通過YARN命令擷取正在啟動並執行Application的狀態,以及終止Application,因此需要為FE配置YARN用戶端,建議使用官方2.5.2或以上版本的Hadoop 2.x。Hadoop下載地址,下載完成後,請按照以下步驟完成配置:
配置YARN可執行檔路徑
將下載好的YARN用戶端放在FE同一台機器的目錄下,並在FE設定檔中配置yarn_client_path參數,指向YARN的二進位可執行檔,預設為FE根目錄下的lib/yarn-client/hadoop/bin/yarn路徑。
配置產生YARN所需的設定檔的路徑(可選)
當FE通過YARN用戶端擷取Application的狀態,或者終止Application時,預設會在FE根目錄下的lib/yarn-config路徑下產生執行yarn命令所需的設定檔,此路徑可以通過在FE設定檔配置yarn_config_dir參數修改,目前產生的設定檔包括core-site.xml和yarn-site.xml。
建立匯入任務
建立文法
LOAD LABEL load_label (data_desc, ...) WITH RESOURCE resource_name [resource_properties] [PROPERTIES (key1=value1, ... )] * load_label: db_name.label_name * data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY separator ] [(col1, ...)] [COLUMNS FROM PATH AS (col2, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] DATA FROM TABLE hive_external_tbl [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] * resource_properties: (key2=value2, ...)
建立匯入的詳細文法可以執行
HELP SPARK LOAD
命令查看協助。Spark Load的建立匯入文法中參數意義如下:Label
匯入任務的標識。每個匯入任務,都有一個在單DataBase內部唯一的Label。具體規則與Broker Load一致。
資料描述類參數
目前支援的資料來源有CSV和Hive table。其他規則與Broker Load一致。
匯入作業參數
匯入作業參數主要指的是Spark Load建立匯入語句中的屬於opt_properties部分的參數。匯入作業參數是作用於整個匯入作業的。規則與Broker Load一致。
Spark資源參數
Spark資源需要提前配置到StarRocks系統中並且賦予使用者USAGE-PRIV許可權後才能使用Spark Load。當您有臨時性的需求,例如增加任務使用的資源而修改Spark configs時,可以設定以下參數,設定僅對本次任務生效,並不影響StarRocks叢集中已有的配置。
WITH RESOURCE 'spark0' ( "spark.driver.memory" = "1g", "spark.executor.memory" = "3g" )
資料來源為Hive表時的匯入
如果期望在匯入流程中將Hive表作為資料來源,則需要先建立一張類型為Hive的外部表格,然後提交匯入命令時指定外部表格的表名即可。
匯入流程構建全域字典
適用於StarRocks表彙總列的資料類型為BITMAP類型。在Load命令中指定需要構建全域字典的欄位即可,格式為
StarRocks欄位名稱=bitmap_dict(hive表欄位名稱)
。重要目前只有在上遊資料來源為Hive表時才支援全域字典的構建。
樣本:
上遊資料來源為HDFS檔案時建立匯入任務的情況
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/starRocks/test/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1,tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/starRocks/test/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
上遊資料來源是Hive表時建立匯入任務的情況
建立Hive資源。
CREATE EXTERNAL RESOURCE hive0 properties ( "type" = "hive", "hive.metastore.uris" = "thrift://emr-header-1.cluster-xxx:9083" );
建立Hive外部表格。
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) ) ENGINE=hive properties ( "resource" = "hive0", "database" = "tmp", "table" = "t1" );
提交load命令,要求匯入的StarRocks表中的列必須在Hive外部表格中存在。
LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=bitmap_dict(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
查看匯入任務
Spark Load和Broker Load都是非同步匯入方式。您必須將建立匯入的Label記錄下來,並且在SHOW LOAD
命令中使用此Label來查看匯入結果。查看匯入的命令在所有匯入方式中是通用的,具體文法可執行HELP SHOW LOAD
命令查看。
執行以下命令,查看匯入任務。
show load order by createtime desc limit 1\G
返回資訊如下。
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.*.*:8089/proxy/application_1586619723848_0035/
JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
返回結果中涉及到的參數如下表所示。
參數 | 描述 |
State | 匯入任務當前所處的階段。 任務提交之後狀態為PENDING,提交Spark ETL之後狀態變為ETL,ETL完成之後FE調度BE執行push操作,狀態變為LOADING,push完成並且版本生效後狀態變為FINISHED。 匯入任務的最終階段有CANCELLED和FINISHED兩個狀態,當Load Job處於這兩個階段時匯入完成。其中CANCELLED為匯入失敗,FINISHED為匯入成功。 |
Progress | 匯入任務的進度描述。包括ETL和LOAD兩種進度,對應了匯入流程的ETL和LOADING兩個階段。 LOAD的進度範圍為0~100%。
說明
|
Type | 匯入任務的類型。Spark Load為SPARK。 |
CreateTime | 匯入任務的建立時間。 |
EtlStartTime | ETL階段開始的時間。 |
EtlFinishTime | ETL階段完成的時間。 |
LoadStartTime | LOADING階段開始的時間。 |
LoadFinishTime | 整個匯入任務完成的時間。 |
JobDetails | 顯示作業的詳細運行狀態,包括匯入檔案的個數、總大小(位元組)、子任務個數、已處理的原始行數等。樣本如下。
|
URL | 可以複製輸入到瀏覽器,跳轉至相應Application的Web頁面。 |
其餘返回結果集中參數含義可以參見Broker Load,詳情請參見Broker Load。
查看Spark Launcher提交日誌
Spark任務提交過程中產生的詳細日誌,日誌預設儲存在FE根目錄下log/spark_launcher_log路徑下,並以spark-launcher-{load-job-id}-{label}.log格式命名,日誌會在此目錄下儲存一段時間,當FE中繼資料中的匯入資訊被清理時,相應的日誌也會被清理,預設儲存時間為3天。
取消匯入任務
當Spark Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消匯入任務的Label。取消匯入命令文法可以執行HELP CANCEL LOAD
命令查看。
相關係統配置
以下配置屬於Spark Load的系統層級配置,也就是作用於所有Spark Load匯入任務的配置,主要通過修改fe.conf來調整配置值。
參數 | 描述 |
enable-spark-load | 開啟Spark Load和建立Resource功能。 預設值為false,表示關閉此功能。 |
spark-load-default-timeout-second | 任務預設逾時時間。 預設值為259200秒(3天)。 |
spark-home-default-dir | Spark用戶端路徑。 預設值為fe/lib/spark2x。 |
spark-launcher-log-dir | 打包好的Spark依賴檔案路徑。 預設值為空白。 |
spark-launcher-log-dir | Spark用戶端的提交日誌存放的目錄。 預設值為fe/log/spark-launcher-log。 |
yarn-client-path | YARN二進位可執行檔路徑。 預設值為fe/lib/yarn-client/hadoop/bin/yarn。 |
yarn-config-dir | YARN設定檔產生路徑。 預設值為fe/lib/yarn-config。 |
最佳實務
使用Spark Load最適合的情境是未經處理資料在檔案系統(HDFS)中,資料量在幾十GB到TB層級。小資料量還是建議使用Stream Load或者Broker Load。
完整Spark Load匯入樣本,請參見03_sparkLoad2StarRocks.md。