Hologres与实时计算Blink独享模式(原产品线)深度融合,支持使用Connector的方式写入数据至Hologres结果表,您可以立即查询写入的数据。本文为您介绍实时计算Blink独享模式(原产品线)如何写入数据至Hologres结果表。
使用限制
不同Blink独享模式的版本开发语义不同,在使用之前,请先确定Blink独享模式的版本,并根据版本示例使用。
请确保开通的实时计算与Hologres地域一致,以免连接失败。
Blink独享模式3.6之前的版本未内置Hologres Connector,实时写入数据至Hologres需要引用JAR文件,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?。
说明建议您升级至3.6及以上的版本进行作业。
Blink独享模式3.7版本支持自动创建Hologres分区表,但是您需要在作业中配置
createparttable='true'
。同时,使用分区表的注意事项如下:Hologres当前仅支持List分区。
创建分区表时,需要显示指定的分区列。目前仅支持text和int4类型的分区列,并且分区的值不能包含短划线(-),例如
2020-09-12
。如果分区表配置了主键(pk),则分区列必须是pk的一部分。
创建分区子表时,子表分区列的值必须为固定值。
写入分区子表的数据对应的分区列值,必须与子表创建时定义的值严格匹配,否则会报错。
当前不支持DEFAULT分区功能。
当导入数据的Hologres目标表设置了主键,实时写入的默认语义不会按照主键进行更新,后续导入的主键数据如果重复,则会被丢弃。
Hologres为异步写入数据,您在进行作业时需要添加
blink.checkpoint.fail_on_checkpoint_error=true
配置,当作业发生异常时才会触发Failover。Blink3.7.6及以上版本不需要添加该参数。
DDL语义
创建Hologres结果表的语句如下。
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='<yourDbname>', --Hologres的数据库名称。
tablename='<yourTablename>', --Hologres用于接收数据的表名称。
username='<yourUsername>', --当前阿里云账号的AccessKey ID。
password='<yourPassword>', --当前阿里云账号的AccessKey Secret。
endpoint='<yourEndpoint>'); --当前Hologres实例VPC网络的Endpoint。
WITH参数
参数 | 描述 | 示例 |
type | 结果表的类型。 固定值为hologres。 | hologres |
endpoint | Hologres实例的VPC网络地址。 进入Hologres管理控制台,在目标实例详情页的网络信息区域获取Endpoint。Endpoint需包含端口号,格式为ip:port。 | demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80 |
username | AccessKey ID 您可以单击AccessKey 管理,获取AccessKey ID。 | xxxxm3FMWaxxxx |
password | AccessKey Secret 您可以单击AccessKey 管理,获取AccessKey Secret。 | xxxxm355fffaxxxx |
dbname | 当前Hologres的数据库名称。 | Holodb |
tablename | 当前Hologres数据库的表名称。 | blink_test |
arraydelimiter | Hologres Sink支持将一个STRING字段按照field_delimiter切分为数组导入Hologres。 默认值为\u0002。 | \u0002 |
mutatetype | 数据写入模式,详情请参见实时数仓Hologres结果表。 默认值为insertorignore。 | insertorignore |
ignoredelete | 是否忽略回撤消息。
说明 该参数仅在使用流式语义时生效。 默认为false。 通常Flink的Groupby会产生回撤消息,回撤消息传输到Hologres Connector时会产生Delete请求。 | false |
partitionrouter | 是否写入分区表。
默认为false。 | false |
createparttable | 当写入分区表时,是否根据分区值自动创建分区表。Blink独享 3.7及以上版本支持该功能。 默认值为false。 重要 请您谨慎使用该功能,确保分区值不会出现脏数据,从而导致创建了错误的分区表。 | false |
arraydelimiter、mutatetype、ignoredelete、partitionrouter及createparttable参数未在DDL示例语句中展示,如果您在实际应用中需要使用相应参数,可参考上述表格中的参数描述。
实时写入数据至Hologres普通结果表
Hologres创建表。
在Hologres中创建一张用于接收数据的表。示例建表SQL语句如下。
create table blink_test (a int, b text, c text, d float8, e bigint);
创建实时计算作业。
登录实时计算控制台。
创建实时计算作业。
实时计算Blink 3.6及以上版本已支持Hologres数据源,您可以直接调用,示例SQL语句如下。
create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random'); create table test ( a int, b VARCHAR, c VARCHAR, PRIMARY KEY (a) ) with ( type = 'hologres', `endpoint` = '$ip:$port', --当前Hologres的VPC网络地址以及端口号。 `username` = '当前阿里云账号的AccessKey ID', `password` = '当前阿里云账号的AccessKey Secret', `dbname` = '当前Hologres的数据库名称', `tablename` = 'blink_test'--当前Hologres接收数据的表名称。 ); insert into test select a,b,c from randomSource;
上线作业。
完成新建作业后,单击编辑框的语法检查,如果显示成功,则表明语法正确。
单击保存保存作业。
单击上线,提交作业至生产环境。根据业务情况填写作业的上线配置。
启动作业。
提交作业至生产环境后,您需要手动启动作业。
在阿里实时计算开发平台页面顶部菜单栏右侧,单击运维,跳转至运维界面,选择需要启动的作业,单击右上角的启动。
Hologres实时查询数据。
查询Hologres中用于接收数据的表,就可以实时获取到已写入的数据。示例查询SQL语句如下。
select * from blink_test;
如何使用宽表Merge/局部更新功能
对于常见的多个流的数据写入至一张Hologres宽表的场景,具体使用方法如下:
假设Hologres有一张宽表WIDE_TABLE,有A、B、C、D、E几列,其中A字段是主键,Flink一个流包含数据A、B、C,另一个流包含数据A、D、E。
使用Flink SQL声明两张Hologres结果表,其中一张表只声明字段A、B、C,另一张表只声明字段A、D、E,这两张表都映射至《WIDE_TABLE》。
两张结果表的mutatetype属性都设置成insertorupdate。
两张结果表的ignoredelete属性都设置成true,防止回撤消息产生Delete请求。
将两个流的数据分别Insert至两张结果表中。
该场景的具体使用限制如下:
宽表必须有主键。
每个流的数据都必须包含完整主键字段。
列存表的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary encoding。
实时写入数据至Hologres的分区结果表
Hologres支持通过调用实时数据API接口,直接将数据写入分区父表中,对应的分区数据将会自动路由至分区子表。您可以直接写入数据至分区表。实时数据API的描述,详情请参见实时数据API。
使用限制如下:
Hologres当前版本仅支持List分区。
创建分区表时,需要显示指定的分区列,分区列的类型仅支持text和 int4。
如果设置了主键,分区列必须为主键的一部分。
创建分区子表时,子表分区列的值必须为固定值。
写入分区子表的数据对应的分区列值,必须严格与创建子表时定义的值匹配,否则会报错。
Hologres当前不支持默认分区。
Hologres创建分区表。
在Hologres中创建一张用于接收数据的分区表,并创建对应的分区子表。示例建表SQL语句如下。
--创建分区父表test_message和对应的分区子表。 drop table if exists test_message; begin; create table test_message ( "bizdate" text NOT NULL, "tag" text NOT NULL, "id" int4 NOT NULL, "title" text NOT NULL, "body" text, PRIMARY KEY (bizdate,tag,id) ) PARTITION BY LIST (bizdate); commit;
说明执行命令时,
${bizdate}
参数需要替换为实际值。Blink独享模式3.7版本才支持自动创建分区,如果您使用的是Blink独享模式3.7以下的版本,需要在Hologres中提前创建分区子表,否则会导入数据失败。
Blink独享创建作业。
在Blink独享模式中创建作业的示例语句如下。
说明以下示例适用于独享在Blink独享模式3.7及以上版本。如果您使用的是在Blink独享模式3.7以下版本,请升级至3.7及以上版本,或者删除自动创建分区子表的配置
`createparttable` = 'true'
。create table test_message_src( tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'random', `interval` = '10', `count` = '100' ); create table test_message_sink ( bizdate VARCHAR, tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'hologres', `endpoint` = '$ip:$port', --Hologres实例的VPC网络地址。 `username` ='<AccessID>', --当前阿里云账号的AccessKey ID。 `password` = '<AccessKey>', --当前阿里云账号的AccessKey Secret。 `dbname` = '<DBname>', --当前Hologres的数据库名称。 `tablename` = '<Tablename>', --当前Hologres数据库的表名称。 `partitionrouter` = 'true', --写入数据至Hologres的分区表。 `createparttable` = 'true', --自动创建Hologres的分区子表。 ); insert into test_message_sink select "20200327",* from test_message_src; insert into test_message_sink select "20200328",* from test_message_src;
上线并启动作业。
请参考实时写入数据至Hologres结果表章节中的上线作业和启动作业步骤。
Hologres实时查询数据。
查询Hologres中用于接收数据的表,就可以实时获取到已写入的数据。示例查询SQL语句如下。
select * from test_message; select * from test_message where bizdate = '20200327';
数据类型映射
Blink独享与Hologres的数据类型映射,请参见数据类型汇总。