在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(例如,Apache HDFS,阿里云OSS)上的数据,利用自身的计算资源对数据进行预处理和导入。本文为您介绍Broker Load导入的使用示例以及常见问题。
背景信息
Broker Load是一种异步的导入方式。您需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。StarRocks支持从外部存储系统导入数据,支持CSV、ORCFile和Parquet等文件格式,建议单次导入数据量在几十GB到上百GB级别。
Broker Load导入
查看Broker实例
阿里云EMR StarRocks集群在创建时已经自动搭建并启动Broker服务,Broker服务位于每个Core节点上。使用以下SQL命令可以查看Broker实例。
SHOW PROC "/brokers"\G
返回信息如下所示。
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 2. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 3. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 4. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
4 rows in set (0.00 sec)
创建导入任务
语法
StarRocks版本小于2.5.8
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_name broker_properties [PROPERTIES (key1=value1, ... )]
StarRocks版本大于等于2.5.8
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_properties [PROPERTIES (key1=value1, ... )]
参数描述
执行
HELP BROKER LOAD
命令,可以查看创建导入作业的详细语法。Label
导入任务的标识。每个导入任务都有一个唯一的Label。Label是您在导入命令中自定义的或系统自动生成的名称。通过该Label,您可以查看对应导入任务的执行情况,并且Label可以用来防止导入相同的数据。当导入任务状态为FINISHED时,对应的Label就不能再次使用了。当Label对应的导入任务状态为CANCELLED时,可以再次使用该Label提交导入作业。
数据描述类data_desc
数据描述类参数,主要指的是语句中data_desc部分的参数。每组data_desc表述了本次导入涉及到的数据源地址、ETL函数,目标表及分区等信息。
Broker Load支持一次导入任务涉及多张表,每个Broker Load导入任务可通过多个data_desc声明多张表来实现多表导入。每个单独的data_desc可以指定属于该表的数据源地址,可以用多个file_path来指定导入同一个表的多个文件。Broker Load保证了单次导入的多张表之间原子性成功或失败。data_desc常见参数如下所示。
data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]
相关参数描述如下表所示。
参数
描述
file_path
文件路径可以指定到文件,也可以用星号(*)通配符指定某个目录下的所有文件。中间的目录也可以使用通配符匹配。
可以使用的通配符有? * [] {} ^,使用规则请参见FileSystem。
例如, 通过hdfs://hdfs_host:hdfs_port/user/data/tablename// , 可以匹配tablename下所有分区内的所有文件。通过 hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/ , 可以匹配tablename下4月分区的所有文件。
negative
设置数据取反导入。
该功能适用的场景是当数据表中聚合列的类型均为SUM类型时,如果希望撤销某一批导入的数据,可以通过negative参数导入同一批数据,StarRocks会自动为这批数据在聚合列上数据取反,以达到消除同一批数据的功能。
partition
指定待导入表的Partition信息。
如果待导入数据不属于指定的Partition,则不会被导入。同时,不指定Partition中的数据会被认为是“错误数据”。对于不想导入,也不想记录为“错误数据”的数据,可以使用where predicate来过滤。
column_separator
COLUMNS TERMINATED BY column_separator ,用于指定导入文件中的列分隔符,默认为\t。
如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符为\x01,则列分隔符为\\x01。
file_type
FORMAT AS file_type,用于指定导入文件的类型。例如,parquet、orc、csv,默认值为csv。
parquet类型也可以通过文件后缀名.parquet或者.parq判断。
COLUMNS FROM PATH AS
提取文件路径中的分区字段。
例如,导入文件为/path/col_name=col_value/dt=20210101/file1,其中col_name/dt为表中的列,则将col_value、20210101分别导入到col_name和dt对应的列的代码示例如下。
(col1, col2) COLUMNS FROM PATH AS (col_name, dt)
set column mapping
SET (k1=f1(xx), k2=f2(xx)),data_desc中的SET语句负责设置列函数变换。
如果原始数据的列和表中的列不一一对应,则需要使用该属性。
where predicate
WHERE predicate,data_desc中的WHERE语句负责过滤已经完成transform的数据。
被过滤的数据不会进入容忍率的统计中。如果多个data_desc中声明了关于同一张表的多个条件,则会以AND语义合并这些条件。
导入作业参数
导入作业参数是指Broker Load创建导入语句中属于broker_properties部分的参数。导入作业参数是作用于整个导入作业的。
broker_properties: (key2=value2, ...)
部分参数描述如下表所示。
参数
描述
timeout
导入作业的超时时间(以秒为单位)。
您可以在opt_properties中自行设置每个导入的超时时间。导入任务在设定的时限内未完成则会被系统取消,变为CANCELLED。Broker Load的默认导入超时时间为4小时。
重要通常情况下,不需要您手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。
推荐超时时间的计算方式为:
超时时间 >((总文件大小 (MB)* 待导入的表及相关Roll up表的个数) / (30 * 导入并发数))
公式中的30为目前BE导入的平均速度,表示30 MB/s。例如,如果待导入数据文件为1 GB,待导入表包含2个Rollup表,当前的导入并发数为3,则timeout的最小值为 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。
由于每个StarRocks集群的机器环境不同且集群并发的查询任务也不同,所以StarRocks集群的最慢导入速度需要您根据历史的导入任务速度进行推测。
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以设置该参数值大于0,来保证导入可以成功。
计算公式为:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
其中,
dpp.abnorm.ALL
表示数据质量不合格的行数,例如类型不匹配、列数不匹配和长度不匹配等。dpp.abnorm.ALL
指的是导入过程中正确数据的条数,可以通过SHOW LOAD
命令查询导入任务的正确数据量。原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL
load_mem_limit
导入内存限制。默认值为0,表示不限制。
strict_mode
Broker Load导入可以开启Strict Mode模式。开启方式为
properties ("strict_mode" = "true")
。默认关闭。
Strict Mode模式是对于导入过程中的列类型转换进行严格过滤。严格过滤的策略为,对于列类型转换,如果Strict Mode为true,则错误的数据将被过滤掉。错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的数据。但以下场景除外:
对于导入的某列由函数变换生成时,Strict Mode对其不产生影响。
对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,Strict Mode对其也不产生影响。例如,如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,Strict Mode对其不产生影响。
创建阿里云OSS导入任务示例
重要在阿里云EMR StarRocks上使用broker作为Broker名称即可。
如果您的StarRocks版本小于2.5.8,则可以按照以下代码创建导入示例;如果您的StarRocks版本大于等于2.5.8,则不添加
WITH BROKER broker
部分内容。
StarRocks版本小于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" );
StarRocks版本大于等于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
查看导入任务状态
Broker Load导入是异步的,您可以在SHOW LOAD
命令中指定Label来查询对应导入作业的执行状态。具体语法可执行HELP SHOW LOAD
命令查看。
SHOW LOAD
命令只能查看异步导入方式的LOAD任务。同步方式的LOAD任务,例如Stream Load任务,目前无法使用SHOW LOAD
命令查看。
查看导入任务状态示例如下。
show load where label = 'label1'\G
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
返回参数的描述如下表所示。
参数 | 描述 |
JobId | 导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。 |
Label | 导入任务的标识。 |
State | 导入任务当前所处的阶段。
|
Progress | 导入任务的进度描述。分为ETL和LOAD两种进度,分别对应导入流程的ETL和LOADING两个阶段。目前Broker Load只有LOADING阶段,所以ETL固定显示为N/A,而LOAD的进度范围为0~100%。 LOAD的进度的计算公式为 如果所有导入表均完成导入,此时LOAD的进度为99%, 导入进入到最后生效阶段,待整个导入任务完成后,LOAD的进度才会改为100%。 重要 导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入任务没有执行。 |
Type | 导入任务的类型。Broker Load的Type取值是BROKER。 |
EtlInfo | 主要显示导入的数据量指标unselected.rows,dpp.norm.ALL和dpp.abnorm.ALL。 您可以根据unselected.rows的参数值判断where条件过滤了多少行,根据dpp.norm.ALL和dpp.abnorm.ALL两个指标可以验证当前导入任务的错误率是否超过max-filter-ratio。三个指标之和就是原始数据量的总行数。 |
TaskInfo | 主要显示当前导入任务参数,即创建Broker Load导入任务时您指定的参数,包括cluster,timeout和max-filter-ratio。 |
ErrorMsg | 如果导入任务状态为CANCELLED,则显示失败的原因,包括type和msg两部分。如果导入任务成功则显示N/A。type的取值意义如下:
|
CreateTime | 分别表示导入创建的时间、ETL阶段开始的时间、ETL阶段完成的时间、LOADING阶段开始的时间和整个导入任务完成的时间。
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | 导入任务的错误数据样例,访问URL地址即可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL字段为N/A。 |
JobDetails | 显示作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点ID,以及未完成的BE节点ID。
其中已处理的原始行数,每5秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以EtlInfo中显示的数据为准。 |
取消导入任务
当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消。取消时需要指定待取消导入任务的Label 。可执行HELP CANCEL LOAD
命令查看取消导入命令的语法。
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];
HDFS导入
HDFS导入语法示例
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker1' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );
HDFS认证
社区版本的HDFS支持简单认证和Kerberos认证两种认证方式。
简单认证(Simple):用户的身份由与HDFS建立链接的客户端操作系统决定。
涉及参数如下表。
参数
描述
hadoop.security.authentication
认证方式。默认值为simple。
username
HDFS的用户名。
password
HDFS的密码。
Kerberos认证:客户端的身份由用户自己的Kerberos证书决定。
涉及参数如下表。
参数
描述
hadoop.security.authentication
认证方式。默认值为kerberos。
kerberos_principal
指定Kerberos的Principal。
kerberos_keytab
指定Kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件。
kerberos_keytab_content
指定Kerberos中keytab文件内容经过Base64编码之后的内容。
重要该参数和kerberos_keytab参数只需配置一个。
HDFS HA配置
通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。配置以下参数用于访问以HA模式部署的HDFS集群。
参数
描述
dfs.nameservices
指定HDFS服务的名称,您可以自定义。
例如,设置dfs.nameservices为my_ha。
dfs.ha.namenodes.xxx
自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxx为dfs.nameservices中自定义的名称。
例如,设置dfs.ha.namenodes.my_ha为my_nn。
dfs.namenode.rpc-address.xxx.nn
指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。
例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为host:port。
dfs.client.failover.proxy.provider
指定Client连接NameNode的Provider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。
示例如下。
( "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
HA模式可以与简单认证、Kerberos认证两种认证方式组合,进行集群访问。例如,通过简单认证方式访问HA HDFS。
( "username"="user", "password"="passwd", "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
HDFS集群的配置可以写入hdfs-site.xml文件中,您使用Broker进程读取HDFS集群的信息时,只需要填写集群的文件路径名和认证信息即可。
导入示例
创建测试表,下面是tpch的lineitem。
CREATE TABLE lineitem ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber int, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate date, l_commitdate date, l_receiptdate date, l_shipinstruct string, l_shipmode string, l_comment string ) ENGINE=OLAP DUPLICATE KEY(l_orderkey) DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES( "replication_num" = "1" );
创建导入任务。
重要如果您的StarRocks版本小于2.5.8,则可以按照以下代码创建导入示例;如果您的StarRocks版本大于等于2.5.8,则不添加
WITH BROKER broker
部分内容。StarRocks版本小于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx" );
StarRocks版本大于等于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
查看导入任务状态。
show load where label = 'lineitem'\G; *************************** 1. row *************************** JobId: 1**** Label: lineitem State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215 TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2022-04-13 15:07:53 EtlStartTime: 2022-04-13 15:07:56 EtlFinishTime: 2022-04-13 15:07:56 LoadStartTime: 2022-04-13 15:07:56 LoadFinishTime: 2022-04-13 15:08:06 URL: NULL JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072} 2 rows in set (0.00 sec)
导入成功后进行查询操作。
查询表lineitem中的行数。
select count(*) from lineitem;
返回信息如下所示。
+----------+ | count(*) | +----------+ | 6001215 | +----------+ 1 row in set (0.03 sec)
查询表lineitem中的前2行信息。
select * from lineitem limit 2;
返回信息如下所示。
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | 69 | 115209 | 7721 | 1 | 48 | 58761.6 | 0.01 | 0.07 | A | F | 1994-08-17 | 1994-08-11 | 1994-09-08 | NONE | TRUCK | regular epitaphs. carefully even ideas hag | | 69 | 104180 | 9201 | 2 | 32 | 37893.76 | 0.08 | 0.06 | A | F | 1994-08-24 | 1994-08-17 | 1994-08-31 | NONE | REG AIR | s sleep carefully bold, | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ 2 rows in set (0.01 sec)
导入任务并发度
一个作业可以拆成一个或者多个任务,任务之间并行执行。拆分由LOAD语句中的DataDescription来决定。例如:
多个DataDescription对应导入多个不同的表,每个会拆成一个任务。
多个DataDescription对应导入同一个表的不同分区,每个也会拆成一个任务。
每个任务还会拆分成一个或者多个实例,然后将这些实例平均分配到BE上并行执行。实例的拆分由以下FE配置决定:
min_bytes_per_broker_scanner:单个实例处理的最小数据量,默认值为64 MB。
max_broker_concurrency:单个任务最大并发实例数,默认值为100。
load_parallel_instance_num:单个BE上并发实例数,默认值为1个。
实例总数的计算公式为实例的总数 = min(导入文件总大小/单个实例处理的最小数据量,单个任务最大并发实例数,单个BE上并发实例数 * BE数)
。
通常情况下,一个作业只有一个DataDescription,只会拆分成一个任务。任务会拆成与BE数相等的实例,然后分配到所有BE上并行执行。