StarRocks支持从本地直接导入数据,支持CSV文件格式,数据量在10 GB以下。本文为您介绍Stream Load导入的基本原理、使用示例和最佳实践。

背景信息

Stream Load是一种同步的导入方式,通过发送HTTP请求将本地文件或数据流导入到StarRocks中。Stream Load同步执行导入并返回导入结果。您可以直接通过请求的返回值判断导入是否成功。

基本概念

Coordinator:协调节点。负责接收数据并分发数据到其他数据节点,导入完成后返回结果。

基本原理

Stream Load通过HTTP协议提交导入命令。如果提交到FE节点,则FE节点会通过HTTP Redirect指令将请求转发给某一个BE节点,您也可以直接提交导入命令给某一指定BE节点。该BE节点作为Coordinator节点,将数据按表Schema划分并分发数据到相关的BE节点。导入的最终结果由Coordinator节点返回给用户。

Stream Load的主要流程如下图所示。Stream Load

导入示例

创建导入任务

Stream Load通过HTTP协议提交和传输数据。本示例通过curl命令展示如何提交导入任务。您也可以通过其他HTTP Client进行操作。

  • 语法
    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \
        http://fe_host:http_port/api/{db}/{table}/_stream_load
    说明
    • 当前支持HTTP chunked与非chunked两种上传方式,对于非chunked方式,必须要有Content-Length来标示上传的内容长度,保证数据的完整性。
    • 建议设置Expect Header字段内容为100-continue,可以在某些出错场景下避免不必要的数据传输。

    Header中支持的属性见下表的导入任务参数描述,格式为-H "key1:value1"。如果同时有多个任务参数,需要用多个-H来指示,类似于-H "key1:value1" -H "key2:value2"……

    创建导入任务的详细语法可以通过HELP STREAM LOAD命令查看。Stream Load中所有与导入任务相关的参数均设置在Header中。相关参数描述如下表所示。

    参数描述
    签名参数user:passwdStream Load创建导入任务使用的是HTTP协议,已通过Basic access authentication进行签名。StarRocks系统会根据签名来验证用户身份和导入权限。
    导入任务参数label 导入任务的标签,相同标签的数据无法多次导入。

    您可以通过指定Label的方式来避免一份数据重复导入的问题。当前StarRocks系统会保留最近30分钟内成功完成的任务的Label。

    column_separator用于指定导入文件中的列分隔符,默认为\t。

    如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"

    row_delimiter指定导入文件中的行分隔符,默认为\n。
    重要 curl命令无法传递\n,换行符手动指定为\n时,shell会先传递反斜线(\),然后传递n而不是直接传递换行符\n。

    Bash支持另一种转义字符串语法,传递\n和\t时,使用美元符号和全角单引号($')启动字符串并以半角单引号(')结束字符串。例如,-H $'row_delimiter:\n'

    columns用于指定导入文件中的列和Table中列的对应关系。
    如果源文件中的列正好对应表中的内容,则无需指定该参数。如果源文件与表Schema不对应,则需要该参数来配置数据转换规则。列有两种形式,一种是直接对应于导入文件中的字段,可以直接使用字段名表示,一种需要通过计算得出。
    • 示例1:表中有3列c1, c2, c3,源文件中的3列依次对应的是c3,c2,c1,则需要指定-H "columns: c3, c2, c1"
    • 示例2:表中有3列c1, c2, c3,源文件中前3列与表中的列一一对应,但是还有多余1列,则需要指定-H "columns: c1, c2, c3, temp",最后1列随意指定名称用于占位即可。
    • 示例3:表中有3列year, month, day,源文件中只有一个时间列,为2018-06-01 01:02:03格式,则可以指定 -H "columns: col, year = year(col), month=month(col), day=day(col)"完成导入。
    where用于抽取部分数据。用户如需将不需要的数据过滤掉,那么可以通过设定这个选项来达到。

    例如,只导入k1列等于20180601的数据,则可以在导入时指定-H "where: k1 = 20180601"

    max_filter_ratio最大容忍可过滤(例如,因为数据不规范等原因而过滤)的数据比例。默认零容忍。
    说明 此处数据不规范的数据不包括通过WHERE条件过滤的数据。
    partitions用于指定该导入所涉及的Partition。

    如果您能够确定数据对应的Partition,则推荐指定该项。不满足指定分区的数据将被过滤掉。例如,指定导入到p1和p2分区,可以指定-H "partitions: p1, p2"

    timeout指定导入的超时时间。默认是600秒。

    设置范围为1~259200,单位为秒。

    strict_mode指定此次导入是否开启严格模式,默认为开启。

    关闭方式为-H "strict_mode: false"

    timezone指定本次导入所使用的时区。默认为东八区。

    该参数会影响所有导入涉及和时区有关的函数结果。

    exec_mem_limit导入内存限制。默认值为2 GB。
  • 示例
    curl --location-trusted -u root -T date -H "label:123" \
        http://abc.com:8030/api/test/date/_stream_load
  • 返回结果

    导入任务完成后,Stream Load会以JSON格式返回导入任务的相关内容,返回结果示例如下。

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "ErrorURL": "[http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005](http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005)"
    }
    参数描述
    TxnId导入的事务ID。用户可不感知。
    Label导入的Label。由用户指定或系统自动生成。
    Status导入完成状态。
    • Success:表示导入成功。
    • Publish Timeout:表示导入已经完成,只是数据可能会延迟可见,无需重试。
    • Label Already Exists:Label重复,需更换Label。
    • Fail:导入失败。
    ExistingJobStatus已存在Label对应的导入作业的状态。该字段只有当Status为Label Already Exists时才会显示。您可以通过该状态,知晓已存在Label对应的导入作业的状态。
    • RUNNING:表示作业在执行中。
    • FINISHED:表示作业成功。
    Message导入状态的详细说明。导入失败时会返回具体的失败原因。
    NumberTotalRows从数据流中读取到的总行数。
    NumberLoadedRows导入任务的数据行数,仅在导入状态为Success时有效。
    NumberFilteredRows导入任务过滤掉的行数,即数据质量不合格的行。
    NumberUnselectedRows通过Where条件被过滤掉的行数。
    LoadBytes导入任务的源文件数据量大小。
    LoadTimeMs导入任务所用的时间,单位为ms。
    ErrorURL被过滤数据的具体内容,仅保留前1000条数据。如果导入任务失败,可以直接用以下方式获取被过滤的数据并进行分析,以调整导入任务。
    wget http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005

取消导入任务

Stream Load无法手动取消,Stream Load在超时或者导入错误后会被系统自动取消。

最佳实践

应用场景

Stream Load的最佳使用场景是原始文件在内存中或者存储在本地磁盘中。由于Stream Load是一种同步的导入方式,所以当您希望用同步方式获取导入结果时,也可以使用该导入方式。

数据量

由于Stream Load是由BE发起的导入并分发数据,建议的导入数据量在1 GB到10 GB之间。系统默认的最大Stream Load导入数据量为10 GB,所以导入超过10 GB的文件需要修改BE的配置项streaming_load_max_mb。例如,待导入文件大小为15 GB,则可以修改BE的配置项streaming_load_max_mb大于15 GB即可。

Stream Load的默认超时为300秒,按照StarRocks目前最大的导入限速来看,导入超过3 GB大小的文件就需要修改导入任务默认的超时时间了。例如,导入一个10 GB的文件,timeout应该设置为1000s。

导入任务超时时间 = 导入数据量 / 10M/s ,具体的平均导入速度需要您根据自己的集群情况计算。

完整示例

数据情况:数据在客户端本地磁盘路径/home/store-sales中,导入的数据量约为15 GB,希望导入到数据库bj-sales的表store-sales中。

集群情况:Stream Load的并发数不受集群大小影响。

示例如下:
  1. 因为导入文件大小超过默认的最大导入大小10 GB,所以需要修改BE的配置文件BE.conf

    例如,修改参数streaming_load_max_mb,将最大导入大小调整为16000。

  2. 计算大概的导入时间是否超过默认timeout值,导入时间为15000 / 10 = 1500s,如果超过了默认的timeout时间,则需要修改FE的配置FE.conf,修改参数stream_load_default_timeout_second,将导入时间调整为1500。
  3. 创建导入任务。
    curl --location-trusted -u user:password -T /home/store_sales \
        -H "label:abc" [http://abc.com:8000/api/bj_sales/store_sales/_stream_load](http://abc.com:8000/api/bj_sales/store_sales/_stream_load)

代码集成示例