All Products
Search
Document Center

Simple Log Service:Use a resource function to obtain incremental data

Last Updated:Sep 03, 2024

When Simple Log Service pulls incremental data, it pulls only new or updated data for efficiency. This topic describes how to use the res_rds_mysql function to obtain incremental data from a database that is created on an ApsaraDB RDS for MySQL instance.

Prerequisites

  • Simple Log Service

    • Data is uploaded to a source Logstore. For more information, see Data collection overview.

    • The destination Logstore is created. For more information, see Create a Logstore.

    • If you use a Resource Access Management (RAM) user, make sure that the RAM user has permissions to transform data. For more information, see Grant a RAM user the permissions to manage a data transformation job.

    • Indexes are configured for the source and destination Logstores. For more information, see Create indexes.

      Data transformation does not rely on indexes. However, you cannot perform query or analysis operations if you do not configure indexes.

  • ApsaraDB RDS

    • A database is created on an ApsaraDB RDS for MySQL instance. An account that is used to connect to the database is created. For more information, see Create accounts and databases.

    • Data is uploaded to the tables of the database.

    • A whitelist is configured for the ApsaraDB RDS for MySQL instance. For more information, see Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance.

      Important

      If you use the res_rds_mysql function to pull data from a database that is created on an ApsaraDB RDS for MySQL instance, you must configure a whitelist for the instance and add 0.0.0.0 to the whitelist.

Background information

A technology enterprise stores customer information in a database on an ApsaraDB RDS for MySQL instance and stores customer maintenance logs in a Logstore of Simple Log Service. The two types of data are continuously updated. The enterprise wants to perform a JOIN operation on the two types of data and save the results to a new Logstore.

In this case, you can use the res_rds_mysql function that is provided by Simple Log Service. The function allows you to pull data from the database and save the data to the destination Logstore. When your data transformation job runs to pull incremental data from the database, Simple Log Service pulls only new or updated data based on the timestamp field of the database. This ensures high efficiency. You can use the function in scenarios in which a large amount of data exists in the required database, data is frequently updated, data is infrequently deleted, or real-time data transformation is required.

For more information about incremental pulling and other pulling modes, see res_rds_mysql.

Resources and data samples

  • Simple Log Service resources

    • Project: client-project

    • Source Logstore: client-log

      The following figure shows data samples.

      Logstore

    • Destination Logstore: client-information

  • ApsaraDB RDS resources

    • Database: client-db

    • Table: client

      The following figure shows data samples.

      RDS数据库

    • Username and password of an account that is used to connect to the database: test and test1234@@

    • Public endpoint of the database: rm-bp1k****tp8o.mysql.rds.aliyuncs.com

Procedure

  1. Log on to the Simple Log Service console.

  2. Go to the data transformation page.

    1. In the Projects section, find and click the client-project project.

    2. On the Log Storage > Logstores tab, find and click the client-log Logstore.

    3. On the query and analysis page, click Data Transformation.

  3. In the upper-right corner of the page that appears, specify the time range of the data that you want to manage.

    Make sure that data exists on the Raw Logs tab.

  4. In the editor box, enter a data transformation statement.

    For more information about the parameters, see res_rds_mysql.

    e_table_map(
        res_rds_mysql(
            "rm-bp1k****tp8o.mysql.rds.aliyuncs.com",     
            "test",
            "test1234@@",
            "client-db",
            table="client",
            fields=["c_id", "name", "telephone", "update_time"],
            refresh_interval=1,
            primary_keys="c_id",
            update_time_key="update_time",
            deleted_flag_key=None,
        ),
        "c_id",
        ["name", "telephone"],
    )
  5. Preview data in quick mode.

    Check whether the specified data transformation statement is valid. For more information, see Quick preview.

    1. Select Quick.

    2. On the Data Testing > Data tab, enter the following content:

      {
        "__source__": "192.0.2.0",
        "__time__": 1624956516,
        "__topic__": "log",
        "__tag__:__client_ip__":"192.0.2.2",
        "c_id": "1",
        "staff_id": "002",
        "status": "Ongoing follow-up",
        "tag": "Follow-up visit",
      }
    3. On the Data Testing > Dimension Table tab, enter the following content:

      c_id,name,telephone,update_time,delete_flag
      1,maki,010-123,1606358931,false
    4. Click Preview Data.

      View the preview results.快速预览结果

  6. Preview data in advanced mode.

    Check whether Simple Log Service is connected to the ApsaraDB RDS for MySQL instance. For more information, see Advanced preview.

    1. Select Advanced.

    2. Click Preview Data.

    3. In the Add Preview Settings panel, configure the Authorization Method parameter and click OK.

      授权

    4. View the preview results.

      高级预览结果

      If an error is reported, fix the error by following the instructions provided in How do I fix syntax errors that occur when I pull data from ApsaraDB RDS for MySQL?

  7. Create a data transformation job.

    1. Click Save as Transformation Job.

    2. In the Create Data Transformation Job panel, configure the parameters and click OK.

      For more information, see Create a data transformation job. 加工规则

      After the data transformation job is created, you can view the transformed data in the destination Logstore. 加工结果

FAQ

How do I use the delete feature in the incremental pulling mode?

In the incremental pulling mode, Simple Log Service pulls only new or updated data based on the primary key and time field of the required database table. If you configure delete_flag=true for a data record in a database table to mark the record for deletion, Simple Log Service still pulls the record for transformation. To resolve this issue, Simple Log Service adds the deleted_flag_key parameter to the res_rds_mysql function. If you configure the deleted_flag_key parameter in a data transformation statement to pull data from a database table, the data transformation job deletes data rows for which the delete_flag parameter is set to true from the in-memory dimension table of the job after the job obtains the updated data from the table. This does not affect the content of the database table. The data rows that are deleted from the in-memory dimension table are excluded from the subsequent JOIN operation that is performed on log data.

Important
  • If the delete_flag parameter is set to true or 1 for a data row, the data row is marked for deletion. For more information, see res_rds_mysql.

  • If you configure the deleted_flag_key parameter in a data transformation statement, you must also configure the update_time_key parameter.

For example, the name=mia and name=tom data records are inserted into a database on an ApsaraDB RDS for MySQL instance, and the delete_flag parameter is set to true for the name=mia data record. This indicates that the data record is marked for deletion. When Simple Log Service updates the in-memory dimension table, the name=mia data record is deleted from the in-memory dimension table and is not transformed.

image

The following sample code provides an example of the data transformation statement:

e_table_map(
    res_rds_mysql(
        "rm-bp1****l3tp.mysql.rds.aliyuncs.com",     
        "test",
        "test1234@@",
        "client-db",
        table="client",
        fields=["c_id", "name", "telephone", "update_time"],
        refresh_interval=1,
        primary_keys="c_id",
        update_time_key="update_time",
        deleted_flag_key="delete_flag",
    ),
    "c_id",
    ["name", "telephone"],
)