为了更好地满足各种不同的业务场景,StarRocks支持多种数据模型,StarRocks中存储的数据需要按照特定的模型进行组织。本文为您介绍数据导入的基本概念、原理、系统配置、不同导入方式的适用场景,以及一些最佳实践案例和常见问题。

背景信息

数据导入功能是将原始数据按照相应的模型进行清洗转换并加载到StarRocks中,方便查询使用。StarRocks提供了多种导入方式,您可以根据数据量大小或导入频率等要求选择最适合自己业务需求的导入方式。

StarRocks导入方式与各数据源关系图如下。StarRocks schematic diagram
您可以根据不同的数据来源选择不同的导入方式:
  • 离线数据导入:如果数据源是Hive或HDFS,推荐使用Broker Load。如果数据表很多导入比较麻烦可以使用Hive外表,性能会比Broker load导入效果差,但是可以避免数据搬迁。如果单表的数据量特别大,或者需要做为全局数据字典来精确去重可以考虑使用Spark Load
  • 实时数据导入:日志数据和业务数据库的Binlog同步到Kafka后,优先推荐通过Routine Load导入StarRocks。如果导入过程中有复杂的多表关联和ETL预处理可以使用Flink(Flink Connector)处理以后,再通过Stream Load写入StarRocks。
  • 程序写入StarRocks:推荐使用Stream Load,可以参见Stream Load中Java或Python的Demo。
  • 文本文件导入:推荐使用Stream Load
  • MySQL数据导入:推荐使用MySQL外表,通过insert into new_table select * from external_table的方式导入。
  • StarRocks内部导入:推荐使用Insert Into方式导入,跟外部调度器配合实现简单的ETL处理。
说明 本文图片和部分内容来源于开源StarRocks的Overview of data loading

注意事项

向StarRocks导入数据时,通常会采用程序对接的方式。以下是导入数据时的一些注意事项:
  • 选择合适的导入方式:根据数据量大小、导入频次或数据源所在位置选择导入方式。

    例如,如果原始数据存放在HDFS上,则使用Broker load导入。

  • 确定导入方式的协议:如果选择了Broker Load导入方式,则外部系统需要能使用MySQL协议定期提交和查看导入作业。
  • 确定导入方式的类型:导入方式分为同步或异步。如果是异步导入方式,外部系统在提交创建导入后,必须调用查看导入命令,根据查看导入命令的结果来判断导入是否成功。
  • 制定Label生成策略:Label生成策略需满足对每一批次数据唯一且固定的原则。
  • 保证Exactly-Once:外部系统需要保证数据导入的At-Least-Once,StarRocks的Label机制可以保证数据导入的At-Most-Once,即可整体上保证数据导入的Exactly-Once。

基本概念

名词描述
导入作业读取用户提交的源数据并进行清洗转换后,将数据导入到StarRocks系统中。导入完成后,数据即可被用户查询到。
Label用于标识一个导入作业,所有导入作业都有一个Label。

Label可由用户指定或系统自动生成。Label在一个数据库内是唯一的,一个Label仅可用于一个成功的导入作业。当一个Label对应的导入作业成功后,不可再重复使用该Label提交导入作业。如果某Label对应的导入作业失败,则该Label可以被再使用。该机制可以保证Label对应的数据最多被导入一次,即At-Most-Once语义。

原子性StarRocks中所有导入方式都提供原子性保证,即同一个导入作业内的所有有效数据要么全部生效,要么全部不生效,不会出现仅导入部分数据的情况。此处的有效数据不包括由于类型转换错误等数据质量问题而被过滤的数据,数据质量问题可以参见数据导入常见问题
MySQL和HTTP协议StarRocks提供MySQL协议和HTTP协议两种访问协议接口来提交作业。
Broker LoadBroker导入,即通过部署的Broker程序读取外部数据源(例如HDFS)中的数据,并导入到StarRocks。Broker进程利用自身的计算资源对数据进行预处理导入。
Spark LoadSpark导入,即通过外部资源(例如Spark)对数据进行预处理生成中间文件,StarRocks读取中间文件导入。Spark Load是一种异步的导入方式,您需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。
FEFrontend,StarRocks系统的元数据和调度节点。在导入流程中主要负责导入执行计划的生成和导入任务的调度工作。
BEBackend,StarRocks系统的计算和存储节点。在导入流程中主要负责数据的ETL和存储。
TabletStarRocks表的逻辑分片,一个表按照分区、分桶规则可以划分为多个分片,详情请参见数据分布

基本原理

导入执行流程如下图所示。StarRocks flow chart
一个导入作业主要分为以下五个阶段。
阶段描述
PENDING非必须。该阶段是指用户提交导入作业后,等待FE调度执行。

Broker Load和Spark Load包括该步骤。

ETL非必须。该阶段执行数据的预处理,包括清洗、分区、排序和聚合等。

Spark Load包括该步骤,他使用外部计算资源Spark完成ETL。

LOADING该阶段先对数据进行清洗和转换,然后将数据发送给BE处理。当数据全部导入后,进入等待生效过程,此时导入作业依旧是LOADING状态。
FINISHED在导入作业涉及的所有数据均生效后,作业的状态变成FINISHED,FINISHED后导入的数据均可查询。FINISHED是导入作业的最终状态。
CANCELLED在导入作业状态变为FINISHED之前,作业随时可能被取消并进入CANCELLED状态,例如,您手动取消或导入出现错误等。CANCELLED也是导入作业的一种最终状态。
数据导入格式如表。
类型描述
整型类TINYINT、SMALLINT、INT、BIGINT、LARGEINT。例如:1,1000,1234。
浮点类FLOAT、DOUBLE、DECIMAL。例如:1.1,0.23,0.356。
日期类DATE、DATETIME。例如:2017-10-03,2017-06-13 12:34:03。
字符串类CHAR、VARCHAR。例如:I am a student,a。

导入方式

为适配不同的数据导入需求,StarRocks系统提供了5种不同的导入方式,以支持不同的数据源(例如HDFS、Kafka和本地文件等),或者按不同的方式导入数据,StarRocks目前导入数据的方式分为同步导入和异步导入两种。

所有导入方式都支持CSV数据格式。其中Broker Load还支持Parquet和ORC数据格式。

导入方式介绍

导入方式描述导入类型
Broker Load通过Broker进程访问并读取外部数据源,然后采用MySQL协议向StarRocks创建导入作业。提交的作业将异步执行,您可以通过SHOW LOAD命令查看导入结果。

Broker Load适用于源数据在Broker进程可访问的存储系统(例如HDFS)中,数据量为几十GB到上百GB,详细信息请参见Broker Load

异步导入
Spark Load通过外部的Spark资源实现对导入数据的预处理,提高StarRocks大数据量的导入性能并且节省StarRocks集群的计算资源。Spark Load是一种异步导入方式,需要通过MySQL协议创建导入作业,并通过SHOW LOAD查看导入结果。

Spark Load适用于初次迁移大数据量(可达到TB级别)到StarRocks的场景,且源数据在Spark可访问的存储系统(例如HDFS)中,详细信息请参见Spark Load

异步导入
Stream Load是一种同步执行的导入方式。您可以通过HTTP协议发送请求将本地文件或数据流导入到StarRocks中,并等待系统返回导入的结果状态,从而判断导入是否成功。

Stream Load适用于导入本地文件,或通过程序导入数据流中的数据,详细信息请参见Stream Load

同步导入
Routine LoadRoutine Load(例行导入)提供了一种自动从指定数据源进行数据导入的功能。您可以通过MySQL协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(例如Kafka)中读取数据并导入到StarRocks中,详细信息请参见Routine Load异步导入
Insert Into类似MySQL中的Insert语句,StarRocks提供INSERT INTO tbl SELECT ...;的方式从StarRocks的表中读取数据并导入到另一张表,或者通过INSERT INTO tbl VALUES(...);插入单条数据,详细信息请参见Insert Into同步导入

导入类型

重要 如果是外部程序接入StarRocks的导入功能,需要先判断使用导入方式是哪类,然后再确定接入逻辑。
  • 同步导入

    同步导入方式即用户创建导入任务,StarRocks同步执行,执行完成后返回导入结果。用户可以通过该结果判断导入是否成功。

    操作步骤:
    1. 用户(外部系统)创建导入任务。
    2. StarRocks返回导入结果。
    3. 用户(外部系统)判断导入结果。如果导入结果为失败,则可以再次创建导入任务。
  • 异步导入

    异步导入方式即用户创建导入任务后,StarRocks直接返回创建成功。创建成功不代表数据已经导入成功。导入任务会被异步执行,用户在创建成功后,需要通过轮询的方式发送查看命令查看导入作业的状态。如果创建失败,则可以根据失败信息,判断是否需要再次创建。

    操作步骤:
    1. 用户(外部系统)创建导入任务。
    2. StarRocks返回创建任务的结果。
    3. 用户(外部系统)判断创建任务的结果,如果成功则进入步骤4;如果失败则可以回到步骤1,重新尝试创建导入任务。
    4. 用户(外部系统)轮询查看任务状态,直至状态变为FINISHED或CANCELLED。

适用场景

场景描述
HDFS导入如果HDFS导入源数据存储在HDFS中,当数据量为几十GB到上百GB时,则可以采用Broker Load方法向StarRocks导入数据。此时要求部署的Broker进程可以访问HDFS数据源。导入数据的作业异步执行,您可以通过SHOW LOAD命令查看导入结果。

如果源数据存储在HDSF中,当数据量达到TB级别时,则可以采用Spark Load方法向StarRocks导入数据。此时要求部署的Spark进程可以访问HDFS数据源。导入数据的作业异步执行,您可以通过SHOW LOAD命令查看导入结果。

对于其他外部数据源,只要Broker或Spark进程能读取对应数据源,也可以采用Broker Load或Spark Load方法导入数据。

本地文件导入数据存储在本地文件中,数据量小于10 GB,可以采用Stream Load方法将数据快速导入StarRocks系统。采用HTTP协议创建导入作业,作业同步执行,您可以通过HTTP请求的返回值判断导入是否成功。
Kafka导入数据来自于Kafka等流式数据源,需要向StarRocks系统导入实时数据时,可以采用Routine Load方法。您通过MySQL协议创建例行导入作业,StarRocks持续不断地从Kafka中读取并导入数据。
Insert Into导入手工测试及临时数据处理时可以使用Insert Into方法向StarRocks表中写入数据。

其中,INSERT INTO tbl SELECT ...;语句是从StarRocks的表中读取数据并导入到另一张表,INSERT INTO tbl VALUES(...);语句是向指定表里插入单条数据。

内存限制

您可以通过设置参数来限制单个导入作业的内存使用,以防止导入占用过多的内存而导致系统OOM。不同导入方式限制内存的方式略有不同,详情可以参见各个导入方式的文档。

一个导入作业通常会分布在多个BE上执行,内存参数限制的是一个导入作业在单个BE上的内存使用,而不是在整个集群的内存使用。同时,每个BE会设置可用于导入作业的内存总上限,详情请参见通用系统配置。配置限制了所有在该BE上运行的导入任务的总体内存使用上限。

较小的内存限制可能会影响导入效率,因为导入流程可能会因为内存达到上限而频繁的将内存中的数据写回磁盘。而过大的内存限制可能导致当导入并发较高时系统OOM。所以需要根据需求合理地设置内存参数。

通用系统配置

FE配置

以下配置属于FE的系统配置,可以通过FE的配置文件fe.conf来修改。

参数描述
max_load_timeout_second导入超时时间的最大、最小取值范围,均以秒为单位。默认的最大超时时间为3天,最小超时时间为1秒。您自定义的导入超时时间不可超过该范围。该参数通用于所有类型的导入任务。
min_load_timeout_second
desired_max_waiting_jobs等待队列可以容纳的最多导入任务数目,默认值为100。

例如,FE中处于PENDING状态(即等待执行)的导入任务数目达到该值,则新的导入请求会被拒绝。此配置仅对异步执行的导入有效,如果处于等待状态的异步导入任务数达到限额,则后续创建导入的请求会被拒绝。

max_running_txn_num_per_db每个数据库中正在运行的导入任务的最大个数(不区分导入类型、统一计数),默认值为100。

当数据库中正在运行的导入任务超过最大值时,后续的导入任务不会被执行。如果是同步作业,则作业会被拒绝;如果是异步作业,则作业会在队列中等待。

label_keep_max_second导入任务记录的保留时间。

已经完成的(FINISHED或CANCELLED)导入任务记录会在StarRocks系统中保留一段时间,时间长短则由此参数决定。参数默认值为3天。该参数通用于所有类型的导入任务。

BE配置

以下配置属于BE的系统配置,可以通过BE的配置文件be.conf来修改。

参数描述
push_write_mbytes_per_secBE上单个Tablet的写入速度限制。默认值是10,即10MB/s。

根据Schema以及系统的不同,通常BE对单个Tablet的最大写入速度大约在10~30MB/s之间。您可以适当调整该参数来控制导入速度。

write_buffer_size导入数据在BE上会先写入到一个内存块,当该内存块达到阈值后才会写回磁盘。默认值为100 MB。

过小的阈值可能导致BE上存在大量的小文件。您可以适当提高该阈值减少文件数量。但过大的阈值可能导致RPC超时,详细请参见参数tablet_writer_rpc_timeout_sec

tablet_writer_rpc_timeout_sec导入过程中,发送一个Batch(1024行)的RPC超时时间。默认为600秒。

因为该RPC可能涉及多个分片内存块的写盘操作,所以可能会因为写盘导致RPC超时,可以适当调整超时时间来减少超时错误(例如send batch fail)。同时,如果调大参数write_buffer_size,则tablet_writer_rpc_timeout_sec参数也需要适当调大。

streaming_load_rpc_max_alive_time_sec在导入过程中,StarRocks会为每个Tablet开启一个Writer,用于接收数据并写入。该参数指定了Writer的等待超时时间。默认为600秒。

如果在参数指定时间内Writer没有收到任何数据,则Writer会被自动销毁。当系统处理速度较慢时,Writer可能长时间接收不到下一批数据,导致导入报错TabletWriter add batch with unknown id。此时可适当调大该参数。

load_process_max_memory_limit_percent分别为最大内存和最大内存百分比,限制了单个BE上可用于导入任务的内存上限。系统会在两个参数中取较小者,作为最终的BE导入任务内存使用上限。
  • load_process_max_memory_limit_percent:表示对BE总内存限制的百分比。默认为80。总内存限制mem_limit默认为80%,表示对物理内存的百分比。即假设物理内存为M,则默认导入内存限制为M * 80% * 80%。
  • load_process_max_memory_limit_bytes:默认为100 GB。
load_process_max_memory_limit_bytes