数据传输服务DTS(Data Transmission Service)支持将云数据库MongoDB版(副本集架构)实例的增量数据同步至函数计算FC的指定函数。您可以编写函数代码,结合同步至函数中的数据,对数据进行二次加工。
前提条件
注意事项
类型 | 说明 |
源库限制 |
|
其他限制 |
|
特殊情况 | 当源库为自建MongoDB时:
说明 如果同步对象选择为整库,您还可以创建心跳表,心跳表每秒定期更新或者写入数据。 |
费用说明
同步类型 | 链路配置费用 |
增量数据同步 | 收费,详情请参见计费概述。 |
支持同步的操作
同步类型 | 说明 |
增量同步 | 使用Oplog增量同步不支持在任务开始运行后新建的数据库,支持同步的增量更新如下:
使用ChangeStream支持同步的增量更新如下:
|
数据库账号的权限要求
数据库 | 所需权限 | 创建及授权方式 |
源云数据库MongoDB版 | 待同步库、admin库和local库的read权限。 |
操作步骤
进入目标地域的同步任务列表页面(二选一)。
通过DTS控制台进入
登录数据传输服务DTS控制台。
在左侧导航栏,单击数据同步。
在页面左上角,选择同步实例所属地域。
通过DMS控制台进入
说明实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台和自定义DMS界面布局与样式。
登录DMS数据管理服务。
在顶部菜单栏中,选择
。在同步任务右侧,选择同步实例所属地域。
单击创建任务,进入任务配置页面。
可选:在页面右上角,单击试用新版配置页。
说明若您已进入新版配置页(页面右上角的按钮为返回旧版配置页),则无需执行此操作。
新版配置页和旧版配置页部分参数有差异,建议使用新版配置页。
配置源库及目标库信息。
类别
配置
说明
无
任务名称
DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
源库信息
选择已有连接信息
您可以按实际需求,选择是否使用已有数据库实例。
如使用已有实例,下方数据库信息将自动填入,您无需重复输入。
如不使用已有实例,您需要配置下方的数据库信息。
说明您可以在数据连接管理页面或新版配置页面,将数据库录入DTS。更多信息,请参见数据连接管理。
DMS控制台的配置项为选择DMS数据库实例,您可以单击新增DMS数据库实例或在控制台首页将数据库录入DMS。更多信息,请参见云数据库录入和他云/自建数据库录入。
数据库类型
选择MongoDB。
接入方式
选择云实例。
实例地区
选择源云数据库MongoDB版实例所属地域。
是否跨阿里云账号
本示例为同一阿里云账号间的同步,选择不跨账号。
架构类型
选择副本集架构。
迁移方式
请根据实际情况,选择增量数据同步的方式。
Oplog(推荐):
若源库已开启Oplog日志,则支持此选项。
说明本地自建MongoDB和云数据库MongoDB版默认已开启Oplog日志,且使用此方式同步增量数据时增量同步任务的延迟较小(拉取日志的速度较快),因此推荐选择Oplog。
ChangeStream:
若源库已开启变更流(Change Streams),则支持此选项。
说明源库为Amazon DocumentDB(非弹性集群)时,仅支持选择ChangeStream。
源库架构类型选择为分片集群架构,无需填写Shard账号和Shard密码。
实例ID
选择源云数据库MongoDB版实例ID。
鉴权数据库名称
填入源云数据库MongoDB版实例数据库账号所属的数据库名称,若未修改过则为默认的admin。
数据库账号
填入源云数据库MongoDB版实例的数据库账号,权限要求请参见数据库账号的权限要求。
数据库密码
填入该数据库账号对应的密码。
连接方式
DTS支持非加密连接、SSL安全连接和Mongo Atlas SSL三种连接方式。连接方式的选项与接入方式和架构类型有关,请以控制台为准。
说明架构类型为分片集群架构,且迁移方式为Oplog的MongoDB数据库,不支持SSL安全连接。
若源库为自建(接入方式不为云实例)副本集架构的MongoDB数据库,并且选择了SSL安全连接,DTS还支持上传CA证书对连接进行校验。
目标库信息
选择已有连接信息
您可以按实际需求,选择是否使用已有数据库实例。
如使用已有实例,下方数据库信息将自动填入,您无需重复输入。
如不使用已有实例,您需要配置下方的数据库信息。
说明您可以在数据连接管理页面或新版配置页面,将数据库录入DTS。更多信息,请参见数据连接管理。
DMS控制台的配置项为选择DMS数据库实例,您可以单击新增DMS数据库实例或在控制台首页将数据库录入DMS。更多信息,请参见云数据库录入和他云/自建数据库录入。
数据库类型
选择函数计算 FC。
接入方式
选择云实例。
实例地区
默认与源库实例地区一致,且不支持修改。
服务
选择目标函数计算FC的服务名称。
函数
选择目标函数计算FC用于接收数据的函数。
服务版本和别名
请根据实际情况进行选择。
默认版本:服务版本固定为LATEST。
指定版本:您还需选择服务版本。
指定别名:您还需选择服务别名。
说明目标函数计算FC的专有名词,请参见基本概念。
配置完成后,在页面下方单击测试连接以进行下一步。
说明请确保DTS服务的IP地址段能够被自动或手动添加至源库和目标库的安全设置中,以允许DTS服务器的访问。更多信息,请参见添加DTS服务器的IP地址段。
若源库或目标库为自建数据库(接入方式不是云实例),则还需要在弹出的DTS服务器访问授权对话框单击测试连接。
配置任务对象。
在对象配置页面,配置待同步的对象。
配置
说明
同步类型
仅支持增量同步,且默认已选中。
数据格式
同步到FC函数中的数据存储格式,当前仅支持Canal Json。
说明Canal Json的参数说明和示例,请参见Canal Json说明。
源库对象
在源库对象框中单击待同步对象,然后单击将其移动至已选择对象框。
说明同步对象的选择粒度为库或集合。
已选择对象
请在已选择对象框中确认待同步的数据。
说明如需移除已选择的对象,可以在已选择对象框中勾选目标对象,并单击进行移除。
如需按库或集合级别筛选需要同步的增量更新操作,请在已选择对象框中右键单击待同步的对象,并在弹出的对话框中进行选择。
单击下一步高级配置,进行高级参数配置。
配置
说明
选择调度该任务的专属集群
DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群。
源库、目标库无法连接后的重试时间
在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。
说明针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。
由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。
源库、目标库出现其他问题后的重试时间
在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。
重要源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。
是否限制增量同步速率
您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS和每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。
环境标签
您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。
配置ETL功能
选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL。
是:配置ETL功能,并在文本框中填写数据处理语句,详情请参见在DTS迁移或同步任务中配置ETL。
否:不配置ETL功能。
监控告警
是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。
不设置:不设置告警。
设置:设置告警,您还需要设置告警阈值和告警通知。更多信息,请参见在配置任务过程中配置监控告警。
保存任务并进行预检查。
若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数。
若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查。
说明在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。
如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
如果预检查产生警告:
对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情、确认屏蔽、确定、重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。
购买实例。
预检查通过率显示为100%时,单击下一步购买。
在购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。
类别
参数
说明
信息配置
计费方式
预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。
后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。
资源组配置
实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理。
链路规格
DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明。
订购时长
在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。
说明该选项仅在付费类型为预付费时出现。
配置完成后,阅读并勾选《数据传输(按量付费)服务条款》。
单击购买并启动,并在弹出的确认对话框,单击确定。
您可在数据同步界面查看具体任务进度。
后续步骤
若待同步的单条数据大于16 MB,导致DTS任务报错,您可以修改同步对象或者使用ETL功能,过滤大字段的数据。操作方法,请参见在已有同步任务上修改ETL配置和修改同步对象。
根据实际需求,编写函数代码。更多信息,请参见代码开发概述。
目标服务接收的数据格式
FC接收到的数据类型为Object,源端增量数据以数组的方式存储在Records字段中,数组中的每一个元素为一条Object类型的数据记录。其中Object字段和含义如下表所示。
FC接收到的数据有DML和DDL两种:
DDL:记录更改数据库的结构信息,如CreateIndex、CreateCollection、DropIndex、DropCollection等。
DML:记录管理数据库中的数据信息,如INSERT、UPDATE、DELETE。
字段 | 类型 | 说明 |
| Boolean | 是否为DDL操作。
|
| String | SQL操作的类型。
|
| String | MongoDB的数据库名。 |
| String | MongoDB的集合名。 |
| String | MongoDB的主键名,固定为_id。 |
| Long | 操作在源库的执行时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可用搜索引擎获取。 |
| Long | 操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可用搜索引擎获取。 |
| Object Array | Array中仅包含一个元素,类型为Object,其中Key为doc,value的类型为Json String。 说明 将value反序列化即可得到数据记录。 |
| Int | 操作的序列号。 |
DDL操作数据格式示例
创建集合
SQL语句
db.createCollection("testCollection")
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056437000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056437510
}]
}
删除集合
SQL语句
db.testCollection.drop()
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"drop": "testCollection"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056577000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056577789
}]
}
创建索引
SQL语句
db.testCollection.createIndex({name:1})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056670000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': 'testCollection',
'ts': 1694056670719
}]
}
删除索引
SQL语句
db.testCollection.dropIndex({name:1})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
}],
'pkNames': ['_id'],
'type': 'DDL',
'es': 1694056817000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': True,
'table': '$cmd',
'ts': 1694056818035
}]
}
DML操作数据格式示例
插入数据
SQL语句
// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// 逐条插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784427
}, {
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
}],
'pkNames': ['_id'],
'type': 'INSERT',
'es': 1694054783000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054784428
}]
}
更新数据
SQL语句
db.user.update({"name":"jack"},{$set:{"age":30}})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"$set": {"age": 30}}'
}],
'pkNames': ['_id'],
'old': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'type': 'UPDATE',
'es': 1694054989000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694054990555
}]
}
删除数据
SQL语句
db.user.remove({"name":"jack"})
FC接收到的数据
{
'Records': [{
'data': [{
'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
}],
'pkNames': ['_id'],
'type': 'DELETE',
'es': 1694055452000,
'database': 'MongoDBTest',
'id': 0,
'isDdl': False,
'table': 'user',
'ts': 1694055452852
}]
}