全部产品
Search
文档中心

实时计算Flink版:StarRocks

更新时间:Sep 26, 2024

本文为您介绍如何使用StarRocks连接器。

背景信息

StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。StarRocks具有以下优势:

  • StarRocks兼容MySQL协议,可以使用MySQL客户端和常用BI工具对接StarRocks来分析数据。

  • StarRocks采用分布式架构:

    • 对数据表进行水平划分并以多副本存储。

    • 集群规模可以灵活伸缩,支持10 PB级别的数据分析。

    • 支持MPP框架,并行加速计算。

    • 支持多副本,具有弹性容错能力。

Flink连接器内部的结果表是通过缓存并批量由Stream Load导入实现,源表是通过批量读取数据实现。StarRocks连接器支持的信息如下。

类别

详情

支持类型

源表和结果表、数据摄入目标端

运行模式

流模式和批模式

数据格式

CSV

特有监控指标

暂无

API种类

Datastream、SQL和数据摄入YAML

是否支持更新或删除结果表数据

前提条件

已创建StarRocks集群,包括EMR的StarRocks或基于ECS的云上自建StarRocks。

使用限制

  • 仅实时计算引擎VVR 6.0.5及以上版本支持StarRocks连接器。

  • StarRocks连接器仅支持at-least-once和exactly-once语义。

SQL

特色功能

EMR的StarRocks支持CTAS&CDAS功能,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步,详情请参见基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks

语法结构

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

WITH参数

类型

参数

说明

数据类型

是否必填

默认值

备注

通用

connector

表类型。

String

固定值为starrocks。

jdbc-url

JDBC连接的URL。

String

指定FE(Front End)的IP和JDBC端口,格式为jdbc:mysql://ip:port

database-name

StarRocks数据库名称。

String

无。

table-name

StarRocks表名称。

String

无。

username

StarRocks连接用户名。

String

无。

password

StarRocks连接密码。

String

无。

starrocks.create.table.properties

StarRocks表属性。

String

设置数据表初始属性,如引擎、副本数等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。

源表独有

scan-url

数据扫描的url。

String

指定FE(Front End)的IP和HTTP端口,格式为fe_ip:http_port;fe_ip:http_port

说明

填写多个IP和端口号时,请使用半角分号(;)进行分隔。

scan.connect.timeout-ms

flink-connector-starrocks连接StarRocks的时间上限。

超过该时间上限,将报错。

String

1000

单位为毫秒。

scan.params.keep-alive-min

查询任务的保活时间。

String

10

无。

scan.params.query-timeout-s

查询任务的超时时间。

如果超过该时间,仍未返回查询结果,则停止查询任务。

String

600

单位为秒。

scan.params.mem-limit-byte

BE节点中单个查询的内存上限。

String

1073741824(1 GB)

单位为字节。

scan.max-retries

查询失败时的最大重试次数。

超过该数量上限,则将报错。

String

1

无。

结果表独有

load-url

数据导入的URL。

String

指定FE(Front End)的IP和HTTP端口,格式为fe_ip:http_port;fe_ip:http_port

说明

填写多个IP和端口号时,请使用半角分号(;)进行分隔。

sink.semantic

数据写入语义。

String

at-least-once

取值如下:

  • at-least-once(默认值):至少一次。

  • exactly-once:恰好一次。

sink.buffer-flush.max-bytes

Buffer可容纳的最大数据量。

String

94371840(90 MB)

取值范围为64 MB~10 GB。

sink.buffer-flush.max-rows

Buffer可容纳的最大数据行数。

String

500000

取值范围为64,000~5000,000。

sink.buffer-flush.interval-ms

Buffer刷新时间间隔。

String

300000

取值范围为1000毫秒~3600000毫秒。

sink.max-retries

最大重试次数。

String

3

取值范围为0~10。

sink.connect.timeout-ms

连接到starrocks的超时时间。

String

1000

取值范围为100~60000。单位为毫秒。

sink.properties.*

结果表属性。

String

Stream Load的参数控制Stream Load导入行为。例如,参数 sink.properties.format表示Stream Load所导入的数据格式,如CSV。更多参数和解释,请参见Stream Load

类型映射

StarRocks字段类型

Flink字段类型

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

LARGEINT

STRING

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR

CHAR

VARCHAR

STRING

代码示例

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;

数据摄入

使用StarRocks Pipeline连接器,您可以轻松地将来自上游数据源的数据记录和表结构变更写入外部StarRocks数据库。StarRocks连接器同时支持社区版与阿里云E-MapReduce Serverless StarRocks全托管版本。

特色功能

  • 自动建库建表

    如果来自上游的数据库及数据表不存在于下游StarRocks实例中,则对应的数据库及数据表会被自动创建。您可以通过table.create.properties.*参数设定自动创建表时的选项。

  • 表结构变更同步

    目前,StarRocks连接器支持自动将建表事件(CreateTableEvent)、增加列事件(AddColumnEvent)和删除列(DropColumnEvent)事件自动应用到下游数据库中。

注意事项

  • 目前StarRocks连接器只支持At-least Once语义,并通过主键表来保证幂等写入。

  • 目前,同步的表必须包含主键。不含主键的表必须通过transform 语句块指定主键方可正常写入下游。例如:

    transform:
      - source-table: ...
        primary-keys: id, ...
  • 自动创建的表分桶键与主键相同,且不可有分区键。

  • 进行表结构变更同步时,新增列只能追加到已有列的尾部。在默认的表结构演化模式Lenient下,会自动将其他位置的插入转换到尾部。

  • 如果您使用的StarRocks版本低于2.5.7,则必须显式地通过table.create.num-buckets参数指定分桶数量。更高版本的StarRocks可以自动设定合适的分桶数

  • 如果您使用的是StarRocks 3.2或更高版本,建议开启table.create.properties.fast_schema_evolution选项来加快表结构变更的速度。

语法结构

source:
  ...

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

配置项

参数名称

描述

类型

是否必填

默认值

备注

type

连接器的名称。

String

固定值为starrocks

name

Sink的显示名称。

String

无。

jdbc-url

JDBC连接的URL。

String

支持传入多个地址,使用英文逗号 (,) 分隔。例如 jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3

load-url

连接到FE节点的HTTP服务URL。

String

支持传入多个地址,使用英文分号 (;) 分隔。例如 fe_host1:fe_http_port1;fe_host2:fe_http_port2

username

连接到 StarRocks时使用的用户名。

String

该用户至少需要具备对目标表的SELECT和INSERT权限。您可以使用StarRocks的GRANT命令赋予相应的权限。

password

连接到 StarRocks时使用的密码。

String

无。

sink.label-prefix

在进行Stream Load导入时使用的标签前缀。

String

无。

sink.connect.timeout-ms

建立HTTP连接时的超时时间。

Integer

30000

单位为毫秒,取值需要介于100 ~ 60000。

sink.wait-for-continue.timeout-ms

从服务器得到100 Continue请求前的超时时间)。

Integer

30000

单位为毫秒。取值需要介于3000 ~ 600000。

sink.buffer-flush.max-bytes

在将数据写入StarRocks前,最多可以在内存中缓存多大量的数据。

Long

157286400

单位为字节,取值需要介于64 MB ~ 10 GB。

说明
  • 该缓存大小被所有表共用。当缓冲区已满时,连接器将选择若干张表进行Flush。

  • 将此参数设置为较大的值可以提高吞吐量,但可能会增加导入时的延迟。

sink.buffer-flush.interval-ms

每张表连续两次Flush之间的间隔时间。

Long

300000

单位为毫秒。

sink.scan-frequency.ms

连续两次检查是否应该进行Flush之间的间隔时间。

Long

50

单位为毫秒。

sink.io.thread-count

在进行 Stream Load导入时的线程数量。

Integer

2

无。

sink.at-least-once.use-transaction-stream-load

是否使用Stream Load事务接口进行导入。

Boolean

true

仅在数据库支持的情况下生效。

sink.properties.*

提供给Sink的额外参数。

String

可以在STREAM LOAD查看支持的参数。

table.create.num-buckets

自动建表时的Bucket数量。

Integer

table.create.properties.*

在自动建表时需要传递的额外参数。

String

例如,可以传递'table.create.properties.fast_schema_evolution' = 'true'来启用快速表结构变更功能。参数详情请参见StarRocks文档

table.schema-change.timeout

执行表结构变更的超时时间。

Duration

30 min

必须设定为整数秒。

说明

如果某个表结构变更操作耗时超过此限制,作业将运行失败。

类型映射

说明

StarRocks并不支持所有的CDC YAML类型,尝试将不支持的类型写入下游会导致作业失败。您可以使用Transform CAST内置函数对不支持的数据进行转换,或是使用Projection语句将其从结果表中移除。详情请参考数据摄入开发参考

CDC类型

StarRocks类型

附注

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP

DATETIME

TIMESTAMP_LTZ

DATETIME

CHAR(n)

(n <= 85 时)

CHAR(n × 3)

CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的StarRocks CHAR类型长度为原来的3倍。

说明

StarRocks的CHAR类型长度最长不可超过255,因此只有长度不超过85的CDC CHAR类型才会被映射到StarRocks CHAR类型。

CHAR(n)

(n > 85 时)

VARCHAR(n × 3)

CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的 StarRocks CHAR类型长度为原来的3倍。

说明

StarRocks的CHAR类型长度最长不可超过255,因此长度大于85的CDC CHAR类型会被映射到StarRocks VARCHAR类型。

VARCHAR(n)

VARCHAR(n × 3)

CDC中的CHAR类型长度表示字符数,而StarRocks中的CHAR类型长度表示UTF-8编码后的字节数。通常情况下,一个中文字符经过UTF-8编码后不会超过3字节,因此映射到的StarRocks CHAR类型长度为原来的3倍。