全部产品
Search
文档中心

实时计算Flink版:实时数仓Hologres数据摄入YAML连接器(公测中)

更新时间:Oct 10, 2024

本文为您介绍如何在数据摄入YAML作业中,使用实时数仓Hologres连接器进行数据同步。

背景信息

实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres YAML连接器支持的信息如下。

类别

详情

支持类型

数据摄入目标端(Sink)

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

  • numRecordsOut

  • numRecordsOutPerSecond

说明

指标含义详情,请参见监控指标说明

API种类

YAML

是否支持更新或删除结果表数据

功能说明

功能

详情

整库同步

支持实时同步整库(或者多张表)的全量和增量数据到每张对应的结果表中。

表结构变更同步

在实时同步整库数据的同时,还支持将每张源表的表结构变更(增加列、删除列、重命名列等)实时同步到结果表中。

分库分表合并

支持使用正则表达式定义库名,匹配数据源的多个分库下的源表,合并后同步到下游每张对应表名的结果表中。

分区表写入

支持将上游的一张表写入到Hologres分区表。

类型映射

采用多种数据映射策略,将上游数据类型映射为更宽的Hologres数据类型。

语法结构

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

参数说明

参数

说明

数据类型

是否必填

默认值

备注

type

sink类型。

String

固定值为hologres

name

sink名称。

String

无。

dbname

数据库名称。

String

无。

username

用户名,请填写阿里云账号的AccessKey ID。

String

详情请参见如何查看AccessKey ID和AccessKey Secret信息?

重要

为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理

password

密码,请填写阿里云账号的AccessKey Secret。

String

详情请参见如何查看AccessKey ID和AccessKey Secret信息?

重要

为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理

endpoint

Hologres服务地址。

String

详情请参见访问域名

jdbcRetryCount

当连接故障时,写入和查询的重试次数。

Integer

10

无。

jdbcRetrySleepInitMs

每次重试的固定等待时间。

Long

1000

单位为毫秒。实际重试的等待时间的计算公式为jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs

jdbcRetrySleepStepMs

每次重试的累加等待时间。

Long

5000

单位为毫秒。实际重试的等待时间的计算公式为jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs

jdbcConnectionMaxIdleMs

JDBC连接的空闲时间。

Long

60000

单位为毫秒。超过这个空闲时间,连接就会断开释放掉。

jdbcMetaCacheTTL

本地缓存TableSchema信息的过期时间。

Long

60000

单位为毫秒。

jdbcMetaAutoRefreshFactor

如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。

Integer

4

缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。

触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。

mutatetype

数据写入模式。

String

INSERT_OR_UPDATE

如果Hologres物理表已设置主键,则Hologres Sink通过主键确保Exactly-once语义。当同主键数据出现多次时,您需要设置mutatetype参数确定更新结果表的方式,mutatetype取值如下:

  • INSERT_OR_IGNORE:保留首次出现的数据,忽略后续所有数据。

  • INSERT_OR_REPLACE:后出现的数据整行替换已有数据。

  • INSERT_OR_UPDATE:更新已有数据的部分列。例如一张表有a、b、c和d四个字段,a是PK(Primary Key),写入Hologres时只写入a和b两个字段,在PK重复的情况下,系统只会更新b字段,c和d保持不变。

createparttable

当写入分区表时,是否根据分区值自动创建不存在的分区表。

Boolean

false

无。

sink.delete-strategy

撤回消息的处理方式。

String

参数取值如下:

  • IGNORE_DELETE:忽略Update Before和Delete消息。适用于仅需插入或更新数据,而无需删除数据的场景。

  • CHANGELOG_STANDARD:Flink框架按照 Flink SQL Changelog的工作原理运行,不忽略删除操作,并通过先删除数据再插入的方式执行更新操作,以确保数据准确性。适用于不涉及局部更新的场景

jdbcWriteBatchSize

JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。

Integer

256

单位为数据行数。

说明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

jdbcWriteBatchByteSize

JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。

Long

2097152(2*1024*1024)字节,即2 MB

说明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

jdbcWriteFlushInterval

JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。

Long

10000

单位为毫秒。

说明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

ignoreNullWhenUpdate

mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。

Boolean

false

参数取值如下:

  • false(默认值):将Null值写到Hologres结果表里。

  • true:忽略更新写入数据中的Null值。

jdbcEnableDefaultForNotNullColumn

如果将Null值写入Hologres表中Not Null且无默认值的字段,是否允许连接器帮助填充一个默认值。

Boolean

true

参数取值如下:

  • true(默认值):允许连接器填充默认值并写入,规则如下。

    • 如果字段是String类型,则默认写为空("")。

    • 如果字段是Number类型,则默认写为0。

    • 如果是Date、timestamp或timestamptz时间类型字段,则默认写为1970-01-01 00:00:00

  • false:不填充默认值,写Null到Not Null字段时,会抛出异常。

remove-u0000-in-text.enabled

如果写入时字符串类型包含\u0000非法字符,是否允许连接器帮助去除。

Boolean

false

参数取值如下:

  • false(默认值):连接器不对数据进行操作,但碰到脏数据时写入可能抛出如下异常,ERROR: invalid byte sequence for encoding "UTF8": 0x00

    此时需要在源表提前处理脏数据,或者在SQL中定义脏数据处理逻辑。

  • true:连接器会帮助去除字符串类型中的\u0000,防止写入抛出异常。

deduplication.enabled

jdbc及jdbc_fixed模式写入攒批过程中,是否进行去重。

Boolean

true

参数取值如下:

  • true(默认值):如果一批数据中有主键相同的数据,默认进行去重,只保留最后一条到达的数据。以两个字段,其中第一个字段为主键的数据举例:

    • INSERT (1,'a')INSERT (1,'b')两条记录先后到达,去重之后只保留后到达的(1,'b')写入Hologres结果表中。

    • Hologres结果表中已经存在记录(1,'a'),此时DELETE (1,'a')INSERT (1,'b')两条记录先后到达,只保留后到达的(1,'b')写入hologres中,表现为直接更新,而不是先删除再插入。

  • false:在攒批过程中不进行去重,如果发现新到的数据和目前攒批的数据中存在主键相同的情况,先将攒批数据写入,写入完成之后再继续写入新到的数据。

sink.type-normalize-strategy

数据映射策略。

String

STANDARD

当Hologres sink转换上游数据到Hologres类型时的策略。

  • STANDARD:根据标准将Flink CDC类型转换为PG类型。

  • BROADEN:将Flink CDC类型转换为更广泛的Hologres类型。

  • ONLY_BIGINT_OR_TEXT:将所有Flink CDC类型转换为Hologres中的BIGINT或STRING类型。

table_property.*

Hologres物理表属性。

String

创建Hologres表时,允许在WITH参数中设置物理表属性,合理的表属性设置可以有助于系统高效地组织和查询数据。

警告

table_property.distribution_key默认为主键值,不要轻易设置,会影响写入数据的正确性。

类型映射

通过配置项sink.type-normalize-strategy设置转换上游数据到Hologres类型时的策略。

说明
  • 建议您在首次启动YAML作业时开启sink.type-normalize-strategy。如果启动后再开启sink.type-normalize-strategy,需要删除下游表并且将作业无状态重启才能生效。

  • 目前数组类型仅支持INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。

  • Hologres不支持numeric类型作为主键,因此如果主键类型被映射为numeric,会被转化为varchar类型。

STANDARD

当sink.type-normalize-strategy为STANDARD时,类型映射如下:

Flink CDC类型

Hologres类型

CHAR

bpchar

STRING

text

VARCHAR

text(长度大于10485760时)

varchar(长度不大于10485760时)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

各种类型的数组

MAP

不支持

ROW

不支持

BROADEN

当sink.type-normalize-strategy为STANDARD时,将Flink CDC类型转换为更广泛的Hologres类型。数据映射如下:

Flink CDC类型

Hologres类型

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

各种类型的数组

MAP

不支持

ROW

不支持

ONLY_BIGINT_OR_TEXT

sink.type-normalize-strategy为ONLY_BIGINT_OR_TEXT时,将所有Flink CDC类型转换为Hologres中的BIGINT或STRING类型。类型映射如下:

Flink CDC类型

Hologres类型

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

各种类型的数组

MAP

不支持

ROW

不支持

分区表写入

Hologres Sink支持分区表写入,搭配Transform可以将上游数据写入到Hologres分区表中。写入时需要注意:

  • 分区键(Partition Key)必须为主键的一部分,如果采用上游非主键中的一个作为分区表,可能会导致上下游主键不一致。数据同步时,如果上下游主键不一致,会导致数据不一致。

  • Hologres支持将TEXTVARCHAR以及INT类型的数据作为分区键(Partition Key),V1.3.22及以上版本支持将DATE类型设为分区键。

  • 需要设置createparttable为true, 才能自动创建分区子表,否则用户需要手动创建分区子表。

分区表写入示例请参见分区表写入示例分区表写入

表结构变更同步

CDC Yaml Pipeline作业在处理表结构变更时有不同的策略,通过pipeline级别的配置项schema.change.behavior来设置。schema.change.behavior取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支持TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表结构变更,接下来会说明如何处理不同表结构变更事件(Schema Change Event)。

LENIENT(默认)

LENIENT模式下支持的Schema变更策略详情如下:

  • 添加可空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据。

  • 删除可空列:不会直接在结果表中删除该列,而是将该列的数据自动填充为NULL值。

  • 添加非空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据,新增的列会默认设置为可空列,对于添加列发生之前的数据自动设置为NULL值。

  • 重命名列:被看作为添加列和删除列。直接在结果表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。例如,如果col_a重命名为col_b,则会在结果表末尾添加col_b,并自动将col_a的数据填充为NULL值。

  • 列类型变更:不支持。由于Hologres不支持列类型变更,需要搭配sink.type-normalize-strategy使用。

  • 暂不支持同步以下Schema的变更:

    • 主键或索引等约束的变更。

    • 非空列的删除。

    • 从NOT NULL转为NULLABLE变更。

EVOLVE

EVOLVE模式下支持的Schema变更策略详情如下:

  • 添加可空列:支持

  • 删除可空列:不支持。

  • 添加非空列:会在结果表添加可空列。

  • 重命名列:支持,会在结果表将原有列重命名。

  • 列类型变更:不支持。由于Hologres不支持列类型变更,需要搭配sink.type-normalize-strategy使用。

  • 暂不支持同步以下Schema的变更:

    • 主键或索引等约束的变更。

    • 非空列的删除。

    • 从NOT NULL转为NULLABLE变更。

警告

在EVOLVE模式下,如果在未删除结果表的情况下无状态重启,有可能出现上游数据与结果表的结构不一致的情况导致作业失败,需要用户手动调整下游表结构。

开启EVOLVE模式示例请参见开启EVOLVE模式

代码示例

宽类型映射

通过配置项sink.type-normalize-strategy设置宽类型映射。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

分区表写入

将上游时间戳类型的create_time字段转化为日期类型,作为Hologres表的分区键。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

开启EVOLVE模式

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

单表同步

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

整库同步

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

分库分表合并

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

同步到指定schema

Hologres的Schema对应MySQL的Database,可以执行结果表的Schema。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r

pipeline:
  name: MySQL to Hologres Pipeline

不重启同步新增表

如果想在作业运行的过程中实时同步新增表,设置scan.binlog.newly-added-table.enable = true.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

重启新增存量表

如果想要新增同步存量表,设置scan.newly-added-table.enabled = true后重启作业。

警告

如果作业先设置scan.binlog.newly-added-table.enabled为true捕获新增表,不可以再通过scan.newly-added-table.enabled = true重启捕获存量表,否则会有数据重复发送。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

整库同步时排除部分表

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

相关文档