全部产品
Search
文档中心

数据传输服务 DTS:PolarDB PostgreSQL版(兼容Oracle)集群同步至阿里云消息队列Kafka版

更新时间:Oct 25, 2024

本文介绍如何使用数据传输服务DTS(Data Transmission Service),将PolarDB PostgreSQL版(兼容Oracle)集群的数据同步至阿里云消息队列Kafka版

前提条件

  • PolarDB PostgreSQL版(兼容Oracle)集群中wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。设置方法,请参见设置集群参数

  • 已创建存储空间大于源PolarDB PostgreSQL版(兼容Oracle)实例已占用存储空间的目标阿里云消息队列Kafka版实例。

    说明

    源库和目标库支持的版本,请参见同步方案概览

  • 目标阿里云消息队列Kafka版实例中已创建用于接收同步数据的Topic,请参见步骤一:创建Topic

注意事项

类型

说明

源库限制

  • 带宽要求:源库所属的服务器需具备足够出口带宽,否则将影响数据同步速率。

  • 待同步的表需具备主键或唯一约束,且字段具有唯一性,否则可能会导致目标数据库中出现重复数据。

  • 如同步对象为表级别,且需进行编辑(如表列名映射),则单次同步任务待同步的表数量超过1000时,建议您拆分待同步的表,分批配置为多个任务,或者配置整库的同步任务,否则任务提交后可能会显示请求报错。

  • 需开启WAL日志。如为增量同步任务,DTS要求源数据库的WAL日志保存24小时以上,如为全量同步和增量同步任务,DTS要求源数据库的WAL日志至少保留7天以上(您可在全量同步完成后将WAL日志保存时间设置为24小时以上),否则DTS可能因无法获取WAL日志而导致任务失败,极端情况下甚至可能会导致数据不一致或丢失。由于您所设置的WAL日志保存时间低于DTS要求的时间进而导致的问题,不在DTS的SLA保障范围内。

  • 若源库有长事务,在增量同步任务运行时可能会导致源库长事务提交前的预写日志WAL(Write-Ahead Logging)无法清理而堆积,从而造成源库磁盘空间不足。

  • 源库的操作限制:

    • 在库表结构同步和全量同步阶段,请勿执行库或表结构变更的DDL操作,否则数据同步任务会失败。

    • 如仅执行全量数据同步,请勿向源实例中写入新的数据,否则会导致源和目标数据不一致。为实时保持数据一致性,建议选择结构同步、全量数据同步和增量数据同步。

    • 为保障同步任务的正常进行,避免主备切换导致的逻辑订阅中断,需要PolarDB PostgreSQL版(兼容Oracle)支持并开启Logical Replication Slot Failover。设置方式,请参见逻辑复制槽故障转移(Logical Replication Slot Failover)

其他限制

  • 单个数据同步任务只能同步一个数据库,如需同步多个数据库,您需要为每个数据库配置数据同步任务。

  • 增量数据同步期间,DTS会在源库中创建前缀为dts_sync_的replication slot用于复制数据。通过该replication slot,DTS可以获取源库15分钟内的增量日志。

    说明

    当释放同步任务或同步失败时,DTS会主动清理该replication slot;如果源库发生了主备切换,则需要您登录备库来手动清理。

  • 在数据同步期间,若目标Kafka发生了扩容或缩容,您需要重启迁移实例。

  • 为保障增量数据同步延迟时间展示的准确性,DTS会在源库中新增一个表,表名为dts_postgres_heartbeat,结构及内容如下图所示。表结构

  • 执行数据同步前需评估源库和目标库的性能,同时建议在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。否则全量数据同步时DTS占用源和目标库一定读写资源,可能会导致数据库的负载上升。

  • DTS会尝试恢复七天之内同步失败的任务。因此业务切换至目标实例前,请务必结束或释放该任务,或者回收DTS访问目标实例账号的写权限。避免该任务被自动恢复后,源端数据覆盖目标实例的数据。

  • 若实例运行失败,DTS技术支持人员将在8小时内尝试恢复该实例。在恢复失败实例的过程中,可能会对该实例进行重启、调整参数等操作。

    说明

    在调整参数时,仅会修改实例的参数,不会对数据库中的参数进行修改。可能修改的参数,包括但不限于修改实例参数中的参数。

费用说明

同步类型

链路配置费用

库表结构同步

不收费。

增量数据同步

收费,详情请参见计费概述

支持增量同步的SQL操作

操作类型

SQL操作语句

DML

INSERT、UPDATE、DELETE

DDL

  • CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、TRUNCATE TABLE

  • CREATE VIEW、ALTER VIEW、DROP VIEW

  • CREATE PROCEDURE、ALTER PROCEDURE、DROP PROCEDURE

  • CREATE FUNCTION、DROP FUNCTION

  • CREATE INDEX、DROP INDEX

说明

不支持DDL同步的有以下场景:

  • DDL中CASCADE和RESTRICT等附加信息不会被同步。

  • 一个事务中包含DML和DDL,则该DDL不会被同步。

  • 一个事务中同时包含在同步对象的DDL和不在同步对象的DDL。

  • 通过设置SET session_replication_role = replica会话下执行的DDL不支持同步。

  • 通过调用FUNCTION等方式执行的DDL。

  • 省略schema的DDL(会使用 show search_path 默认为public)。

  • DDL语句中包含if not exist的场景。

数据库账号的权限要求

数据库

权限要求

账号创建及授权方法

PolarDB PostgreSQL版(兼容Oracle)集群

高权限账号

创建和管理数据库账号

操作步骤

  1. 进入目标地域的同步任务列表页面(二选一)。

    通过DTS控制台进入

    1. 登录数据传输服务DTS控制台

    2. 在左侧导航栏,单击数据同步

    3. 在页面左上角,选择同步实例所属地域。

    通过DMS控制台进入

    说明

    实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台自定义DMS界面布局与样式

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,选择集成与开发 > 数据传输(DTS) > 数据同步

    3. 同步任务右侧,选择同步实例所属地域。

  2. 单击创建任务,进入任务配置页面。

  3. 可选:在页面右上角,单击试用新版配置页

    说明
    • 若您已进入新版配置页(页面右上角的按钮为返回旧版配置页),则无需执行此操作。

    • 新版配置页和旧版配置页部分参数有差异,建议使用新版配置页。

  4. 配置源库及目标库信息。

    说明

    目标阿里云消息队列Kafka版实例的参数获取方式,请参见配置消息队列Kafka版实例的参数

    类别

    配置

    说明

    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息

    选择DMS数据库实例

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要配置下方的数据库信息。

    说明

    数据库类型

    选择PolarDB(兼容Oracle)

    接入方式

    选择云实例

    实例地区

    选择源PolarDB PostgreSQL版(兼容Oracle)集群所属的地域。

    是否跨阿里云账号

    本示例使用当前阿里云账号下的数据库实例,需选择不跨账号

    实例ID

    选择源PolarDB PostgreSQL版(兼容Oracle)集群的ID。

    数据库名称

    填入源PolarDB PostgreSQL版(兼容Oracle)集群中待同步对象所属数据库的名称。

    数据库账号

    填入源PolarDB PostgreSQL版(兼容Oracle)集群的数据库账号,权限要求请参见数据库账号的权限要求

    数据库密码

    填入该数据库账号对应的密码。

    目标库信息

    选择DMS数据库实例

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要配置下方的数据库信息。

    说明

    数据库类型

    选择Kafka

    接入方式

    选择专线/VPN网关/智能网关

    说明

    此同步实例中的阿里云消息队列Kafka版暂不支持以云实例的方式接入到DTS,需要将其作为自建Kafka进行配置。

    实例地区

    选择目标阿里云消息队列Kafka版实例所属地域。

    已和目标端数据库联通的VPC

    选择目标阿里云消息队列Kafka版实例所属的专有网络ID。

    域名或IP地址

    填入目标阿里云消息队列Kafka版实例默认接入点中的任意一个IP地址。

    端口

    填入目标阿里云消息队列Kafka版实例的服务端口,默认为9092

    数据库账号

    本示例无需填写。

    数据库密码

    Kafka版本

    请根据Kafka实例版本,选择对应的版本信息。

    连接方式

    请根据业务及安全需求,选择非加密连接SCRAM-SHA-256

    Topic

    在下拉框中选择接收数据的Topic。

    存储DDL的Topic

    在下拉框中选择用于存储DDL信息的Topic。如果未指定,DDL信息默认存储在Topic选择的Topic中。

    是否使用Kafka Schema Registry

    Kafka Schema Registry是元数据提供服务层,提供了一个RESTful接口,用于存储和检索Avro Schema。

    • :不使用Kafka Schema Registry。

    • :使用Kafka Schema Registry。您需要在Schema Registry URI或IP文本框输入Avro Schema在Kafka Schema Registry注册的URL或IP。

  5. 配置完成后,单击页面下方的测试连接以进行下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添加到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  6. 配置任务对象。

    1. 对象配置页面,配置待同步的对象。

      配置

      说明

      同步类型

      建议同时选中库表结构同步增量同步

      说明

      不支持全量同步

      目标已存在表的处理模式

      • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

        说明

        如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射

      • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。

        警告

        选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

        • 表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:

          • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。

          • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。

        • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。

      投递到Kafka的数据格式

      请根据业务需求选择同步到Kafka实例中的数据存储格式。

      • 如果您选择DTS Avro,根据DTS Avro的Schema定义进行数据解析,Schema定义详情请参见DTS Avro的schema定义

      • 如果您选择Shareplex Json,Shareplex Json的参数说明和示例请参见Shareplex Json

      Kafka压缩格式

      根据需求选择Kafka压缩消息的压缩格式。

      • LZ4(默认):压缩率较低,压缩速率较高。

      • GZIP:压缩率较高,压缩速率较低。

        说明

        对CPU的消耗较高。

      • Snappy:压缩率中等,压缩速率中等。

      投递到Kafka Partition策略

      请根据业务需求选择同步的策略,策略信息请参见Kafka Partition迁移策略说明

      目标库对象名称大小写策略

      您可以配置目标实例中同步对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略

      源库对象

      源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

      说明

      同步对象的选择粒度为表。

      已选择对象

      本示例无需额外配置。您可以使用映射功能,设置源表在目标Kafka实例中的Topic名称、Topic的Partition数量、Partition Key等信息。更多信息,请参见映射信息

      说明
      • 如果使用了对象名映射功能,可能会导致依赖这个对象的其他对象同步失败。

      • 如需选择增量同步的SQL操作,请在已选择对象中右击待同步对象,并在弹出的对话框中选择所需同步的SQL操作。支持的操作,请参见支持增量同步的SQL操作

    2. 单击下一步高级配置,进行高级参数配置。

      配置

      说明

      选择调度该任务的专属集群

      DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群

      源库、目标库无法连接后的重试时间

      在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

      说明
      • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

      • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

      源库、目标库出现其他问题后的重试时间

      在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。

      重要

      源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。

      是否限制增量同步速率

      您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。

      环境标签

      您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。

      配置ETL功能

      选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL

      监控告警

      是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

  7. 保存任务并进行预检查。

    • 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数

    • 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  8. 预检查通过率显示为100%时,单击下一步购买

  9. 购买实例。

    1. 购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。

      类别

      参数

      说明

      信息配置

      计费方式

      • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。

      • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。

      资源组配置

      实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

      链路规格

      DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明

      订购时长

      在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。

      说明

      该选项仅在付费类型为预付费时出现。

    2. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》

    3. 单击购买并启动,并在弹出的确认对话框,单击确定

      您可在数据同步界面查看具体任务进度。

映射信息

  1. 已选择对象区域框中,将鼠标指针放置在目标Topic名(表级别)上。

  2. 单击目标Topic名后出现的编辑

  3. 在弹出的编辑表对话框中,配置映射信息。

    配置

    说明

    表名称

    源表同步到的目标Topic名称,默认为源库及目标库配置阶段在目标库信息选择的Topic

    重要
    • 目标库为阿里云消息队列Kafka版实例时,设置的Topic名称必须在目标Kafka实例中真实存在,否则将导致数据同步失败。

    • 若您修改了表名称,数据将会被写入到您填写的Topic中。

    过滤条件

    详情请参见设置过滤条件

    设置新建Topic的Partition数量

    数据写入到目标Topic时的分区数。

    Partition Key

    投递到Kafka Partition策略选择为按主键的hash值投递到不同Partition时,您需要配置本参数,指定单个或多个列作为Partition Key来计算Hash值,DTS将根据计算得到的Hash值将不同的行投递到目标Topic的各Partition中。否则,该投递策略在增量写入阶段将无法生效。

    说明

    取消选中全表同步后,才可以勾选Partition Key

  4. 单击确定