DataHub is a real-time data distribution platform that is designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze and use streaming data. This topic describes how to synchronize data from a self-managed Oracle database to a DataHub project by using Data Transmission Service (DTS). After you synchronize data, you can use big data services such as Realtime Compute for Apache Flink to analyze the data in real time.
Prerequisites
The engine version of the self-managed Oracle database is 9i, 10g, 11g, 12c, 18c, or 19c.
The self-managed Oracle database is running in ARCHIVELOG mode. Archived log files are accessible, and an appropriate retention period is set for archived log files. For more information, see Managing Archived Redo Log Files.
The supplemental logging feature is enabled for the self-managed Oracle database, and the SUPPLEMENTAL_LOG_DATA_PK and SUPPLEMENTAL_LOG_DATA_UI parameters are set to Yes. For more information, see Supplemental Logging.
A DataHub project is created to receive the synchronized data. For more information, see Create a project.
Precautions
Category | Description |
Category | Description |
Limits on the source database |
|
Other limits |
|
Supported synchronization topologies
One-way one-to-one synchronization
One-way one-to-many synchronization
One-way cascade synchronization
One-way many-to-one synchronization
For more information about the synchronization topologies that are supported by DTS, see Synchronization topologies.
SQL operations that can be synchronized
INSERT, UPDATE, and DELETE
Permissions required for database accounts
Database | Required permissions | References |
Database | Required permissions | References |
Self-managed Oracle database | Database administrator (DBA) | CREATE USER and GRANT If you need to synchronize data from an Oracle database but the DBA permission cannot be granted to the database account, you can enable archive logging and supplemental logging, and grant fine-grained permissions to the account. |
Enable logging and grant fine-grained permissions to an Oracle database account
If you need to synchronize data from an Oracle database but the database administrator (DBA) permission cannot be granted to the database account, you can enable archive logging and supplemental logging, and grant fine-grained permissions to the account.
Enable archive logging and supplemental logging.
Log type
Procedure
Log type
Procedure
Archive logging
Execute the following statements to enable archive logging:
shutdown immediate; startup mount; alter database archivelog; alter database open; archive log list;
Supplemental logging
Enable supplemental logging at the database or table level based on your business requirements.
You can enable database-level supplemental logging to ensure the stability of Data Transmission Service (DTS) tasks. You can enable table-level supplemental logging to reduce the disk usage of the source Oracle database.
Enable database-level supplemental logging
Execute the following statement to enable minimal supplemental logging:
alter database add supplemental log data;
Execute the following statement to enable primary key and unique key supplemental logging at the database level:
alter database add supplemental log data (primary key,unique index) columns;
Enable table-level supplemental logging
Execute the following statement to enable minimal supplemental logging:
alter database add supplemental log data;
Enable table-level supplemental logging by using one of the following methods:
Execute the following statement to enable primary key supplemental logging at the table level:
alter table table_name add supplemental log data (primary key) columns;
Execute the following statement to enable table-level supplemental logging for all columns:
alter table tb_name add supplemental log data (all) columns ;
Grant fine-grained permissions to an Oracle database account.
Oracle versions 9i to 11gOracle versions 12c to 19c that use the multitenant architectureOracle versions 12c to 19c that use a non-multitenant architecture# Create a database account named rdsdt_dtsacct and grant permissions to the account. create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct; grant create session to rdsdt_dtsacct; grant connect to rdsdt_dtsacct; grant resource to rdsdt_dtsacct; grant execute on sys.dbms_logmnr to rdsdt_dtsacct; grant select on V_$LOGMNR_LOGS to rdsdt_dtsacct; grant select on all_objects to rdsdt_dtsacct; grant select on all_tab_cols to rdsdt_dtsacct; grant select on dba_registry to rdsdt_dtsacct; grant select any table to rdsdt_dtsacct; grant select any transaction to rdsdt_dtsacct; -- v$log privileges grant select on v_$log to rdsdt_dtsacct; -- v$logfile privileges grant select on v_$logfile to rdsdt_dtsacct; -- v$archived_log privileges grant select on v_$archived_log to rdsdt_dtsacct; -- v$parameter privileges grant select on v_$parameter to rdsdt_dtsacct; -- v$database privileges grant select on v_$database to rdsdt_dtsacct; -- v$active_instances privileges grant select on v_$active_instances to rdsdt_dtsacct; -- v$instance privileges grant select on v_$instance to rdsdt_dtsacct; -- v$logmnr_contents privileges grant select on v_$logmnr_contents to rdsdt_dtsacct; -- system tables grant select on sys.USER$ to rdsdt_dtsacct; grant select on SYS.OBJ$ to rdsdt_dtsacct; grant select on SYS.COL$ to rdsdt_dtsacct; grant select on SYS.IND$ to rdsdt_dtsacct; grant select on SYS.ICOL$ to rdsdt_dtsacct; grant select on SYS.CDEF$ to rdsdt_dtsacct; grant select on SYS.CCOL$ to rdsdt_dtsacct; grant select on SYS.TABPART$ to rdsdt_dtsacct; grant select on SYS.TABSUBPART$ to rdsdt_dtsacct; grant select on SYS.TABCOMPART$ to rdsdt_dtsacct; grant select on v$database to rdsdt_dtsacct; grant select on dba_objects to rdsdt_dtsacct; grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct; grant select on dba_tab_cols to rdsdt_dtsacct;
# Switch to the pluggable database (PDB). Create a database account named rdsdt_dtsacct and grant permissions to the account. ALTER SESSION SET container = ORCLPDB1; create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct; grant create session to rdsdt_dtsacct; grant connect to rdsdt_dtsacct; grant resource to rdsdt_dtsacct; grant execute on sys.dbms_logmnr to rdsdt_dtsacct; grant select on all_objects to rdsdt_dtsacct; grant select on all_tab_cols to rdsdt_dtsacct; grant select on dba_registry to rdsdt_dtsacct; grant select any table to rdsdt_dtsacct; grant select any transaction to rdsdt_dtsacct; -- v$log privileges grant select on v_$log to rdsdt_dtsacct; -- v$logfile privileges grant select on v_$logfile to rdsdt_dtsacct; -- v$archived_log privileges grant select on v_$archived_log to rdsdt_dtsacct; -- v$parameter privileges grant select on v_$parameter to rdsdt_dtsacct; -- v$database privileges grant select on v_$database to rdsdt_dtsacct; -- v$active_instances privileges grant select on v_$active_instances to rdsdt_dtsacct; -- v$instance privileges grant select on v_$instance to rdsdt_dtsacct; -- v$logmnr_contents privileges grant select on v_$logmnr_contents to rdsdt_dtsacct; grant select on sys.USER$ to rdsdt_dtsacct; grant select on SYS.OBJ$ to rdsdt_dtsacct; grant select on SYS.COL$ to rdsdt_dtsacct; grant select on SYS.IND$ to rdsdt_dtsacct; grant select on SYS.ICOL$ to rdsdt_dtsacct; grant select on SYS.CDEF$ to rdsdt_dtsacct; grant select on SYS.CCOL$ to rdsdt_dtsacct; grant select on SYS.TABPART$ to rdsdt_dtsacct; grant select on SYS.TABSUBPART$ to rdsdt_dtsacct; grant select on SYS.TABCOMPART$ to rdsdt_dtsacct; -- V$PDBS privileges grant select on V_$PDBS to rdsdt_dtsacct; grant select on v$database to rdsdt_dtsacct; grant select on dba_objects to rdsdt_dtsacct; grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct; grant select on dba_tab_cols to rdsdt_dtsacct; grant select_catalog_role TO rdsdt_dtsacct; # Switch to the container database (CDB). Create a database account and grant permissions to the account. ALTER SESSION SET container = CDB$ROOT; # Create a database account named rdsdt_dtsacct and grant permissions to the account. You must modify the default parameters of the Oracle database. alter session set "_ORACLE_SCRIPT"=true; create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct; grant create session to rdsdt_dtsacct; grant connect to rdsdt_dtsacct; grant select on v_$logmnr_contents to rdsdt_dtsacct; grant LOGMINING TO rdsdt_dtsacct; grant EXECUTE_CATALOG_ROLE to rdsdt_dtsacct; grant execute on sys.dbms_logmnr to rdsdt_dtsacct; grant select on v$database to rdsdt_dtsacct; grant select on dba_objects to rdsdt_dtsacct; grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct; grant select on dba_tab_cols to rdsdt_dtsacct;
# Create a database account named rdsdt_dtsacct and grant permissions to the account. create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct; grant create session to rdsdt_dtsacct; grant connect to rdsdt_dtsacct; grant resource to rdsdt_dtsacct; grant select on V_$LOGMNR_LOGS to rdsdt_dtsacct; grant select on all_objects to rdsdt_dtsacct; grant select on all_tab_cols to rdsdt_dtsacct; grant select on dba_registry to rdsdt_dtsacct; grant select any table to rdsdt_dtsacct; grant select any transaction to rdsdt_dtsacct; -- v$log privileges grant select on v_$log to rdsdt_dtsacct; -- v$logfile privileges grant select on v_$logfile to rdsdt_dtsacct; -- v$archived_log privileges grant select on v_$archived_log to rdsdt_dtsacct; -- v$parameter privileges grant select on v_$parameter to rdsdt_dtsacct; -- v$database privileges grant select on v_$database to rdsdt_dtsacct; -- v$active_instances privileges grant select on v_$active_instances to rdsdt_dtsacct; -- v$instance privileges grant select on v_$instance to rdsdt_dtsacct; -- v$logmnr_contents privileges grant select on v_$logmnr_contents to rdsdt_dtsacct; grant select on sys.USER$ to rdsdt_dtsacct; grant select on SYS.OBJ$ to rdsdt_dtsacct; grant select on SYS.COL$ to rdsdt_dtsacct; grant select on SYS.IND$ to rdsdt_dtsacct; grant select on SYS.ICOL$ to rdsdt_dtsacct; grant select on SYS.CDEF$ to rdsdt_dtsacct; grant select on SYS.CCOL$ to rdsdt_dtsacct; grant select on SYS.TABPART$ to rdsdt_dtsacct; grant select on SYS.TABSUBPART$ to rdsdt_dtsacct; grant select on SYS.TABCOMPART$ to rdsdt_dtsacct; grant LOGMINING TO rdsdt_dtsacct; grant EXECUTE_CATALOG_ROLE to rdsdt_dtsacct; grant execute on sys.dbms_logmnr to rdsdt_dtsacct; grant select on v$database to rdsdt_dtsacct; grant select on dba_objects to rdsdt_dtsacct; grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct; grant select on dba_tab_cols to rdsdt_dtsacct;
Procedure
Purchase a data synchronization instance. For more information, see Purchase a DTS instance.
On the buy page, set Source Instance to Oracle and set Destination Instance to DataHub.
Log on to the DTS console.
If you are redirected to the Data Management (DMS) console, you can click the
icon in the
to go to the previous version of the DTS console.
In the left-side navigation pane, click Data Synchronization.
In the upper part of the Synchronization Tasks page, select the region in which the destination instance resides.
Find the data synchronization instance and click Configure Task in the Actions column.
Configure the source and destination instances.
Section
Parameter
Description
Section
Parameter
Description
N/A
Synchronization Task Name
DTS automatically generates a task name. We recommend that you specify an informative name to identify the task. You do not need to use a unique task name.
Source Instance Details
Instance Type
Select Self-managed Database on ECS.
If your source database is a self-managed database, you must deploy the network environment for the database. For more information, see Preparation overview.
Instance Region
The source region that you selected on the buy page. You cannot change the value of this parameter.
ECS Instance ID
Select the ID of the Elastic Compute Service (ECS) instance that hosts the self-managed Oracle database.
Database Type
The value of this parameter is set to Oracle and cannot be changed.
Port Number
Enter the service port number of the self-managed Oracle database. The default port number is 1521.
Instance Type
Non-RAC Instance: If you select this option, you must specify the SID parameter.
RAC Instance: You cannot select this option because RAC instances are not supported.
Database Account
Enter the account of the self-managed Oracle database. For information about the permissions that are required for the account, see Permissions required for database accounts.
Database Password
Enter the password of the database account.
Destination Instance Details
Instance Type
The value of this parameter is set to DataHub and cannot be changed.
Instance Region
The destination region that you selected on the buy page. You cannot change the value of this parameter.
Project
Select the name of the DataHub project.
In the lower-right corner of the page, click Set Whitelist and Next.
If the source or destination database is an Alibaba Cloud database instance, such as an ApsaraDB RDS for MySQL or ApsaraDB for MongoDB instance, DTS automatically adds the CIDR blocks of DTS servers to the IP address whitelist of the instance. If the source or destination database is a self-managed database hosted on an Elastic Compute Service (ECS) instance, DTS automatically adds the CIDR blocks of DTS servers to the security group rules of the ECS instance, and you must make sure that the ECS instance can access the database. If the self-managed database is hosted on multiple ECS instances, you must manually add the CIDR blocks of DTS servers to the security group rules of each ECS instance. If the source or destination database is a self-managed database that is deployed in a data center or provided by a third-party cloud service provider, you must manually add the CIDR blocks of DTS servers to the IP address whitelist of the database to allow DTS to access the database. For more information, see Add the CIDR blocks of DTS servers.
If the CIDR blocks of DTS servers are automatically or manually added to the whitelist of the database or instance, or to the ECS security group rules, security risks may arise. Therefore, before you use DTS to synchronize data, you must understand and acknowledge the potential risks and take preventive measures, including but not limited to the following measures: enhancing the security of your username and password, limiting the ports that are exposed, authenticating API calls, regularly checking the whitelist or ECS security group rules and forbidding unauthorized CIDR blocks, or connecting the database to DTS by using Express Connect, VPN Gateway, or Smart Access Gateway.
Select the synchronization policy and the objects to be synchronized.
Parameter
Description
Parameter
Description
Initial Synchronization
Select Initial Schema Synchronization.
After you select Initial Schema Synchronization, DTS synchronizes the schemas of the required objects (such as tables) to the destination DataHub instance.
Select the objects to be synchronized
Select one or more objects from the Available section and click the
icon to move the objects to the Selected section.
You can select only tables as the objects to be synchronized.
By default, after an object is synchronized to the destination cluster, the name of the object remains unchanged. You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Rename an object to be synchronized.
Whether to enable the new naming rules for additional columns
After DTS synchronizes data to DataHub, DTS adds additional columns to the destination topic. If the names of additional columns are the same as the names of existing columns in the destination topic, data synchronization fails. Select Yes or No to specify whether you want to enable the new naming rules for additional columns.
Before you specify this parameter, check whether additional columns and existing columns in the destination topic have name conflicts. For more information, see Modify the naming rules for additional columns.
Rename Databases and Tables
You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Object name mapping.
Retry Time for Failed Connections
By default, if DTS fails to connect to the source or destination database, DTS retries within the next 720 minutes (12 hours). You can specify the retry time based on your needs. If DTS reconnects to the source and destination databases within the specified time, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time based on your business needs. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
Optional:In the Selected section, move the pointer over the destination topic and click Edit. In the dialog box that appears, set the shard key. The shard key is used for partitioning.
In the lower-right corner of the page, click Precheck.
Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
If the task fails to pass the precheck, you can click the
icon next to each failed item to view details.
After you troubleshoot the issues based on the details, initiate a new precheck.
If you do not need to troubleshoot the issues, ignore the failed items and initiate a new precheck.
Close the Precheck dialog box after the following message is displayed: Precheck Passed. Then, the data synchronization task starts.
Wait until initial synchronization is complete and the data synchronization task enters the Synchronizing state.
You can view the status of the data synchronization task on the Synchronization Tasks page.
Schema of a DataHub topic
When DTS synchronizes incremental data to a DataHub topic, DTS adds additional columns to store metadata in the topic. The following figure shows the schema of a DataHub topic.
In this example, id
, name
, and address
are data fields. DTS adds the dts_
prefix to data fields, including the original data fields that are synchronized from the source database to the destination database, because the previous version of naming rules for additional columns is used. If you use the new naming rules for additional columns, DTS does not add prefixes to the original data fields that are synchronized from the source database to the destination database.
The following table describes the additional columns in the DataHub topic.
Previous additional column name | New additional column name | Type | Description |
Previous additional column name | New additional column name | Type | Description |
|
| String | The unique ID of the incremental log entry.
|
|
| String | The operation type. Valid values:
|
|
| String | The server ID of the database. |
|
| String | The database name. |
|
| String | The table name. |
|
| String | The operation timestamp displayed in UTC. It is also the timestamp of the log file. |
|
| String | Indicates whether the column values are pre-update values. Valid values: Y and N. |
|
| String | Indicates whether the column values are post-update values. Valid values: Y and N. |
Additional information about the dts_before_flag and dts_after_flag fields
The values of the dts_before_flag
and dts_after_flag
fields in an incremental log entry vary based on operation types:
INSERT
For an INSERT operation, the column values are the newly inserted record values (post-update values). The value of the
dts_before_flag
field is N, and the value of thedts_after_flag
field is Y.UPDATE
DTS generates two incremental log entries for an UPDATE operation. The two incremental log entries have the same values for the
dts_record_id
,dts_operation_flag
, anddts_utc_timestamp
fields.The first log entry records the pre-update values. Therefore, the value of the
dts_before_flag
field is Y, and the value of thedts_after_flag
field is N. The second log entry records the post-update values. Therefore, the value of thedts_before_flag
field is N, and the value of thedts_after_flag
field is Y.DELETE
For a DELETE operation, the column values are the deleted record values (pre-update values). The value of the
dts_before_flag
field is Y, and the value of thedts_after_flag
field is N.
What to do next
After you configure the data synchronization task, you can use Alibaba Cloud Realtime Compute for Apache Flink to analyze the data that is synchronized to the DataHub project. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?