全部产品
Search
文档中心

数据传输服务 DTS:使用Kafka客户端消费订阅数据

更新时间:Dec 05, 2024

新版数据订阅支持使用0.11版本至2.7版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。

注意事项

  • 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。

    说明

    如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。

  • 数据以Avro序列化存储,详细格式请参见Record.avsc文档。

    警告

    如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。

  • 关于offsetForTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。

  • 由于数据订阅服务端会因容灾等原因导致网络闪断,若您未使用本文提供的Kafka客户端,您使用的Kafka客户端需具备网络重试能力。

  • 若您使用原生的Kafka客户端消费订阅数据,则可能会在DTS发生增量数据采集模块切换行为,从而使subscribe模式下订阅客户端保存在服务端的消费位点被清除,您需要手动调整订阅的消费位点以实现按需消费数据。若您需要使用subscribe模式建议使用DTS提供的订阅SDK消费订阅数据,或者自行管理位点,详情请参见使用SDK示例代码消费订阅数据管理位点

Kafka客户端运行流程说明

请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。

说明
  • 单击code,然后选择Download ZIP下载文件。

  • 如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。

kafka2.0

表 1. 运行流程说明

步骤

相关目录或文件

1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像后镜像 和其他属性。

警告
  • 如源实例为自建Oracle数据库,则为确保客户端成功消费订阅数据,并保证前后镜像完整性,您需要开启全列补偿日志。

  • 如源实例不为自建Oracle数据库,则DTS暂时不能保证前镜像的完整性,建议您对所获得的前镜像进行校验。

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。

  1. 创建新版数据订阅通道,详情请参见订阅方案概览中的相关配置文档。

  2. 创建一个或多个消费组,详情请参见新增消费组

  3. 下载Kafka客户端Demo代码,然后解压该文件。

    说明

    单击code,然后选择Download ZIP下载文件。

  4. 打开IntelliJ IDEA软件,然后单击Open

    打开项目

  5. 在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml

    打开项目文件

  6. 在弹出对话框中,选择Open as Project

  7. 在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemoDB.java

  8. 设置NotifyDemoDB.java文件中的各参数对应的值。

    设置参数值

    参数

    说明

    获取方式

    USER_NAME

    消费组的账号。

    警告

    如您未使用本文提供的客户端,请按照<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。

    在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。

    说明

    消费组账号的密码已在您新建消费组时指定。

    PASSWORD_NAME

    该账号的密码。

    SID_NAME

    消费组ID。

    GROUP_NAME

    消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。

    KAFKA_TOPIC

    数据订阅通道的订阅Topic。

    在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到订阅Topic网络信息。

    KAFKA_BROKER_URL_NAME

    数据订阅通道的网络地址信息。

    说明
    • 如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。

    • 不建议使用公网地址。

    INITIAL_CHECKPOINT_NAME

    消费的数据时间点,格式为Unix时间戳,例如1592269238。

    说明
    • 您需要自行保存时间点信息,以便:

      • 当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。

      • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。

    • SUBSCRIBE_MODE_NAMEsubscribe时,传入的INITIAL_CHECKPOINT_NAME仅在订阅客户端首次启动时生效。

    消费的数据时间点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。

    说明
    • 您可以在订阅任务列表的数据范围列,查看订阅实例的数据范围。

    • Unix时间戳转换工具可用搜索引擎获取。

    USE_CONFIG_CHECKPOINT_NAME

    默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。

    SUBSCRIBE_MODE_NAME

    一个消费组下支持同时启动两个及以上Kafka客户端,如需实现该功能,请将所有客户端该参数的值设置为subscribe

    默认值为assign,即不使用该功能,建议只部署一个客户端。

  9. 在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。

    说明

    首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。

执行结果

运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

Kafka客户端订阅结果

您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。

kafka

常见问题

  • Q:为什么需要自行记录客户端的消费位点?

    A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。

管理位点

  1. 配置订阅客户端监听DTS的数据采集模块的切换行为。

    通过配置订阅客户端的Consumer的properties,监听DTS的数据采集模块的切换行为,实现方法的主要内容如下所示:

    properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

    ClusterSwitchListener的实现方法如下所示:

    public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
        private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
        private ClusterResource originClusterResource = null;
        private ClusterResource currentClusterResource = null;
    
        public ConsumerRecords onConsume(ConsumerRecords records) {
            return records;
        }
    
    
        public void close() {
        }
    
        public void onCommit(Map offsets) {
        }
    
    
        public void onUpdate(ClusterResource clusterResource) {
            synchronized (this) {
                originClusterResource = currentClusterResource;
                currentClusterResource = clusterResource;
                if (null == originClusterResource) {
                    LOG.info("Cluster updated to " + currentClusterResource.clusterId());
                } else {
                    if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                        LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
                    } else {
                        LOG.error("Cluster changed");
                        throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
                                + ", consumer require restart");
                    }
                }
            }
        }
    
        public boolean isClusterResourceChanged() {
            if (null == originClusterResource) {
                return false;
            }
            if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                return false;
            }
            return true;
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public static class ClusterSwitchException extends KafkaException {
            public ClusterSwitchException(String message, Throwable cause) {
                super(message, cause);
            }
    
            public ClusterSwitchException(String message) {
                super(message);
            }
    
            public ClusterSwitchException(Throwable cause) {
                super(cause);
            }
    
            public ClusterSwitchException() {
                super();
            }
    
        }
  2. 处理捕获到DTS数据采集模块的切换行为。

    将实际消费的最后一条订阅数据的时间位点(timestamp),设置为下一次订阅的初始位点,实现方法的主要内容如下所示:

    try{
       //do some action
    } catch (ClusterSwitchListener.ClusterSwitchException e) {
       reset();
    }
    
    //重置位点
    public reset() {
      long offset = kafkaConsumer.offsetsForTimes(timestamp);
      kafkaConsumer.seek(tp,offset);
    }
    说明

    实现方法示例,请参见KafkaRecordFetcher

MySQL字段类型与dataTypeNumber数值的对应关系

MySQL字段类型

对应dataTypeNumber数值

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Oracle字段类型与dataTypeNumber数值的对应关系

Oracle字段类型

对应dataTypeNumber数值

VARCHAR2/NVARCHAR2

1

NUMBER/FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR、NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

PostgreSQL字段类型与dataTypeNumber数值的对应关系

PostgreSQL字段类型

对应dataTypeNumber数值

INT2/SMALLINT

21

INT4/INTEGER/SERIAL

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME/TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615