本文为您介绍如何在数据摄入YAML作业中,使用实时数仓Hologres连接器进行数据同步。
背景信息
实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres YAML连接器支持的信息如下。
类别 | 详情 |
支持类型 | 数据摄入目标端(Sink) |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | YAML |
是否支持更新或删除结果表数据 | 是 |
功能说明
功能 | 详情 |
支持实时同步整库(或者多张表)的全量和增量数据到每张对应的结果表中。 | |
在实时同步整库数据的同时,还支持将每张源表的表结构变更(增加列、删除列、重命名列等)实时同步到结果表中。 | |
支持使用正则表达式定义库名,匹配数据源的多个分库下的源表,合并后同步到下游每张对应表名的结果表中。 | |
支持将上游的一张表写入到Hologres分区表。 | |
采用多种数据映射策略,将上游数据类型映射为更宽的Hologres数据类型。 |
语法结构
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
参数说明
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
type | sink类型。 | String | 是 | 无 | 固定值为 |
name | sink名称。 | String | 否 | 无 | 无。 |
dbname | 数据库名称。 | String | 是 | 无 | 无。 |
username | 用户名,请填写阿里云账号的AccessKey ID。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。 |
password | 密码,请填写阿里云账号的AccessKey Secret。 | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理。 |
endpoint | Hologres服务地址。 | String | 是 | 无 | 详情请参见访问域名。 |
jdbcRetryCount | 当连接故障时,写入和查询的重试次数。 | Integer | 否 | 10 | 无。 |
jdbcRetrySleepInitMs | 每次重试的固定等待时间。 | Long | 否 | 1000 | 单位为毫秒。实际重试的等待时间的计算公式为 |
jdbcRetrySleepStepMs | 每次重试的累加等待时间。 | Long | 否 | 5000 | 单位为毫秒。实际重试的等待时间的计算公式为 |
jdbcConnectionMaxIdleMs | JDBC连接的空闲时间。 | Long | 否 | 60000 | 单位为毫秒。超过这个空闲时间,连接就会断开释放掉。 |
jdbcMetaCacheTTL | 本地缓存TableSchema信息的过期时间。 | Long | 否 | 60000 | 单位为毫秒。 |
jdbcMetaAutoRefreshFactor | 如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。 | Integer | 否 | 4 | 缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。 触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。 |
mutatetype | 数据写入模式。 | String | 否 | INSERT_OR_UPDATE | 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保Exactly-once语义。当同主键数据出现多次时,您需要设置mutatetype参数确定更新结果表的方式,mutatetype取值如下:
|
createparttable | 当写入分区表时,是否根据分区值自动创建不存在的分区表。 | Boolean | 否 | false | 无。 |
sink.delete-strategy | 撤回消息的处理方式。 | String | 否 | 无 | 参数取值如下:
|
jdbcWriteBatchSize | JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。 | Integer | 否 | 256 | 单位为数据行数。 说明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 |
jdbcWriteBatchByteSize | JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。 | Long | 否 | 2097152(2*1024*1024)字节,即2 MB | 说明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 |
jdbcWriteFlushInterval | JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。 | Long | 否 | 10000 | 单位为毫秒。 说明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。 |
ignoreNullWhenUpdate | 当mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。 | Boolean | 否 | false | 参数取值如下:
|
jdbcEnableDefaultForNotNullColumn | 如果将Null值写入Hologres表中Not Null且无默认值的字段,是否允许连接器帮助填充一个默认值。 | Boolean | 否 | true | 参数取值如下:
|
remove-u0000-in-text.enabled | 如果写入时字符串类型包含\u0000非法字符,是否允许连接器帮助去除。 | Boolean | 否 | false | 参数取值如下:
|
deduplication.enabled | jdbc及jdbc_fixed模式写入攒批过程中,是否进行去重。 | Boolean | 否 | true | 参数取值如下:
|
sink.type-normalize-strategy | 数据映射策略。 | String | 否 | STANDARD | 当Hologres sink转换上游数据到Hologres类型时的策略。
|
table_property.* | Hologres物理表属性。 | String | 否 | 无 | 创建Hologres表时,允许在WITH参数中设置物理表属性,合理的表属性设置可以有助于系统高效地组织和查询数据。 警告 table_property.distribution_key默认为主键值,不要轻易设置,会影响写入数据的正确性。 |
类型映射
通过配置项sink.type-normalize-strategy
设置转换上游数据到Hologres类型时的策略。
建议您在首次启动YAML作业时开启sink.type-normalize-strategy。如果启动后再开启sink.type-normalize-strategy,需要删除下游表并且将作业无状态重启才能生效。
目前数组类型仅支持INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。
Hologres不支持numeric类型作为主键,因此如果主键类型被映射为numeric,会被转化为varchar类型。
STANDARD
当sink.type-normalize-strategy为STANDARD时,类型映射如下:
Flink CDC类型 | Hologres类型 |
CHAR | bpchar |
STRING | text |
VARCHAR | text(长度大于10485760时) |
varchar(长度不大于10485760时) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | 各种类型的数组 |
MAP | 不支持 |
ROW | 不支持 |
BROADEN
当sink.type-normalize-strategy为BROADEN时,将Flink CDC类型转换为更广泛的Hologres类型。数据映射如下:
Flink CDC类型 | Hologres类型 |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | 各种类型的数组 |
MAP | 不支持 |
ROW | 不支持 |
ONLY_BIGINT_OR_TEXT
当sink.type-normalize-strategy
为ONLY_BIGINT_OR_TEXT时,将所有Flink CDC类型转换为Hologres中的BIGINT或STRING类型。类型映射如下:
Flink CDC类型 | Hologres类型 |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | 各种类型的数组 |
MAP | 不支持 |
ROW | 不支持 |
分区表写入
Hologres Sink支持分区表写入,搭配Transform可以将上游数据写入到Hologres分区表中。写入时需要注意:
分区键(Partition Key)必须为主键的一部分,如果采用上游非主键中的一个作为分区表,可能会导致上下游主键不一致。数据同步时,如果上下游主键不一致,会导致数据不一致。
Hologres支持将TEXT、VARCHAR以及INT类型的数据作为分区键(Partition Key),V1.3.22及以上版本支持将DATE类型设为分区键。
需要设置createparttable为true, 才能自动创建分区子表,否则用户需要手动创建分区子表。
示例请参见分区表写入示。
表结构变更同步
CDC Yaml Pipeline作业在处理表结构变更时有不同的策略,通过pipeline级别的配置项schema.change.behavior
来设置。schema.change.behavior
取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支持TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表结构变更,接下来会说明如何处理不同表结构变更事件(Schema Change Event)。
LENIENT(默认)
LENIENT模式下支持的Schema变更策略详情如下:
添加可空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据。
删除可空列:不会直接在结果表中删除该列,而是将该列的数据自动填充为NULL值。
添加非空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据,新增的列会默认设置为可空列,对于添加列发生之前的数据自动设置为NULL值。
重命名列:被看作为添加列和删除列。直接在结果表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。例如,如果col_a重命名为col_b,则会在结果表末尾添加col_b,并自动将col_a的数据填充为NULL值。
列类型变更:不支持。由于Hologres不支持列类型变更,需要搭配sink.type-normalize-strategy使用。
暂不支持同步以下Schema的变更:
主键或索引等约束的变更。
非空列的删除。
从NOT NULL转为NULLABLE变更。
EVOLVE
EVOLVE模式下支持的Schema变更策略详情如下:
添加可空列:支持
删除可空列:不支持。
添加非空列:会在结果表添加可空列。
重命名列:支持,会在结果表将原有列重命名。
列类型变更:不支持。由于Hologres不支持列类型变更,需要搭配sink.type-normalize-strategy使用。
暂不支持同步以下Schema的变更:
主键或索引等约束的变更。
非空列的删除。
从NOT NULL转为NULLABLE变更。
在EVOLVE模式下,如果在未删除结果表的情况下无状态重启,有可能出现上游数据与结果表的结构不一致的情况导致作业失败,需要用户手动调整下游表结构。
开启EVOLVE模式示例请参见开启EVOLVE模式。
代码示例
宽类型映射
通过配置项sink.type-normalize-strategy
设置宽类型映射。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
分区表写入
将上游时间戳类型的create_time字段转化为日期类型,作为Hologres表的分区键。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres Pipeline
开启EVOLVE模式
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolve
单表同步
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
整库同步
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
分库分表合并
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres Pipeline
同步到指定schema
Hologres的Schema对应MySQL的Database,可以执行结果表的Schema。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db2.user\.*r
pipeline:
name: MySQL to Hologres Pipeline
不重启同步新增表
如果想在作业运行的过程中实时同步新增表,设置scan.binlog.newly-added-table.enable = true.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
重启新增存量表
如果想要新增同步存量表,设置scan.newly-added-table.enabled = true后重启作业。
如果作业先设置scan.binlog.newly-added-table.enabled为true捕获新增表,不可以再通过scan.newly-added-table.enabled = true重启捕获存量表,否则会有数据重复发送。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
整库同步时排除部分表
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
相关文档
source、sink、transform和route模块的开发参考,详情请参见数据摄入开发参考。
数据摄入YAML作业开发的操作步骤,详情请参见数据摄入YAML作业开发(公测中)。