全部产品
Search
文档中心

开源大数据平台E-MapReduce:Broker Load

更新时间:Dec 07, 2023

在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(例如,Apache HDFS,阿里云OSS)上的数据,利用自身的计算资源对数据进行预处理和导入。本文为您介绍Broker Load导入的使用示例以及常见问题。

背景信息

Broker Load是一种异步的导入方式。您需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。StarRocks支持从外部存储系统导入数据,支持CSV、ORCFile和Parquet等文件格式,建议单次导入数据量在几十GB到上百GB级别。

Broker Load导入

查看Broker实例

阿里云EMR StarRocks集群在创建时已经自动搭建并启动Broker服务,Broker服务位于每个Core节点上。使用以下SQL命令可以查看Broker实例。

SHOW PROC "/brokers"\G

返回信息如下所示。

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 2. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 3. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 4. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
4 rows in set (0.00 sec)

创建导入任务

  • 语法

    StarRocks版本小于2.5.8

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_name broker_properties
        [PROPERTIES (key1=value1, ... )]

    StarRocks版本大于等于2.5.8

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_properties
        [PROPERTIES (key1=value1, ... )]
  • 参数描述

    执行HELP BROKER LOAD命令,可以查看创建导入作业的详细语法。

    • Label

      导入任务的标识。每个导入任务都有一个唯一的Label。Label是您在导入命令中自定义的或系统自动生成的名称。通过该Label,您可以查看对应导入任务的执行情况,并且Label可以用来防止导入相同的数据。当导入任务状态为FINISHED时,对应的Label就不能再次使用了。当Label对应的导入任务状态为CANCELLED时,可以再次使用该Label提交导入作业。

    • 数据描述类data_desc

      数据描述类参数,主要指的是语句中data_desc部分的参数。每组data_desc表述了本次导入涉及到的数据源地址、ETL函数,目标表及分区等信息。

      Broker Load支持一次导入任务涉及多张表,每个Broker Load导入任务可通过多个data_desc声明多张表来实现多表导入。每个单独的data_desc可以指定属于该表的数据源地址,可以用多个file_path来指定导入同一个表的多个文件。Broker Load保证了单次导入的多张表之间原子性成功或失败。data_desc常见参数如下所示。

      data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY column_separator ]
          [FORMAT AS file_type]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (colx, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]

      相关参数描述如下表所示。

      参数

      描述

      file_path

      文件路径可以指定到文件,也可以用星号(*)通配符指定某个目录下的所有文件。中间的目录也可以使用通配符匹配。

      可以使用的通配符有? * [] {} ^,使用规则请参见FileSystem

      例如, 通过hdfs://hdfs_host:hdfs_port/user/data/tablename// , 可以匹配tablename下所有分区内的所有文件。通过 hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/ , 可以匹配tablename下4月分区的所有文件。

      negative

      设置数据取反导入。

      该功能适用的场景是当数据表中聚合列的类型均为SUM类型时,如果希望撤销某一批导入的数据,可以通过negative参数导入同一批数据,StarRocks会自动为这批数据在聚合列上数据取反,以达到消除同一批数据的功能。

      partition

      指定待导入表的Partition信息。

      如果待导入数据不属于指定的Partition,则不会被导入。同时,不指定Partition中的数据会被认为是“错误数据”。对于不想导入,也不想记录为“错误数据”的数据,可以使用where predicate来过滤。

      column_separator

      COLUMNS TERMINATED BY column_separator ,用于指定导入文件中的列分隔符,默认为\t。

      如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符为\x01,则列分隔符为\\x01。

      file_type

      FORMAT AS file_type,用于指定导入文件的类型。例如,parquetorccsv,默认值为csv

      parquet类型也可以通过文件后缀名.parquet或者.parq判断。

      COLUMNS FROM PATH AS

      提取文件路径中的分区字段。

      例如,导入文件为/path/col_name=col_value/dt=20210101/file1,其中col_name/dt为表中的列,则将col_value、20210101分别导入到col_name和dt对应的列的代码示例如下。

      (col1, col2)
      COLUMNS FROM PATH AS (col_name, dt)

      set column mapping

      SET (k1=f1(xx), k2=f2(xx)),data_desc中的SET语句负责设置列函数变换。

      如果原始数据的列和表中的列不一一对应,则需要使用该属性。

      where predicate

      WHERE predicate,data_desc中的WHERE语句负责过滤已经完成transform的数据。

      被过滤的数据不会进入容忍率的统计中。如果多个data_desc中声明了关于同一张表的多个条件,则会以AND语义合并这些条件。

    • 导入作业参数

      导入作业参数是指Broker Load创建导入语句中属于broker_properties部分的参数。导入作业参数是作用于整个导入作业的。

      broker_properties:
          (key2=value2, ...)

      部分参数描述如下表所示。

      参数

      描述

      timeout

      导入作业的超时时间(以秒为单位)。

      您可以在opt_properties中自行设置每个导入的超时时间。导入任务在设定的时限内未完成则会被系统取消,变为CANCELLED。Broker Load的默认导入超时时间为4小时。

      重要

      通常情况下,不需要您手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。

      推荐超时时间的计算方式为:超时时间 >((总文件大小 (MB)* 待导入的表及相关Roll up表的个数) / (30 * 导入并发数))

      公式中的30为目前BE导入的平均速度,表示30 MB/s。例如,如果待导入数据文件为1 GB,待导入表包含2个Rollup表,当前的导入并发数为3,则timeout的最小值为 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。

      由于每个StarRocks集群的机器环境不同且集群并发的查询任务也不同,所以StarRocks集群的最慢导入速度需要您根据历史的导入任务速度进行推测。

      max_filter_ratio

      导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以设置该参数值大于0,来保证导入可以成功。

      计算公式为:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

      其中,dpp.abnorm.ALL表示数据质量不合格的行数,例如类型不匹配、列数不匹配和长度不匹配等。dpp.abnorm.ALL指的是导入过程中正确数据的条数,可以通过SHOW LOAD命令查询导入任务的正确数据量。

      原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL

      load_mem_limit

      导入内存限制。默认值为0,表示不限制。

      strict_mode

      Broker Load导入可以开启Strict Mode模式。开启方式为properties ("strict_mode" = "true")

      默认关闭。

      Strict Mode模式是对于导入过程中的列类型转换进行严格过滤。严格过滤的策略为,对于列类型转换,如果Strict Mode为true,则错误的数据将被过滤掉。错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的数据。但以下场景除外:

      • 对于导入的某列由函数变换生成时,Strict Mode对其不产生影响。

      • 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,Strict Mode对其也不产生影响。例如,如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,Strict Mode对其不产生影响。

  • 创建阿里云OSS导入任务示例

    重要
    • 在阿里云EMR StarRocks上使用broker作为Broker名称即可。

    • 如果您的StarRocks版本小于2.5.8,则可以按照以下代码创建导入示例;如果您的StarRocks版本大于等于2.5.8,则不添加WITH BROKER broker部分内容。

    StarRocks版本小于2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
    );

    StarRocks版本大于等于2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )

查看导入任务状态

Broker Load导入是异步的,您可以在SHOW LOAD命令中指定Label来查询对应导入作业的执行状态。具体语法可执行HELP SHOW LOAD命令查看。

重要

SHOW LOAD命令只能查看异步导入方式的LOAD任务。同步方式的LOAD任务,例如Stream Load任务,目前无法使用SHOW LOAD命令查看。

查看导入任务状态示例如下。

show load where label = 'label1'\G
*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

返回参数的描述如下表所示。

参数

描述

JobId

导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。

Label

导入任务的标识。

State

导入任务当前所处的阶段。

  • PENDING:表示当前导入任务正在等待被执行。

  • LOADING:表示正在执行中。

  • CANCELLED:表示导入失败。

  • FINISHED:表示导入成功。

Progress

导入任务的进度描述。分为ETL和LOAD两种进度,分别对应导入流程的ETL和LOADING两个阶段。目前Broker Load只有LOADING阶段,所以ETL固定显示为N/A,而LOAD的进度范围为0~100%。

LOAD的进度的计算公式为LOAD进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%

如果所有导入表均完成导入,此时LOAD的进度为99%, 导入进入到最后生效阶段,待整个导入任务完成后,LOAD的进度才会改为100%。

重要

导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入任务没有执行。

Type

导入任务的类型。Broker Load的Type取值是BROKER。

EtlInfo

主要显示导入的数据量指标unselected.rowsdpp.norm.ALLdpp.abnorm.ALL

您可以根据unselected.rows的参数值判断where条件过滤了多少行,根据dpp.norm.ALLdpp.abnorm.ALL两个指标可以验证当前导入任务的错误率是否超过max-filter-ratio。三个指标之和就是原始数据量的总行数。

TaskInfo

主要显示当前导入任务参数,即创建Broker Load导入任务时您指定的参数,包括cluster,timeout和max-filter-ratio。

ErrorMsg

如果导入任务状态为CANCELLED,则显示失败的原因,包括type和msg两部分。如果导入任务成功则显示N/A。type的取值意义如下:

  • USER-CANCEL:取消的任务。

  • ETL-RUN-FAIL:在ETL阶段失败的导入任务。

  • ETL-QUALITY-UNSATISFIED:数据质量不合格,即错误数据率超过了max-filter-ratio。

  • LOAD-RUN-FAIL:在LOADING阶段失败的导入任务。

  • TIMEOUT:没在超时时间内完成的导入任务。

  • UNKNOWN:未知的导入错误。

CreateTime

分别表示导入创建的时间、ETL阶段开始的时间、ETL阶段完成的时间、LOADING阶段开始的时间和整个导入任务完成的时间。

  • 由于Broker Load导入没有ETL阶段,所以EtlStartTimeEtlFinishTimeLoadStartTime被设置为同一个值。

  • 如果导入任务长时间停留在CreateTime,而LoadStartTime为N/A ,则说明目前导入任务堆积严重,您可以减少导入提交的频率。

    LoadFinishTime - CreateTime = 整个导入任务所消耗时间
    
    LoadFinishTime - LoadStartTime = 整个Broker load导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间

EtlStartTime

EtlFinishTime

LoadStartTime

LoadFinishTime

URL

导入任务的错误数据样例,访问URL地址即可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL字段为N/A。

JobDetails

显示作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点ID,以及未完成的BE节点ID。

{"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

其中已处理的原始行数,每5秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以EtlInfo中显示的数据为准。

取消导入任务

当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消。取消时需要指定待取消导入任务的Label 。可执行HELP CANCEL LOAD命令查看取消导入命令的语法。

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

HDFS导入

  • HDFS导入语法示例

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1, tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
    
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH BROKER 'broker1'
    (
        "username" = "hdfs_username",
        "password" = "hdfs_password"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • HDFS认证

    社区版本的HDFS支持简单认证和Kerberos认证两种认证方式。

    • 简单认证(Simple):用户的身份由与HDFS建立链接的客户端操作系统决定。

      涉及参数如下表。

      参数

      描述

      hadoop.security.authentication

      认证方式。默认值为simple。

      username

      HDFS的用户名。

      password

      HDFS的密码。

    • Kerberos认证:客户端的身份由用户自己的Kerberos证书决定。

      涉及参数如下表。

      参数

      描述

      hadoop.security.authentication

      认证方式。默认值为kerberos。

      kerberos_principal

      指定Kerberos的Principal。

      kerberos_keytab

      指定Kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件。

      kerberos_keytab_content

      指定Kerberos中keytab文件内容经过Base64编码之后的内容。

      重要

      该参数和kerberos_keytab参数只需配置一个。

  • HDFS HA配置

    通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。配置以下参数用于访问以HA模式部署的HDFS集群。

    参数

    描述

    dfs.nameservices

    指定HDFS服务的名称,您可以自定义。

    例如,设置dfs.nameservices为my_ha。

    dfs.ha.namenodes.xxx

    自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxx为dfs.nameservices中自定义的名称。

    例如,设置dfs.ha.namenodes.my_ha为my_nn。

    dfs.namenode.rpc-address.xxx.nn

    指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。

    例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为host:port。

    dfs.client.failover.proxy.provider

    指定Client连接NameNode的Provider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

    示例如下。

    (
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    HA模式可以与简单认证、Kerberos认证两种认证方式组合,进行集群访问。例如,通过简单认证方式访问HA HDFS。

    (
        "username"="user",
        "password"="passwd",
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    HDFS集群的配置可以写入hdfs-site.xml文件中,您使用Broker进程读取HDFS集群的信息时,只需要填写集群的文件路径名和认证信息即可。

导入示例

  1. 创建测试表,下面是tpch的lineitem。

    CREATE TABLE lineitem (
      l_orderkey bigint,
      l_partkey bigint,
      l_suppkey bigint,
      l_linenumber int,
      l_quantity double,
      l_extendedprice double,
      l_discount double,
      l_tax double,
      l_returnflag string,
      l_linestatus string,
      l_shipdate date,
      l_commitdate date,
      l_receiptdate date,
      l_shipinstruct string,
      l_shipmode string,
      l_comment string
    )
    ENGINE=OLAP
    DUPLICATE KEY(l_orderkey)
    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
    PROPERTIES(
      "replication_num" = "1"
    );
  2. 创建导入任务。

    重要

    如果您的StarRocks版本小于2.5.8,则可以按照以下代码创建导入示例;如果您的StarRocks版本大于等于2.5.8,则不添加WITH BROKER broker部分内容。

    StarRocks版本小于2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "xxx"
    );

    StarRocks版本大于等于2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
  3. 查看导入任务状态。

    show load where label = 'lineitem'\G;
    
    *************************** 1. row ***************************
             JobId: 1****
             Label: lineitem
             State: FINISHED
          Progress: ETL:100%; LOAD:100%
              Type: BROKER
           EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215
          TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
          ErrorMsg: NULL
        CreateTime: 2022-04-13 15:07:53
      EtlStartTime: 2022-04-13 15:07:56
     EtlFinishTime: 2022-04-13 15:07:56
     LoadStartTime: 2022-04-13 15:07:56
    LoadFinishTime: 2022-04-13 15:08:06
               URL: NULL
        JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072}
    2 rows in set (0.00 sec)
  4. 导入成功后进行查询操作。

    • 查询表lineitem中的行数。

      select count(*) from lineitem;

      返回信息如下所示。

      +----------+
      | count(*) |
      +----------+
      |  6001215 |
      +----------+
      1 row in set (0.03 sec)
    • 查询表lineitem中的前2行信息。

      select * from lineitem limit 2;

      返回信息如下所示。

      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment                                  |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      |         69 |    115209 |      7721 |            1 |         48 |         58761.6 |       0.01 |  0.07 | A            | F            | 1994-08-17 | 1994-08-11   | 1994-09-08    | NONE           | TRUCK      | regular epitaphs. carefully even ideas hag |
      |         69 |    104180 |      9201 |            2 |         32 |        37893.76 |       0.08 |  0.06 | A            | F            | 1994-08-24 | 1994-08-17   | 1994-08-31    | NONE           | REG AIR    | s sleep carefully bold,                    |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      2 rows in set (0.01 sec)

导入任务并发度

一个作业可以拆成一个或者多个任务,任务之间并行执行。拆分由LOAD语句中的DataDescription来决定。例如:

  • 多个DataDescription对应导入多个不同的表,每个会拆成一个任务。

  • 多个DataDescription对应导入同一个表的不同分区,每个也会拆成一个任务。

每个任务还会拆分成一个或者多个实例,然后将这些实例平均分配到BE上并行执行。实例的拆分由以下FE配置决定:

  • min_bytes_per_broker_scanner:单个实例处理的最小数据量,默认值为64 MB。

  • max_broker_concurrency:单个任务最大并发实例数,默认值为100。

  • load_parallel_instance_num:单个BE上并发实例数,默认值为1个。

实例总数的计算公式为实例的总数 = min(导入文件总大小/单个实例处理的最小数据量,单个任务最大并发实例数,单个BE上并发实例数 * BE数)

通常情况下,一个作业只有一个DataDescription,只会拆分成一个任务。任务会拆成与BE数相等的实例,然后分配到所有BE上并行执行。