Synchronize data from a self-managed Oracle database to a DataHub project

Updated at: 2024-10-17 10:53
important

This topic contains important information on necessary precautions. We recommend that you read this topic carefully before proceeding.

DataHub is a real-time data distribution platform that is designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze and use streaming data. This topic describes how to synchronize data from a self-managed Oracle database to a DataHub project by using Data Transmission Service (DTS). After you synchronize data, you can use big data services such as Realtime Compute for Apache Flink to analyze the data in real time.

Prerequisites

  • The engine version of the self-managed Oracle database is 9i, 10g, 11g, 12c, 18c, or 19c.

  • The self-managed Oracle database is running in ARCHIVELOG mode. Archived log files are accessible, and an appropriate retention period is set for archived log files. For more information, see Managing Archived Redo Log Files.

  • The supplemental logging feature is enabled for the self-managed Oracle database, and the SUPPLEMENTAL_LOG_DATA_PK and SUPPLEMENTAL_LOG_DATA_UI parameters are set to Yes. For more information, see Supplemental Logging.

  • A DataHub project is created to receive the synchronized data. For more information, see Create a project.

Precautions

Category

Description

Category

Description

Limits on the source database

  • Requirements for the objects to be synchronized:

    • The tables to be synchronized must have PRIMARY KEY or UNIQUE constraints, and all fields must be unique. Otherwise, the destination database may contain duplicate data records.

    • If the version of your Oracle database is 12c or later, the names of the tables to be synchronized cannot exceed 30 bytes in length.

    • If you select tables as the objects to be synchronized and you want to edit the tables in the destination database, such as renaming tables or columns, you can synchronize up to 1,000 tables in a single data synchronization task. If you run a task to synchronize more than 1,000 tables, a request error occurs. In this case, we recommend that you configure multiple tasks to synchronize the tables in batches or configure a task to synchronize the entire database.

  • If the source database is an Oracle RAC database connected over Express Connect, you must specify a VIP for the database when you configure the data synchronization task.

  • If the self-managed Oracle database is an Oracle RAC database, you can use only a VIP rather than a Single Client Access Name (SCAN) IP address when you configure the data synchronization task. After you specify the VIP, node failover of the Oracle RAC database is not supported.

  • The redo logging and archive logging features must be enabled.

    Note

    If you perform only incremental data synchronization, the redo logs and archive logs of the source database must be stored for more than 24 hours. If you perform both full data synchronization and incremental data synchronization, the redo logs and archive logs of the source database must be stored for at least seven days. Otherwise, Data Transmission Service (DTS) may fail to obtain the redo logs and archive logs, which causes the task to fail, or even data inconsistency or data loss. After the full data synchronization is complete, you can set the retention period to more than 24 hours. Make sure that you set the retention period of redo logs and archive logs based on the preceding requirements. Otherwise, the service level agreement (SLA) of DTS does not guarantee service reliability or performance.

  • If you perform a primary/secondary switchover on the source database when the data synchronization task is running, the task fails.

  • The data synchronization task fails if the source database contains an empty string of the VARCHAR2 type and the corresponding column in the destination database has a NOT NULL constraint. Empty strings of the VARCHAR2 type are processed as null values in Oracle databases.

  • During data synchronization, do not update LONGTEXT fields. Otherwise, the data synchronization task fails.

Other limits

  • The objects of schema synchronization must be tables.

    Warning

    In this scenario, DTS does not support schema synchronization for triggers. We recommend that you delete the triggers of the source database to prevent data inconsistency caused by the triggers. For more information, see Configure a data synchronization or migration task for a source database that contains a trigger.

  • A single string in the destination DataHub project cannot exceed 2 MB in length.

  • Only tables can be selected as the objects to be synchronized.

  • DTS calculates the synchronization latency based on the timestamp of the latest synchronized data in the destination database and the current timestamp in the source database. If no DML operation is performed on the source database for an extended period of time, the synchronization latency may be inaccurate. If the synchronization latency is too high, you can perform a DML operation on the source database to update the latency.

    Note

    If you select an entire database as the object to be synchronized, you can create a heartbeat table. The heartbeat table is updated or receives data every second.

  • During data synchronization, we recommend that you use only DTS to write data to the destination database. This prevents data inconsistency between the source and destination databases. For example, if you use tools other than DTS to write data to the destination database, data loss may occur in the destination database when you use DMS to perform online DDL operations.

Supported synchronization topologies

  • One-way one-to-one synchronization

  • One-way one-to-many synchronization

  • One-way cascade synchronization

  • One-way many-to-one synchronization

For more information about the synchronization topologies that are supported by DTS, see Synchronization topologies.

SQL operations that can be synchronized

INSERT, UPDATE, and DELETE

Permissions required for database accounts

Database

Required permissions

References

Database

Required permissions

References

Self-managed Oracle database

Database administrator (DBA)

CREATE USER and GRANT

Important

If you need to synchronize data from an Oracle database but the DBA permission cannot be granted to the database account, you can enable archive logging and supplemental logging, and grant fine-grained permissions to the account.

Enable logging and grant fine-grained permissions to an Oracle database account

Important

If you need to synchronize data from an Oracle database but the database administrator (DBA) permission cannot be granted to the database account, you can enable archive logging and supplemental logging, and grant fine-grained permissions to the account.

  1. Enable archive logging and supplemental logging.

    Log type

    Procedure

    Log type

    Procedure

    Archive logging

    Execute the following statements to enable archive logging:

    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;
    archive log list;

    Supplemental logging

    Enable supplemental logging at the database or table level based on your business requirements.

    Note

    You can enable database-level supplemental logging to ensure the stability of Data Transmission Service (DTS) tasks. You can enable table-level supplemental logging to reduce the disk usage of the source Oracle database.

    • Enable database-level supplemental logging

      1. Execute the following statement to enable minimal supplemental logging:

        alter database add supplemental log data;
      2. Execute the following statement to enable primary key and unique key supplemental logging at the database level:

        alter database add supplemental log data (primary key,unique index) columns;
    • Enable table-level supplemental logging

      1. Execute the following statement to enable minimal supplemental logging:

        alter database add supplemental log data;
      2. Enable table-level supplemental logging by using one of the following methods:

        • Execute the following statement to enable primary key supplemental logging at the table level:

          alter table table_name add supplemental log data (primary key) columns;
        • Execute the following statement to enable table-level supplemental logging for all columns:

          alter table tb_name add supplemental log data (all) columns ;
  2. Grant fine-grained permissions to an Oracle database account.

    Oracle versions 9i to 11g
    Oracle versions 12c to 19c that use the multitenant architecture
    Oracle versions 12c to 19c that use a non-multitenant architecture
    # Create a database account named rdsdt_dtsacct and grant permissions to the account.
    create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct;
    grant create session to rdsdt_dtsacct;
    grant connect to rdsdt_dtsacct;
    grant resource to rdsdt_dtsacct;
    grant execute on sys.dbms_logmnr to rdsdt_dtsacct;
    grant select on V_$LOGMNR_LOGS to rdsdt_dtsacct;
    grant select on  all_objects to rdsdt_dtsacct;
    grant select on  all_tab_cols to rdsdt_dtsacct;
    grant select on  dba_registry to rdsdt_dtsacct;
    grant select any table to rdsdt_dtsacct;
    grant select any transaction to rdsdt_dtsacct;
    -- v$log privileges
    grant select on v_$log to rdsdt_dtsacct;
    -- v$logfile privileges
    grant select on v_$logfile to rdsdt_dtsacct;
    -- v$archived_log privileges
    grant select on v_$archived_log to rdsdt_dtsacct;
    -- v$parameter privileges
    grant select on v_$parameter to rdsdt_dtsacct;
    -- v$database privileges
    grant select on v_$database to rdsdt_dtsacct;
    -- v$active_instances privileges
    grant select on v_$active_instances to rdsdt_dtsacct;
    -- v$instance privileges
    grant select on v_$instance to rdsdt_dtsacct;
    -- v$logmnr_contents privileges
    grant select on v_$logmnr_contents to rdsdt_dtsacct;
    -- system tables
    grant select on sys.USER$ to rdsdt_dtsacct;
    grant select on SYS.OBJ$ to rdsdt_dtsacct;
    grant select on SYS.COL$ to rdsdt_dtsacct;
    grant select on SYS.IND$ to rdsdt_dtsacct;
    grant select on SYS.ICOL$ to rdsdt_dtsacct;
    grant select on SYS.CDEF$ to rdsdt_dtsacct;
    grant select on SYS.CCOL$ to rdsdt_dtsacct;
    grant select on SYS.TABPART$ to rdsdt_dtsacct;
    grant select on SYS.TABSUBPART$ to rdsdt_dtsacct;
    grant select on SYS.TABCOMPART$ to rdsdt_dtsacct;
    grant select on v$database to rdsdt_dtsacct;
    grant select on dba_objects to rdsdt_dtsacct;
    grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct;
    grant select on dba_tab_cols to rdsdt_dtsacct;
    # Switch to the pluggable database (PDB). Create a database account named rdsdt_dtsacct and grant permissions to the account.
    ALTER SESSION SET container = ORCLPDB1;
    create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct;
    grant create  session to rdsdt_dtsacct;
    grant connect  to rdsdt_dtsacct;
    grant resource to rdsdt_dtsacct;
    grant execute on sys.dbms_logmnr to rdsdt_dtsacct;
    grant select on  all_objects to rdsdt_dtsacct;
    grant select on  all_tab_cols to rdsdt_dtsacct;
    grant select on  dba_registry to rdsdt_dtsacct;
    grant select any table to rdsdt_dtsacct;
    grant select any transaction to rdsdt_dtsacct;
    -- v$log privileges
    grant select on v_$log to rdsdt_dtsacct;
    -- v$logfile privileges
    grant select on v_$logfile to rdsdt_dtsacct;
    -- v$archived_log privileges
    grant select on v_$archived_log to rdsdt_dtsacct;
    -- v$parameter privileges
    grant select on v_$parameter to rdsdt_dtsacct;
    -- v$database privileges
    grant select on v_$database to rdsdt_dtsacct;
    -- v$active_instances privileges
    grant select on v_$active_instances to rdsdt_dtsacct;
    -- v$instance privileges
    grant select on v_$instance to rdsdt_dtsacct;
    -- v$logmnr_contents privileges
    grant select on v_$logmnr_contents to rdsdt_dtsacct;
    grant select on sys.USER$ to rdsdt_dtsacct;
    grant select on SYS.OBJ$ to rdsdt_dtsacct;
    grant select on SYS.COL$ to rdsdt_dtsacct;
    grant select on SYS.IND$ to rdsdt_dtsacct;
    grant select on SYS.ICOL$ to rdsdt_dtsacct;
    grant select on SYS.CDEF$ to rdsdt_dtsacct;
    grant select on SYS.CCOL$ to rdsdt_dtsacct;
    grant select on SYS.TABPART$ to rdsdt_dtsacct;
    grant select on SYS.TABSUBPART$ to rdsdt_dtsacct;
    grant select on SYS.TABCOMPART$ to rdsdt_dtsacct;
    -- V$PDBS privileges
    grant select on V_$PDBS to rdsdt_dtsacct;
    grant select on v$database to rdsdt_dtsacct;
    grant select on dba_objects to rdsdt_dtsacct;
    grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct;
    grant select on dba_tab_cols to rdsdt_dtsacct;
    grant select_catalog_role TO rdsdt_dtsacct;
    
    # Switch to the container database (CDB). Create a database account and grant permissions to the account.
    ALTER SESSION SET container = CDB$ROOT;
    
    # Create a database account named rdsdt_dtsacct and grant permissions to the account. You must modify the default parameters of the Oracle database.
    alter session set "_ORACLE_SCRIPT"=true;
    create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct;
    grant create session to rdsdt_dtsacct;
    grant connect to rdsdt_dtsacct;
    grant select on v_$logmnr_contents to rdsdt_dtsacct;
    grant LOGMINING TO rdsdt_dtsacct;
    grant EXECUTE_CATALOG_ROLE to rdsdt_dtsacct;
    grant execute on sys.dbms_logmnr to rdsdt_dtsacct;
    grant select on v$database to rdsdt_dtsacct;
    grant select on dba_objects to rdsdt_dtsacct;
    grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct;
    grant select on dba_tab_cols to rdsdt_dtsacct;
    # Create a database account named rdsdt_dtsacct and grant permissions to the account.
    create user rdsdt_dtsacct IDENTIFIED BY rdsdt_dtsacct;
    grant create  session to rdsdt_dtsacct;
    grant connect  to rdsdt_dtsacct;
    grant resource to rdsdt_dtsacct;
    grant select on V_$LOGMNR_LOGS to rdsdt_dtsacct;
    grant select on  all_objects to rdsdt_dtsacct;
    grant select on  all_tab_cols to rdsdt_dtsacct;
    grant select on  dba_registry to rdsdt_dtsacct;
    grant select any table to rdsdt_dtsacct;
    grant select any transaction to rdsdt_dtsacct;
    -- v$log privileges
    grant select on v_$log to rdsdt_dtsacct;
    -- v$logfile privileges
    grant select on v_$logfile to rdsdt_dtsacct;
    -- v$archived_log privileges
    grant select on v_$archived_log to rdsdt_dtsacct;
    -- v$parameter privileges
    grant select on v_$parameter to rdsdt_dtsacct;
    -- v$database privileges
    grant select on v_$database to rdsdt_dtsacct;
    -- v$active_instances privileges
    grant select on v_$active_instances to rdsdt_dtsacct;
    -- v$instance privileges
    grant select on v_$instance to rdsdt_dtsacct;
    -- v$logmnr_contents privileges
    grant select on v_$logmnr_contents to rdsdt_dtsacct;
    grant select on sys.USER$ to rdsdt_dtsacct;
    grant select on SYS.OBJ$ to rdsdt_dtsacct;
    grant select on SYS.COL$ to rdsdt_dtsacct;
    grant select on SYS.IND$ to rdsdt_dtsacct;
    grant select on SYS.ICOL$ to rdsdt_dtsacct;
    grant select on SYS.CDEF$ to rdsdt_dtsacct;
    grant select on SYS.CCOL$ to rdsdt_dtsacct;
    grant select on SYS.TABPART$ to rdsdt_dtsacct;
    grant select on SYS.TABSUBPART$ to rdsdt_dtsacct;
    grant select on SYS.TABCOMPART$ to rdsdt_dtsacct;
    grant LOGMINING TO rdsdt_dtsacct;
    grant EXECUTE_CATALOG_ROLE to rdsdt_dtsacct;
    grant execute on sys.dbms_logmnr to rdsdt_dtsacct;
    grant select on v$database to rdsdt_dtsacct;
    grant select on dba_objects to rdsdt_dtsacct;
    grant select on DBA_TAB_COMMENTS to rdsdt_dtsacct;
    grant select on dba_tab_cols to rdsdt_dtsacct;

Procedure

  1. Purchase a data synchronization instance. For more information, see Purchase a DTS instance.

    Note

    On the buy page, set Source Instance to Oracle and set Destination Instance to DataHub.

  2. Log on to the DTS console.

    Note

    If you are redirected to the Data Management (DMS) console, you can click the old icon in the image to go to the previous version of the DTS console.

  3. In the left-side navigation pane, click Data Synchronization.

  4. In the upper part of the Synchronization Tasks page, select the region in which the destination instance resides.

  5. Find the data synchronization instance and click Configure Task in the Actions column.

  6. Configure the source and destination instances.

    Section

    Parameter

    Description

    Section

    Parameter

    Description

    N/A

    Synchronization Task Name

    DTS automatically generates a task name. We recommend that you specify an informative name to identify the task. You do not need to use a unique task name.

    Source Instance Details

    Instance Type

    Select Self-managed Database on ECS.

    Note

    If your source database is a self-managed database, you must deploy the network environment for the database. For more information, see Preparation overview.

    Instance Region

    The source region that you selected on the buy page. You cannot change the value of this parameter.

    ECS Instance ID

    Select the ID of the Elastic Compute Service (ECS) instance that hosts the self-managed Oracle database.

    Database Type

    The value of this parameter is set to Oracle and cannot be changed.

    Port Number

    Enter the service port number of the self-managed Oracle database. The default port number is 1521.

    Instance Type

    • Non-RAC Instance: If you select this option, you must specify the SID parameter.

    • RAC Instance: You cannot select this option because RAC instances are not supported.

    Database Account

    Enter the account of the self-managed Oracle database. For information about the permissions that are required for the account, see Permissions required for database accounts.

    Database Password

    Enter the password of the database account.

    Destination Instance Details

    Instance Type

    The value of this parameter is set to DataHub and cannot be changed.

    Instance Region

    The destination region that you selected on the buy page. You cannot change the value of this parameter.

    Project

    Select the name of the DataHub project.

  7. In the lower-right corner of the page, click Set Whitelist and Next.

    If the source or destination database is an Alibaba Cloud database instance, such as an ApsaraDB RDS for MySQL or ApsaraDB for MongoDB instance, DTS automatically adds the CIDR blocks of DTS servers to the IP address whitelist of the instance. If the source or destination database is a self-managed database hosted on an Elastic Compute Service (ECS) instance, DTS automatically adds the CIDR blocks of DTS servers to the security group rules of the ECS instance, and you must make sure that the ECS instance can access the database. If the self-managed database is hosted on multiple ECS instances, you must manually add the CIDR blocks of DTS servers to the security group rules of each ECS instance. If the source or destination database is a self-managed database that is deployed in a data center or provided by a third-party cloud service provider, you must manually add the CIDR blocks of DTS servers to the IP address whitelist of the database to allow DTS to access the database. For more information, see Add the CIDR blocks of DTS servers.

    Warning

    If the CIDR blocks of DTS servers are automatically or manually added to the whitelist of the database or instance, or to the ECS security group rules, security risks may arise. Therefore, before you use DTS to synchronize data, you must understand and acknowledge the potential risks and take preventive measures, including but not limited to the following measures: enhancing the security of your username and password, limiting the ports that are exposed, authenticating API calls, regularly checking the whitelist or ECS security group rules and forbidding unauthorized CIDR blocks, or connecting the database to DTS by using Express Connect, VPN Gateway, or Smart Access Gateway.

  8. Select the synchronization policy and the objects to be synchronized.

    Parameter

    Description

    Parameter

    Description

    Initial Synchronization

    Select Initial Schema Synchronization.

    Note

    After you select Initial Schema Synchronization, DTS synchronizes the schemas of the required objects (such as tables) to the destination DataHub instance.

    Select the objects to be synchronized

    Select one or more objects from the Available section and click the Rightwards arrow icon to move the objects to the Selected section.

    Note
    • You can select only tables as the objects to be synchronized.

    • By default, after an object is synchronized to the destination cluster, the name of the object remains unchanged. You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Rename an object to be synchronized.

    Whether to enable the new naming rules for additional columns

    After DTS synchronizes data to DataHub, DTS adds additional columns to the destination topic. If the names of additional columns are the same as the names of existing columns in the destination topic, data synchronization fails. Select Yes or No to specify whether you want to enable the new naming rules for additional columns.

    Warning

    Before you specify this parameter, check whether additional columns and existing columns in the destination topic have name conflicts. For more information, see Modify the naming rules for additional columns.

    Rename Databases and Tables

    You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Object name mapping.

    Retry Time for Failed Connections

    By default, if DTS fails to connect to the source or destination database, DTS retries within the next 720 minutes (12 hours). You can specify the retry time based on your needs. If DTS reconnects to the source and destination databases within the specified time, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

    Note

    When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time based on your business needs. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.

  9. Optional:In the Selected section, move the pointer over the destination topic and click Edit. In the dialog box that appears, set the shard key. The shard key is used for partitioning.

  10. In the lower-right corner of the page, click Precheck.

    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.

    • If the task fails to pass the precheck, you can click the 提示 icon next to each failed item to view details.

      • After you troubleshoot the issues based on the details, initiate a new precheck.

      • If you do not need to troubleshoot the issues, ignore the failed items and initiate a new precheck.

  11. Close the Precheck dialog box after the following message is displayed: Precheck Passed. Then, the data synchronization task starts.

  12. Wait until initial synchronization is complete and the data synchronization task enters the Synchronizing state.

    You can view the status of the data synchronization task on the Synchronization Tasks page. View the status of a data synchronization task

Schema of a DataHub topic

When DTS synchronizes incremental data to a DataHub topic, DTS adds additional columns to store metadata in the topic. The following figure shows the schema of a DataHub topic.

Note

In this example, id, name, and address are data fields. DTS adds the dts_ prefix to data fields, including the original data fields that are synchronized from the source database to the destination database, because the previous version of naming rules for additional columns is used. If you use the new naming rules for additional columns, DTS does not add prefixes to the original data fields that are synchronized from the source database to the destination database.

Topic定义

The following table describes the additional columns in the DataHub topic.

Previous additional column name

New additional column name

Type

Description

Previous additional column name

New additional column name

Type

Description

dts_record_id

new_dts_sync_dts_record_id

String

The unique ID of the incremental log entry.

Note
  • By default, the ID auto-increments for each new log entry. In disaster recovery scenarios, rollback may occur, and the ID may not auto-increment. Therefore, some IDs may be duplicated.

  • If an UPDATE operation is performed, DTS generates two incremental log entries to record the pre-update and post-update values. The values of the dts_record_id field in the two incremental log entries are the same.

dts_operation_flag

new_dts_sync_dts_operation_flag

String

The operation type. Valid values:

  • I: an INSERT operation.

  • D: a DELETE operation.

  • U: an UPDATE operation.

  • F: full data synchronization.

dts_instance_id

new_dts_sync_dts_instance_id

String

The server ID of the database.

dts_db_name

new_dts_sync_dts_db_name

String

The database name.

dts_table_name

new_dts_sync_dts_table_name

String

The table name.

dts_utc_timestamp

new_dts_sync_dts_utc_timestamp

String

The operation timestamp displayed in UTC. It is also the timestamp of the log file.

dts_before_flag

new_dts_sync_dts_before_flag

String

Indicates whether the column values are pre-update values. Valid values: Y and N.

dts_after_flag

new_dts_sync_dts_after_flag

String

Indicates whether the column values are post-update values. Valid values: Y and N.

Additional information about the dts_before_flag and dts_after_flag fields

The values of the dts_before_flag and dts_after_flag fields in an incremental log entry vary based on operation types:

  • INSERT

    For an INSERT operation, the column values are the newly inserted record values (post-update values). The value of the dts_before_flag field is N, and the value of the dts_after_flag field is Y.

    INSERT操作

  • UPDATE

    DTS generates two incremental log entries for an UPDATE operation. The two incremental log entries have the same values for the dts_record_id, dts_operation_flag, and dts_utc_timestamp fields.

    The first log entry records the pre-update values. Therefore, the value of the dts_before_flag field is Y, and the value of the dts_after_flag field is N. The second log entry records the post-update values. Therefore, the value of the dts_before_flag field is N, and the value of the dts_after_flag field is Y.

    UPDATE操作

  • DELETE

    For a DELETE operation, the column values are the deleted record values (pre-update values). The value of the dts_before_flag field is Y, and the value of the dts_after_flag field is N.

    DELETE操作

What to do next

After you configure the data synchronization task, you can use Alibaba Cloud Realtime Compute for Apache Flink to analyze the data that is synchronized to the DataHub project. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?

  • On this page (1, T)
  • Prerequisites
  • Precautions
  • Supported synchronization topologies
  • SQL operations that can be synchronized
  • Permissions required for database accounts
  • Enable logging and grant fine-grained permissions to an Oracle database account
  • Procedure
  • Schema of a DataHub topic
  • Additional information about the dts_before_flag and dts_after_flag fields
  • What to do next
Feedback
phone Contact Us