全部产品
Search
文档中心

实时计算Flink版:CDC问题

更新时间:Oct 14, 2024

本文介绍CDC相关的常见问题。

Flink CDC作业失败后能不能彻底退出,而不是重启?

您可以修改Flink配置,指定具体的重启策略。例如可以通过如下配置,指定最多尝试重启两次,且在下一次尝试启动前会等待10秒。如果两次启动都失败则作业失败退出,不会持续重启。

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s

MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?

可以通过非窗口聚合的方式实现类似的效果。具体方法为:

  1. 使用DATE_FORMAT函数,将时间字段转换成分钟粒度的字符串,作为窗口值。

  2. 根据窗口值进行GROUP BY聚合。

例如,统计每个店铺每分钟的订单数和销量,实现代码如下。

SELECT 
    shop_id, 
    DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm') AS window,
    COUNT(*) AS order_count, 
    SUM(price) AS amount 
FROM order_mysql_cdc 
GROUP BY shop_id, window

MySQL CDC表只能作为Source吗?

是的。MySQL CDC源表可以读取MySQL数据库表中的全量和增量数据,只能作为Source使用。而MySQL表可以用作维表或者结果表。

MySQL CDC读取全量数据后,不读增量数据是为什么?

问题详情

问题原因

解决方案

只读全量,不读增量。

MySQL CDC配置读取的是RDS MySQL 5.6备库或者只读实例时,可能出现这个问题。因为RDS MySQL 5.6该类型实例没有向日志文件里写入数据,导致下游同步工具无法读取增量的变更信息。

建议您使用可写实例或者升级RDS MySQL至更高版本。

MySQL全量数据读取完后一直卡住。

MySQL CDC全量阶段读取时间过长,导致最后一个分片数据量过大,出现OOM问题,作业Failover后卡住。

增加MySQL Source端的并发,加快全量读取的速度。

在MySQL CDC在做全量读到增量读的切换时,如果全量读是多并发,则在进入增量前要多等一个Checkpoint,来确保全量的数据已经写入到下游后再读取增量数据,从而保证数据的正确性。如果您设置的Checkpoint间隔时间比较大,例如20分钟,则会导致作业20分钟后才开始同步增量数据。

您需要根据业务情况设置合理的Checkpoint间隔时间。

MySQL CDC使用table-name正则表达式不能解析逗号,怎么办?

  • 报错原因

    例如配置'table-name' = 't_process_wi_history_\d{1,2}',报错如下。报错参数

  • 报错原因

    Debezium使用逗号作为分隔符,不支持带逗号的正则表达式,所以解析会报错。

  • 解决方案

    建议您使用'table-name' = '(t_process_wi_history_\d{1}|t_process_wi_history_\d{2})'进行配置。

作业重启时,MySQL CDC源表会从作业停止时的位置消费,还是从作业配置的位置重新消费?

作业重启时,您可以自由选择启动策略。如果选择全新启动,MySQL CDC源表会从配置的位置重新消费。如果选择从最新状态恢复,MySQL CDC源表会从作业停止时的位置开始消费。

比如,作业配置为从Binlog位点{file=mysql-bin.01, position=40}启动作业,作业运行一段时间后停止,此时消费到Binlog位点{file=mysql-bin.01, position=210}。如果选择全新启动,MySQL CDC源表会重新从Binlog位点{file=mysql-bin.01, position=40}消费。如果选择从最新状态恢复,则会从Binlog位点{file=mysql-bin.01, position=210}开始消费。

重要

作业重启时,请保证所需Binlog在服务器上没有因过期被清理,否则会报错。

MySQL CDC源表如何工作,会对数据库造成什么影响?

在启动模式为initial(默认值)时,MySQL CDC源表会先通过JDBC连接MySQL,使用SELECT语句读取全量的数据,并记录Binlog的位点。全量阶段结束后,再从记录的位点开始读取Binlog中的增量数据。

全量阶段时,由于通过SELECT语句查询数据,MySQL服务的查询压力可能会增加。增量阶段时,需要通过Binlog Client连接MySQL读取Binlog,当使用的数据表增加时,可能出现连接数过多的问题。可以通过如下MySQL查询来查看最大连接数:

show variables like '%max_connections%';

如何跳过Snapshot阶段,只从变更数据开始读取?

可以通过WITH参数scan.startup.mode来控制,您可以指定从最早可用的Binlog位点消费、从最新的Binlog位点消费、指定时间戳消费或指定具体的Binlog位点消费,详情可参见WITH参数:scan.startup.mode

如何读取一个分库分表的MySQL数据库?

如果MySQL是一个分库分表的数据库,分成了user_00、user_02和user_99等多个表,且所有表的schema一致。则可以通过table-name选项,指定一个正则表达式来匹配读取多张表,例如设置table-name为user_.*,监控所有user_前缀的表。database-name选项也支持该功能,但需要所有的表schema一致。

全表读取阶段效率慢、存在反压,应该如何解决?

可能是下游节点处理太慢导致反压了。因此您需要先排查下游节点是否存在反压。如果存在,则需要先解决下游节点的反压问题。您可以通过以下方式处理:

  • 增加并发数。

  • 开启minibatch等聚合优化参数(下游聚合节点)。

如何判断MySQL CDC作业是否已完成全量数据同步?

  • 您可以通过监控告警页面currentEmitEventTimeLag指标来判断作业是否完成了全量数据同步。

    currentEmitEventTimeLag指标记录的是Source发送一条记录到下游节点的时间点和该记录在数据库中生成时间点的差值,用于衡量数据从数据库中产生到离开Source节点的延迟。指标

    currentEmitEventTimeLag指标取值含义如下:

    • 当该指标小于等于0时,则代表作业还在全量数据同步阶段。

    • 当该指标大于0时,则代表作业完成了全量数据同步,进入了Binlog读取阶段。

  • 在MySQL CDC源表所在的TM日志中排查是否有BinlogSplitReader is created日志来判断是否读取完了全量数据,例如下图所示。

    日志

多个CDC作业导致数据库压力过大怎么办?

MySQL CDC源表需要连接数据库读取Binlog,当源表数量逐渐增加,数据库压力也会逐渐增加。为了解决数据库压力过大的问题,可以考虑通过将表同步到Kafka消息队列中,再通过消费Kafka中数据进行解耦。详情请参见MySQL整库同步Kafka

如果是通过CTAS方式同步数据导致的数据库压力过大,可以将多个CTAS作业合并为一个作业运行。在配置相同的情况下,为每一个MySQL CDC源表配置相同Server ID,可以实现数据源的复用,从而减小数据库的压力。详情请参见代码示例四:多CTAS语句

使用MySQL CDC,为什么数据量不大,但是flink读取时候消耗了大量带宽?

  • 问题详情

    MySQL的源表数据更新量不大,但是Flink在读取数据时消耗了大量的带宽。

  • 问题原因

    在MySQL中,Binlog是整个实例级别的,因此它会记录所有数据库和所有表的变更。也就是说,如果您的MySQL数据库下有三张表,即使Flink作业只涉及其中一张表的变更,Binlog也会包含这三张表的所有变更记录。

    使用Flink CDC时,虽然底层Binlog包含整个MySQL实例的所有变更数据,Flink CDC可以通过配置只过滤并读取特定表的变更记录。这个过滤过程是在Debezium或Flink CDC连接器层面完成的,而不是在MySQL层面完成的。

  • 解决方案

    虽然无法解决Binlog的数据变更存储机制,但是可以通过Source复用避免更多的带宽使用,详情请参考开启CDC Source复用

使用MySQL CDC,增量阶段读取出来的timestamp字段时区相差8小时,怎么回事呢?

  • 在解析Binlog数据中的timestamp字段时,CDC作业里配置server-time-zone参数,如果这个参数没有和您的MySQL服务器时区一致,就会出现这个问题。

  • 在DataStream中使用了自定义序列化器,例如MyDeserializer implements DebeziumDeserializationSchema。当自定义序列化器解析timestamp类型的数据时,出现该问题。可以参考RowDataDebeziumDeserializeSchema中对timestamp类型的解析,在serverTimeZone处给定时区信息。

      private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
              if (dbzObj instanceof Long) {
                  switch (schema.name()) {
                      case Timestamp.SCHEMA_NAME:
                         return TimestampData.fromEpochMillis((Long) dbzObj);
                      case MicroTimestamp.SCHEMA_NAME:
                         long micro = (long) dbzObj;
                         return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                      case NanoTimestamp.SCHEMA_NAME:
                         long nano = (long) dbzObj;
                         return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
                  }
             }
             LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
             return TimestampData.fromLocalDateTime(localDateTime);
        }

MySQL CDC支持监听从库吗?从库需要如何配置?

是的,为了支持MySQL CDC监听从库,需要对从库进行如下配置。配置完成后,从库在接收主库同步过来的数据时,会将这些数据写入从库自身的Binlog文件中。

log-slave-updates = 1

如果主库启用了GTID模式,那么从库也必须启用GTID模式。对主从库进行如下配置。

gtid_mode = on
enforce_gtid_consistency = on

怎么获取数据库中的DDL事件?

使用社区版本的CDC连接器时,您可以通过DataStream API使用MySqlSource,配置includeSchemaChanges(true)参数来获取DDL事件。获取到DDL事件后,再编写相应的代码进行后续处理。代码示例如下。

MySqlSource<xxx> mySqlSource =
 MySqlSource.<xxx>builder()
 .hostname(...)
 .port(...)
 .databaseList("<databaseName>")
 .tableList("<databaseName>.<tableName>")
 .username(...)
 .password(...)
 .serverId(...)
 .deserializer(...)
 .includeSchemaChanges(true) // 配置获取DDL事件的参数
 .build();
 ... // 其他处理逻辑 

Flink CDC支持MySQL整库同步吗?怎么做?

支持的,实时计算Flink版提供了CTAS或CDAS语法来支持整库同步,详情请参见CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句

说明

因为RDS MySQL 5.6该类型实例没有向日志文件里写入数据,导致下游同步工具无法读取增量的变更信息。

同一个实例下,某个库的表无法同步增量数据,其他库都可以,为什么?

因为MySQL服务器可以配置Binlog过滤器,忽略了某些库的Binlog。您可以通过show master status命令查看Binlog_Ignore_DB和Binlog_Do_DB。查看结果示例如下。

mysql> show master status;
+------------------+----------+--------------+------------------+----------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |  Executed_Gtid_Set   |
+------------------+----------+--------------+------------------+----------------------+
| mysql-bin.000006 |     4594 |              |                  | xxx:1-15             |
+------------------+----------+--------------+------------------+----------------------+

使用DataStream API构建MySQL CDC Source时如何配置tableList选项?

tableList要求表名包含数据库名,而不是DataStream API中的表名。对于MySQL CDC Source,可以配置tableList("yourDatabaseName.yourTableName")参数。​

MongoDB CDC全量读取阶段,作业失败后,可以从checkpoint继续读取吗?

在WITH参数中配置'scan.incremental.snapshot.enabled'= 'true'参数,可以从checkpoint恢复读取数据。

MongoDB CDC支持全量+增量读和只读增量吗?

支持,默认为全量+增量读取;在WITH参数中配置'scan.startup.mode' = 'latest-offset'参数将读取数据模式变为只读增量。

MongoDB CDC支持订阅多个集合吗?

仅支持订阅整个库的集合,不支持筛选部分集合功能。例如,在WITH参数中设置'database' = 'mgdb',并且'collection' = '',则表示会订阅mgdb数据库下的所有集合。

MongoDB CDC支持设置多并发度读取吗?

如果启用了scan.incremental.snapshot.enabled配置,则在初始快照阶段将支持并发读取。

MongoDB CDC支持的MongoDB版本是哪些?

MongoDB CDC基于Change Stream特性实现,Change Stream是MongoDB 3.6新增的特性。MongoDB CDC理论上支持3.6及更高的版本,建议使用版本>=4.0。如果在低于3.6的版本上运行,可能会出现错误Unrecognized pipeline stage name: '$changeStream'

MongoDB CDC支持的MongoDB运行模式是什么样的?

Change Stream需要MongoDB在副本集或分片集群架构下运行,在本地测试时,可以使用单节点副本集架构,通过执行rs.initiate()命令来初始化。但需要注意,在单节点架构下执行CDC可能会出现错误The $changestage is only supported on replica sets

MongoDB CDC是否支持Debezium相关的参数?

不支持的,因为MongoDB CDC连接器是在Flink CDC项目中独立开发,并不依赖Debezium项目。

其他组件可以成功使用相同的用户名和密码进行连接,为何MongoDB CDC会报错并提示用户名密码错误?

因为该用户凭证是在所需连接的数据库下创建的。如果需要正常访问,需要在WITH参数里添加 'connection.options' = 'authSource=用户所在的DB'

MongoDB CDC支持从Checkpoint恢复吗?原理是怎么样的呢?

支持,checkpoint会记录Change Stream的Resume Token,可以通过Resume Token重新恢复Change Stream。其中Resume Token对应oplog.rs(MongoDB变更日志集合)的位置,oplog.rs是一个固定容量的集合。

当Resume Token对应的记录在oplog.rs中不存在时,可能会出现无效Resume Token的异常,您可以设置合适oplog.rs的集合大小,避免oplog.rs保留时间过短,详情请参见更改Oplog的大小

另外,新达到的变更记录和心跳记录可以刷新Resume Token。

MongoDB CDC支持输出-U(update_before,更新前镜像值)消息吗?

  • 在MongoDB  6.0及以上版本中,若数据库开启了前像或后像功能,您可以在SQL作业中配置参数'scan.full-changelog' = 'true',使得数据源能够输出-U消息,从而省去ChangelogNormalize。

  • 在MongoDB 6.0以下版本中,由于MongoDB原始的oplog.rs包含INSERT、UPDATE、REPLACE和DELETE这四种操作类型,没有保留更新前的信息,因此无法直接输出-U类型消息。在Flink中,只能实现基于update的语义。使用MongoDBTableSource时,Flink planner会自动进行ChangelogNormalize优化,补齐缺失的-U消息,输出完整的+I、 -U、+U、-D四种消息,ChangelogNormalize优化的代价是该节点会保存之前所有key的状态。如果是DataStream作业直接使用MongoDBSource,没有Flink planner的优化,将不会自动进行ChangelogNormalize优化,所以不能直接获取-U消息。若想要获取更新前镜像值,您需要自己管理状态。如果不希望自己管理状态,您可以将MongoDBTableSource中原始的oplog.rs转换为ChangelogStream或者RetractStream,并借助Flink planner的优化能力补齐更新前的镜像值,示例如下。

     tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )");
    
     Table table = tEnv.from("orders")
     .select($("*"));
    
     tEnv.toChangelogStream(table)
     .print()
     .setParallelism(1);
    
     env.execute();

如何配置参数以过滤作业中的非法日期脏数据?

可以在Postgres CDC的WITH参数中配置如下参数来过滤脏数据。

  • 配置'debezium.event.deserialization.failure.handling.mode'='warn'参数,跳过脏数据,将脏数据打印到WARN日志里。

  • 配置'debezium.event.deserialization.failure.handling.mode'='ignore'参数,跳过脏数据,不打印脏数据到日志。

Postgres CDC提示未传输TOAST数据,是什么原因?

请确保副本身份是完整的。TOAST数据相对较大,为了节省WAL的大小,如果TOAST数据没有发生变化,您配置了'debezium.schema.refresh.mode'='columns_diff_exclude_unchanged_toast'参数,wal2json插件不会将TOAST数据带到更新后的数据中。

发现PG服务器磁盘使用率高,WAL日志不释放是什么原因?

Postgres CDC只会在checkpoint完成的时候更新Postgres slot中的LSN。如果发现磁盘使用率高的情况下,请先确认Postgres的checkpoint是否开启并查看数据库是否有其他未使用或同步延迟的slot。

Postgres CDC同步Postgres中DECIMAL类型数据精度超过最大精度时,会返回什么结果?

Postgres CDC中如果收到DECIMAL类型数据的精度大于在Postgres CDC中声明的类型的精度时,会将数据处理成NULL。此时,您可以配置'debezium.decimal.handling.mode' = 'string'参数,将读取的数据用字符串类型来进行处理。

在DataStream API中构建Postgres CDC Source时如何配置tableList选项?

tableList要求表名使用模式名,而不是DataStream API中的表名。对于Postgres CDC Source,tableList选项值为my_schema.my_table。

为什么无法下载flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar,Maven仓库为什么没有xxx-SNAPSHOT依赖?

和主流的Maven项目版本管理相同,xxx-SNAPSHOT版本对应开发分支的代码。如果需要使用这个版本,您需要下载源码并编译对应的JAR。您可以使用稳定版本,例如flink-sql-connector-mysql-cdc-2.1.0.jar,可以直接从Maven中心仓库中获取并使用。

使用flink-sql-connector-xxx.jar和使用flink-connector-xxx.jar有什么区别?

Flink CDC各个连接器的包命名规则和Flink其他连接器的包命名规则是保持一致的。

  • flink-sql-connector-xx是FAT JAR,除了连接器的代码外,还将连接器依赖的所有第三方包shade后打入FAT JAR,提供给SQL作业使用,您只需要在lib目录下添加该FAT JAR即可。

  • flink-connector-xx只包含该连接器的代码,不包含其所需的依赖,供DataStream作业使用,您需要自己管理所需的第三方包依赖,有冲突的依赖需要进行exclude和shade处理。

为什么Maven仓库里找不到2.x版本?

Flink CDC项目从2.0.0版本将group id从com.alibaba.ververica改成com.ververica,所以在Maven仓库找2.x版本的包时,路径是/com/ververica

DataStream API使用JsonDebeziumDeserializationSchema反序列化器时,数值类型显示是一堆字符串,怎么办?

因为Debezium在解析数值类型时有不同的转换方式,详情请参见Debezium connector for MySQL。在Flink CDC配置的转换代码如下。

Properties properties = new Properties();
....
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");

MySqlSource.<String>builder()
 .hostname(config.getHostname())
 ....
 .debeziumProperties(properties);

报错:Replication slot "xxxx" is active

  • 报错详情

    当您的Postgres CDC作业结束后,可能会遇到slot未被正确释放的情况。

  • 解决方案

    您可以采用以下两种方法释放slot:

    • 在PostgreSQL中执行以下命令手动释放slot。

      select pg_drop_replication_slot('rep_slot');

      如果遇到错误“ERROR: replication slot "rep_slot" is active for PID 162564”,则表示slot正在被进程(PID)占用。您需要先终止该进程,再释放slot。释放命令如下所示。

      select pg_terminate_backend(162564);
      select pg_drop_replication_slot('rep_slot');
    • 自动清理slot。在作业的Postgres Source配置中添加'debezium.slot.drop.on.stop' = 'true'参数,确保当CDC作业停止时,slot会被自动清理。

      警告

      如果开启自动清理slot,  会导致Wal Log被回收,当作业再次启动时,会导致数据丢失,无法保证At-Least Once语义。

报错:Lock wait timeout exceeded; try restarting transaction

  • 报错详情

    org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; try restarting transaction Error code: 1205; SQLSTATE: 40001.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241)
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218)
        at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:857)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
    Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1200)
        at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:554)
        at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:497)
        at io.debezium.connector.mysql.SnapshotReader.readTableSchema(SnapshotReader.java:888)
        at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:550)
        ... 3 more
  • 报错原因

    当MySQL CDC源表不开启增量快照读取时,申请锁时可能发生的超时错误。

  • 解决方案

    升级到VVR 4.0.8及以上版本即可,新版本默认使用无锁算法,不需要申请锁。

报错:Cause by:java.lang.ArrayIndexOutOfBoundsException

  • 报错详情报错情况

  • 报错原因

    VVR 4.0.12以下版本依赖的Binlog读取工具存在问题,导致报错。

  • 解决方案

    VVR 4.0.12及以上版本已修复此问题,建议升级版本解决。

报错:Caused by: io.debezium.DebeziumException: Received DML 'xxx' for processing, binlog probably contains events generated with statement or mixed based replication format

  • 报错详情

    Caused by: io.debezium.DebeziumException: Received DML 'insert into gd_chat_fetch_log (
    
    id,
    c_cursor,
    d_timestamp,
    msg_cnt,
    state,
    ext1,
    ext2,
    cost_time
    
    ) values (
    null,
    null,
    '2022-03-23 16:51:00.616',
    0,
    1,
    null,
    null,
    0
    )' for processing, binlog probably contains events generated with statement or mixed based replication format
  • 报错原因

    Binlog格式是Mixed导致报错。MySQL CDC源表要求Binlog的格式为ROW。

  • 解决方案

    1. 使用show global variables like "binlog_format"命令,查看Binlog的格式。

      说明

      show variables like "binlog_format"命令只能查看当前的Binlog格式,具有局限性。

    2. 如果Binlog格式不是ROW,您需要在MySQL Server端将Binlog格式修改为ROW。详情请参见Setting The Binary Log Format

    3. 重启作业。

报错:Encountered change event for table xxx.xxx whose schema isn't known to this connector

  • 报错详情报错详情

    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Encountered change event 'Event{header=EventHeaderV4{timestamp=xxx, eventType=TABLE_MAP, serverId=xxx, headerLength=xxx, dataLength=xxx, nextPosition=xxx, flags=xxx}, data=TableMapEventData{tableId=xxx, database='xxx', table='xxx', columnTypes=xxx, xxx..., columnMetadata=xxx,xxx..., columnNullability={xxx,xxx...}, eventMetadata=null}}' at offset {ts_sec=xxx, file=mysql-bin.xxx, pos=xxx, gtids=xxx, server_id=xxx, event=xxx} for table xxx.xxx whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
    Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=30946 --stop-position=31028 --verbose mysql-bin.004419
    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.xxx/xxx
    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Failed due to error: Error processing binlog event
    org.apache.kafka.connect.errors.ConnectException: Encountered change event for table statistic.apk_info whose schema isn't known to this connector
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:607) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1104) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:955) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
    Caused by: org.apache.kafka.connect.errors.ConnectException: Encountered change event for table xxx.xxx whose schema isn't known to this connector
        at io.debezium.connector.mysql.BinlogReader.informAboutUnknownTableIfRequired(BinlogReader.java:875) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleUpdateTableMetadata(BinlogReader.java:849) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:590) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        ... 5 more
  • 报错原因

    • 当作业中使用的某些数据库,您没有其对应的某些权限时,可能出现该错误。

    • 当使用了'debezium.snapshot.mode'='never'时,可能出现该错误。因为这会导致从Binlog开头读取数据,但是Binlog开头的变更事件对应的Table Schema和当前表的Schema可能不匹配,所以会报该错误。

    • 遇到Debezium解析不了的变更,例如Debezium无法解析`DEFAULT (now())`,可能出现该错误。

  • 解决方案

    • 先确认您使用的数据库用户是否有对应作业中全部数据库的相应权限,详细授权操作请参见配置MySQL

    • 不建议使用'debezium.snapshot.mode'='never',可以通过'debezium.inconsistent.schema.handling.mode' = 'warn'参数避免报错。

    • 继续观察日志,查询io.debezium.connector.mysql.MySqlSchema WARN的log,会打印出具体无法解析的变更详情,例如Debezium解析不了`DEFAULT (now())`。

报错:org.apache.kafka.connect.errors.DataException: xxx is not a valid field name

  • 报错详情

    org.apache.kafka.connect.errors.DataException: xxx is not a valid field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambdaşcreateRowConverter$508c5858$1(RowDataDebeziumDeserializeSchema.java:369)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambdaşwrapIntoNullableConverter$7b91dc26$1(RowDataDebeziumDeserializeSchema.java:394)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:127) 
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:102)
        at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:124)
        at io.debezium.embedded.ConvertingEngineBuilder.lambdaşnotifying$2(ConvertingEngineBuilder.java:82) 
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) 
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutorsWorker.run ThreadPoolExecutor.java:622) 
        at java.lang.Thread.run(Thread.java: 834)
  • 报错原因

    分库分表中的某张物理表缺少了您在MySQL CDC表中定义的字段,导致Schema不一致。

    例如您使用正则表达式mydb.users_\d{3}去监控mydb数据库下users_001,users_002,……,users_999这些表,您在MySQL CDC表的DDL中声明了user_name字段,但如果users_002表中无user_name字段,在解析到users_002表的Binlog时就会出现该异常。

  • 解决方案

    针对分库分表场景,分库分表里的每个表中必须包含MySQL CDC DDL中声明的字段。

    此外,您也可以将作业升级到VVR 6.0.2及以上版本,VVR 6.0.2及以上版本的MySQL CDC会自动使用分库分表中最宽的Schema解决该问题。

报错:Caused by: java.sql.SQLSyntaxErrorException: Unknown storage engine 'BLACKHOLE'

  • 报错详情错误详情

  • 报错原因

    在解析MySQL 5.6的DDL时,存在不支持的语法导致报错。

  • 解决方案

    • 您可以在MySQL CDC表的WITH参数中加上'debezium.database.history.store.only.monitored.tables.ddl'='true''debezium.database.exclude.list'='mysql'两个参数来避免报错。

    • 您也可以将作业升级到VVR 6.0.2及以上版本,VVR 6.0.2及以上版本的MySQL CDC对DDL解析提供了更好的支持。

报错:Caused by: org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

  • 报错详情

    org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)
        at io.debezium.connector.common. BaseSourceTask.start (BaseSourceTask.java:106) 
        at io.debezium.embedded.EmbeddedEngine.run (EmbeddedEngine.java:758) 
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor. runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • 报错原因和解决方案

    报错原因

    解决方案

    作业正在读取的Binlog文件在MySQL服务器已经被清理掉时,会出现该报错。这种情况一般由于MySQL服务器上Binlog的保留时间太短。

    可以将Binlog的保留时间调大,比如7天。具体命令如下。

    show variables like 'expire_logs_days';
    set global expire_logs_days=7;

    MySQL CDC作业消费Binlog太慢,例如下游的聚合算子或者Sink算子长时间出现反压,反压传递到source,导致source无法消费数据。

    需要对作业资源调优,让source恢复正常消费即可。

    阿里云RDS MySQL的日志保留策略一般有两个条件:最长18个小时,最大占用30%存储空间。两个条件满足任何一个都会触发清理删除,如果写入较多导致超过30%的存储空间,可能导致Binlog清理而不可用。

    需要调整RDS MySQL的Binlog过期策略,使得Binlog能正常被读取。

    通过只读实例消费CDC数据时,RDS的只读实例不保证Binlog可用(本地最短可能只保留10秒,然后上传OSS)。如果配置读取只读实例,一旦作业Failover 10s内无法恢复,就会发生报错。

    不建议MySQL CDC源表读取RDS的只读实例数据。

    说明

    是否是只读实例可以通过hostname区分,hostname为rr开头的是只读实例,rm开头的是正常的实例。

    RDS MySQL发生了内部迁移操作,导致报错。

    需要全新启动作业,以重新读取数据。

报错:EventDataDeserializationException: Failed to deserialize data of EventHeaderV4.... Caused by: java.net.SocketException: Connection reset

  • 报错详情

    EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:304)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:227)
        at io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:252)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:934)
    ... 3 more
    Caused by: java.io.EOFException
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read (ByteArrayInputStream.java:192)
        at java.io.InputSt ream.read (InputStream.java:170)
        at java.io.InputSt ream.skip (InputStream.java:224)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:301)
    ...    6 more
  • 报错原因

    • 网络问题导致。

    • 作业反压导致。

      当Flink作业存在反压时,CDC Source中使用的Binlog Client会因为反压的存在而无法继续读取数据。为了尽量减少数据库上残留的连接数,MySQL当某个Binlog Client连接不活跃时间超过数据库上配置的超时时间之后,会自动切断连接,导致作业异常。

    • 数据库net_write_timeout参数配置过小导致。net_write_timeout参数的默认值是60s,如果配置过小会导致服务端主动断开链接。

  • 解决方案

    • 如果是网络问题导致的,可以尝试在CDC Source上增加配置'debezium.connect.keep.alive.interval.ms' = '40000'来解决。如果允许修改数据库的配置,也可以增加net_write_timeout参数配置值。具体说明请参见参数调优建议

    • 如果是作业反压问题导致的,需要调节作业解决反压问题。

    • 实时计算引擎8.0.7及以上版本增加了该异常的重试,可以尝试使用实时计算引擎8.0.7及以上版本运行作业。

报错:The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.

  • 报错详情

    org.apache.kafka.connect.errors.ConnectException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDS that the slave requires. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) 
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
        at io.debezium.connector.mysql.BinlogReadersReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1142) 
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:962)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839)
        at java.lang.Thread.run(Thread. java:834)
    Caused by: com.github.shyiko.mysql. binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
        at com.github.shyiko.mysql.binlog. BinaryLogClient.listenForEventPackets(BinaryLogClient.java:926) 
    ... 3 more
  • 报错原因

    全量阶段读取时间过长,导致全量阶段结束开始读Binlog时,之前记录的Gtid位点已经被MySQL清理掉了。

  • 解决方案

    建议延长Binlog的清理时间或调大Binlog文件大小。调节清理时间的命令如下。

    mysql> show variables like 'expire_logs_days';
    mysql> set global expire_logs_days=7;

报错:java.lang.IllegalStateException: The "before" field of UPDATE/DELETE message is null,please check the Postgres table has been set REPLICA IDENTITY to FULL level.

  • 报错详情

    java.lang.IllegalStateException: The "before" feild of UPDATE/DELETE message is null,please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE xxx.xxx REPLICA IDENTITY FULL'. Please see more in Debezium documentation:https:debezium.io/documentation/reference/1.2/connectors/postresql.html#postgresql-replica-identity
        at com.alibaba.ververica.cdc.connectors.postgres.table.PostgresValueValidator.validate(PostgresValueValidator.java:46)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:113)
        at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:158)
        at io.debezium.embedded.ConvertingEngineBuilder.lambdaşnotifying$2(ConvertingEngineBuilder.java:82)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutorSWorker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • 报错原因

    Postgres表的REPLICA IDENTITY不为FULL时,会出现该报错。

  • 解决方案

    根据提示执行ALTER TABLE yourTableName REPLICA IDENTITY FULL;,如果执行后作业重启依旧报错,可以尝试加上'debezium.slot.drop.on.stop' = 'true'参数解决。

报错:Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: xxx and table-name: xxxx

  • 报错原因

    • 配置的表名无法在数据库中找到时,出现该报错。

    • Flink作业里包含不同数据库的表,当使用的账号没有其中某些数据库权限时,出现该报错。

  • 解决方案

    1. 检查对应表名是否在数据库中存在。

    2. 为账号添加作业中所有数据库的对应权限。

报错:com.github.shyiko.mysql.binlog.network.ServerException

  • 报错详情

    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
  • 报错原因

    MySQL CDC在启动全量读取之前记录Binlog位点,等全量读取结束后再从Binlog位点读取增量数据。该报错一般是因为全量读取耗时太长,超过了MySQL Binlog的淘汰周期,导致MySQL Binlog位点的数据已经被MySQL清理掉了。

  • 解决方案

    查看MySQL Binlog的清理规则,例如时间、存储空间和文件个数等,建议保留Binlog一天以上,RDS Binlog详情请参见删除本地日志(Binlog)

    说明

    :VVR 4.0.8及以上版本,MySQL CDC支持并发读取全量数据,可以提高全量数据的读取速度,针对该问题会起到缓解作用。

报错:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'

  • 报错详情

    MySQL CDC源表在VVR 4.0.x版本语法检查时报错详情如下。

    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • 报错原因

    在MySQL CDC DDL WITH参数中,您未设置主键(Primary Key)信息。因为VVR 6.0.x版本和VVR 4.0.8及以上版本,新增支持按PK分片,进行多并发读取数据的功能。

    重要

    VVR 4.0.8以下版本,MySQL CDC源表仅支持单并发读取数据。

  • 解决方案

    • VVR 6.0.x版本或VVR 4.0.8及以上版本,如果您需要多并发读取MySQL数据,可以在DDL中添加PK信息。

    • VVR 4.0.8以下版本,MySQL CDC源表不支持多并发读取MySQL数据,需要在DDL中添加scan.incremental.snapshot.enabled参数,且把该参数值设置为false,无需设置PK信息。

报错:java.io.EOFException: SSL peer shut down incorrectly

  • 报错详情

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • 报错原因

    在MySQL 8.0.27版本,MySQL数据库默认连接需要使用SSL协议,但JDBC默认的访问方式不通过SSL协议连接数据库,导致报错。

  • 解决方案

    • 如果可以升级到VVR 6.0.2及以上版本,在MySQL CDC表的with参数中添加参数'jdbc.properties.useSSL'='false'可以解决该问题。

    • 如果声明的表只是做维表,可以在MySQL表的WITH参数中将connector设置为rds,同时在URL参数中追加characterEncoding=utf-8&useSSL=false,例如:

      'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

报错:com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master

  • 报错详情

    Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
  • 报错原因

    MySQL CDC源表读取数据时,需要保证每个并行度有单独的server-id,且每个server-id全局唯一。当使用的server-id与同一作业CDC源表、其他作业CDC源表或其他同步工具使用server-id冲突时,导致报错。

  • 解决方案

    需要为MySQL CDC源表的每个并行度配置全局唯一的server-id,详细操作请参见MySQL CDC源表注意事项

报错:TableMapEventDataDeserializer.readMetadata的NullPointerException

  • 报错详情

    Causedby:java.lang.NullPointerException
        atcom.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.readMetadata(TableMapEventDataDeserializer.java:81)
    atcom.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:42)
    atcom.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:27)
    atcom.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
    atcom.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:281)
    atcom.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:228)
    atio.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
    atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:952)
    ...3more
  • 报错原因

    8.0.18及以上版本MySQL新添加了一些数据类型,但是Flink解析Binlog部分没有兼容新的数据类型。

  • 解决方案

    VVR 6.0.6及以上版本针对MySQL新增的数据类型进行了兼容,您可以升级VVR版本解决。

MySQL全量过程中增加列后报错NullPointerException

  • 报错详情

    Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=iplus.retail_detail, splitId='iplus.retail_detail:68', splitKeyType=[`id` BIGINT NOT NULL], splitStart=[212974500236****], splitEnd=[213118153601****], highWatermark=null} error due to java.lang.NullPointerException.
      at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:361)
      at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:293)
      at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:124)
      at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:86)
      at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
      at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
      ... 6 more
  • 报错原因

    全量阶段的表结构是在作业启动的时候确定,且Schema会被记录在系统检查点中。如果在读全量的过程中增加了列,Schema会无法匹配,就会抛出此异常。

  • 解决方案

    停止作业并删除同步的下游表后,无状态启动该作业。

报错 :The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed

  • 报错详情

    出现该报错表明CDC作业试图读取的Binlog文件时,在MySQL服务器上文件已经被清理掉。

  • 报错原因

    • MySQL服务器上设置的Binlog文件过期时间太短导致文件被自动清理。

    • CDC作业处理Binlog的速度过慢。

  • 解决方案

    • 增加Binlog的保留时间可以避免文件过期时间太短这个问题,例如,将其设置为保留7天。

      mysql> show variables like 'expire_logs_days';
      mysql> set global expire_logs_days=7;
    • 针对作业处理Binlog的速度过慢这个问题,可以分配更多资源给Flink作业帮助加快处理速度。

报错:Mysql8.0 Public Key Retrieval is not allowed

  • 报错原因

    因为用户配置的MySQL用户使用的是SHA256密码认证方式,并且需要TLS等协议传输密码。

  • 解决方案

    允许MySQL用户支持原始密码方式访问。更改验证方式的命令如下。

    mysql> ALTER USER 'username'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
    mysql> FLUSH PRIVILEGES;