全部產品
Search
文件中心

E-MapReduce:Spark Load

更新時間:Nov 12, 2024

Spark Load通過外部的Spark資源實現對匯入資料的預先處理,提高StarRocks巨量資料量的匯入效能並且節省StarRocks叢集的計算資源。Spark Load主要用於初次遷移、巨量資料量匯入StarRocks的情境(資料量可到TB層級)。本文為您介紹Spark Load匯入的基本概念、基本原理、使用樣本、最佳實務以及常見問題。

背景資訊

Spark Load是一種非同步匯入方式,您需要通過MySQL協議建立Spark類型匯入任務,並可以通過SHOW LOAD命令查看匯入結果。

說明

本文圖片和部分內容來源於開源StarRocks的Bulk load using Apache Spark

基本概念

  • Spark ETL:在匯入流程中主要負責資料的ETL工作,包括全域字典構建(BITMAP類型)、分區、排序和彙總等。

  • Broker:是一個獨立的無狀態進程。封裝了檔案系統介面,提供StarRocks讀取遠端儲存系統中檔案的能力。

  • 全域字典:儲存了資料從原始值到編碼值對應的資料結構,原始值可以是任意資料類型,而編碼後的值為整型。全域字典主要應用於精確去重預計算的情境。

基本原理

使用者通過MySQL用戶端提交Spark類型匯入任務,FE記錄中繼資料並返回使用者提交成功。

Spark Load的主要流程如下圖所示。Spark Load

Spark Load任務的執行主要分為以下幾個階段:

  1. 向FE提交Spark Load任務。

  2. FE調度提交ETL任務到Spark叢集執行。

  3. Spark叢集執行ETL完成對匯入資料的預先處理,包括全域字典構建(BITMAP類型)、分區、排序和彙總等。

  4. ETL任務完成後,FE擷取預先處理過的每個分區的資料路徑,並調度相關的BE執行Push任務。

  5. BE通過Broker讀取資料,轉化為StarRocks儲存格式。

  6. FE調度生效版本,完成匯入任務。

全域字典

適用情境

目前StarRocks中BITMAP列是使用類庫Roaringbitmap實現的,而Roaringbitmap的輸入資料類型只能是整型,因此如果要在匯入流程中實現對於BITMAP列的預計算,則需要將輸入資料的類型轉換成整型。在StarRocks現有的匯入流程中,全域字典的資料結構是基於Hive表實現的,儲存了原始值到編碼值的映射。

構建流程

  1. 讀取上遊資料來源的資料,產生一張Hive暫存資料表,記為hive-table。

  2. 從hive-table中抽取待去重欄位的去重值,產生一張新的Hive表,記為distinct-value-table。

  3. 建立一張全域字典表,記為dict-table。一列為原始值,一列為編碼後的值。

  4. 將distinct-value-table與dict-table進行LEFT JOIN,計算出新增的去重值集合,然後對這個集合使用視窗函數進行編碼,此時去重列原始值就多了一列編碼後的值,最後將這兩列的資料寫回dict-table。

  5. 將dict-table與hive-table進行JOIN,完成hive-table中原始值替換成整型編碼值的工作。

  6. hive-table會被下一步資料預先處理的流程所讀取,經過計算後匯入到StarRocks中。

資料預先處理

資料預先處理的基本流程如下:

  1. 從資料來源讀取資料,上遊資料來源可以是HDFS檔案,也可以是Hive表。

  2. 對讀取到的資料完成欄位對應、運算式計算,並根據分區資訊產生分桶欄位bucket-id。

  3. 根據StarRocks表的Rollup中繼資料產生RollupTree。

  4. 遍曆RollupTree,進行分層的彙總操作,下一個層級的Rollup可以由上一個層級的Rollup計算得來。

  5. 每次完成彙總計算後,會根據bucket-id對資料進行分桶然後寫入HDFS中。

  6. 後續Broker會拉取HDFS中的檔案然後匯入StarRocks BE節點中。

基本操作

配置ETL叢集

Spark作為一種外部計算資源在StarRocks中用來完成ETL工作,未來可能還有其他的外部資源會加入到StarRocks中使用。例如,Spark或GPU用於查詢,HDFS或S3用於外部儲存,MapReduce用於ETL等,因此引入Resource Management來管理StarRocks使用的這些外部資源。

提交Spark匯入任務之前,需要配置執行ETL任務的Spark叢集。操作文法如下所示。

-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
 type = spark,
 spark_conf_key = spark_conf_value,
 working_dir = path,
 broker = broker_name,
 broker.property_key = property_value
);

-- drop spark resource
DROP RESOURCE resource_name;

-- show resources
SHOW RESOURCES
SHOW PROC "/resources";

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
  • 建立資源

    resource-name為StarRocks中配置的Spark資源的名字。

    PROPERTIES是Spark資源的相關參數,參數描述如下表所示,更多參數描述請參見Spark Configuration

    參數

    描述

    type

    資源類型。必填參數,目前僅支援Spark。

    Spark相關參數

    spark.master

    必填參數,目前支援yarn。

    spark.submit.deployMode

    Spark程式的部署模式。必填參數,支援cluster和client兩種。

    spark.hadoop.fs.defaultFS

    Master為YARN時必填。

    spark.hadoop.yarn.resourcemanager.address

    單點Resource Manager地址。

    spark.hadoop.yarn.resourcemanager.ha.enabled

    Resource Manager啟用HA。預設值為true。

    spark.hadoop.yarn.resourcemanager.ha.rm-ids

    Resource Manager邏輯ID列表。

    spark.hadoop.yarn.resourcemanager.hostname.rm-id

    對於每個rm-id,指定Resource Manager對應的主機名稱。

    說明

    HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-idspark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。

    spark.hadoop.yarn.resourcemanager.address.rm-id

    對於每個rm-id,指定host:port以供用戶端提交作業。

    說明

    HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-idspark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。

    working_dir

    ETL使用的目錄。

    說明

    Spark作為ETL資源使用時必填。例如,hdfs://host:port/tmp/starrocks

    broker

    Broker名字。

    說明

    Spark作為ETL資源使用時必填。需要使用ALTER SYSTEM ADD BROKER命令提前完成配置。

    broker.property_key

    Broker讀取ETL產生中間檔案時需要指定的認證資訊等。

    建立資源樣本如下所示。

    -- yarn cluster模式
    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.jars" = "xxx.jar,yyy.jar",
        "spark.files" = "/tmp/aaa,/tmp/bbb",
        "spark.executor.memory" = "1g",
        "spark.yarn.queue" = "queue0",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker0",
        "broker.username" = "user0",
        "broker.password" = "password0"
    );
    
    -- yarn HA cluster模式
    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
        "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
        "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
        "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker1"
    );
    
    -- HDFS HA cluster模式
    CREATE EXTERNAL RESOURCE "spark2"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://myha",
        "spark.hadoop.dfs.nameservices" = "myha",
        "spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
        "working_dir" = "hdfs://myha/tmp/starrocks",
        "broker" = "broker2",
        "broker.dfs.nameservices" = "myha",
        "broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
  • 查看資源

    show resources;

    普通賬戶只能看到自己有USAGE-PRIV使用許可權的資源。root和admin賬戶可以看到所有的資源。

  • 資源許可權

    資源許可權通過GRANT REVOKE來管理,目前僅支援USAGE-PRIV使用許可權。您可以將USAGE-PRIV許可權賦予某個使用者或者某個角色,角色的使用與之前一致。樣本如下。

    -- 授予spark0資源的使用許可權給使用者user0
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
    
    -- 授予spark0資源的使用許可權給角色role0
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
    
    -- 授予所有資源的使用許可權給使用者user0
    GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
    
    -- 授予所有資源的使用許可權給角色role0
    GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
    
    -- 撤銷使用者user0的spark0資源使用許可權
    REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

配置Spark用戶端

FE底層通過執行spark-submit命令提交Spark任務,因此需要為FE配置Spark用戶端,建議使用官方2.4.5或以上版本的Spark 2.x,Spark下載地址下載完成後,請按照以下步驟完成配置:

  1. 配置SPARK-HOME環境變數

    將Spark用戶端放在FE同一台機器上的目錄下,並在FE的設定檔中配置spark_home_default_dir指向此目錄,此配置項的值預設為FE根目錄下的lib/spark2x路徑,此配置項不可為空白。

  2. 配置Spark依賴包

    將Spark用戶端下的jars檔案夾內所有JAR包歸檔打包成一個ZIP檔案,並在FE的設定檔中配置spark_resource_path指向此ZIP檔案。如果此配置項為空白,則FE會嘗試尋找FE根目錄下的lib/spark2x/jars/spark-2x.zip檔案,如果沒有找到則會報檔案不存在的錯誤。

    當提交Spark Load任務時,會將歸檔好的依賴檔案上傳至遠端倉庫,預設倉庫路徑掛在working_dir/{cluster_id}目錄下,並以--spark-repository--{resource-name}命名,表示叢集內的一個Resource對應一個遠端倉庫,遠端倉庫目錄結構參考如下。

    ---spark-repository--spark0/
       |---archive-1.0.0/
       |   |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar
       |   |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip
       |---archive-1.1.0/
       |   |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar
       |   |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip
       |---archive-1.2.0/
       |   |-...

除了Spark依賴(預設以spark-2x.zip命名),FE還會上傳DPP的依賴包至遠端倉庫。如果此次Spark Load提交的所有依賴檔案都已存在遠端倉庫,則不需要再上傳依賴,節省下了每次重複上傳大量檔案的時間。

配置YARN用戶端

FE底層通過YARN命令擷取正在啟動並執行Application的狀態,以及終止Application,因此需要為FE配置YARN用戶端,建議使用官方2.5.2或以上版本的Hadoop 2.x。Hadoop下載地址,下載完成後,請按照以下步驟完成配置:

  1. 配置YARN可執行檔路徑

    將下載好的YARN用戶端放在FE同一台機器的目錄下,並在FE設定檔中配置yarn_client_path參數,指向YARN的二進位可執行檔,預設為FE根目錄下的lib/yarn-client/hadoop/bin/yarn路徑。

  2. 配置產生YARN所需的設定檔的路徑(可選)

    當FE通過YARN用戶端擷取Application的狀態,或者終止Application時,預設會在FE根目錄下的lib/yarn-config路徑下產生執行yarn命令所需的設定檔,此路徑可以通過在FE設定檔配置yarn_config_dir參數修改,目前產生的設定檔包括core-site.xmlyarn-site.xml

建立匯入任務

  • 建立文法

    LOAD LABEL load_label
        (data_desc, ...)
    WITH RESOURCE resource_name
    [resource_properties]
    [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
     (key2=value2, ...)

    建立匯入的詳細文法可以執行HELP SPARK LOAD命令查看協助。Spark Load的建立匯入文法中參數意義如下:

    • Label

      匯入任務的標識。每個匯入任務,都有一個在單DataBase內部唯一的Label。具體規則與Broker Load一致。

    • 資料描述類參數

      目前支援的資料來源有CSV和Hive table。其他規則與Broker Load一致。

    • 匯入作業參數

      匯入作業參數主要指的是Spark Load建立匯入語句中的屬於opt_properties部分的參數。匯入作業參數是作用於整個匯入作業的。規則與Broker Load一致。

    • Spark資源參數

      Spark資源需要提前配置到StarRocks系統中並且賦予使用者USAGE-PRIV許可權後才能使用Spark Load。當您有臨時性的需求,例如增加任務使用的資源而修改Spark configs時,可以設定以下參數,設定僅對本次任務生效,並不影響StarRocks叢集中已有的配置。

      WITH RESOURCE 'spark0'
      (
         "spark.driver.memory" = "1g",
         "spark.executor.memory" = "3g"
      )
    • 資料來源為Hive表時的匯入

      如果期望在匯入流程中將Hive表作為資料來源,則需要先建立一張類型為Hive的外部表格,然後提交匯入命令時指定外部表格的表名即可。

    • 匯入流程構建全域字典

      適用於StarRocks表彙總列的資料類型為BITMAP類型。在Load命令中指定需要構建全域字典的欄位即可,格式為StarRocks欄位名稱=bitmap_dict(hive表欄位名稱)

      重要

      目前只有在上遊資料來源為Hive表時才支援全域字典的構建。

  • 樣本:

    • 上遊資料來源為HDFS檔案時建立匯入任務的情況

      LOAD LABEL db1.label1 
      (
          DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/starRocks/test/ml/file1")
          INTO TABLE tbl1
          COLUMNS TERMINATED BY ","
          (tmp_c1,tmp_c2)
          SET
          (
              id=tmp_c2,
              name=tmp_c1
          ),
          DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/starRocks/test/ml/file2")
          INTO TABLE tbl2
          COLUMNS TERMINATED BY ","
          (col1, col2)
          where col1 > 1
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
    • 上遊資料來源是Hive表時建立匯入任務的情況

      1. 建立Hive資源。

        CREATE EXTERNAL RESOURCE hive0
        properties
        (
            "type" = "hive",
            "hive.metastore.uris" = "thrift://emr-header-1.cluster-xxx:9083"
        );
      2. 建立Hive外部表格。

        CREATE EXTERNAL TABLE hive_t1
        (
            k1 INT,
            K2 SMALLINT,
            k3 varchar(50),
            uuid varchar(100)
        )
        ENGINE=hive
        properties
        (
            "resource" = "hive0",
            "database" = "tmp",
            "table" = "t1"
        );
      3. 提交load命令,要求匯入的StarRocks表中的列必須在Hive外部表格中存在。

        LOAD LABEL db1.label1
        (
            DATA FROM TABLE hive_t1
            INTO TABLE tbl1
            SET
            (
                uuid=bitmap_dict(uuid)
            )
        )
        WITH RESOURCE 'spark0'
        (
            "spark.executor.memory" = "2g",
            "spark.shuffle.compress" = "true"
        )
        PROPERTIES
        (
            "timeout" = "3600"
        );

查看匯入任務

Spark Load和Broker Load都是非同步匯入方式。您必須將建立匯入的Label記錄下來,並且在SHOW LOAD命令中使用此Label來查看匯入結果。查看匯入的命令在所有匯入方式中是通用的,具體文法可執行HELP SHOW LOAD命令查看。

執行以下命令,查看匯入任務。

show load order by createtime desc limit 1\G

返回資訊如下。

 *************************** 1. row ***************************
  JobId: 76391
  Label: label1
  State: FINISHED
 Progress: ETL:100%; LOAD:100%
  Type: SPARK
 EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
 TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
 ErrorMsg: N/A
 CreateTime: 2019-07-27 11:46:42
 EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
  URL: http://1.1.*.*:8089/proxy/application_1586619723848_0035/
 JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}

返回結果中涉及到的參數如下表所示。

參數

描述

State

匯入任務當前所處的階段。

任務提交之後狀態為PENDING,提交Spark ETL之後狀態變為ETL,ETL完成之後FE調度BE執行push操作,狀態變為LOADING,push完成並且版本生效後狀態變為FINISHED。

匯入任務的最終階段有CANCELLED和FINISHED兩個狀態,當Load Job處於這兩個階段時匯入完成。其中CANCELLED為匯入失敗,FINISHED為匯入成功。

Progress

匯入任務的進度描述。包括ETL和LOAD兩種進度,對應了匯入流程的ETL和LOADING兩個階段。

LOAD的進度範圍為0~100%。

LOAD進度 = 當前已完成所有replica匯入的tablet個數 / 本次匯入任務的總tablet個數* 100%
說明
  • 如果所有匯入表均完成匯入,此時LOAD的進度為99%,匯入進入到最後生效階段,整個匯入完成後,LOAD的進度才會變為100%。

  • 因為匯入進度並不是線性,所以如果一段時間內進度沒有變化,並不代表匯入沒有在執行。

Type

匯入任務的類型。Spark Load為SPARK。

CreateTime

匯入任務的建立時間。

EtlStartTime

ETL階段開始的時間。

EtlFinishTime

ETL階段完成的時間。

LoadStartTime

LOADING階段開始的時間。

LoadFinishTime

整個匯入任務完成的時間。

JobDetails

顯示作業的詳細運行狀態,包括匯入檔案的個數、總大小(位元組)、子任務個數、已處理的原始行數等。樣本如下。

 {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}

URL

可以複製輸入到瀏覽器,跳轉至相應Application的Web頁面。

其餘返回結果集中參數含義可以參見Broker Load,詳情請參見Broker Load

查看Spark Launcher提交日誌

Spark任務提交過程中產生的詳細日誌,日誌預設儲存在FE根目錄下log/spark_launcher_log路徑下,並以spark-launcher-{load-job-id}-{label}.log格式命名,日誌會在此目錄下儲存一段時間,當FE中繼資料中的匯入資訊被清理時,相應的日誌也會被清理,預設儲存時間為3天。

取消匯入任務

當Spark Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消匯入任務的Label。取消匯入命令文法可以執行HELP CANCEL LOAD命令查看。

相關係統配置

以下配置屬於Spark Load的系統層級配置,也就是作用於所有Spark Load匯入任務的配置,主要通過修改fe.conf來調整配置值。

參數

描述

enable-spark-load

開啟Spark Load和建立Resource功能。

預設值為false,表示關閉此功能。

spark-load-default-timeout-second

任務預設逾時時間。

預設值為259200秒(3天)。

spark-home-default-dir

Spark用戶端路徑。

預設值為fe/lib/spark2x

spark-launcher-log-dir

打包好的Spark依賴檔案路徑。

預設值為空白。

spark-launcher-log-dir

Spark用戶端的提交日誌存放的目錄。

預設值為fe/log/spark-launcher-log

yarn-client-path

YARN二進位可執行檔路徑。

預設值為fe/lib/yarn-client/hadoop/bin/yarn

yarn-config-dir

YARN設定檔產生路徑。

預設值為fe/lib/yarn-config

最佳實務

使用Spark Load最適合的情境是未經處理資料在檔案系統(HDFS)中,資料量在幾十GB到TB層級。小資料量還是建議使用Stream Load或者Broker Load。

完整Spark Load匯入樣本,請參見03_sparkLoad2StarRocks.md