All Products
Search
Document Center

MaxCompute:Migrate data from ApsaraDB RDS to MaxCompute based on dynamic partitioning

Last Updated:Nov 20, 2024

This topic describes how to use the data integration and data synchronization features of DataWorks to migrate data from ApsaraDB RDS to MaxCompute based on dynamic partitioning.

Prerequisites

  • The DataWorks environment is prepared.

    1. MaxCompute and DataWorks are activated. For more information, see Activate MaxCompute and DataWorks.

    2. A workflow is created in your workspace in the DataWorks console. In this topic, a workflow is created in a workspace that is in basic mode. For more information, see Create a workflow.

  • Connections to the source and destination data stores are created.

Migrate data from ApsaraDB RDS to MaxCompute based on dynamic partitioning

After the preparations are complete, configure a batch synchronization node to migrate data from ApsaraDB RDS to MaxCompute based on dynamic partitioning every day at a scheduled point in time. For more information about how to configure a batch synchronization node, see Overview.

  1. Login DataWorks console.

  2. Create a destination table in MaxCompute.

    1. In the left-side navigation pane, click Workspaces.

    2. On the Workspaces page, find the desired workspace and choose Shortcuts > Data Development in the Actions column.

    3. Right-click a created workflow, Select new > MaxCompute > table.

    4. In create a table page, select the engine type, and enter table name.

    5. On the table editing page, click DDL Statement.

    6. In the DDL dialog box, enter the following CREATE TABLE statement and click Generate Table Schema.

      CREATE TABLE IF NOT EXISTS ods_user_info_d (
      uid STRING COMMENT 'User ID',
      gender STRING COMMENT 'Gender',
      age_range STRING COMMENT 'Age range',
      zodiac STRING COMMENT 'Zodiac sign'
      )
      PARTITIONED BY (
      dt STRING
      );                           
    7. Click Submit to Production Environment.

  3. Create a batch synchronization node.

    1. Go to the data analytics page. Right-click the specified workflow and choose new > data integration > offline synchronization.

    2. In create a node dialog box, enter node name, and click submit.

    3. Configure the source and destination for the batch synchronization node.

      **

  4. Configure the partition parameter.

    1. Click the Properties tab in the right-side navigation pane.

    2. In the Scheduling Parameter section of the Properties tab, configure the parameters. The default value of the Parameter Value parameter is ${bizdate}. The value of the Parameter Value parameter is in the yyyymmdd format.

      Note

      The value of the Parameter Value parameter in the Scheduling Parameter section on the Properties tab is the same as the value of the Partition Key Column parameter in the Destination section on the node configuration page. When the synchronization node is scheduled and run, the value of the partition parameter of the destination table is replaced with the date that is one day before the node is run, which is known as the data timestamp. By default, the data generated on the day before the node is run is migrated. To use the date when the node is run as the value of the partition parameter of the destination table, you must specify the partition parameter.

      You can specify a date in one of the following formats for the partition parameter:

      • Next N years: $[add_months(yyyymmdd,12*N)]

      • Previous N years: $[add_months(yyyymmdd,-12*N)]

      • Previous N months: $[add_months(yyyymmdd,-N)]

      • Next N weeks: $[yyyymmdd+7*N]

      • Next N months: $[add_months(yyyymmdd,N)]

      • Previous N weeks: $[yyyymmdd-7*N]

      • Next N days: $[yyyymmdd+N]

      • Previous N days: $[yyyymmdd-N]

      • Next N hours: $[hh24miss+N/24]

      • Previous N hours: $[hh24miss-N/24]

      • Next N minutes: $[hh24miss+N/24/60]

      • Previous N minutes: $[hh24miss-N/24/60]

      Note
      • Use brackets ([]) to edit the formula for calculating the value of a custom scheduling parameter. For example, you can change the format to key1=$[yyyy-mm-dd].

      • The default unit of the calculation result is day. For example, $[hh24miss-N/24/60] indicates the calculation result of (yyyymmddhh24miss-(N/24/60 × 1 day)). You can obtain the hour, minute, and second in the hh24miss format.

      • The unit of add_months is month. For example, $[add_months(yyyymmdd,12 N)-M/24/60] indicates the calculation result of (yyyymmddhh24miss-(12 × N × 1 month))-(M/24/60 × 1 day). You can obtain the year, month, and day in the yyyymmdd format.

  5. Click **icon to run the code.

  6. You can operation Log view the results.

Generate retroactive data

If a large amount of historical data exists in ApsaraDB RDS that is generated before the node is run, all historical data needs to be automatically migrated to MaxCompute and the partitions need to be automatically created. To generate retroactive data for the current synchronization node, you can use the data backfill feature of DataWorks Operation Center.

  1. Filter historical data in ApsaraDB RDS by date.

    You can configure the Filter parameter in the Source section to filter data in ApsaraDB RDS.

  2. Generate retroactive data for the node. For more information, see Backfill data for an auto triggered node and view data backfill instances generated for the node.

  3. View the process of extracting data from ApsaraDB RDS on the Run Log tab.

    The logs indicate that Partition 20180913 is automatically created in MaxCompute.

  4. Verify the execution result. Execute the following statement on the MaxCompute client to check whether the data is written to MaxCompute:

    SELECT count(*) from ods_user_info_d where dt = 20180913;

Use hash functions to create partitions based on non-date fields

If a large amount of data or full data is migrated to partitions based on a non-date field for the first time, the partitions cannot be automatically created during the migration. In this case, you can map the values in a field in the source table to a corresponding partition in MaxCompute by using a hash function.

  1. Create an SQL script node. Run the following commands:

    drop table if exists ods_user_t;
    CREATE TABLE ods_user_t ( 
            dt STRING,
            uid STRING,
            gender STRING,
            age_range STRING,
            zodiac STRING);
    -- Create a temporary table named ods_user_t and write data from Table ods_user_info_d to Table ods_user_t. 
    insert overwrite table ods_user_t select dt,uid,gender,age_range,zodiac from ods_user_info_d;         
  2. Create a synchronization node named mysql_to_odps to migrate full data from ApsaraDB RDS to MaxCompute. Partitioning is not required.

  3. Execute the following SQL statements to migrate data from Table ods_user_t to Table ods_user_d based on dynamic partitioning:

    drop table if exists ods_user_d;
    // Create a MaxCompute partitioned table named ods_user_d, which is the destination table. 
        CREATE TABLE ods_user_d (
        uid STRING,
            gender STRING,
            age_range STRING,
            zodiac STRING
    )
    PARTITIONED BY (
        dt STRING
    );
    // Create dynamic partitions for Table ods_user_d based on the dt field in Table ods_user_t. In Table ods_user_d, a partition is automatically created for each unique value in the dt field in Table ods_user_t. 
    // For example, if the value of the dt field is 20181025 in some rows of Table ods_user_t, Partition dt=20181025 is created in Table ods_user_d. 
    // The following SQL statement is used to migrate data from Table ods_user_t to Table ods_user_d based on dynamic partitioning. 
    // The dt field is specified in the SELECT clause. This indicates that the partitions are automatically created based on this field. 
    insert overwrite table ods_user_d partition(dt)select dt,uid,gender,age_range,zodiac from ods_user_t;
    // After data migration is complete, you may drop the temporary table to release storage space. 
    drop table if exists ods_user_t;
  4. Configure the three nodes to form a workflow to run these nodes sequentially. The following figure shows an example.

  5. View the execution process. The last node represents the process of dynamic partitioning. The following figure shows an example.

    **

  6. Verify the execution result. Execute the following statement on the MaxCompute client to check whether the data is written to MaxCompute:

    SELECT count(*) from ods_user_d where dt = 20180913;