全部產品
Search
文件中心

E-MapReduce:Broker Load

更新時間:Jul 01, 2024

在Broker Load模式下,通過部署的Broker程式,StarRocks可讀取對應資料來源(例如,Apache HDFS,阿里雲OSS)上的資料,利用自身的計算資源對資料進行預先處理和匯入。本文為您介紹Broker Load匯入的使用樣本以及常見問題。

背景資訊

Broker Load是一種非同步匯入方式。您需要通過MySQL協議建立匯入,並通過查看匯入命令檢查匯入結果。StarRocks支援從外部儲存系統匯入資料,支援CSV、ORCFile和Parquet等檔案格式,建議單次匯入資料量在幾十GB到上百GB層級。

Broker Load匯入

查看Broker執行個體

阿里雲EMR StarRocks叢集在建立時已經自動搭建並啟動Broker服務,Broker服務位於每個Core節點上。使用以下SQL命令可以查看Broker執行個體。

SHOW PROC "/brokers"\G

返回資訊如下所示。

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 2. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 3. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 4. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
4 rows in set (0.00 sec)

建立匯入任務

  • 文法

    StarRocks版本小於2.5.8

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_name broker_properties
        [PROPERTIES (key1=value1, ... )]

    StarRocks版本大於等於2.5.8

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_properties
        [PROPERTIES (key1=value1, ... )]
  • 參數描述

    執行HELP BROKER LOAD命令,可以查看建立匯入作業的詳細文法。

    • Label

      匯入任務的標識。每個匯入任務都有一個唯一的Label。Label是您在匯入命令中自訂的或系統自動產生的名稱。通過該Label,您可以查看對應匯入任務的執行情況,並且Label可以用來防止匯入相同的資料。當匯入任務狀態為FINISHED時,對應的Label就不能再次使用了。當Label對應的匯入任務狀態為CANCELLED時,可以再次使用該Label提交匯入作業。

    • 資料描述類data_desc

      資料描述類參數,主要指的是語句中data_desc部分的參數。每組data_desc表述了本次匯入涉及到的資料來源地址、ETL函數,目標表及分區等資訊。

      Broker Load支援一次匯入任務涉及多張表,每個Broker Load匯入任務可通過多個data_desc聲明多張表來實現多表匯入。每個單獨的data_desc可以指定屬於該表的資料來源地址,可以用多個file_path來指定匯入同一個表的多個檔案。Broker Load保證了單次匯入的多張表之間原子性成功或失敗。data_desc常見參數如下所示。

      data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY column_separator ]
          [FORMAT AS file_type]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (colx, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]

      相關參數描述如下表所示。

      參數

      描述

      file_path

      檔案路徑可以指定到檔案,也可以用星號(*)萬用字元指定某個目錄下的所有檔案。中間的目錄也可以使用萬用字元匹配。

      可以使用的萬用字元有? * [] {} ^,使用規則請參見FileSystem

      例如, 通過hdfs://hdfs_host:hdfs_port/user/data/tablename// , 可以匹配tablename下所有分區內的所有檔案。通過 hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/ , 可以匹配tablename下4月分區的所有檔案。

      negative

      設定資料取反匯入。

      該功能適用的情境是當資料表中彙總列的類型均為SUM類型時,如果希望撤銷某一批匯入的資料,可以通過negative參數匯入同一批資料,StarRocks會自動為這批資料在彙總列上資料取反,以達到消除同一批資料的功能。

      partition

      指定待匯入表的Partition資訊。

      如果待匯入資料不屬於指定的Partition,則不會被匯入。同時,不指定Partition中的資料會被認為是“錯誤資料”。對於不想匯入,也不想記錄為“錯誤資料”的資料,可以使用where predicate來過濾。

      column_separator

      COLUMNS TERMINATED BY column_separator ,用於指定匯入檔案中的資料行分隔符號,預設為\t。

      如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如,Hive檔案的分隔字元為\x01,則資料行分隔符號為\\x01。

      file_type

      FORMAT AS file_type,用於指定匯入檔案的類型。例如,parquetorccsv,預設值為csv

      parquet類型也可以通過檔案尾碼名.parquet或者.parq判斷。

      COLUMNS FROM PATH AS

      提取檔案路徑中的分區欄位。

      例如,匯入檔案為/path/col_name=col_value/dt=20210101/file1,其中col_name/dt為表中的列,則將col_value、20210101分別匯入到col_name和dt對應的列的程式碼範例如下。

      (col1, col2)
      COLUMNS FROM PATH AS (col_name, dt)

      set column mapping

      SET (k1=f1(xx), k2=f2(xx)),data_desc中的SET語句負責設定列函數變換。

      如果未經處理資料的列和表中的列不一一對應,則需要使用該屬性。

      where predicate

      WHERE predicate,data_desc中的WHERE語句負責過濾已經完成transform的資料。

      被過濾的資料不會進入容忍率的統計中。如果多個data_desc中聲明了關於同一張表的多個條件,則會以AND語義合并這些條件。

    • 匯入作業參數

      匯入作業參數是指Broker Load建立匯入語句中屬於broker_properties部分的參數。匯入作業參數是作用於整個匯入作業的。

      broker_properties:
          (key2=value2, ...)

      部分參數描述如下表所示。

      參數

      描述

      timeout

      匯入作業的逾時時間(以秒為單位)。

      您可以在opt_properties中自行設定每個匯入的逾時時間。匯入任務在設定的時限內未完成則會被系統取消,變為CANCELLED。Broker Load的預設匯入逾時時間為4小時。

      重要

      通常情況下,不需要您手動設定匯入任務的逾時時間。當在預設逾時時間內無法完成匯入時,可以手動設定任務的逾時時間。

      推薦逾時時間的計算方式為:逾時時間 >((總檔案大小 (MB)* 待匯入的表及相關Roll up表的個數) / (30 * 匯入並發數))

      公式中的30為目前BE匯入的平均速度,表示30 MB/s。例如,如果待匯入資料檔案為1 GB,待匯入表包含2個Rollup表,當前的匯入並發數為3,則timeout的最小值為 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。

      由於每個StarRocks叢集的機器環境不同且叢集並發的查詢任務也不同,所以StarRocks叢集的最慢匯入速度需要您根據歷史的匯入任務速度進行推測。

      max_filter_ratio

      匯入任務的最大容忍率,預設為0容忍,取值範圍是0~1。當匯入的錯誤率超過該值,則匯入失敗。如果您希望忽略錯誤的行,可以設定該參數值大於0,來保證匯入可以成功。

      計算公式為:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

      其中,dpp.abnorm.ALL表示資料品質不合格的行數,例如類型不符、列數不匹配和長度不匹配等。dpp.abnorm.ALL指的是匯入處理程序中正確資料的條數,可以通過SHOW LOAD命令查詢匯入任務的正確資料量。

      原始檔案的行數 = dpp.abnorm.ALL + dpp.norm.ALL

      load_mem_limit

      匯入記憶體限制。預設值為0,表示不限制。

      strict_mode

      Broker Load匯入可以開啟Strict Mode模式。開啟方式為properties ("strict_mode" = "true")

      預設關閉。

      Strict Mode模式是對於匯入處理程序中的列類型轉換進行嚴格過濾。嚴格過濾的策略為,對於列類型轉換,如果Strict Mode為true,則錯誤的資料將被過濾掉。錯誤資料是指未經處理資料並不為空白值,在參與列類型轉換後結果為空白值的資料。但以下情境除外:

      • 對於匯入的某列由函數變換產生時,Strict Mode對其不產生影響。

      • 對於匯入的某列類型包含範圍限制的,如果未經處理資料能正常通過類型轉換,但無法通過範圍限制的,Strict Mode對其也不產生影響。例如,如果類型是decimal(1,0),未經處理資料為10,則屬於可以通過類型轉換但不在列聲明的範圍內,Strict Mode對其不產生影響。

  • 建立阿里雲OSS匯入任務樣本

    重要
    • 在阿里雲EMR StarRocks上使用broker作為Broker名稱即可。

    • 如果您的StarRocks版本小於2.5.8,則可以按照以下代碼建立匯入樣本;如果您的StarRocks版本大於等於2.5.8,則不添加WITH BROKER broker部分內容。

    StarRocks版本小於2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
    );

    StarRocks版本大於等於2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )

查看匯入任務狀態

Broker Load匯入是非同步,您可以在SHOW LOAD命令中指定Label來查詢對應匯入作業的執行狀態。具體文法可執行HELP SHOW LOAD命令查看。

重要

SHOW LOAD命令只能查看非同步匯入方式的LOAD任務。同步方式的LOAD任務,例如Stream Load任務,目前無法使用SHOW LOAD命令查看。

查看匯入任務狀態樣本如下。

show load where label = 'label1'\G
*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

返回參數的描述如下表所示。

參數

描述

JobId

匯入任務的唯一ID,每個匯入任務的JobId都不同,由系統自動產生。與Label不同的是,JobId永遠不會相同,而Label則可以在匯入任務失敗後被複用。

Label

匯入任務的標識。

State

匯入任務當前所處的階段。

  • PENDING:表示當前置入任務正在等待被執行。

  • LOADING:表示正在執行中。

  • CANCELLED:表示匯入失敗。

  • FINISHED:表示匯入成功。

Progress

匯入任務的進度描述。分為ETL和LOAD兩種進度,分別對應匯入流程的ETL和LOADING兩個階段。目前Broker Load只有LOADING階段,所以ETL固定顯示為N/A,而LOAD的進度範圍為0~100%。

LOAD的進度的計算公式為LOAD進度 = 當前完成匯入的表個數 / 本次匯入任務設計的總表個數 * 100%

如果所有匯入表均完成匯入,此時LOAD的進度為99%, 匯入進入到最後生效階段,待整個匯入任務完成後,LOAD的進度才會改為100%。

重要

匯入進度並不是線性,所以如果一段時間內進度沒有變化,並不代表匯入任務沒有執行。

Type

匯入任務的類型。Broker Load的Type取值是BROKER。

EtlInfo

主要顯示匯入的資料量指標unselected.rowsdpp.norm.ALLdpp.abnorm.ALL

您可以根據unselected.rows的參數值判斷where條件過濾了多少行,根據dpp.norm.ALLdpp.abnorm.ALL兩個指標可以驗證當前置入任務的錯誤率是否超過max-filter-ratio。三個指標之和就是未經處理資料量的總行數。

TaskInfo

主要顯示當前置入任務參數,即建立Broker Load匯入任務時您指定的參數,包括cluster,timeout和max-filter-ratio。

ErrorMsg

如果匯入任務狀態為CANCELLED,則顯示失敗的原因,包括type和msg兩部分。如果匯入任務成功則顯示N/A。type的取值意義如下:

  • USER-CANCEL:取消的任務。

  • ETL-RUN-FAIL:在ETL階段失敗的匯入任務。

  • ETL-QUALITY-UNSATISFIED:資料品質不合格,即錯誤資料率超過了max-filter-ratio。

  • LOAD-RUN-FAIL:在LOADING階段失敗的匯入任務。

  • TIMEOUT:沒在逾時時間內完成的匯入任務。

  • UNKNOWN:未知的匯入錯誤。

CreateTime

分別表示匯入建立的時間、ETL階段開始的時間、ETL階段完成的時間、LOADING階段開始的時間和整個匯入任務完成的時間。

  • 由於Broker Load匯入沒有ETL階段,所以EtlStartTimeEtlFinishTimeLoadStartTime被設定為同一個值。

  • 如果匯入任務長時間停留在CreateTime,而LoadStartTime為N/A ,則說明目前置入任務堆積嚴重,您可以減少匯入提交的頻率。

    LoadFinishTime - CreateTime = 整個匯入任務所消耗時間
    
    LoadFinishTime - LoadStartTime = 整個Broker load匯入任務執行時間 = 整個匯入任務所消耗時間 - 匯入任務等待的時間

EtlStartTime

EtlFinishTime

LoadStartTime

LoadFinishTime

URL

匯入任務的錯誤資料範例,訪問URL地址即可擷取本次匯入的錯誤資料範例。當本次匯入不存在錯誤資料時,URL欄位為N/A。

JobDetails

顯示作業的詳細運行狀態。包括匯入檔案的個數、總大小(位元組)、子任務個數、已處理的原始行數,運行子任務的BE節點ID,以及未完成的BE節點ID。

{"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

其中已處理的原始行數,每5秒更新一次。該行數僅用於展示當前的進度,不代表最終實際的處理行數。實際處理行數以EtlInfo中顯示的資料為準。

取消匯入任務

當Broker Load作業狀態不為CANCELLED或FINISHED時,可以手動取消。取消時需要指定待取消匯入任務的Label 。可執行HELP CANCEL LOAD命令查看取消匯入命令的文法。

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

HDFS匯入

  • HDFS匯入文法樣本

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1, tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
    
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH BROKER 'broker1'
    (
        "username" = "hdfs_username",
        "password" = "hdfs_password"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • HDFS認證

    社區版本的HDFS支援簡單認證和Kerberos認證兩種認證方式。

    • 簡單認證(Simple):使用者的身份由與HDFS建立連結的用戶端作業系統決定。

      涉及參數如下表。

      參數

      描述

      hadoop.security.authentication

      認證方式。預設值為simple。

      username

      HDFS的使用者名稱。

      password

      HDFS的密碼。

    • Kerberos認證:用戶端的身份由使用者自己的Kerberos認證決定。

      涉及參數如下表。

      參數

      描述

      hadoop.security.authentication

      認證方式。預設值為kerberos。

      kerberos_principal

      指定Kerberos的Principal。

      kerberos_keytab

      指定Kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案。

      kerberos_keytab_content

      指定Kerberos中keytab檔案內容經過Base64編碼之後的內容。

      重要

      該參數和kerberos_keytab參數只需配置一個。

  • HDFS HA配置

    通過配置NameNode HA,可以在NameNode切換時,自動識別到新的NameNode。配置以下參數用於訪問以HA模式部署的HDFS叢集。

    參數

    描述

    dfs.nameservices

    指定HDFS服務的名稱,您可以自訂。

    例如,設定dfs.nameservices為my_ha。

    dfs.ha.namenodes.xxx

    自訂NameNode的名稱,多個名稱時以逗號(,)分隔。其中xxx為dfs.nameservices中自訂的名稱。

    例如,設定dfs.ha.namenodes.my_ha為my_nn。

    dfs.namenode.rpc-address.xxx.nn

    指定NameNode的RPC地址資訊。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名稱。

    例如,設定dfs.namenode.rpc-address.my_ha.my_nn參數值的格式為host:port。

    dfs.client.failover.proxy.provider

    指定Client串連NameNode的Provider,預設值為org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

    樣本如下。

    (
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    HA模式可以與簡單認證、Kerberos認證兩種認證方式組合,進行叢集訪問。例如,通過簡單認證方式訪問HA HDFS。

    (
        "username"="user",
        "password"="passwd",
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    HDFS叢集的配置可以寫入hdfs-site.xml檔案中,您使用Broker進程讀取HDFS叢集的資訊時,只需要填寫叢集的檔案路徑名和認證資訊即可。

匯入樣本

  1. 建立測試表,下面是tpch的lineitem。

    CREATE TABLE lineitem (
      l_orderkey bigint,
      l_partkey bigint,
      l_suppkey bigint,
      l_linenumber int,
      l_quantity double,
      l_extendedprice double,
      l_discount double,
      l_tax double,
      l_returnflag string,
      l_linestatus string,
      l_shipdate date,
      l_commitdate date,
      l_receiptdate date,
      l_shipinstruct string,
      l_shipmode string,
      l_comment string
    )
    ENGINE=OLAP
    DUPLICATE KEY(l_orderkey)
    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
    PROPERTIES(
      "replication_num" = "1"
    );
  2. 建立匯入任務。

    重要

    如果您的StarRocks版本小於2.5.8,則可以按照以下代碼建立匯入樣本;如果您的StarRocks版本大於等於2.5.8,則不添加WITH BROKER broker部分內容。

    StarRocks版本小於2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "xxx"
    );

    StarRocks版本大於等於2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
  3. 查看匯入任務狀態。

    show load where label = 'lineitem'\G;
    
    *************************** 1. row ***************************
             JobId: 1****
             Label: lineitem
             State: FINISHED
          Progress: ETL:100%; LOAD:100%
              Type: BROKER
           EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215
          TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
          ErrorMsg: NULL
        CreateTime: 2022-04-13 15:07:53
      EtlStartTime: 2022-04-13 15:07:56
     EtlFinishTime: 2022-04-13 15:07:56
     LoadStartTime: 2022-04-13 15:07:56
    LoadFinishTime: 2022-04-13 15:08:06
               URL: NULL
        JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072}
    2 rows in set (0.00 sec)
  4. 匯入成功後進行查詢操作。

    • 查詢表lineitem中的行數。

      select count(*) from lineitem;

      返回資訊如下所示。

      +----------+
      | count(*) |
      +----------+
      |  6001215 |
      +----------+
      1 row in set (0.03 sec)
    • 查詢表lineitem中的前2行資訊。

      select * from lineitem limit 2;

      返回資訊如下所示。

      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment                                  |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      |         69 |    115209 |      7721 |            1 |         48 |         58761.6 |       0.01 |  0.07 | A            | F            | 1994-08-17 | 1994-08-11   | 1994-09-08    | NONE           | TRUCK      | regular epitaphs. carefully even ideas hag |
      |         69 |    104180 |      9201 |            2 |         32 |        37893.76 |       0.08 |  0.06 | A            | F            | 1994-08-24 | 1994-08-17   | 1994-08-31    | NONE           | REG AIR    | s sleep carefully bold,                    |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      2 rows in set (0.01 sec)

匯入任務並發度

一個作業可以拆成一個或者多個任務,任務之間並存執行。拆分由LOAD語句中的DataDescription來決定。例如:

  • 多個DataDescription對應匯入多個不同的表,每個會拆成一個任務。

  • 多個DataDescription對應匯入同一個表的不同分區,每個也會拆成一個任務。

每個任務還會拆分成一個或者多個執行個體,然後將這些執行個體平均分配到BE上並存執行。執行個體的拆分由以下FE配置決定:

  • min_bytes_per_broker_scanner:單個執行個體處理的最小資料量,預設值為64 MB。

  • max_broker_concurrency:單個任務最大並發執行個體數,預設值為100。

  • load_parallel_instance_num:單個BE上並發執行個體數,預設值為1個。

執行個體總數的計算公式為執行個體的總數 = min(匯入檔案總大小/單個執行個體處理的最小資料量,單個任務最大並發執行個體數,單個BE上並發執行個體數 * BE數)

通常情況下,一個作業只有一個DataDescription,只會拆分成一個任務。任務會拆成與BE數相等的執行個體,然後分配到所有BE上並存執行。