全部产品
Search
文档中心

实时数仓Hologres:使用Flink导入

更新时间:Dec 17, 2024

实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级高性能的实时大数据处理系统。Hologres与Flink深度连通,支持实时写入Flink的数据,可以实时查询写入的数据,帮助您快速搭建实时数仓。

形态说明

  • 阿里云实时计算Flink版不进行业务存储,所有的数据均来自于外部存储系统持有的数据。阿里云实时计算Flink版支持的数据存储类型如下:

    • 源表

    源表指输入至Flink的数据输入源。Flink的源表指定为Hologres时,使用的是批量导入而非流式导入,Hologres会将全表的数据统一扫描一次后再输出至下游,扫描完成后本次作业结束。

    • 维表

    维表一般适用于点查询场景(Lookup by Key),因此在Hologres中,维表建议使用行存储,并且JOIN的字段必须是完整的主键字段。

    • 结果表

    结果表用于接收并存放经过Flink计算的结果数据,为下游数据继续消费提供各类读写接口。

  • 阿里云实时计算Flink版还支持很多企业级高级能力,通过与Hologres深度集成,提供以下创新能力:

    • Hologres Binlog消费

    使用消息队列的模式消费Hologres表的Change Log。

    • Flink Catalog

    Flink支持导入Hologres元数据为Catalog,在Flink全托管控制台直接读取Hologres元数据,不用再手动注册Hologres表,可以提高开发效率且能保证表结构的正确性。

    • Schema Evolution

    Flink全托管支持Schema Evolution,在Flink读取JSON数据时,可以自动解析类型,自动创建对应表的列,支持数据模型的动态演化。

  • Hologres支持的Flink产品形态及功能如下表所示。

    产品形态

    数据存储类型

    企业级高级能力

    描述

    源表

    结果表

    维表

    Hologres Binlog消费

    Flink Catalog

    Schema Evolution

    Flink半托管

    支持行存储及列存储,Binlog源表建议使用行存储或行列共存。

    支持行存储及列存储

    建议使用行存储或行列共存

    支持

    支持

    支持

    使用EMR Studio开发平台。

    Blink独享(已停售)

    支持行存储及列存储,Binlog源表建议使用行存储或行列共存。

    支持行存储及列存储

    建议使用行存储或行列共存

    Hologres V0.8版本只支持行存储,V0.9及以上版本支持行存储及列存储。建议使用行存储。

    不支持

    不支持

    使用Bayes开发平台。

    推荐使用阿里云Flink全托管。

    开源Flink1.10

    不支持

    支持行存储及列存储

    不支持

    不支持

    不支持

    不支持

    -

    开源Flink1.11及以上版本

    不支持

    支持行存储及列存储

    建议使用行存储

    不支持

    不支持

    不支持

    从开源Flink1.11版本开始,Hologres代码已开源。详细内容请参见GitHub

    开源Flink1.13及以上版本

    支持批量源表

    支持行存储及列存储

    建议使用行存储

    不支持

    不支持

    不支持

    从开源Flink1.11版本开始,Hologres代码已开源。详细内容请参见GitHub

Hologres Connector Release Note

Flink版本

阿里云实时计算VVR版本

Hologres版本

更新信息

相关文档

1.17

8.0.9~8.0.10

2.1.x

2.2.x

3.0.x

  • 修复共享连接池时,注册新的client可能死锁的问题。

  • 消费binlog从状态恢复,不再强制检查table id。

实时计算Flink-Hologres连接器

1.17

8.0.8

2.1.x

2.2.x

结果表:

  • 新增sink.delete-strategy参数,在原有ignoredelete的基础上,丰富了对UPDATE BEFORE类型记录的处理方式。

实时计算Flink-Hologres连接器

1.17

8.0.7

2.1.x

维表:

  • 修复维表字段较多时,频繁获取元数据导致作业上线超时的问题。

通用:

  • 修复不同的表使用了多个不同的用户,共享连接池时抛出权限不足异常的问题。

实时计算Flink版实时消费Hologres

1.17

8.0.6

2.1.x

源表:

  • Hologres V2.1版本起下线Holohub模式,不再支持Flink通过Holohub模式消费Hologres Binlog。Flink VVR 8.0.6版本对应的Hologres Connector,支持在Hologres实例版本升级至V2.1后,自动将Holohub模式切换为JDBC模式,详情请参见Flink/Blink实时消费Hologres Binlog

通用:

  • 支持通过type-mapping.timestamp-converting.legacy参数启用正确读写Flink TIMESTAMP_LTZ类型,详情请参见Flink与Hologres时区说明

1.17

8.0.5

2.0.x

2.1.x

源表:

  • Hologres实例为2.1及以上版本时,使用JDBC消费Binlog不再需要创建slot,详情请参见详见通过JDBC消费Hologres Binlog。因此,从该版本开始,当判断Hologres实例的版本大于2.1时,也将不再自动创建publication和slot。

结果表:

  • 新增deduplication.enabled参数,默认为true。当设置为false时,表示结果表在写入聚合过程中可以选择不进行去重。该功能适用于对上游CDC流的完全回放等场景。

  • 无主键表支持bulkload写入,相比原有的jdbc_copy,写入时使用更少的Hologres资源。

通用:

  • 支持通过connection.ssl.mode以及connection.ssl.root-cert.location参数启用传输加密。

  • 为内部JDBC连接增加超时参数,以防止在服务端异常重启等情况下,客户端连接出现死等的问题。

1.17

8.0.4

2.0.x

2.1.x

源表:

  • 在使用JDBC消费Binlog时会自动创建publication。然而,当对表进行重建时,之前创建的publication并不会被自动删除,从而导致无法消费重建表的Binlog。在此版本中,针对这类情况进行了处理,系统将会自动删除残留的publication。

通用:

  • 为了增加连接池参数的默认值,现在同一任务中的Hologres维表和结果表将共享连接池。

1.17

8.0.3

2.0.x

2.1.x

通用:

  • 无论Hologres实例版本,维表和结果表都已不再支持RPC模式。若选择RPC模式,将自动切换为jdbc_fixed模式,如果实例版本较低,建议升级版本。

实时数仓Hologres连接器

1.15

6.0.7

  • 1.3.x

  • 2.0.x

  • 源表:

    适配Hologres V2.0版本,检测到用户连接到的是2.0及以上版本的Hologres实例,则自动切换Holohub Binlog模式为jdbc binlog模式。

  • 维表:

    适配Hologres V2.0版本,检测到用户连接到的是2.0及以上版本的Hologres实例,则自动切换RPC模式为jdbc_fixed模式。

  • 结果表:

    • 适配Hologres V2.0版本,检测到用户连接到的是2.0及以上版本的Hologres实例,则自动切换RPC模式为jdbc_fixed模式。

    • 新增部分列更新能力,仅支持插入Flink INSERT语句中声明的字段,使用此能力可以更加简洁地表达宽表Merge场景。

  • 通用:

    Connector中发生record convert异常时,会打印出来源数据以及目前convert的结果,方便排查脏数据问题。

  • 问题修复:

    • 修复用户作业中不同实例或者数据库使用相同connectionPoolName不抛出异常的问题。

    • 修复6.0.6版本中维表字符串类型有null值时抛出空指针的问题。

实时数仓Hologres连接器

1.15

6.0.6

1.3.x

源表:

  • JDBC模式消费Hologres Binlog不再强制设置Slotname参数,支持创建默认Slot,从Holohub模式切换更平滑。

  • 新增enable_filter_push_down参数,批量源表不再默认下推filter条件,需要设置此参数值为true

实时数仓Hologres连接器

1.15

6.0.5

1.3.x

  • 通用:作业启动时,会在TaskManager日志中打印所有参数信息,方便排查问题。

  • CTAS/CDAS:支持字段类型宽容模式,使用此模式上游发生数据类型修改事件时,只要所修改类型与原类型的归一化类型相同,都视作修改成功。

  • Hologres Catalog:丰富ALTER TABLE语法,支持修改Hologres物理表属性,支持修改表名、增加列、列重命名和修改列注释。

1.15

6.0.3~6.0.4

1.3.x

源表:

  • 新增JDBC模式消费Hologres Binlog,此模式支持更多的数据类型,且支持自定义账号。

  • 批量源表以及全增量源表的全量阶段,支持Filter下推。

结果表:

支持以Fixed Copy的模式写入,Fixed Copy是Hologres1.3版本新增的能力,相比通过JDBC模式进行写入,Fixed Copy方式可以实现更高的吞吐(因为是流模式)、更低的数据延时和更低的客户端内存消耗(因为不攒批)。

Hologres Catalog:

  • 支持创建Catalog时进行默认的表属性设置。

sdkMode参数:Hologres不同类型的表都有多种模式可以选择,此版本开始统一整理为sdkMode参数。

1.13

4.0.18

1.1及以上版本

修复Sink表上报Metrics影响写入性能的问题。

1.13以及1.15

4.0.15以及6.0.2

1.1及以上版本

源表:

  • 从该版本开始,批量源表默认大小写敏感。

  • 支持配置批量源表Scan操作所在事务的超时时间。

  • 修复批量源表解析复杂字符串可能失败的问题。

  • 全增量源表新增Upsert模式。

维表:

Hologres维表支持异步请求的超时时间(asyncTimeoutMs)设置。

结果表:

  • 支持PARTITION BY语法,CTAS创建Hologres结果表时可以通过PARTITION BY定义分区表。

  • Metrcis支持currentSendTime指标。

1.13

4.0.13

1.1及以上版本

  • 源表支持全增量一体源表消费。

  • 支持Datastream API。

1.13

4.0.11

0.10及以上版本

支持CTAS、CDAS。

1.13

4.0.8

0.10及以上版本

结果表、源表、维表支持Hologres Catalog。

管理Hologres Catalog

1.13

3.0.0

0.9及以上版本

支持实时消费Hologres。

Flink全托管

Hologres Connector相关已知缺陷和修复版本

  • 缺陷及修复说明

    • 影响版本会明确标注,不在范围内的没有对应缺陷。

    • 影响版本标记为“不涉及”,表示问题可能是Hologres引擎的缺陷,而非Connector的问题。

  • 缺陷等级说明

    • P0:建议立即升级,一旦触发会影响线上的使用(如查询正确性、写入成功率等操作)。

    • P1:推荐升级,提前规避相关问题。

    • P2:选择性升级,偶尔发生的问题,具备应该改写方法,或重启即可修复。

等级

缺陷描述

影响版本

修复版本

解决方法

P1

JDBC模式消费Binlog,出现Binlog Convert Failed异常,或者部分shard的数据读取停止在某个时刻。原因是Hologres实例的Gateway收到后端超时的异常信息时,将异常返回给客户端的过程中会存在问题,导致读取数据卡住或数据解析失败报错。

不涉及

不涉及

一般在作业反压时会更容易出现,如果作业存在数据读取卡住的问题,可以选择重启作业并从最近的checkpoint恢复。

要彻底解决该问题,需要将Hologres版本升级到2.2.21及以上版本。

P2

作业上线慢或者超时,查看Thread Dump卡在 GetTableSchema处。

不涉及

不涉及

可能存在多种情况,可以根据以下步骤依次排查:

  1. 请先验证Flink集群和Hologres实例的网络连通性

  2. 调整 jdbcRetryCount参数为1,防止内部重试无法看到异常根因。

  3. Hologres 2.0版本之前,在用户DDL频繁时,存在元数据清理不及时的问题,可能导致查询表的Meta变慢,建议将Hologres版本升级至2.1及以上版本。

P0

通过FixedFE(对应connector中的jdbc_fixed模式)写入Hologres的 Text,Json或Jsonb类型时,如果数据源有不合法的字符,可能抛出预期外的异常,进一步导致当前连接所在FE节点重启,当前FE的连接中断。

不涉及

不涉及

如果无法保证上游字符串的合法性,建议使用jdbc模式写入,并且为结果表开启

remove-u0000-in-text.enabled参数。

或者将Hologres版本升级至3.0及以上版本,以继续使用jdbc_fixed模式。

P1

在进行JDBC维表一对多join时,Flink Task出现内存使用过高或者OOM 的情况。

不涉及

不涉及

Hologres1.3版本,使用prefix scan,在查询结果大于jdbcScanFetchSize时,出现批量查询不结束的情况。绕过方法:将jdbcScanFetchSize设置为一个大值,例如100000

要彻底解决该问题,需要升级Hologres实例版本至 2.0及以上版本。

P1

Binlog作业有状态恢复时,抛出the table id parsed from checkpoint is different from the current table id的异常。原因是作业在之前运行过程中,进行了TRUNCATE或者重建表,而checkpoint中存储的是作业首次启动时的table id,导致与当前的table id不匹配。

8.0.4

8.0.9

从8.0.9版本开始,不再强制检查table id,而是仅输出WARN级别日志,以允许作业从最新状态恢复。但仍然不推荐在binlog表作业运行时进行重建表操作。对binlog来说,这类操作会导致之前的binlog完全丢失。

P2

作业运行过程中发生反压,查看ThreadDump发现Execution Pool卡在close或者start方法。原因是共享连接池多个client复用同一个连接池时,可能存在死锁,导致连接池无法正常关闭。

8.0.5

8.0.9

建议升级版本。

P2

如果对源表进行了DELETE FROM操作之后进行全增量消费,由于增量阶段没有数据,导致全量阶段会从头开始消费所有的binlog。

8.0.6及之前版本

8.0.7

建议升级或者指定其实时间进行增量消费。

P1

维表字段数量较多时,作业上线超时。

8.0.6

8.0.7

建议升级版本。

P0

当在批量源表中开启enable_filter_push_down参数后,Filter不生效,导致读取的数据包含了一些应该被过滤掉的数据。

说明

全增量和Binlog源表不存在此问题。

8.0.5及之前版本

8.0.6

建议升级版本。

P0

通过FixedFE(对应Connector中的jdbc_fixed模式)写入JSON或JSONB类型数据到Hologres时,如果数据源有不合法的JSON或JSONB字段,会导致当前连接所在FE节点重启,当前FE的连接中断。

8.0.5及之前版本

暂无

如果无法保证上游JSON或JSONB字符串的合法性,建议使用JDBC模式写入。

P1

JDBC维表一对多Join时,内部出现的连接失败等异常无法抛出,表现为异步Join节点反压且数据不再流动,发生概率较小。

6.0.7及之前版本

8.0.3

建议升级版本,也可以通过重启作业暂时恢复。

P1

通过JDBC模式消费Binlog时,存在内存泄漏问题。可能的表现为作业启动时消费速率较高,之后持续下降。

6.0.7及之前版本

6.0.7

建议升级版本,对于DataStream作业,需要使用6.0.7-1版本的依赖。

P0

JDBC模式写入的定时Flush(由jdbcWriteFlushInterval参数控制)捕获的异常在下条数据写入时才会抛出,当用户写入流量较小时,异常被捕获还未抛出期间有可能进行成功的checkpoint。下次失败时会从这个不合理成功的checkpoint恢复,从而可能出现丢数据的情况。

6.0.6及之前版本

6.0.7

流量较小时容易出发此缺陷,建议升级版本,或者调整jdbcWriteFlushInterval时间大于checkpoint的间隔时间。

P2

JDBC模式消费Binlog不设置Slotname时,系统会自动创建一个Slotname。若当表名含有特殊字符或Schema名称时,自动创建非法Slotname,无法使用,且作业将会抛出syntax error异常。

6.0.6

6.0.7

建议升级版本,对于DataStream作业,需要使用6.0.7-1版本的依赖。

P1

用户作业中不同的Hologres实例或数据库使用相同的connectionPoolName时,出现找不到表等异常。

6.0.6及之前版本

6.0.7

同一个作业使用的Hologres实例或者数据库不同时,使用不同的connectionPoolName

P1

维表字符串类型有null值时,会抛出NPE异常。

6.0.6

6.0.7

建议升级版本。

P0

Hologres源表默认开启Filter下推,但如果作业也使用了Hologres维表,且写入的DML中包含对维表非主键字段的过滤条件时,维表的Filter也会被错误地下推,可能导致维表Join出现错误结果。

6.0.3~6.0.5

6.0.6

建议升级版本。

P0

多个结果表的mutatetype不同,但使用同一个connectionPoolName参数以复用连接池时,可能出现mutatetype配置被覆盖的情况,导致配置不生效。

6.0.2及之前版本

6.0.3

将全部结果表的mutatetype都设置为InsertOrUpdate,或者为mutatetype不同的表设置不同的connectionPoolName

P1

Binlog源表DDL中声明hg_binlog_timestamp_us字段时抛出NPE异常。

6.0.2

6.0.3

不使用所述字段,或者升级版本。

P1

汇报Metrics会影响结果表的写入性能,排查表现为Sink节点的thread dump会卡在reportWriteLatency

4.0.15~4.0.17

4.0.18

选择没有受影响的版本。

P2

批量源表读取字符或字符数组类型,其中比较特殊字符时,解析字符串失败。

4.0.14及之前版本

4.0.15

清理源表中的脏数据,或者升级版本。

P2

全增量一体化源表DDL中声明hg_binlog等Binlog独有字段时,消费不到全量数据。

4.0.13

4.0.14

不使用全增量功能,或者升级版本。