新版数据订阅支持使用0.11版本至2.7版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。
注意事项
使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
说明如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
数据以Avro序列化存储,详细格式请参见Record.avsc文档。
警告如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。
关于
offsetForTimes
接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。由于数据订阅服务端会因容灾等原因导致网络闪断,若您未使用本文提供的Kafka客户端,您使用的Kafka客户端需具备网络重试能力。
若您使用原生的Kafka客户端消费订阅数据,则可能会在DTS发生增量数据采集模块切换行为,从而使subscribe模式下订阅客户端保存在服务端的消费位点被清除,您需要手动调整订阅的消费位点以实现按需消费数据。若您需要使用subscribe模式建议使用DTS提供的订阅SDK消费订阅数据,或者自行管理位点,详情请参见使用SDK示例代码消费订阅数据和管理位点。
Kafka客户端运行流程说明
请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。
单击,然后选择Download ZIP下载文件。
如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。
表 1. 运行流程说明
步骤 | 相关目录或文件 |
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像 、 后镜像 和其他属性。 警告
| subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。 | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。
创建新版数据订阅通道,详情请参见订阅方案概览中的相关配置文档。
创建一个或多个消费组,详情请参见新增消费组。
下载Kafka客户端Demo代码,然后解压该文件。
说明单击,然后选择Download ZIP下载文件。
打开IntelliJ IDEA软件,然后单击Open。
在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml。
在弹出对话框中,选择Open as Project。
在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemoDB.java。
设置NotifyDemoDB.java文件中的各参数对应的值。
参数
说明
获取方式
USER_NAME
消费组的账号。
警告如您未使用本文提供的客户端,请按照
<消费组的账号>-<消费组ID>
的格式设置用户名(例如:dtstest-dtsae******bpv
),否则无法正常连接。在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。
说明消费组账号的密码已在您新建消费组时指定。
PASSWORD_NAME
该账号的密码。
SID_NAME
消费组ID。
GROUP_NAME
消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。
KAFKA_TOPIC
数据订阅通道的订阅Topic。
在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到订阅Topic和网络信息。
KAFKA_BROKER_URL_NAME
数据订阅通道的网络地址信息。
说明如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
不建议使用公网地址。
INITIAL_CHECKPOINT_NAME
消费的数据时间点,格式为Unix时间戳,例如1592269238。
说明您需要自行保存时间点信息,以便:
当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。
在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
若SUBSCRIBE_MODE_NAME为subscribe时,传入的INITIAL_CHECKPOINT_NAME仅在订阅客户端首次启动时生效。
消费的数据时间点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。
说明您可以在订阅任务列表的数据范围列,查看订阅实例的数据范围。
Unix时间戳转换工具可用搜索引擎获取。
USE_CONFIG_CHECKPOINT_NAME
默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。
无
SUBSCRIBE_MODE_NAME
一个消费组下支持同时启动两个及以上Kafka客户端,如需实现该功能,请将所有客户端该参数的值设置为subscribe。
默认值为assign,即不使用该功能,建议只部署一个客户端。
无
在IntelliJ IDEA软件界面的顶部,选择 运行该客户端。
说明首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。
执行结果
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。
您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);
中的//
),然后再次运行该客户端即可查看详细的数据变更信息。
常见问题
Q:为什么需要自行记录客户端的消费位点?
A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。
管理位点
配置订阅客户端监听DTS的数据采集模块的切换行为。
通过配置订阅客户端的Consumer的properties,监听DTS的数据采集模块的切换行为,实现方法的主要内容如下所示:
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());
ClusterSwitchListener的实现方法如下所示:
public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor { private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class); private ClusterResource originClusterResource = null; private ClusterResource currentClusterResource = null; public ConsumerRecords onConsume(ConsumerRecords records) { return records; } public void close() { } public void onCommit(Map offsets) { } public void onUpdate(ClusterResource clusterResource) { synchronized (this) { originClusterResource = currentClusterResource; currentClusterResource = clusterResource; if (null == originClusterResource) { LOG.info("Cluster updated to " + currentClusterResource.clusterId()); } else { if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { LOG.info("Cluster not changed on update:" + clusterResource.clusterId()); } else { LOG.error("Cluster changed"); throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId() + ", consumer require restart"); } } } } public boolean isClusterResourceChanged() { if (null == originClusterResource) { return false; } if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { return false; } return true; } public void configure(Map<String, ?> configs) { } public static class ClusterSwitchException extends KafkaException { public ClusterSwitchException(String message, Throwable cause) { super(message, cause); } public ClusterSwitchException(String message) { super(message); } public ClusterSwitchException(Throwable cause) { super(cause); } public ClusterSwitchException() { super(); } }
处理捕获到DTS数据采集模块的切换行为。
将实际消费的最后一条订阅数据的时间位点(timestamp),设置为下一次订阅的初始位点,实现方法的主要内容如下所示:
try{ //do some action } catch (ClusterSwitchListener.ClusterSwitchException e) { reset(); } //重置位点 public reset() { long offset = kafkaConsumer.offsetsForTimes(timestamp); kafkaConsumer.seek(tp,offset); }
说明实现方法示例,请参见KafkaRecordFetcher。
MySQL字段类型与dataTypeNumber数值的对应关系
MySQL字段类型 | 对应dataTypeNumber数值 |
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |
Oracle字段类型与dataTypeNumber数值的对应关系
Oracle字段类型 | 对应dataTypeNumber数值 |
VARCHAR2/NVARCHAR2 | 1 |
NUMBER/FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR、NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
PostgreSQL字段类型与dataTypeNumber数值的对应关系
PostgreSQL字段类型 | 对应dataTypeNumber数值 |
INT2/SMALLINT | 21 |
INT4/INTEGER/SERIAL | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME/TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |