使用通道服务、DataWorks或者DataX将表格存储数据表中的数据同步到另一个数据表。
前提条件
已创建目标数据表,目标数据表的列必须与源数据表中待迁移的列一一对应。具体操作,请参见创建数据表。
如果要实现跨账号、跨地域数据迁移,请使用DataX工具通过互联网或者通过云企业网连通VPC进行操作。关于使用云企业网的具体操作,请参见云企业网快速入门。
使用通道服务迁移同步
创建源数据表的通道后,使用SDK进行迁移同步。迁移过程中可以自定义业务处理逻辑对数据进行处理。
前提条件
已确定要使用的Endpoint。具体操作,请参见初始化OTSClient。
已配置密钥。具体操作,请参见初始化OTSClient。
已将密钥配置到环境变量中。具体操作,请参见初始化OTSClient。
表格存储使用OTS_AK_ENV环境变量名表示阿里云账号或者RAM用户的AccessKey ID,使用OTS_SK_ENV环境变量名表示对应AccessKey Secret,请根据实际配置。
操作步骤
使用表格存储控制台或SDK创建源数据表的通道并记录通道ID,具体操作请分别参见快速入门或通过SDK使用通道服务。
使用SDK迁移数据。
示例代码如下:
public class TunnelTest { public static void main(String[] args){ String accessKeyId = System.getenv("OTS_AK_ENV"); String accessKeySecret = System.getenv("OTS_SK_ENV"); TunnelClient tunnelClient = new TunnelClient("endpoint", accessKeyId,accessKeySecret,"instanceName"); TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //tunnelId可以在表格存储控制台通道管理页面查看或者调用describeTunnelRequest查询。 TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); worker.shutdown(); tunnelClient.shutdown(); } } public static class SimpleProcessor implements IChannelProcessor{ //目标tablestore连接对象。 TunnelClient tunnelClient = new TunnelClient("endpoint", "accessKeyId","accessKeySecret","instanceName"); @Override public void process(ProcessRecordsInput processRecordsInput) { //ProcessRecordsInput中返回了增量或全量数据。 List<StreamRecord> list = processRecordsInput.getRecords(); for(StreamRecord streamRecord : list){ switch (streamRecord.getRecordType()){ case PUT: //自定义业务处理逻辑。 //putRow break; case UPDATE: //updateRow break; case DELETE: //deleteRow break; } System.out.println(streamRecord.toString()); } } @Override public void shutdown() { } } }
使用DataWorks或者DataX迁移同步
通过DataWorks或者DataX实现表格存储数据的迁移同步,此处以DataWorks为例介绍迁移操作。
步骤一:新增表格存储数据源
分别以源数据表和目标数据表所在实例新增表格存储数据源。
进入数据集成页面。
以项目管理员身份登录DataWorks控制台。
在左侧导航栏,单击工作空间列表后,选择地域。
在工作空间列表页面,在目标工作空间操作列选择快速进入>数据集成。
在左侧导航栏,单击数据源。
在数据源页面,单击新增数据源。
在新增数据源对话框,找到Tablestore区块,单击Tablestore。
在新增OTS数据源对话框,根据下表配置数据源参数。
参数
说明
数据源名称
数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。
数据源描述
对数据源进行简单描述,不得超过80个字符。
Endpoint
Tablestore实例的服务地址。更多信息,请参见服务地址。
如果Tablestore实例和目标数据源的资源在同一个地域,填写VPC地址;如果Tablestore实例和目标数据源的资源不在同一个地域,填写公网地址。
Table Store实例名称
Tablestore实例的名称。更多信息,请参见实例。
AccessKey ID
阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。获取方式请参见创建AccessKey。
AccessKey Secret
测试资源组连通性。
创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。
重要数据同步时,一个任务只能使用一种资源组。资源组列表默认仅显示独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。
如果未创建资源组,请单击新建独享数据集成资源组进行创建。具体操作,请参见新增和使用独享数据集成资源组。
单击相应资源组操作列的测试连通性,当连通状态为可连通时,表示连通成功。
测试连通性通过后,单击完成。
在数据源列表中,可以查看新建的数据源。
步骤三:新建同步任务节点
进入数据开发页面。
以项目管理员身份登录DataWorks控制台。
选择地域,在左侧导航栏,单击工作空间列表。
在工作空间列表页面,在目标工作空间操作列选择快速进入>数据开发。
在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。
如果需要新建业务流程,请参见创建业务流程。
在数据集成节点上右键选择新建节点 > 离线同步。
在新建节点对话框,选择路径并填写节点名称。
单击确认。
在数据集成节点下会显示新建的离线同步节点。
步骤四:配置离线同步任务并启动
在数据集成节点下,双击打开新建的离线同步任务节点。
配置同步网络链接。
选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。
重要数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。
在网络与资源配置步骤,选择数据来源为Tablestore,并选择数据源名称为新增的源数据源。
选择资源组。
选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。
重要请与新增数据源时选择的资源组保持一致。
选择数据去向为Tablestore,并选择数据源名称为新增的目标数据源。
系统会自动测试资源组与所选数据源之间连通性。
测试可连通后,单击下一步。
在提示对话框,单击确认使用脚本模式。
重要表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。
任务转为脚本模式后,将无法转为向导模式。
配置任务并保存。
全量数据的同步需要使用到Tablestore Reader与Tablestore Writer插件。脚本配置规则请参见Tablestore数据源。
在配置任务步骤,编辑脚本。
配置Tablestore Reader
Tablestore Reader插件实现了从Tablestore读取数据,通过您指定的抽取数据范围,可以方便地实现数据增量抽取的需求。具体操作,请参见附录:Reader脚本Demo与参数说明。
配置Tablestore Writer
Tablestore Writer通过Tablestore官方Java SDK连接到Tablestore服务端,并通过SDK写入Tablestore服务端 。Tablestore Writer本身对于写入过程进行了诸多优化,包括写入超时重试、异常写入重试、批量提交等功能。具体操作,请参见附录:Writer脚本Demo与参数说明。
按【Ctrl+S】保存脚本。
说明执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。
执行同步任务。
说明全量数据一般只需要同步一次,无需配置调度属性。
单击图标。
在参数对话框,选择运行资源组的名称。
单击运行。
运行结束后,在同步任务的运行日志页签,单击Detail log url对应的链接后。在任务的详细运行日志页面,查看
Current task status
对应的状态。当
Current task status
的值为FINISH时,表示任务运行完成。