This topic describes how to synchronize incremental data from an ApsaraDB RDS database
to MaxCompute. You can refer to this topic to synchronize incremental data in different
scenarios.
Background information
Data to be synchronized is classified into data that does not change after generation
and data that is constantly updated after generation. For example, log entries do
not change after they are generated, and user data may be constantly updated due to
the changes of user status.
If the running results of a node that is run multiple times remain the same, you can
schedule the node to rerun it. If an error occurs on the node, you can delete dirty
data. This principle is called idempotence. Each time when you write data, the data
is written to a separate table or partition or overwrites the historical data in an
existing table or partition.
In this topic, the scheduled date of a data synchronization node is set to November
14, 2016, and historical data is synchronized to the ds=20161113 partition on this
day. In the incremental synchronization scenario, automatic scheduling is configured
to synchronize incremental data to the ds=20161114 partition on the early morning
of November 15, 2016. The optime field indicates the time when a data record is modified. This field is used to determine
whether a data record is incremental data.
Usage notes
- Incremental synchronization is not supported for specific types of data sources, such
as HBase and OTSStream data sources. You can refer to the topics for the Reader plug-ins
of the related data sources to check whether incremental synchronization is supported.
- The parameters that you must configure vary based on the Reader plug-ins that you
use to synchronize incremental data. For more information, see Supported data source types, readers, and writers.
- For information about how to configure a batch synchronization node to synchronize
incremental data, see Synchronize incremental data.
Synchronize the incremental data of data that does not change after generation
For data that does not change after it is generated, you can partition a table based
on the pattern in which data is generated. In most cases, you can partition a table
by date, such as one partition per day.
- Execute the following statements to prepare data in the ApsaraDB RDS database:
drop table if exists oplog;
create table if not exists oplog(
optime DATETIME,
uname varchar(50),
action varchar(50),
status varchar(10)
);
Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');
The preceding data entries are used as historical data. You must first synchronize
all historical data to the ds=20161113 partition.
- In the Scheduled Workflow pane of the DataStudio page, expand the created workflow, right-click Table under MaxCompute, and then select Create Table.
- In the Create Table dialog box, set the Name parameter to ods_oplog and click Create.
- On the configuration tab of the ods_oplog table, click DDL. In the DDL dialog box, enter the following statement to create a MaxCompute table:
-- Create a MaxCompute table and partition the table by day.
create table if not exists ods_oplog(
optime datetime,
uname string,
action string,
status string
) partitioned by (ds string);
- Configure a data synchronization node to synchronize historical data. For more information,
see Configure a batch synchronization node by using the codeless UI.
After the test on the data synchronization node is complete, click Properties in the right-side navigation pane of the configuration tab of the node. On the Properties
tab, select Skip Execution for the Recurrence parameter and commit or deploy the node again to prevent the node
from being automatically scheduled.
- Execute the following statements to insert data into the source table in the ApsaraDB
RDS database as incremental data:
insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed');
insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
- Configure a data synchronization node to synchronize incremental data.
When you configure the
source for the node, enter
date_format(optime,'%Y%m%d')=${bdp.system.bizdate}
in the Filter field. When you configure the
destination for the node, enter
${bdp.system.bizdate}
in the Partition Key Column field.
Note After you specify a filter condition, you can query data that is inserted into the
source table on November 14, 2016 and synchronize the data to the incremental data
partition of the destination table on the early morning of November 15, 2016.
- View the incremental synchronization result.
Click the Properties tab in the right-side navigation pane of the configuration tab of the node. On the
Properties tab, set the Scheduling Cycle parameter to Day. After you commit or deploy
the data synchronization node that is used to synchronize incremental data, the node
is automatically scheduled to run on the next day. After the node is successfully
run, you can view the data in the destination MaxCompute table.
Synchronize the incremental data of data that is constantly updated after generation
Data warehouses have a time-variant characteristic. Therefore, we recommend that you
synchronize all data in tables on which changes are generated, such as user and order
tables, on a daily basis. This way, full data is stored on a daily basis, and you
can retrieve historical and current data.
In some special scenarios, you may need to synchronize only incremental data on a
daily basis. MaxCompute does not allow you to change data by using the UPDATE statement.
Therefore, you must use other methods to synchronize incremental data. This section
describes how to synchronize full data on a daily basis and how to synchronize full
data at a time and then incremental data on a daily basis.
- Execute the following statements to prepare data:
drop table if exists user ;
create table if not exists user(
uid int,
uname varchar(50),
deptno int,
gender VARCHAR(1),
optime DATETIME
);
-- Insert historical data.
insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
-- Insert incremental data.
update user set deptno=101,optime=CURRENT_TIME where uid = 2; -- Change null values to non-null values.
update user set deptno=104,optime=CURRENT_TIME where uid = 3; -- Change non-null values to other non-null values.
update user set deptno=null,optime=CURRENT_TIME where uid = 4; -- Change non-null values to null values.
delete from user where uid = 5;
insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
- Synchronize data.
- Synchronize full data on a daily basis.
- Execute the following statement to create a MaxCompute table. For more information,
see Create a MaxCompute table.
-- Synchronize full data.
create table ods_user_full(
uid bigint,
uname string,
deptno bigint,
gender string,
optime DATETIME
) partitioned by (ds string);
- Configure a data synchronization node to synchronize full data.
Note Set the Scheduling Cycle parameter to Day because full data must be synchronized on
a daily basis.
- Run the data synchronization node and view the data in the destination MaxCompute
table after the synchronization is complete.
When full synchronization is performed on a daily basis, incremental data is also
synchronized every day. You can view the data results in the table after the node
is automatically scheduled to run on the next day.
- Synchronize incremental data on a daily basis.
We recommend that you do not use this synchronization mode except in scenarios in
which the DELETE statement is not supported and you fail to execute related SQL statements
to view the deleted data. In most cases, data can be deleted by using the UPDATE statement
instead of the DELETE statement. In scenarios in which this method is inapplicable,
this data synchronization mode may cause data inconsistency. In addition, if you use
this data synchronization mode, you must merge new and historical data after data
synchronization.
Prepare data
Create two tables. One table is used to store all the latest data, and the other table
is used to store incremental data.
-- Create a result table.
create table dw_user_inc(
uid bigint,
uname string,
deptno bigint,
gender string,
optime DATETIME
);
-- Create an incremental data table.
create table ods_user_inc(
uid bigint,
uname string,
deptno bigint,
gender string,
optime DATETIME
)
- Configure a data synchronization node to write full data to the result table.
Note You need to run the node only once. After you run the node, click the Properties tab in the right-side navigation pane of the configuration tab of the node, and select
Skip Execution for the Recurrence parameter on the Properties tab.
- Configure a data synchronization node to write incremental data to the incremental
data table. To filter the data, specify the filter condition
date_format(optime,'%Y%m%d')=${bdp.system.bizdate}
.
- Execute the following statements to merge data:
insert overwrite table dw_user_inc
select
-- All SELECT operations are listed. If the incremental data table contains data, data in the source changes. In this case, data in the incremental data table is the latest data.
case when b.uid is not null then b.uid else a.uid end as uid,
case when b.uid is not null then b.uname else a.uname end as uname,
case when b.uid is not null then b.deptno else a.deptno end as deptno,
case when b.uid is not null then b.gender else a.gender end as gender,
case when b.uid is not null then b.optime else a.optime end as optime
from
dw_user_inc a
full outer join ods_user_inc b
on a.uid = b.uid ;
View the merge result. The result shows that the deleted data entry is not synchronized.
Daily incremental synchronization provides the benefit that only a small amount of
data needs to be synchronized every day. However, data inconsistency may occur, which
requires an extra computing workload to merge data.
We recommend that you synchronize constantly changing data in full mode on a daily
basis. In addition, you can specify a lifecycle for the historical data. This way, the historical data can be automatically deleted
after it is retained for a specific period of time.