This topic was translated by AI and is currently in queue for revision by our editors. Alibaba Cloud does not guarantee the accuracy of AI-translated content. Request expedited revision

MySQL data source

Updated at: 2025-03-24 05:31

MySQL data source provides you with bidirectional channels for reading and writing MySQL data. This topic describes the MySQL data synchronization capabilities supported by DataWorks.

Supported MySQL versions

  • Offline read and write:

    MySQL 5.5.x, MySQL 5.6.x, MySQL 5.7.x, and MySQL 8.0.x are supported, compatible with Amazon RDS for MySQL, Azure MySQL, and Amazon Aurora MySQL.

    Offline synchronization supports reading from view tables.

  • Real-time reading:

    Data Integration real-time reading of MySQL data is implemented based on real-time subscription to MySQL. Currently, it only supports real-time synchronization of MySQL 5.5.x, MySQL 5.6.x, MySQL 5.7.x, and MySQL 8.0.x versions (excluding new features in 8.0, such as functional index, only compatible with existing features), compatible with Amazon RDS for MySQL, Azure MySQL, and Amazon Aurora MySQL.

    Important

    If you need to synchronize DRDS MySQL, do not configure DRDS MySQL as a MySQL data source. You can refer to the Configure DRDS data source document to directly configure it as a DRDS data source.

Limits

Real-time reading

  • Synchronization of data from MySQL read-only instance is not supported.

  • Synchronization of tables containing Functional index is not supported.

  • XA ROLLBACK is not supported.

    For transaction data that has been XA PREPARE, real-time synchronization will synchronize it to the target end. If XA ROLLBACK occurs, real-time synchronization will not perform rollback write operations for the XA PREPARE data. To handle XA ROLLBACK scenarios, you need to manually remove the XA ROLLBACK table from the real-time synchronization task, then add the table again and resynchronize.

  • Only MySQL server Binlog configuration format set to ROW is supported.

  • Real-time synchronization will not synchronize associated table records that are cascade deleted.

  • For Amazon Aurora MySQL databases, you need to connect to your primary/write database because AWS does not allow activating the Binlog feature on Aurora MySQL read replicas. Real-time synchronization tasks require Binlog to perform incremental updates.

  • Real-time synchronization of online DDL changes only supports adding columns (Add Column) to MySQL tables through Data Management (DMS).

  • Reading stored procedures from MySQL is not supported.

Offline reading

  • When MySQL Reader plugin synchronizes multiple tables such as in sharding scenarios, if you want to split a single table, the number of concurrent tasks must be greater than the number of tables. Otherwise, the number of split tasks equals the number of tables.

  • Reading stored procedures from MySQL is not supported.

Supported field types

For the complete list of field types for various MySQL versions, see the MySQL official documentation. The following table lists the current support status of major fields using MySQL 8.0.x as an example.

Field type

Offline reading (MySQL Reader)

Offline writing (MySQL Writer)

Real-time reading

Real-time writing

Field type

Offline reading (MySQL Reader)

Offline writing (MySQL Writer)

Real-time reading

Real-time writing

TINYINT

image

image

image

image

SMALLINT

image

image

image

image

INTEGER

image

image

image

image

BIGINT

image

image

image

image

FLOAT

image

image

image

image

DOUBLE

image

image

image

image

DECIMAL/NUMBERIC

image

image

image

image

REAL

image

image

image

image

VARCHAR

image

image

image

image

JSON

image

image

image

image

TEXT

image

image

image

image

MEDIUMTEXT

image

image

image

image

LONGTEXT

image

image

image

image

VARBINARY

image

image

image

image

BINARY

image

image

image

image

TINYBLOB

image

image

image

image

MEDIUMBLOB

image

image

image

image

LONGBLOB

image

image

image

image

ENUM

image

image

image

image

SET

image

image

image

image

BOOLEAN

image

image

image

image

BIT

image

image

image

image

DATE

image

image

image

image

DATETIME

image

image

image

image

TIMESTAMP

image

image

image

image

TIME

image

image

image

image

YEAR

image

image

image

image

LINESTRING

image

image

image

image

POLYGON

image

image

image

image

MULTIPOINT

image

image

image

image

MULTILINESTRING

image

image

image

image

MULTIPOLYGON

image

image

image

image

GEOMETRYCOLLECTION

image

image

image

image

Preparation for data synchronization: MySQL environment preparation

Before performing data synchronization on DataWorks, you need to prepare the MySQL environment for data synchronization according to this document to ensure normal service when configuring and executing MySQL data synchronization tasks on DataWorks. The following describes the relevant environment preparation for MySQL synchronization.

Confirm MySQL version

Data Integration has requirements for MySQL versions. You can refer to the Supported MySQL versions section above to check if the MySQL to be synchronized meets the version requirements. You can check the current MySQL database version using the following statement in the MySQL database.

SELECT version();

Configure account permissions

It is recommended that you plan and create a MySQL account dedicated to DataWorks for accessing data sources in advance. The operation is as follows.

  1. Optional: Create an account.

    For operation details, see Create MySQL account.

  2. Configure permissions.

    • Offline

      In offline synchronization scenarios:

      • When reading MySQL data offline, this account needs to have read (SELECT) permission for the synchronized tables.

      • When writing MySQL data offline, this account needs to have write (INSERT, DELETE, UPDATE) permissions for the synchronized tables.

    • Real-time

      In real-time synchronization scenarios, this account needs to have SELECT, REPLICATION SLAVE, and REPLICATION CLIENT permissions for the database.

    You can refer to the following commands to add permissions to the account, or directly grant the account SUPER permission. When using the following execution statements, please replace 'sync account' with the account created above.

    -- CREATE USER 'sync account'@'%' IDENTIFIED BY 'password'; //Create a sync account and set a password to allow login from any host. % represents any host.
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync account'@'%'; //Grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT permissions on the database to the sync account.

    *.* means granting the above permissions to the sync account for all tables in all databases. You can also specify granting the above permissions to the sync account for specific tables in the target database. For example, to grant the above permissions to the sync account for the user table in the test database, you can use the statement GRANT SELECT, REPLICATION CLIENT ON test.user TO 'sync account'@'%';.

    Note

    The REPLICATION SLAVE statement is a global permission and cannot be specified to grant the sync account permissions for specific tables in the target database.

(Required only for real-time synchronization) Enable MySQL binlog

Data Integration achieves real-time incremental data synchronization through real-time subscription to MySQL Binlog. You need to enable the MySQL Binlog service before configuring synchronization on DataWorks. The operation is as follows:

Important
  • If Binlog is being consumed, it cannot be deleted by the database. If the real-time synchronization task runs with a delay, it may cause the source Binlog to be consumed for a long time. Please reasonably configure the task's delay alert and pay attention to the database's disk space in a timely manner.

  • Binlog should be retained for at least 72 hours or more to avoid data loss due to the disappearance of Binlog after a task failure, making it impossible to reset the position to before the failure occurred when restarting (in this case, only full offline synchronization can be used to fill in the data).

  1. Check if Binlog is enabled.

    • Use the following statement to check if Binlog is enabled.

      SHOW variables LIKE "log_bin";

      When the return result is ON, it indicates that Binlog is enabled.

    • If you use a standby database to synchronize data, you can also check if Binlog is enabled using the following statement.

      SHOW variables LIKE "log_slave_updates";

      When the return result is ON, it indicates that Binlog is enabled on the standby database.

    If the returned result does not match the above results:

  2. Query the Binlog format.

    Use the following statement to query the Binlog format.

    SHOW variables LIKE "binlog_format";

    Explanation of return results:

    • Returns ROW, indicating that the enabled Binlog format is ROW.

    • Returns STATEMENT, indicating that the enabled Binlog format is STATEMENT.

    • Returns MIXED, indicating that the enabled Binlog format is MIXED.

    Important

    DataWorks real-time synchronization only supports synchronizing MySQL server Binlog configuration format set to ROW. If the return is not ROW, please modify the Binlog Format.

  3. Check if the complete Binlog log is enabled.

    Use the following statement to check if the complete Binlog log is enabled.

    SHOW variables LIKE "binlog_row_image";

    Explanation of return results:

    • Returns FULL, indicating that the complete log is enabled for Binlog.

    • Returns MINIMAL, indicating that the minimal log is enabled for Binlog, and the complete log is not enabled.

    Important

    DataWorks real-time synchronization only supports synchronizing data from MySQL servers that have enabled the complete Binlog log. If the query result returns non-FULL, please modify the binlog_row_image configuration.

OSS binlog read authorization configuration

When adding a MySQL data source, if the Configuration Mode is set to Alibaba Cloud Instance Mode, and the RDS MySQL instance region is in the same region as the DataWorks project space, you can enable Support OSS Binlog Reading. After enabling this, when the RDS Binlog cannot be accessed, it will attempt to obtain Binlog from OSS to avoid interruption of real-time synchronization tasks.

If the selected OSS Binlog Access Identity is Alibaba Cloud RAM User or Alibaba Cloud RAM Role, you need to configure account authorization as follows.

  • Alibaba Cloud RAM user.

    1. Log in to the RAM Access Control-Users console and find the user that needs authorization. Specific operation:

    2. Click Actions column's Add Permissions.

    3. Configure the following key parameters, then click Confirm.

      • Resource Scope: Account level.

      • Policy: System policy.

      • Policy Name: AliyunDataWorksAccessingRdsOSSBinlogPolicy.

      image

  • Alibaba Cloud RAM role.

    1. Log in to the RAM Access Control-Roles console and create a RAM role. For specific operations, see Create a RAM role with a trusted entity of an Alibaba Cloud account.

      Key parameters:

      • Trusted Entity Type: Alibaba Cloud account.

      • Trusted Entity Name: Other Alibaba Cloud account, you need to fill in the Alibaba Cloud account that owns the DataWorks workspace.

      • Role Name: Custom.

    2. Grant precise permissions to the created RAM role. For specific operations, see Grant permissions to a RAM role.

      Key parameters:

      • Policy: System policy.

      • Policy Name: AliyunDataWorksAccessingRdsOSSBinlogPolicy.

    3. Modify the trust policy for the created RAM role. For specific operations, see Modify the trust policy of a RAM role.

      {
          "Statement": [
              {
                  "Action": "sts:AssumeRole",
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "@cdp.aliyuncs.com",
                          "@dataworks.aliyuncs.com"
                      ]
                  }
              }
          ],
          "Version": "1"
      }

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Add and manage data sources. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.

Data synchronization task development: MySQL synchronization process guide

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Single table offline synchronization task configuration guide

Single table real-time synchronization task configuration guide

For the operation process, see DataStudio real-time synchronization task configuration.

Configuration guide for database-level synchronization such as offline database, real-time full and incremental database, and real-time sharding database

For the operation process, see Data Integration synchronization task configuration.

FAQ

For more common Data Integration issues, see Common issues in Data Integration.

Appendix: MySQL script demo and parameter description

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

This document provides configuration examples for single database single table and sharding:

Note

The comments in the JSON examples in this document are only used to show the meaning of some important parameters. When configuring, please remove the comment content.

  • Configure single database single table

    {
      "type": "job",
      "version": "2.0",//Version number.
      "steps": [
        {
          "stepType": "mysql",//Plugin name.
          "parameter": {
            "column": [//Column names.
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""//Data source name.
              }
            ],
            "where": "",//Filter condition.
            "splitPk": "",//Split key.
            "encoding": "UTF-8"//Encoding format.
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"//Number of error records.
        },
        "speed": {
          "throttle": true,//When throttle is false, the mbps parameter is ineffective, indicating no rate limiting; when throttle is true, it indicates rate limiting.
          "concurrent": 1,//Number of concurrent jobs.
          "mbps": "12"//Rate limit, here 1mbps = 1MB/s.
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • Configure sharding

    Note

    Sharding refers to the ability to select multiple MySQL data tables on the MySQL Reader side, with consistent table structures. The 'sharding' here refers to multiple MySQL tables writing to the same target table. If you want to support database-level sharding configuration, please create a task on the Data Integration site and select the database sharding capability

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "Test order table",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

Reader script parameters

Script parameter name

Description

Required

Default value

Script parameter name

Description

Required

Default value

datasource

Data source name. The code editor mode supports adding data sources, and the content filled in this configuration item must be consistent with the added data source name.

Yes

None

table

The name of the table to be synchronized. A data integration task can only read data from one table.

Advanced usage examples of table for configuring ranges:

  • You can configure range reading for sharded tables, for example 'table_[0-99]' means reading from 'table_0', 'table_1', 'table_2' up to 'table_99'.

  • If your table numeric suffixes have consistent lengths, such as 'table_000', 'table_001', 'table_002' up to 'table_999', you can configure it as '"table":["table_00[0-9]","table_0[10-99]","table_[100-999]"]'.

Note

The task will read all matched tables and specifically read the columns specified in the column configuration item from these tables. If the table does not exist, or the columns being read do not exist, the task will fail.

Yes

None

column

The set of column names in the configured table that need to be synchronized, described using a JSON array. The default is to use all columns configuration, such as [*].

  • Supports column pruning: You can select specific columns for export.

  • Supports column reordering: Columns can be exported in an order different from the table schema.

  • Supports constant configuration: You need to follow MySQL SQL syntax format, for example ["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"].

    • id is a regular column name.

    • table is a column name containing reserved words.

    • 1 is an integer constant.

    • 'mingya.wmy' is a string constant (note that it needs to be enclosed in single quotes).

    • About null:

      • " " represents empty.

      • null represents null.

      • 'null' represents the string "null".

    • to_char(a+1) is a string length calculation function.

    • 2.3 is a floating-point number.

    • true is a Boolean value.

  • column must explicitly specify the set of columns to be synchronized and cannot be empty.

Yes

None

splitPk

When MySQL Reader extracts data, if splitPk is specified, it means you want to use the field represented by splitPk for data sharding. Data synchronization will then start concurrent tasks for data synchronization, improving the efficiency of data synchronization.

  • It is recommended to use the table's primary key for splitPk, as the primary key is usually evenly distributed, making the resulting shards less likely to have data hotspots.

  • Currently, splitPk only supports integer data splitting and does not support string, floating-point, date, or other types. If you specify other unsupported types, the splitPk function will be ignored, and a single channel will be used for synchronization.

  • If you do not fill in splitPk, including not providing splitPk or setting splitPk to empty, data synchronization will use a single channel to synchronize the table data.

No

None

where

Filter condition. In actual business scenarios, you often want to synchronize data for the current day, so you can specify the where condition as gmt_create>$bizdate.

  • The where condition can effectively perform incremental business synchronization. If you do not fill in the where statement, including not providing the key or value for where, data synchronization will be treated as synchronizing all data.

  • You cannot specify the where condition as limit 10, as this does not comply with MySQL SQL WHERE clause constraints.

No

None

querySql (Advanced mode, this parameter configuration is not supported in codeless UI mode)

In some business scenarios, the where configuration item is not sufficient to describe the filtering conditions. You can use this configuration item to customize filtering SQL. After configuring this item, the data synchronization system will ignore the tables, columns, and splitPk configuration items, and directly use the content configured in this item to filter data. For example, if you need to join multiple tables and then synchronize the data, you can use select a,b from table_a join table_b on table_a.id = table_b.id. When you configure querySql, MySQL Reader directly ignores the table, column, where, and splitPk condition configurations. The querySql has higher priority than table, column, where, and splitPk options. datasource is used to parse information such as username and password.

Note

querySql is case-sensitive. For example, writing it as querysql will not work.

No

None

useSpecialSecret

For multiple source data sources, whether to use each data source's password. Values include the following:

  • true

  • false

If you have configured multiple source data sources and each data source uses different usernames and passwords, you can set this parameter to true to use each data source's password.

No

false

Writer script demo

{
  "type": "job",
  "version": "2.0",//Version number.
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",//Plugin name.
      "parameter": {
        "postSql": [],//Statements to execute after import.
        "datasource": "",//Data source.
        "column": [//Column names.
          "id",
          "value"
        ],
        "writeMode": "insert",//Write mode, you can set it to insert, replace, or update.
        "batchSize": 1024,//Number of records in a single batch commit.
        "table": "",//Table name.
        "nullMode": "skipNull",//NULL value handling strategy.
        "skipNullColumn": [//Columns to skip NULL values.
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"//Statements to execute before import.
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {//Number of error records.
      "record": "0"
    },
    "speed": {
      "throttle": true,//When throttle is false, the mbps parameter is ineffective, indicating no rate limiting; when throttle is true, it indicates rate limiting.
      "concurrent": 1,//Number of concurrent jobs.
      "mbps": "12"//Rate limit, controls the maximum rate of synchronization to prevent excessive read/write pressure on upstream/downstream databases, here 1mbps = 1MB/s.
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Writer script parameters

Script parameter name

Description

Required

Default value

Script parameter name

Description

Required

Default value

datasource

Data source name. The code editor mode supports adding data sources, and the content filled in this configuration item must be consistent with the added data source name.

Yes

None

table

The name of the table to be synchronized.

Yes

None

writeMode

Select the import mode, which can support insert into, on duplicate key update, and replace into methods:

  • insert into: When there is a primary key/unique index conflict, the conflicting row will not be written, appearing as dirty data.

    If you configure tasks using the code editor mode, set writeMode to insert.

  • on duplicate key update: When there is no primary key/unique index conflict, the behavior is consistent with insert into. In case of conflict, the values of the specified fields in the existing row will be replaced with the new row, and the data will be written to MySQL.

    If you configure tasks using the code editor mode, set writeMode to update.

  • replace into: When there is no primary key/unique index conflict, the behavior is consistent with insert into. In case of conflict, the original row will be deleted first, and then the new row will be inserted. That is, the new row will replace all fields of the original row.

    If you configure tasks using the code editor mode, set writeMode to replace.

No

insert

nullMode

NULL value handling strategy, value range:

  • writeNull: When the source field data is NULL, write NULL value to the target field.

  • skipNull: When the source field data is NULL, the target does not write this field. If the target has a default value definition, the column value will use the target's default value. If the target has no default value definition, the column value will be NULL. When configuring this parameter, you also need to configure the skipNullColumn parameter.

Important

When configured as skipNull, the task will dynamically concatenate SQL statements for writing data to support target default values, which will increase the number of FLUSH operations and reduce synchronization speed. In the worst case, it will FLUSH once per data record.

No

writeNull

skipNullColumn

When nullMode is configured as skipNull, the columns configured in this parameter will not be forced to write as NULL, and will preferentially use the default value of the corresponding column.

Configuration format: ["c1","c2",...], where c1, c2 need to be configured as a subset of the column parameter.

No

Default is all columns configured for this task.

column

Fields in the target table that need to be written with data, separated by commas, for example "column":["id","name","age"]. To write all columns in sequence, use an asterisk (*), for example "column":["*"].

Yes

None

preSql

SQL statements to execute before the data synchronization task. Currently, the codeless UI mode only allows executing one SQL statement, while the code editor mode can support multiple SQL statements. For example, clearing old data from the table before execution (truncate table tablename).

Note

When there are multiple SQL statements, transactions are not supported.

No

None

postSql

SQL statements to execute after the data synchronization task. Currently, the codeless UI mode only allows executing one SQL statement, while the code editor mode can support multiple SQL statements. For example, adding a timestamp ALTER TABLE tablename ADD colname TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP.

Note

When there are multiple SQL statements, transactions are not supported.

No

None

batchSize

The number of records in a single batch commit. This value can greatly reduce the number of network interactions between the data synchronization system and MySQL, and improve overall throughput. If this value is set too high, it may cause OOM exceptions in the data synchronization process.

No

256

updateColumn

When writeMode is configured as update, this specifies the fields to update when a primary key/unique index conflict occurs. Fields are separated by commas, for example "updateColumn":["name","age"].

No

None

  • On this page (1)
  • Supported MySQL versions
  • Limits
  • Real-time reading
  • Offline reading
  • Supported field types
  • Preparation for data synchronization: MySQL environment preparation
  • Confirm MySQL version
  • Configure account permissions
  • (Required only for real-time synchronization) Enable MySQL binlog
  • Add a data source
  • Data synchronization task development: MySQL synchronization process guide
  • Single table offline synchronization task configuration guide
  • Single table real-time synchronization task configuration guide
  • Configuration guide for database-level synchronization such as offline database, real-time full and incremental database, and real-time sharding database
  • FAQ
  • Appendix: MySQL script demo and parameter description
  • Configure a batch synchronization task by using the code editor
  • Reader script demo
  • Reader script parameters
  • Writer script demo
  • Writer script parameters
Feedback
phone Contact Us