全部产品
Search
文档中心

实时计算Flink版:实时数仓Hologres

更新时间:Oct 15, 2024

本文为您介绍如何使用实时数仓Hologres连接器。

背景信息

实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

  • 源表:

    • numRecordsIn

    • numRecordsInPerSecond

  • 结果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    说明

    指标含义详情,请参见监控指标说明

API种类

Datastream和SQL

是否支持更新或删除结果表数据

特色功能

源表

功能

详情

实时消费Hologres

  • 支持作为非Binlog Source读取Hologres全量数据。

  • 支持实时消费Hologres的Binlog数据。

    • 支持非CDC模式消费。

    • 支持CDC模式消费。

    • 支持全增量一体源表消费。

获取更多信息,详情请参见Flink/Blink实时消费Hologres Binlog

结果表

功能

详情

流式语义

支持写入Changelog消息。

宽表Merge和局部更新功能

只更新修改部分的数据,而非整行更新。

作为CTAS和CDAS的目标端

支持实时同步单表、整库的数据以及相应的表结构变更到Hologres表中。

插入部分列

说明

仅实时计算引擎VVR 6.0.7及以上版本支持。

支持将Flink INSERT DML中指定的列名下推给连接器,从而仅更新指定的列。

前提条件

已创建Hologres表,详情请参见创建Hologres表

使用限制

  • 通用:

  • 源表独有:

    • Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。从VVR 3.0.0版本开始,支持实时消费Hologres数据,详情请参见实时计算Flink版实时消费Hologres。从VVR 6.0.3版本开始,Hologres源表在批模式读取时支持filter下推,详见源表参数enable_filter_push_down

    • 实时计算引擎8.0以下版本Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?

  • 结果表独有:无。

  • 维表独有:

    • 维表建议使用主键作为Join条件,对于此类主键点查的维表,创建Hologres表时建议选择行存模式,列存模式对于点查场景性能开销较大。选择行存模式创建维表时必须设置主键,并且将主键设置为Clustering Key才可以工作。详情请参见CREATE TABLE

    • 如果业务需要,无法使用主键作为Join条件,对于此类非主键点查的维表(即一对多的查询),创建Hologres表时建议选择列存模式,并合理设置分布键Distribution Key以及Event Time Column(Segment Key)以优化查询性能,详情请参见表存储格式:列存、行存、行列共存

    • VVR 4.0以下版本仅支持对维表主键点查的维表Join,VVR 4.0及以上版本,jdbc模式支持维表的非主键点查。

注意事项

  • 使用了rpc模式时,VVR版本升级注意事项:

    Hologres 2.0版本下线了rpc(用于维表与结果表)模式,全面转为jdbc相关模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重,或者升级到VVR 8.0.5版本配置deduplication.enabled参数为false。

    如果您作业中存在使用了rpc模式读写Hologres的情况,此时如果您需要将VVR 4.x升级到VVR 6.x或VVR 8.x,请按照以下情况进行处理:

    • 升级到VVR 6.0.4~6.0.6版本,可能会抛出异常。推荐维表和结果表使用jdbc_fixed或jdbc模式。

    • 升级到VVR 6.0.7及以上版本,无需您做任何处理,Flink系统会自动替换rpc为jdbc相关模式。

  • 使用binlog源表且未指定jdbc模式时,VVR版本升级注意事项:

    Hologres 2.0版本开始有限支持holohub(用于Binlog源表)模式,Hologres 2.1版本彻底下线了holohub模式,全面转为jdbc模式。

    如果您作业中存在消费binlog源表的情况,而且binlog源表未通过sdkmode='jdbc'指定jdbc模式,默认会使用holohub模式。此时如果您需要将VVR 4.x升级到VVR 6.x或VVR 8.x,请按照以下情况进行处理:

    • 如果Hologres版本是2.0。

      • 升级到VVR 6.0.7~VVR 8.0.3版本,仍然可以继续使用holohub模式。

      • 升级到VVR 8.0.4及以上版本,可能抛出权限不足的异常,需要特别配置用户权限,详情见实时计算Flink版实时消费Hologres

    • 如果Hologres版本是2.1。

      • 升级到VVR 6.0.7~VVR 8.0.4版本,可能无法正常消费Binlog,建议升级到VVR 8.0.5。

      • 升级到VVR 8.0.5及以上版本,无需您做任何处理,Flink系统会自动替换holohub模式为jdbc模式。

语法结构

CREATE TABLE hologres_table (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'hologres',
  'dbname' = '<yourDBName>',
  'tablename' = '<yourTableName>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint' = '<yourEndpoint>',
  'sdkmode' = 'jdbc'
);

WITH参数

说明

仅Flink实时计算引擎VVR 4.0.11及以上版本支持所有jdbc开头的参数。

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为hologres

    dbname

    数据库名称。

    String

    Hologres V2.0版本推出了全新的弹性高可用实例形态,将计算资源分解为不同的计算组(Virtual Warehouse),更好的服务于高可用部署。不同的计算组使用相同的Endpoint,您可以通过在dbname参数后添加特定的后缀来指定连接某个计算组。例如某张维表希望连接特定的计算组read_warehouse,可以通过'dbname' = 'db_test@read_warehouse' 方式指定。

    说明

    仅JDBC相关模式支持使用计算组,详见源表、维表和结果表WITH参数中的sdkMode参数。

    tablename

    表名称。

    String

    如果Schema不为Public时,则tablename需要填写为schema.tableName

    username

    用户名,请填写阿里云账号的AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理

    password

    密码,请填写阿里云账号的AccessKey Secret。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理

    endpoint

    Hologres服务地址。

    String

    详情请参见访问域名

    connection.ssl.mode

    是否启用SSL(Secure Sockets Layer)传输加密,以及启用采用何种模式。

    String

    disable

    参数取值如下:

    • disable(默认值):不启用传输加密。

    • require:启用SSL,只对数据链路加密。

    • verify-ca:启用SSL,加密数据链路,同时使用CA证书验证Hologres服务端的真实性。

    • verify-full:启用SSL,加密数据链路,使用CA证书验证Hologres服务端的真实性,同时比对证书内的CN或DNS与连接时配置的Hologres连接地址是否一致。

    说明
    • VVR 8.0.5及以上版本开始支持此参数。

    • Hologres自1.1版本起支持SSL传输加密的require模式,2.1版本起新增支持verify-ca和verify-full模式。详见传输加密

    • 当配置为verify-ca或者verify-full时,需要同时配置connection.ssl.root-cert.location参数。

    connection.ssl.root-cert.location

    当传输加密模式需要证书时,配置证书的路径。

    String

    当connection.ssl.mode配置为verify-ca或者verify-full时,需要同时配置CA证书的路径。证书可以使用实时计算控制台的资源上传功能上传至平台,上传后文件存放在/flink/usrlib目录下。例如,需要使用的CA证书文件名为certificate.crt,则上传后参数取值应该为 '/flink/usrlib/certificate.crt'

    说明

    jdbcRetryCount

    当连接故障时,写入和查询的重试次数。

    Integer

    10

    无。

    jdbcRetrySleepInitMs

    每次重试的固定等待时间。

    Long

    1000

    实际重试的等待时间的计算公式为jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。单位为毫秒。

    jdbcRetrySleepStepMs

    每次重试的累加等待时间。

    Long

    5000

    实际重试的等待时间的计算公式为jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。单位为毫秒。

    jdbcConnectionMaxIdleMs

    JDBC连接的空闲时间。

    Long

    60000

    超过这个空闲时间,连接就会断开释放掉。单位为毫秒。

    jdbcMetaCacheTTL

    本地缓存TableSchema信息的过期时间。

    Long

    60000

    单位为毫秒。

    jdbcMetaAutoRefreshFactor

    如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。

    Integer

    4

    缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。

    触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。

    type-mapping.timestamp-converting.legacy

    Flink和Hologres之间是否进行时间类型的相互转换。

    Boolean

    true

    参数取值如下:

    • true:不进行相互转换。时区转换将采用运行环境中的JVM时区。

    • false(推荐):进行相互转换。时区转换将使用Flink所配置的时区。

    说明
    • 仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

    • Flink和Hologres的时区详情,请参见Flink与Hologres时区说明

    • property-version=0时,默认值为true;property-version=1时,默认值为false。

    property-version

    Connector参数版本。

    Integer

    0

    可填的值为0和1,默认值为0。

    说明
    • 仅VVR 8.0.6及以上版本支持配置该参数。

    • 在不同参数版本里,可用的参数集合和参数的默认值可能不同。如果存在区别,区别详情会在参数的说明部分描述。

    • 推荐使用参数版本1。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    field_delimiter

    导出数据时,不同行之间使用的分隔符。

    String

    "\u0002"

    无。

    binlog

    是否消费Binlog数据。

    Boolean

    false

    参数取值如下:

    • true:消费Binlog数据。

    • false(默认值):不消费Binlog数据。

    说明

    实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。

    • property-version=0时,默认值为false。

    • property-version=1时,默认值为true。

    sdkMode

    SDK模式。

    String

    holohub

    参数取值如下:

    • holohub(默认值):使用holohub模式消费binlog。

    • jdbc:使用jdbc模式消费binlog。

    • jdbc_fixed: 使用fixed jdbc模式消费binlog,与jdbc模式的区别在于不受连接数限制。目前此模式暂不支持消费开起数据脱敏功能的database的binlog,详情请参见数据脱敏

    详情请参见Binlog Source表

    版本推荐参数如下:

    VVR 6.0.3及以下版本:不支持配置该参数。

    VVR 6.0.4~6.0.6版本:推荐使用默认配置,即holohub。

    VVR 6.0.7及以上版本:推荐配置为jdbc。

    VVR 8.0.4~8.0.5版本:系统会自动切换为jdbc。

    VVR 8.0.6及以上版本:系统会根据Hologres版本进行自动切换。

    • 2.1.27及以上版本,为jdbc_fixed模式。

    • 2.1.0~2.1.26版本,为jdbc模式

    • Hologres实例为2.0版本,Flink系统会尝试使用jdbc模式,如果存在权限不足等异常,会自动选择holohub模式启动。

    说明
    • VVR 6.0.7及以上版本:

      • Hologres实例为2.0以下版本,Flink系统采用您配置的参数值。

      • Hologres实例为2.0及以上版本,由于Hologres 2.0版本下线了holohub服务,此时如果您配置了holohub,Flink系统自动切换为jdbc。但是如果您配置为jdbc,Flink系统采用jdbc。

    • VVR 8.0.4~8.0.5版本:

    jdbcBinlogSlotName

    JDBC模式的binlog源表的Slot名称。创建方法请参见JDBC模式Binlog源表

    String

    仅在sdkMode配置为jdbc时生效,如果用户未配置,连接器会默认创建一个Slot来使用。详见JDBC模式Binlog源表

    说明

    Hologres实例2.1版本起,不再需要配置此参数,连接器也不会尝试自动创建。

    binlogMaxRetryTimes

    读取Binlog数据出错后的重试次数。

    Integer

    60

    无。

    binlogRetryIntervalMs

    读取Binlog数据出错后的重试时间间隔。

    Long

    2000

    单位为毫秒。

    binlogBatchReadSize

    批量读取Binlog的数据行数。

    Integer

    100

    无。

    cdcMode

    是否采用CDC模式读取Binlog数据。

    Boolean

    false

    参数取值如下:

    • true:CDC模式读取Binlog数据。

    • false(默认值):非CDC模式读取Binlog数据。

    说明

    实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。

    • property-version=0时,默认值为false。

    • property-version=1时,默认值为true。

    upsertSource

    源表是否使用upsert类型的Changelog。

    Boolean

    false

    仅在CDC模式下生效。参数取值如下:

    • true:仅支持Upsert类型,包括INSERT、DELETE和UPDATE_AFTER。

    • false(默认值):支持所有类型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

    说明

    如果下游包含回撤算子(例如使用ROW_NUMBER OVER WINDOW去重),则需要设置upsertSource为true,此时源表会以Upsert方式从Hologres中读取数据。

    binlogStartupMode

    Binlog数据消费模式。

    String

    earliestOffset

    参数取值如下:

    • initial:先全量消费数据,再读取Binlog开始增量消费。

    • earliestOffset(默认值):从最早的Binlog开始消费。

    • timestamp:从设置的startTime开始消费Binlog。

    说明

    如果设置了startTime或者在启动界面选择了启动时间,则binlogStartupMode强制使用timestamp模式,其他消费模式不生效,即startTime参数优先级更高。

    说明
    • 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    • 实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。

    • property-version=0时,默认值为false。

    • property-version=1时,默认值为true。

    startTime

    启动位点的时间。

    String

    格式为yyyy-MM-dd hh:mm:ss。如果没有设置该参数,且作业没有从State恢复,则从最早的Binlog开始消费Hologres数据。

    jdbcScanFetchSize

    扫描时攒批大小。

    Integer

    256

    无。

    jdbcScanTimeoutSeconds

    扫描操作超时时间。

    Integer

    60

    单位为秒。

    jdbcScanTransactionSessionTimeoutSeconds

    扫描操作所在事务的超时时间。

    Integer

    600秒(0表示不超时)

    对应Hologres的GUC参数idle_in_transaction_session_timeout,详情请参见GUC参数

    说明

    仅实时计算引擎Flink1.13-vvr-4.0.15及以上版本支持该参数。

    enable_filter_push_down

    全量读取阶段是否进行filter下推。

    Boolean

    false

    参数取值如下:

    • false(默认值):不进行filter下推。

    • true:读取全量数据时,将支持的过滤条件下推到Hologres执行。包括非Binlog Source全量读取以及Binlog Source使用全增量一体化消费模式时的全量阶段。

      重要

      实时计算引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5默认会进行filter下推,但如果作业使用了hologres维表,且写入的DML中包含有对维表非主键字段的过滤条件时,维表的filter会被错误的下推,可能导致维表join出现错误结果。因此推荐使用实时计算引擎Flink1.15-vvr-6.0.6及以上版本,并在源表增加此参数来开启过滤条件下推功能。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sdkMode

    SDK模式。

    String

    jdbc

    参数取值如下:

    • jdbc:默认值,表示使用jdbc模式进行写入。

    • jdbc_copy:是否使用fixed copy方式写入。

      fixed copy是hologres1.3新增的能力,相比通过insert方法(jdbc模式)进行写入,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批)。但此模式暂不支持delete数据,也不支持写入分区父表,不支持ignoreNullWhenUpdate参数。

    • rpc:表示使用rpc模式进行写入,与useRpcMode参数一致,与jdbc模式的区别在于不占用连接数,并且不支持写入Hologres的JSONB、RoarinBitmap类型。

    • jdbc_fixed(beta功能):表示使用fixed jdbc方式进行写入,

      需要Hologres引擎版本大于等于1.3,与jdbc模式的区别在于不占用连接数,不支持写入Hologres的Jsonb,RoarinBitmap类型。目前此模式暂不支持写入开启数据脱敏功能的database,详情请参见数据脱敏

    版本推荐参数如下:

    VVR 6.0.3以下版本:不支持配置该参数。

    VVR 6.0.4及以上版本:推荐配置为jdbc。

    说明
    • VVR 6.0.7~VVR 8.0.1版本:

      • 如果Hologres实例为2.0以下版本,Flink系统采用您配置的参数值。

      • 如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为rpc,Flink系统将自动切换该参数值为jdbc_fixed,但是如果您配置为其他值,Flink系统将采用您配置的参数值。

      • rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重。

    • VVR 8.0.3及以上版本

      • 自此版本开始,无论Hologres实例版本,都不再支持rpc模式,如果选择rpc模式,将自动切换该参数值为jdbc_fixed且设置'jdbcWriteBatchSize'='1'防止去重。

    bulkload

    是否采用bulkload写入。

    Boolean

    false

    仅在sdkMode设置为jdbc_copy时生效。bulkload写入目前仅适用于无主键表或者主键保证不重复的有主键表(主键重复会抛出异常),相比默认的jdbc_copy,写入使用更少的Hologres资源。

    说明

    仅实时计算引擎VVR 8.0.5及以上版本且Hologres实例为2.1及以上版本支持该参数。

    useRpcMode

    是否通过RPC方式使用Hologres连接器。

    Boolean

    false

    参数取值如下:

    • true:使用RPC方式使用Hologres连接器。

      与sdkMode参数设置为rpc效果相同,通过RPC方式会降低SQL连接数。

    • false(默认值):使用JDBC方式使用Hologres连接器。

      通过JDBC方式会占用SQL连接,导致JDBC连接数增加。

    说明
    • Hologres 2.0版本下线了rpc模式,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。

    • 实时计算引擎VVR 6.0.7及VVR 8.0.1版本在检测到Hologres实例是2.0及以上版本时,会自动切换rpc模式为jdbc_fixed模式。

    • 实时计算引擎VVR 8.0.3及以上版本会自动切换rpc模式为jdbc_fixed模式。

    • rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据变化记录,推荐使用实时计算引擎VVR 8.0.5及以上版本,jdbc模式可以配置deduplication.enabled参数为false不进行去重。

    • property-version=1时,该参数下线。

    mutatetype

    数据写入模式。

    String

    insertorignore

    详情请参见流式语义

    说明

    实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。

    • property-version=0时,默认值为insertorignore。

    • property-version=1时,默认值为insertorupdate。

    partitionrouter

    是否写入分区表。

    Boolean

    false

    无。

    createparttable

    当写入分区表时,是否根据分区值自动创建不存在的分区表。

    Boolean

    false

    RPC模式下,如果分区值中存在短划线(-),暂不支持自动创建分区表。

    说明
    • Hologres实例1.3.22及以上版本开始支持使用Date类型做分区键。实时计算引擎VVR 8.0.3及以上版本,支持使用Date类型做分区键时自动创建分区表。

    • 请确保分区值不会出现脏数据,否则会创建错误的分区表导致Failover,建议慎用该参数。

    • 当sdk_mode设置为jdbc_copy时,不支持写入分区父表。

    ignoredelete

    是否忽略撤回消息。

    Boolean

    true

    说明
    • 仅在使用流式语义时生效。

    • 实时计算引擎VVR 8.0.8及以上版本推荐使用sink.delete-strategy参数替换该参数。两个参数同时配置时,只有sink.delete-strategy参数生效。

    • 实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。

      • property-version=0时,默认值为true。

      • property-version=1时,默认值为false。

    sink.delete-strategy

    撤回消息的处理策略。

    String

    参数取值如下:

    • IGNORE_DELETE:忽略Update Before和Delete消息。适用于仅需插入或更新数据,而无需删除数据的场景。

    • NON_PK_FIELD_TO_NULL:忽略Update Before消息,并将Delete消息执行为将非主键字段更新为null。适用于希望在局部更新操作中执行删除操作而不影响其他列的场景。

    • DELETE_ROW_ON_PK:忽略Update Before消息,并将Delete消息执行为根据主键删除整行。适用于在局部更新过程中,希望执行删除整行操作,从而影响其他列的场景。

    • CHANGELOG_STANDARD:Flink框架按照 Flink SQL Changelog的工作原理运行,不忽略删除操作,并通过先删除数据再插入的方式执行更新操作,以确保数据准确性。适用于不涉及局部更新的场景

    说明
    • 仅实时计算引擎VVR 8.0.8及以上版本支持该参数。

    • 启用NON_PK_FIELD_TO_NULL选项可能会导致记录中只有主键,其他所有列都为null。

    connectionSize

    单个Flink结果表任务所创建的JDBC连接池大小。

    Integer

    3

    如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。

    jdbcWriteBatchSize

    JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。

    Integer

    256

    单位为数据行数。

    说明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

    jdbcWriteBatchByteSize

    JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。

    Long

    2097152(2*1024*1024)字节,即2 MB

    说明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

    jdbcWriteFlushInterval

    JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。

    Long

    10000

    单位为毫秒。

    说明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

    ignoreNullWhenUpdate

    mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。

    Boolean

    false

    取值说明如下:

    • false(默认值):将Null值写到Hologres结果表里。

    • true:忽略更新写入数据中的Null值。

    说明
    • 仅Flink计算引擎VVR 4.0及以上版本支持该参数。

    • 当sdk_mode设置为jdbc_copy时,不支持此参数。

    connectionPoolName

    连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。

    String

    取值为非'default'的任意字符串。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectionSize参数也需要相同。

    说明
    • VVR 4.0.12以下版本:不支持配置该参数。

    • VVR 4.0.12~VVR 8.0.3版本:默认不共享,每个表使用自己的连接池。

    • VVR 8.0.4以上版本:同一个作业中endpoint相同的表会默认共享连接池。作业中表数量较多时连接数可能相对不足影响性能,这种情况下推荐为不同的表设置不同的connectionPoolName。

    • 此参数可以按需配置,比如作业中有维表A,B以及结果表C,D,E五张hologres表,可以A表和B表使用'pool1',C表和D表使用'pool2',E表流量较大,单独使用'pool3'。

    jdbcEnableDefaultForNotNullColumn

    如果将Null值写入Hologres表中Not Null且无默认值的字段,是否允许连接器帮助填充一个默认值。

    Boolean

    true

    参数取值如下:

    • true(默认值):允许连接器填充默认值并写入,规则如下。

      • 如果字段是String类型,则默认写为空("")。

      • 如果字段是Number类型,则默认写为0。

      • 如果是Date、timestamp或timestamptz时间类型字段,则默认写为1970-01-01 00:00:00

    • false:不填充默认值,写Null到Not Null字段时,会抛出异常。

    remove-u0000-in-text.enabled

    如果写入时字符串类型包含\u0000非法字符,是否允许连接器帮助去除。

    Boolean

    false

    参数取值如下:

    • false(默认值):连接器不对数据进行操作,但碰到脏数据时写入可能抛出如下异常,ERROR: invalid byte sequence for encoding "UTF8": 0x00

      此时需要在源表提前处理脏数据,或者在SQL中定义脏数据处理逻辑。

    • true:连接器会帮助去除字符串类型中的\u0000,防止写入抛出异常。

    重要
    • 实时计算引擎VVR 8.0.1及以上版本,仅sdkMode='jdbc'场景下支持该参数。

    • 实时计算引擎VVR 8.0.8及以上版本,仅sdkMode='jdbc_copy'sdkMode='jdbc'场景下支持该参数。

    partial-insert.enabled

    是否只插入INSERT语句中定义的字段。

    Boolean

    false

    参数取值如下:

    • false(默认值):无论INSERT语句中声明了哪些字段,都会更新结果表DDL中定义的所有字段,对于未在INSERT语句中声明的字段,会被更新为null。

    • true:将INSERT语句中定义的字段下推给连接器,从而可以只对声明的字段进行更新或插入。

    说明
    • 仅实时计算引擎VVR 6.0.7及以上版本支持该参数。

    • 此参数仅在mutatetype参数配置为InsertOrUpdate时生效。

    deduplication.enabled

    jdbc及jdbc_fixed模式写入攒批过程中,是否进行去重。

    Boolean

    true

    参数取值如下:

    • true(默认值):如果一批数据中有主键相同的数据,默认进行去重,只保留最后一条到达的数据。以两个字段,其中第一个字段为主键的数据举例:

      • INSERT (1,'a')INSERT (1,'b')两条记录先后到达,去重之后只保留后到达的(1,'b')写入Hologres结果表中。

      • Hologres结果表中已经存在记录(1,'a'),此时DELETE (1,'a')INSERT (1,'b')两条记录先后到达,只保留后到达的(1,'b')写入hologres中,表现为直接更新,而不是先删除再插入。

    • false:在攒批过程中不进行去重,如果发现新到的数据和目前攒批的数据中存在主键相同的情况,先将攒批数据写入,写入完成之后再继续写入新到的数据。

    说明
    • 仅实时计算引擎VVR 8.0.5及以上版本支持该参数。

    • 不允许攒批去重时,极端情况下(例如所有数据的主键都相同)写入会退化为不攒批的单条写入,对性能有一定影响。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sdkMode

    SDK模式。

    String

    jdbc

    参数取值如下:

    • jdbc(默认值):表示使用jdbc模式进行查询,支持主键点查和非主键的查询,但是非主键的查询对性能影响较大,查询较慢。

    • rpc:表示使用rpc模式进行点查,与useRpcMode参数一致,仅支持主键点查,即维表的主键字段必须与Flink Join On的字段完全匹配,与jdbc模式的区别在于不占用连接数,不支持读取Hologres的Jsonb,RoarinBitmap类型。

    • jdbc_fixed:(beta功能,需要hologres引擎版本大于等于1.3)表示使用fixed jdbc方式进行点查,与jdbc模式的区别在于不占用连接数,且不支持读取Hologres的Jsonb,RoarinBitmap类型。仅支持主键点查,即维表的主键字段必须与Flink Join On的字段完全匹配。目前此模式暂不支持查询开起数据脱敏功能的database,详情请参见数据脱敏

    版本推荐参数如下:

    VVR 6.0.3以下版本:不支持配置该参数。

    VVR 6.0.4及以上版本:推荐配置为jdbc。

    说明
    • 在VVR 6.0.6版本,SDK模式选择为jdbc时,如果维表字符串类型的查询结果中包含null值,可能抛出空指针异常,此时推荐您使用rpc模式绕过。

    • VVR 6.0.7~VVR 8.0.1版本:

      • 如果Hologres实例为2.0以下版本,Flink系统将采用您配置的参数值。

      • 如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。

    • VVR 8.0.3及以上版本:

      • 自此版本开始,无论Hologres实例版本,都不再支持rpc模式,如果选择rpc模式,将自动切换该参数值为jdbc_fixed。

    useRpcMode

    是否通过RPC方式使用Hologres连接器。

    Boolean

    false

    参数取值如下:

    • true:使用RPC方式使用Hologres连接器。与sdkMode参数设置为rpc效果相同。通过RPC方式会降低SQL连接数。

    • false(默认值):使用JDBC方式使用Hologres连接器。

      通过JDBC方式会占用SQL连接,导致JDBC链接数增加。

    说明
    • Hologres 2.0版本下线了rpc了服务,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。

    • 实时计算引擎VVR 6.0.7及VVR 8.0.1版本在检测到Hologres实例是2.0及以上版本时,会自动切换rpc模式为jdbc_fixed模式。

    • 实时计算引擎VVR 8.0.3及以上版本会自动切换rpc模式为jdbc_fixed模式。

    connectionSize

    单个Flink维表任务所创建的JDBC连接池大小。

    Integer

    3

    如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。

    connectionPoolName

    连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。

    String

    取值为非'default'的任意字符串。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectionSize参数也需要相同。

    您可以按需配置此参数,例如作业中有维表A,B以及结果表C,D,E五张hologres表,可以A表和B表使用pool1,C表和D表使用pool2,E表流量较大,单独使用pool3。

    说明
    • VVR 4.0.12以下版本:不支持配置该参数。

    • VVR 4.0.12~VVR 8.0.3版本:默认不共享,每个表使用自己的连接池。

    • VVR 8.0.4以上版本:同一个作业中Endpoint相同的表会默认共享连接池。作业中表数量较多时连接数可能相对不足影响性能,这种情况下推荐为不同的表设置不同的connectionPoolName。

    jdbcReadBatchSize

    点查Hologres维表时,攒批处理的最大条数。

    Integer

    128

    无。

    jdbcReadBatchQueueSize

    维表点查请求缓冲队列大小。

    Integer

    256

    无。

    jdbcReadTimeoutMs

    维表点查的超时时间。

    Long

    默认值为0,表示不会超时

    仅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支持该参数。

    jdbcReadRetryCount

    维表点查超时时的重试次数。

    Interger

    • VVR 8.0.5以下版本:1

    • VVR 8.0.5及以上版本:10

    • 仅VVR 6.0.3以上版本支持该参数。

    • 本参数与jdbcRetryCount不同,后者是指连接发生异常时的重试次数。

    jdbcScanFetchSize

    在一对多join(即没有使用完整主键)时使用scan接口,scan攒批处理数据的条数。

    Integer

    256

    无。

    jdbcScanTimeoutSeconds

    scan操作的超时时间。

    Integer

    60

    单位为秒。

    cache

    缓存策略。

    String

    见备注列。

    Hologres仅支持None和LRU两种缓存策略,取值详情请参见背景信息

    说明

    Cache默认值和VVR版本有关:

    • VVR 4.x版本及以上版本,默认值为None。

    • VVR 4.x版本以下版本,默认值为LRU。

    cacheSize

    缓存大小。

    Integer

    10000

    选择LRU缓存策略后,可以设置缓存大小。单位为条。

    cacheTTLMs

    缓存更新时间间隔。

    Long

    见备注列。

    单位为毫秒。cacheTTLMs默认值和cache的配置有关:

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    cacheEmpty

    是否缓存join结果为空的数据。

    Boolean

    true

    • true(默认值):表示缓存join结果为空的数据。

    • false:表示不缓存join结果为空的数据。

    async

    是否异步返回数据。

    Boolean

    false

    • true:表示异步返回数据。

    • false(默认值):表示不进行异步返回数据。

    说明

    异步返回数据是无序的。

类型映射

Flink与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射

使用示例

源表示例

非Binlog Source表

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' --该参数可选。
  'sdkmode' = 'jdbc'
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) WITH (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT name, age, birthday
from hologres_source;

Binlog Source表

Hologres连接器支持实时消费Hologres,即实时消费Hologres的Binlog数据。Flink实时消费Hologres详情请参见实时计算Flink版实时消费Hologres

结果表示例

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);

INSERT INTO hologres_sink SELECT * from datagen_source;

维表示例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

特色功能详解

流式语义

流处理,也称为流数据或流事件处理,即对一系列无界数据或事件连续处理。执行流数据或流事件处理的系统通常允许您指定一种可靠性模式或处理语义,保证整个系统处理数据的准确性,因为网络或设备故障等可能会导致数据丢失。

根据Hologres Sink的配置和Hologres表的属性,流式语义分为以下两种:

  • Exactly-once(仅一次):即使在发生各种故障的情况下,系统只处理一次数据或事件。

  • At-least-once(至少一次):如果在系统完全处理之前丢失了数据或事件,则从源头重新传输,因此可以多次处理数据或事件。如果第一次重试成功,则不必进行后续重试。

在Hologres结果表中使用流式语义,需要注意以下几点:

  • 如果Hologres物理表未设置主键,则Hologres Sink使用At-least-once语义。

  • 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保Exactly-once语义。当同主键数据出现多次时,您需要设置mutatetype参数确定更新结果表的方式,mutatetype取值如下:

    • insertorignore(默认值):保留首次出现的数据,忽略后续所有数据。

    • insertorreplace:后出现的数据整行替换已有数据。

    • insertorupdate:更新已有数据的部分列。例如一张表有a、b、c和d四个字段,a是PK(Primary Key),写入Hologres时只写入a和b两个字段,在PK重复的情况下,系统只会更新b字段,c和d保持不变。

    说明
    • mutatetype设置为insertorreplaceinsertorupdate时,系统根据主键更新数据。

    • Flink定义的结果表中的数据列数不一定要和Hologres物理表的列数一致,您需要保证缺失的列没有非空约束,即列值可以为Null,否则会报错。

  • 默认情况下,Hologres Sink只能向一张表导入数据。如果导入数据至分区表的父表,即使导入成功,也会查询数据失败。您可以设置参数partitionRouter为true,开启自动将数据路由到对应分区表的功能。注意事项如下:

    • tablename参数需要填写为父表的表名。

    • 如果没有提前创建分区表,需要在WITH参数中启用createparttable=true,从而支持自动创建分区表,否则会导入失败。

宽表Merge和局部更新功能

在把多个流的数据写到一张Hologres宽表的场景中,会涉及到宽表Merge和数据的局部更新。下面通过示例来介绍实现宽表merge的两种方式。

说明

本场景的两种实现方式均具有如下限制:

  • 宽表必须有主键。

  • 每个数据流的数据都必须包含完整的主键字段。

  • 列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary Encoding功能。

方式一(推荐)

说明

仅实时计算引擎VVR 6.0.7及以上版本支持使用此方式。

假设有两个Flink数据流,一个数据流中包含a、b和c字段,另一个数据流中包含a、d和e字段,Hologres宽表WIDE_TABLE包含a、b、c、d和e字段,其中a字段为主键。具体操作如下:

  1. 使用Flink SQL创建一张Hologres结果表,并声明a、b、c、d、e五个字段,映射至宽表WIDE_TABLE。

  2. 结果表的属性设置:

    • mutatetype设置为insertorupdate,可以根据主键更新数据。

    • ignoredelete设置为true,防止回撤消息产生Delete请求。实时计算引擎VVR 8.0.8及以上版本推荐使用sink.delete-strategy配置局部更新时的各种策略。

  3. 将两个Flink数据流的数据分别INSERT至对应的结果表中。

    // 已经定义的source1和source2
    CREATE TEMPORARY TABLE hologres_sink ( -- 声明a,b,c,d,e五个字段
      a BIGINT, 
      b STRING,
      c STRING,
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>',  -- hologres宽表,包含a,b,c,d,e五个字段
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- 根据主键更新数据
      'ignoredelete'='true',            -- 忽略回撤消息产生的Delete请求
      'partial-insert.enabled'='true'   -- 开启部分列更新参数,支持仅更新INSERT语句中生命的字段
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 声明只插入a,b,c三个字段
    INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 声明只插入a,d,e三个字段
    END;

方式二

说明

仅实时计算引擎VVR 6.0.6及以下版本支持使用此方式。

假设有两个Flink数据流,一个数据流中包含a、b和c字段,另一个数据流中包含a、d和e字段,Hologres宽表WIDE_TABLE包含a、b、c、d和e字段,其中a字段为主键。具体操作如下:

  1. 使用Flink SQL创建两张Hologres结果表,其中一张表只声明a、b和c字段,另一张表只声明a、d和e字段。这两张表都映射至宽表WIDE_TABLE。

  2. 两张结果表的属性设置:

    • mutatetype设置为insertorupdate,可以根据主键更新数据。

    • ignoredelete设置为true,防止回撤消息产生Delete请求。实时计算引擎VVR 8.0.8及以上版本推荐使用sink.delete-strategy配置局部更新时的各种策略。

  3. 将两个Flink数据流的数据分别INSERT至对应的结果表中。

    // 已经定义的source1和source2
    
    CREATE TEMPORARY TABLE hologres_sink_1 ( -- 只声明a,b,c三个字段
      a BIGINT, 
      b STRING,
      c STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>',  -- hologres宽表,包含a,b,c,d,e五个字段
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype' = 'insertorupdate',    -- 根据主键更新数据
      'ignoredelete' = 'true'             -- 忽略回撤消息产生的Delete请求
    );
    
    CREATE TEMPORARY TABLE hologres_sink_2 ( -- 只声明a,d,e三个字段
      a BIGINT, 
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>',  -- hologres宽表,包含a,b,c,d,e五个字段
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- 根据主键更新数据
      'ignoredelete'='true'             -- 忽略回撤消息产生的Delete请求
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink_1 select * from source1;
    INSERT INTO hologres_sink_2 select * from source2;
    END;

作为CTAS和CDAS的目标端

Hologres支持实时同步单表或整库级别的数据,在同步过程之中如果上游的表结构发生了变更也会实时同步到Hologres表中。新数据流到Hologres表时,Flink会先触发Hologres修改相应的表结构,然后再将数据写入到相应的表中。以上过程全部由Flink自动完成,您无需关心实现细节,详情请参见CREATE TABLE AS(CTAS)语句数据库实时入仓快速入门

DataStream API

重要

通过DataStream的方式读写数据时,需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器。VVR 6.0.7请使用其中的1.15-vvr-6.0.7-1版本的依赖。VVR 8.0.7的依赖请通过ververica-connector-hologres-1.17-vvr-8.0.7.jar下载,在本地调试时,需要使用相应的Uber JAR,详见本地运行和调试包含连接器的作业,VVR 8.0.7对应的Uber JAR为ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar

Hologres源表

VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。

VVR 4.0.15

// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
  jdbcOptions.getTable(), schema.getFieldNames());

// 构建HologresBulkreadInputFormat读取表数据。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

VVR 6.0.7&VVR 8.0.7

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");

// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);

HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions,  schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

XML

Maven中央库中已经放置了Hologres DataStream连接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres Binlog源表

VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。

VVR 4.0.15

// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);

// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);

// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
  schema,
  config,
  jdbcOptions,
  recordConverter,
  startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();

VVR 6.0.7

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 设置或创建默认slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
  && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 构建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
  jdbcOptions.getTable(),
  schema,
  new HologresConnectionParam(config),
  cdcMode,
  Collections.emptySet());

// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
  new HologresConnectionParam(config),
  schema,
  config,
  jdbcOptions,
  startTimeMs,
  StartupMode.TIMESTAMP,
  recordConverter,
  "",
  -1);

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();

VVR 8.0.7

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
  new HologresConnectionParam(config),
  schema,
  config,
  jdbcOptions,
  startTimeMs,
  StartupMode.INITIAL,
  "",
  "",
  -1,
  Collections.emptySet());

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
说明
  • 方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。

  • Hologres Binlog注意事项和实现原理等详情,请参见Binlog Source表

Hologres结果表

VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。

VVR 4.0.15

// 初始化读取的表的Schema。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);

// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter =
  buildHologresWriter(schema, config, hologresConnectionParam);

// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);
env.execute();

VVR 6.0.7&VVR 8.0.7

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");

HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
  hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);

env.execute();
说明

方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。

Flink与Hologres时区说明

时间类型

产品

类型

说明

Flink

Flink TIMESTAMP

表示没有时区信息的日期和时间,描述年、 月、日、小时、分钟、秒和小数秒对应的时间戳。可以通过一个字符串来指定,例如1970-01-01 00:00:04.001

Flink TIMESTAMP_LTZ

用于描述时间线上的绝对时间点,使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数。epoch时间是从Java的标准epoch时间开始计算。在计算和可视化时, 每个TIMESTAMP_LTZ类型的数据都使用Session (会话)中配置的时区。可以用于跨时区的计算,因为它是一个基于epoch的绝对时间点,代表的就是不同时区的同一个绝对时间点。

相同的TIMESTAMP_LTZ值,在不同的时区可能会反映出不同的本地TIMESTAMP,例如:如果一个TIMESTAMP_LTZ值为2024-03-19T04:00:00Z,在上海时区(UTC+8)的本地时间戳会表示为2024-03-19T12:00:00,而在格林威治时区(UTC+0)则表示为2024-03-19T04:00:00

Hologres

TIMESTAMP

类似于Flink的TIMESTAMP类型,表示没有时区信息的日期和时间。存储数据时不会改变,即使客户端的时区发生变化,存储的值也保持不变。例如:2022-01-01 01:01:01.123456

TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ)

类似于Flink的TIMESTAMP_LTZ类型,它带有时区信息。Hologres存储TIMESTAMPTZ数据时,会将其转换为UTC时区的值。当查询数据时,Hologres会根据客户端的时区配置参数将UTC时区的值转换为客户端时区的值。

例如北京(UTC+8)时区的时间戳2022-02-01 10:33:20.125+08。在Hologres中存储为TIMESTAMPTZ类型时,其值会是2022-02-01 10:33:20.125+08

时间类型映射

  • 实时计算引擎VVR 8.0.6及以上版本且type-mapping.timestamp-converting.legacy=false时,支持所有时间类型间的相互转换。

    Flink

    Hologres

    详情

    TIMESTAMP

    TIMESTAMP

    之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。

    TIMESTAMP LTZ

    TIMESTAMPTZ

    TIMESTAMP

    TIMESTAMPTZ

    之间的转换涉及时区转换。为了在转换中保持准确性,需要通过配置项参数table.local-time-zone设置Flink时区,配置项参数设置方法请参见如何配置作业运行参数?

    例如当设置'table.local-time-zone': 'Asia/Shanghai'时,表示Flink时区为上海(+8时区)时,Flink TIMESTAMP类型的数据为2022-01-01 01:01:01.123456,写入Hologres TIMESTAMP TZ的数值为2022-01-01 01:01:01.123456+8。

    TIMESTAMP LTZ

    TIMESTAMP

  • 实时计算引擎VVR8.0.6及以上版本且type-mapping.timestamp-converting.legacy=true时或者VVR 8.0.5及以下版本,除TIMESTAMP间转化,其他类型相互转化可能会出现数据偏差问题。

    Flink

    Hologres

    备注

    TIMESTAMP

    TIMESTAMP

    之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。

    TIMESTAMP LTZ

    TIMESTAMPTZ

    读写Hologres数据时都当作无时区时间进行处理,可能会存在数据偏差

    例如,Flink TIMESTAMP_LTZ类型的数值为2024-03-19T04:00:00Z,在上海(+8时区)对应的实际无时区时间为2024-03-19T12:00:00,但是写入时将2024-03-19T04:00:00当作无时区时间,写入Hologres TIMESTAMPTZ的数值为2024-03-19T04:00:00+08,数值偏差8小时。

    TIMESTAMP

    TIMESTAMPTZ

    时区转换默认采用的是运行环境的JVM时区,而不是Flink时区,这与Flink内部计算的时区转换格式不同。当Flink时区与机器的JVM时区不一致时,会导致数据存在偏差,建议采用Flink时区进行Hologres数据的读写。

    TIMESTAMP LTZ

    TIMESTAMP

相关文档