All Products
Search
Document Center

AnalyticDB:Use data synchronization to synchronize data from ApsaraMQ for Kafka to Data Lakehouse Edition

Last Updated:Aug 19, 2024

AnalyticDB for MySQL provides the data synchronization feature that allows you to synchronize data from an ApsaraMQ for Kafka instance to an AnalyticDB for MySQL cluster in real time based on a specific offset. This feature helps meet requirements such as near-real-time data ingestion, full data archiving, and elastic analysis. This topic describes how to create an ApsaraMQ for Kafka data source, create and start a data synchronization job, analyze data, and manage the data source.

Prerequisites

  • An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.

  • A job resource group is created for the AnalyticDB for MySQL cluster. For more information, see Create a resource group.

  • A database account is created for the AnalyticDB for MySQL cluster.

  • An ApsaraMQ for Kafka instance is created in the same region as the AnalyticDB for MySQL Data Lakehouse Edition cluster.

  • An Kafka topic is created in the ApsaraMQ for Kafka instance and messages are sent. For more information, see the "Quick start for ApsaraMQ for Kafka" figure of the Overview topic.

Usage notes

  • After the data of a topic expires, the system automatically deletes the data. If the topic data expires and the data synchronization job fails, the system fails to read the data that is deleted when you restart the data synchronization job. As a result, the data may be lost. If your data synchronization job fails, we recommend that you increase the lifecycle of topic data and contact technical support.

  • If the sample data obtained from a topic is larger than 8 KB in size, the ApsaraMQ for Kafka API truncates the data and cannot parse the data into the JSON format. As a result, the field mapping information cannot be automatically generated.

Billing rules

If you use the data migration feature of AnalyticDB for MySQL to migrate data to OSS, the following fees are incurred:

Procedure

Create a data source

Note

If you want to synchronize data from an existing ApsaraMQ for Kafka data source, skip this step and create a data synchronization job. For more information, see the "Create a data synchronization job" section of this topic.

  1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.

  2. In the left-side navigation pane, choose Data Ingestion > Data Sources.

  3. In the upper-right corner of the page, click Create Data Source.

  4. On the Create Data Source page, configure the parameters that are described in the following table.

    Parameter

    Description

    Data Source Type

    The type of the data source. Select Kafka.

    Data Source Name

    The name of the data source. By default, the system generates a name based on the data source type and the current time. You can modify the name based on your business requirements.

    Data Source Description

    The description of the data source. For example, you can enter the use case and business limits.

    Deployment Mode

    Only Alibaba Cloud Instance is supported.

    Kafka Instance

    The ID of the ApsaraMQ for Kafka instance.

    Log on to the ApsaraMQ for Kafka console and go to the Instances page to view the instance ID.

    Kafka Topic

    The name of the topic that you create in the ApsaraMQ for Kafka instance.

    Log on to the ApsaraMQ for Kafka console and go to the Topics page of the instance to view the topic name.

    Message Data Format

    The data format of messages. Only JSON is supported.

  5. Click Create.

Create a data synchronization job

  1. In the left-side navigation pane, click Simple Log Service/Kafka Data Synchronization.

  2. In the upper-right corner of the page, click Create Synchronization Job.

  3. On the Kafka Data Source tab of the Create Synchronization Job page, configure the parameters in the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings sections.

    • The following table describes the parameters in the Source and Destination Settings section.

      Parameter

      Description

      Job Name

      The name of the data synchronization job. By default, the system generates a name based on the data source type and the current time. You can modify the name based on your business requirements.

      Data Source

      The data source. You can select an existing ApsaraMQ for Kafka data source or create a data source.

      Destination Type

      The data storage type in AnalyticDB for MySQL. Only Data Lake - OSS Storage is supported.

      OSS Path

      The OSS storage path of the AnalyticDB for MySQL cluster data.

      Important
      • All buckets that reside in the same region as the AnalyticDB for MySQL cluster are displayed. Configure this parameter based on your business requirements. After you configure this parameter, it cannot be modified.

      • We recommend that you select an empty directory that does not have nested relationships with the directories of other data synchronization jobs. This prevents historical data from being overwritten. For example, assume that the involved OSS storage paths of two data synchronization jobs are oss://adb_demo/test/sls1/ and oss://adb_demo/test/. In this case, data overwriting occurs during data synchronization because these two paths have nested relationships with each other.

    • The following table describes the parameters in the Destination Database and Table Settings section.

      Parameter

      Description

      Database Name

      The name of the destination database in the AnalyticDB for MySQL cluster. If no database that uses the same name exists, a database is created. If a database that uses the same name already exists, data is synchronized to the existing database. For information about the naming conventions for databases, see Limits.

      Table Name

      The name of the destination table in the AnalyticDB for MySQL cluster. If no table that uses the same name exists in the database, a table is created. If a table that uses the same name already exists in the database, data fails to be synchronized. For information about the naming conventions for tables, see Limits.

      Sample Data

      The latest data that is automatically obtained from the ApsaraMQ for Kafka topic.

      Note

      Data in the ApsaraMQ for Kafka topic must be in the JSON format. Otherwise, an error is reported during data synchronization.

      Parsed JSON Layers

      The number of layers that are parsed for nested JSON fields. Valid values:

      • 0: Nested JSON fields are not parsed.

      • 1 (default): One layer is parsed.

      • 2: Two layers are parsed.

      • 3: Three layers are parsed.

      • 4: Four layers are parsed.

      For more information about how nested JSON fields are parsed, see the "Examples of schema fields parsed with different numbers of layers" section of this topic.

      Schema Field Mapping

      The information about schema fields that are parsed from the sample data. You can modify the destination field name or type and add or remove fields.

      Partition Key Settings

      The partition field settings for the destination table. To ensure that data is ingested and queries are executed as expected, we recommend that you configure partitions based on the log time or business logic. If you do not configure partitions, no partitions exist in the destination table.

      Valid values for the Format Processing Method parameter:

      • Formatted Time: Select a datetime field from the Source Partition Field drop-down list, set the Format Processing Method parameter to Formatted Time, and then configure the Source Field Format and Destination Partition Format parameters. AnalyticDB for MySQL identifies the value of the partition field based on the specified source field format and converts the value into the specified destination partition format for partitioning. For example, if the source field is gmt_created whose value is 1711358834, the Source Field Format parameter is set to Timestamp Accurate to Seconds, and the Destination Partition Format parameter is set to yyyyMMdd, the value is partitioned based on 20240325.

      • Specified Partition Field: Set the Format Processing Method parameter to Specified Partition Field and configure other required parameters.

    • The following table describes the parameters in the Synchronization Settings section.

      Parameter

      Description

      Starting Consumer Offset for Incremental Synchronization

      The point in time from which the system consumes ApsaraMQ for Kafka data when the data synchronization job starts. Valid values:

      • Earliest Offset: The system consumes ApsaraMQ for Kafka data from the point in time at which the earliest data record was generated.

      • Latest Offset: The system consumes ApsaraMQ for Kafka data from the point in time at which the latest data record was generated.

      • Custom Offset: You can select a point in time. Then, the system consumes ApsaraMQ for Kafka data from the first data entry that is generated as of the selected point in time.

      Job Resource Group

      The job resource group that runs the data synchronization job.

      ACUs for Incremental Synchronization

      The number of AnalyticDB compute units (ACUs) that are required for the job resource group to run the data synchronization job. The value ranges from 2 to the maximum number of computing resources that are available in the job resource group. To improve the stability and performance of data ingestion, we recommend that you specify a larger number of ACUs.

      Note

      When you create a data synchronization job in a job resource group, elastic resources in the resource group are used, and the system deducts the used resources from the resource group. For example, assume that a job resource group has 48 ACUs of reserved computing resources and a synchronization job that consumes 8 ACUs is created. When you create another synchronization job in the resource group, you can select up to 40 ACUs.

      Advanced Settings

      The custom settings for the data synchronization job. If you want to configure custom settings, contact technical support.

  4. After you complete the preceding parameter configurations, click Submit.

Start the data synchronization job

  1. On the Simple Log Service/Kafka Data Synchronization page, find the data synchronization job that you created and click Start in the Actions column.

  2. In the upper-right corner of the page, click Search. If the state of the job changes to Starting, the data synchronization job is started.

Analyze data

After the data synchronization job is complete, you can use Spark JAR Development to analyze the data that is synchronized to the AnalyticDB for MySQL cluster. For more information about Spark development, see Spark editor and Overview.

  1. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  2. Enter SQL statements in the default template and click Run Now.

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. (Optional) On the Applications tab, find an application and click Log in the Actions column to view the Spark SQL running log of the application.

Manage the data source

On the Data Sources page, you can perform the operations described in the following table in the Actions column.

Operation

Description

Create Job

Goes to the Create Synchronization Job or Create Migration Job page to create a job that uses the data source.

View

Views the detailed configurations of the data source.

Edit

Modifies the data source parameters, such as the data source name and description.

Delete

Deletes the data source.

Note

If the data source is being used in a data synchronization or data migration job, you cannot delete the data source. In this case, you must first go to the Simple Log Service/Kafka Data Synchronization page, find the job, and then click Delete in the Actions column to delete the job.

Examples of schema fields parsed with different numbers of layers

Nested JSON fields can be parsed with different numbers of layers. In this example, the following JSON data is sent to ApsaraMQ for Kafka:

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

The preceding JSON data can be parsed with different numbers of layers.

No parsing

JSON fields are not parsed, and the JSON data is directly displayed.

JSON field

Example

Destination field name

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

One-layer parsing

The first layer of the JSON fields is parsed.

JSON field

Example

Destination field name

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

Two-layer parsing

The first two layers of the JSON fields are parsed. If a field does not have nested fields, the field is directly displayed. For example, the name and age fields are directly displayed. If a field has nested fields, the child fields of the field are displayed. For example, the device field is parsed as device.os, device.brand, and device.version.

Important

In the destination field names, periods (.) are automatically changed to underscores (_).

JSON field

Example

Destination field name

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

Three-layer parsing

JSON field

Example

Destination field name

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

Four-layer parsing

JSON field

Example

Destination field name

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version