DataWorks provides MySQL Reader and MySQL Writer for you to read data from and write data to MySQL data sources. This topic describes the capabilities of synchronizing data from or to MySQL data sources.
Supported MySQL versions
Batch data read and write:
MySQL 5.5.X, MySQL 5.6.X, MySQL 5.7.X, and MySQL 8.0.X. Amazon RDS for MySQL, Azure MySQL, and Amazon Aurora MySQL data sources are compatible.
Data of views can be read during batch synchronization.
Real-time data read:
Real-time synchronization of data from MySQL is performed based on real-time subscription to MySQL binary logs and is supported only in MySQL 5.5.X, MySQL 5.6.X, MySQL 5.7.X, and MySQL 8.0.X. The real-time synchronization feature is incompatible with new features in MySQL 8.0.X, such as functional index. Amazon RDS for MySQL, Azure MySQL, and Amazon Aurora MySQL data sources are compatible.
ImportantDRDS data sources that run MySQL cannot be configured as MySQL data sources. If you want to synchronize data from a DRDS data source that runs MySQL in real time, you can refer to the Add a DRDS data source topic to add a DRDS data source and configure a real-time synchronization task for the data source.
Limits
Real-time data read
Data Integration does not support real-time synchronization of data from a read-only MySQL instance by using MySQL Reader.
Data Integration does not support real-time synchronization of data from tables that contain functional indexes.
Data Integration does not support real-time synchronization of data on which XA ROLLBACK statements are executed.
For transaction data on which XA PREPARE statements are executed, you can use the real-time synchronization feature to synchronize the data to a destination. If XA ROLLBACK statements are executed later on the data, the rollback changes to the data cannot be synchronized to the destination. If the tables that you want to synchronize include tables on which XA ROLLBACK statements are executed, you must remove the tables on which XA ROLLBACK statements are executed from the real-time synchronization task and add the removed tables back to synchronize data.
Real-time synchronization of data from MySQL supports only binary logs in the row format.
Data Integration does not support real-time synchronization of data records in associated tables on which the cascade delete operation is performed.
If you want to read data from an Amazon Aurora MySQL database, you must connect the Amazon Aurora MySQL database to your primary database for data writing. This is because AWS does not allow you to enable the binary logging feature on read-only replicas of Amazon Aurora MySQL. The binary logging feature is required for real-time synchronization tasks to synchronize incremental data.
Among all online DDL operations on a MySQL table, only the Add Column operation that is performed by using Data Management (DMS) can be synchronized in real time.
Batch data read
When you use MySQL Reader to synchronize data from MySQL in multi-table synchronization scenarios such as sharding, if you want to split tables, the number of parallel threads that you specified must be greater than the number of the tables. If the number of parallel threads that you specified is equal to or lower than the number of the tables, the tables are not split, and the number of parallel threads that are actually used is the same as the number of tables to be synchronized.
Supported data types
For information about all data types in each MySQL version, see the official MySQL documentation. The following table provides the support status of main data types in MySQL 8.0.X.
Data type | MySQL Reader for batch data read | MySQL Writer for batch data write | MySQL Reader for real-time data read | MySQL Writer for real-time data write |
TINYINT | Supported | Supported | Supported | Supported |
SMALLINT | Supported | Supported | Supported | Supported |
INTEGER | Supported | Supported | Supported | Supported |
BIGINT | Supported | Supported | Supported | Supported |
FLOAT | Supported | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported | Supported |
DECIMAL/NUMBERIC | Supported | Supported | Supported | Supported |
REAL | Not supported | Not supported | Not supported | Not supported |
VARCHAR | Supported | Supported | Supported | Supported |
JSON | Supported | Supported | Supported | Supported |
TEXT | Supported | Supported | Supported | Supported |
MEDIUMTEXT | Supported | Supported | Supported | Supported |
LONGTEXT | Supported | Supported | Supported | Supported |
VARBINARY | Supported | Supported | Supported | Supported |
BINARY | Supported | Supported | Supported | Supported |
TINYBLOB | Supported | Supported | Supported | Supported |
MEDIUMBLOB | Supported | Supported | Supported | Supported |
LONGBLOB | Supported | Supported | Supported | Supported |
ENUM | Supported | Supported | Supported | Supported |
SET | Supported | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported | Supported |
BIT | Supported | Supported | Supported | Supported |
DATE | Supported | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported | Supported |
TIMESTAMP | Supported | Supported | Supported | Supported |
TIME | Supported | Supported | Supported | Supported |
YEAR | Supported | Supported | Supported | Supported |
LINESTRING | Not supported | Not supported | Not supported | Not supported |
POLYGON | Not supported | Not supported | Not supported | Not supported |
MULTIPOINT | Not supported | Not supported | Not supported | Not supported |
MULTILINESTRING | Not supported | Not supported | Not supported | Not supported |
MULTIPOLYGON | Not supported | Not supported | Not supported | Not supported |
GEOMETRYCOLLECTION | Not supported | Not supported | Not supported | Not supported |
Prepare a MySQL environment before data synchronization
Before you use DataWorks to synchronize data from or to a MySQL data source, you must prepare a MySQL environment. This ensures that a data synchronization task can be configured and can synchronize data from or to the MySQL data source as expected. The following information describes how to prepare a MySQL environment for data synchronization from or to a MySQL data source.
Preparation 1: Check the version of your MySQL database
Data Integration has specific requirements for the MySQL version. You can refer to Supported MySQL versions to check whether the version of your MySQL database meets the requirements You can execute the following statement to check the version of your MySQL database:
SELECT version();
Preparation 2: Prepare an account that has the required permissions
We recommend that you plan and create an account for DataWorks to access your MySQL database. To prepare such an account, perform the following steps:
Optional. Create an account.
For more information, see Create an account to access a MySQL database.
Grant the required permissions to the account.
Batch synchronization
Different permissions are required for batch data read and batch data write:
Batch data read: The account must have the
SELECT
permission.Batch data write: The account must have the
INSERT
,DELETE
, andUPDATE
permissions.
Real-time synchronization
The account must have the
SELECT
,REPLICATION SLAVE
, andREPLICATION CLIENT
permissions on your MySQL database.
You can execute the following statement to grant permissions to the account. Alternatively, you can grant the
SUPER
permission to the account. ReplaceAccount for data synchronization
in the statement with the created account.-- CREATE USER 'Account for data synchronization'@'%' IDENTIFIED BY 'Password'; //Create an account that can be used for data synchronization and specify a password. This way, you can use the account and password to access the database from any host. % indicates a host. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Account for data synchronization'@'%'; // Grant the SELECT, REPLICATION SLAVE, and REPLICATION CLIENT permissions to the account.
*.*
indicates that the account is granted the preceding permissions on all tables in all databases. You can also grant the preceding permissions on specific tables in a database to the account. For example, to grant the account the preceding permissions on the user table in the test database, execute the following statement:GRANT SELECT, REPLICATION CLIENT ON test.user TO 'Account for data synchronization'@'%';
.NoteThe
REPLICATION SLAVE
permission is a global permission. You cannot grant this permission on specific tables in a database to the account.
Preparation 3: Enable the binary logging feature only in real-time synchronization scenarios
Real-time synchronization of incremental data from MySQL is performed based on real-time subscription to MySQL binary logs. Before you configure a real-time synchronization task to synchronize incremental data from MySQL, you must enable the binary logging feature. To enable the binary logging feature, perform the following steps.
If a real-time synchronization task is synchronizing data from the binary logs of a MySQL data source, the binary logs cannot be deleted from the related database. If latency occurs on a real-time synchronization task for a MySQL data source, the task may require an extended period of time to read data from binary logs. You must configure an appropriate alert rule for the task to enable the system to send an alert notification when latency occurs on the task. In addition, you must pay attention to the disk space of the related database.
Binary logs must be retained for at least 72 hours. If binary logs are retained for a short period of time, the binary logs may be cleared before you rerun a synchronization task that fails. In this case, you cannot reset the offset of the binary logs to the position before the synchronization task fails. This results in data loss. If data loss occurs, you can perform only batch synchronization of full data to supplement missing data.
Check whether the binary logging feature is enabled.
Execute the following statement to check whether the binary logging feature is enabled:
SHOW variables like "log_bin";
If ON is displayed in the returned result, the binary logging feature is enabled.
If you want to synchronize incremental data from a secondary MySQL database, execute the following statement to check whether the binary logging feature is enabled for the secondary MySQL database:
SHOW variables LIKE "log_slave_updates";
If ON is displayed in the returned result, the binary logging feature is enabled for the secondary MySQL database.
If ON is not displayed in the returned result:
Open source MySQL: Follow the instructions that are described in the official MySQL documentation to enable the binary logging feature.
ApsaraDB RDS for MySQL: Follow the instructions that are described in Use the log backup feature for an ApsaraDB RDS for MySQL instance to enable the binary logging feature.
PolarDB for MySQL: Follow the instructions that are described in Enable binary logging to enable the binary logging feature.
Query the format of binary logs.
Execute the following statement to query the format of binary logs:
SHOW variables LIKE "binlog_format";
The system may return one of the following results:
ROW: The format of binary logs is row.
STATEMENT: The format of binary logs is statement.
MIXED: The format of binary logs is mixed.
ImportantReal-time synchronization of data from MySQL supports only binary logs in the row format. If the format of binary logs is not row, change the format to row.
Query the setting of the binlog_row_image parameter.
Execute the following statement to query the setting of the binlog_row_image parameter.
show variables like "binlog_row_image";
The system may return one of the following results:
FULL: logs all columns in both the before image and the after image.
MINIMAL: logs only specific columns in the before image or in the after image.
ImportantReal-time synchronization of data from MySQL supports only the FULL value specified for the binlog_row_image parameter. If the value of the binlog_row_image parameter is not FULL, change the value to FULL.
Authorization for reading binary logs from OSS
When you add a MySQL data source, you can turn on Enable Binary Log Reading from OSS if you set the Configuration Mode parameter to Alibaba Cloud Instance Mode and set the Region parameter to the region in which the current DataWorks workspace resides. After you turn on this switch, DataWorks attempts to obtain binary logs from Object Storage Service (OSS) when it cannot read binary logs from ApsaraDB RDS for MySQL. This prevents real-time synchronization tasks from being interrupted.
If you set the Identity to Access Binary Logs from OSS parameter to Alibaba Cloud RAM User or Alibaba Cloud RAM Role, you must also perform the following operations to grant permissions to the RAM user or RAM role.
Alibaba Cloud RAM User
Go to the Users page in the RAM console. Find the RAM user to which you want to grant permissions.
Click Add Permissions in the Actions column.
In the Grant Permission panel, configure parameters and click Grant permissions. Key parameters:
Resource Scope: Select Account.
Policy: Select System Policy from the drop-down list.
Enter
AliyunDataWorksAccessingRdsOSSBinlogPolicy
in the search box to find the policy and add the policy to the Selected Policy area.
Alibaba Cloud RAM Role
Go to the Roles page in the RAM console. Create a RAM role on the Roles page. For more information, see Create a RAM role for a trusted Alibaba Cloud account.
Key parameters:
Select Trusted Entity: Set this parameter to Alibaba Cloud Account.
RAM Role Name: Specify a custom role name.
Select Trusted Alibaba Cloud Account: Set this parameter to Other Alibaba Cloud Account, and enter the ID of the Alibaba Cloud account to which the DataWorks workspace belongs in the field that appears.
Grant permissions to the created RAM role. For more information, see Grant permissions to a RAM role.
Key parameters:
Policy: Select System Policy from the drop-down list.
Attach the
AliyunDataWorksAccessingRdsOSSBinlogPolicy
policy to the RAM role.
Modify the trust policy for the created RAM role. For more information, see Edit the trust policy of a RAM role.
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "<ID of the Alibaba Cloud account of a DataWorks user>@cdp.aliyuncs.com" ] } } ], "Version": "1" }
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a real-time synchronization task in DataStudio.
Configure synchronization settings to implement batch synchronization of all data in a database, real-time synchronization of full data or incremental data in a database, and real-time synchronization of data from sharded tables in a sharded database
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
FAQ
What do I do if an error is returned during the real-time synchronization of data from MySQL?
Why does my real-time synchronization task that is used to synchronize data from MySQL slows down?
What do I do if a batch synchronization task runs for an extended period of time?
For information about other common issues in Data Integration, see FAQ about Data Integration.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for MySQL Reader
The following sample code provides examples on how to configure a synchronization task to read data from a table that is not sharded and how to configure a synchronization task to read data from a sharded table.
The comments in the JSON examples are used to only indicate the definitions of some major parameters. Remove the comments when you configure the parameters.
Configure a synchronization task to read data from a table that is not sharded
{ "type": "job", "version": "2.0",// The version number. "steps": [ { "stepType": "mysql",// The plug-in name. "parameter": { "column": [// The names of the columns. "id" ], "connection": [ { "querySql": [ "select a,b from join1 c join join2 d on c.id = d.id;" ], "datasource": ""// The name of the data source. } ], "where": "",// The WHERE clause. "splitPk": "",// The shard key. "encoding": "UTF-8"// The encoding format. }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": {}, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0"// The maximum number of dirty data records allowed. }, "speed": { "throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. "concurrent": 1,// The maximum number of parallel threads. "mbps": "12"// The maximum transmission rate. Unit: MB/s. } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Configure a synchronization task to read data from a sharded table
NoteIn sharding scenarios, you can read data from multiple table shards that have the same schema of a sharded MySQL table and write the data to the same destination table. If you want to synchronize all data from a database shard, you can create a data synchronization task in Data Integration and select the required synchronization method.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "indexes": [ { "type": "unique", "column": [ "id" ] } ], "envType": 0, "useSpecialSecret": false, "column": [ "id", "buyer_name", "seller_name", "item_id", "city", "zone" ], "tableComment": "Test order table", "connection": [ { "datasource": "rds_dataservice", "table": [ "rds_table" ] }, { "datasource": "rds_workshop_log", "table": [ "rds_table" ] } ], "where": "", "splitPk": "id", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": {}, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" } } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Parameters in code for MySQL Reader
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table from which you want to read data. Each synchronization task can be used to synchronize data from only one table. For a sharded table, you can use the table parameter to specify the partitions from which you want to read data. Examples:
Note MySQL Reader reads data from the columns that are specified by the column parameter in the partitions that are specified by the table parameter. If a specified partition or column does not exist, the synchronization task fails. | Yes | No default value |
column | The names of the columns from which you want to read data. Specify the names in a JSON array. The default value is [ * ], which indicates all columns in the source table.
| Yes | No default value |
splitPk | The field that is used for data sharding when MySQL Reader reads data. If you specify this parameter, the source table is sharded based on the value of this parameter. Data Integration then runs parallel threads to read data. This way, data can be synchronized more efficiently.
| No | No default value |
where | The WHERE clause. For example, you can set this parameter to
| No | No default value |
querySql (advanced parameter, which is available only in the code editor) | The SQL statement that is used for refined data filtering. If you configure this parameter, data is filtered based only on the value of this parameter. For example, if you want to join multiple tables for data synchronization, set this parameter to Note The name of the querySql parameter is case-sensitive. For example, querysql does not take effect. | No | No default value |
useSpecialSecret | Specifies whether to use the access password of each data source when you synchronize data from multiple data sources. Valid values:
If you have added multiple data sources and the usernames and passwords that are used to access the data sources are different, you can set this parameter to true to use the usernames and passwords separately to access corresponding data sources. | No | false |
Code for MySQL Writer
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mysql",// The plug-in name.
"parameter": {
"postSql": [],// The SQL statement that you want to execute after the synchronization task is run.
"datasource": "",// The name of the data source.
"column": [// The names of the columns.
"id",
"value"
],
"writeMode": "insert",// The write mode. Valid values: insert, replace, and update.
"batchSize": 1024,// The number of data records to write at a time.
"table": "",// The name of the table.
"nullMode": "skipNull",// The policy for processing a NULL value.
"skipNullColumn": [// Specifies the columns in which NULL values need to be skipped.
"id",
"value"
],
"preSql": [
"delete from XXX;"// The SQL statement that you want to execute before the synchronization task is run.
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// The maximum number of dirty data records allowed.
"record": "0"
},
"speed": {
"throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent": 1,// The maximum number of parallel threads.
"mbps": "12"// The maximum transmission rate. Unit: MB/s. You can specify a maximum transmission rate to prevent heavy read workloads on the source or heavy write workloads on the destination.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Parameters in code for MySQL Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table to which you want to write data. | Yes | No default value |
writeMode | The write mode. The following modes are supported: insert into, on duplicate key update, and replace into.
| No | insert |
nullMode | The policy for processing a NULL value. Valid values:
Important If you set this parameter to skipNull, SQL statements that are used to write data are dynamically concatenated to support a default value at the destination. This increases the number of flushes and slows down data synchronization. In the worst case, each data record is flushed once. | No | writeNull |
skipNullColumn | Specifies the columns in which NULL values need to be skipped. When you set the nullMode parameter to skipNull, the Configure this parameter in the format of | No | All columns configured for the task |
column | The names of the columns to which you want to write data. Separate the names with commas (,), such as | Yes | No default value |
preSql | The SQL statement that you want to execute before the synchronization task is run. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor. For example, you can execute the TRUNCATE TABLE tablename statement to delete outdated data before the synchronization task is run. Note If you configure multiple SQL statements, the statements are not executed in the same transaction. | No | No default value |
postSql | The SQL statement that you want to execute after the synchronization task is run. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor. For example, you can execute the Note If you configure multiple SQL statements, the statements are not executed in the same transaction. | No | No default value |
batchSize | The number of data records to write at a time. Set this parameter to an appropriate value based on your business requirements. This greatly reduces the interactions between Data Integration and MySQL and increases throughput. If you set this parameter to an excessively large value, an out of memory (OOM) error may occur during data synchronization. | No | 256 |
updateColumn | The names of columns that are updated when a primary key conflict or unique index conflict occurs. This parameter takes effect only when the writeMode parameter is set to on duplicate key update. Separate multiple column names with commas (,). Example: | No | No default value |