This topic describes how to use the AnalyticDB for MySQL V3.0 connector.
Background information
AnalyticDB for MySQL V3.0 is a cloud-native enterprise-class data warehousing service that integrates database and big data technologies. AnalyticDB for MySQL supports high-throughput real-time data addition, removal, and modification, low-latency real-time data analysis, and complex extract, transform, and load (ETL) operations. AnalyticDB for MySQL is compatible with upstream and downstream ecosystem tools and can be used to build enterprise-class report systems, data warehouses, and data service engines.
The following table describes the capabilities supported by the AnalyticDB for MySQL V3.0 connector.
Item | Description |
Table type | Dimension table and sink table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric | N/A |
API type | SQL API |
Data update or deletion in a sink table | Supported |
Prerequisites
An AnalyticDB for MySQL cluster and an AnalyticDB for MySQL table are created. For more information, see Create a cluster and CREATE TABLE.
A whitelist is configured for the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.
Limits
The AnalyticDB for MySQL V3.0 connector can be used only for dimension tables and sink tables and cannot be used for source tables.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.X or later supports the AnalyticDB for MySQL V3.0 connector.
Syntax
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
The primary key that is specified in the Flink DDL statement must be consistent with the primary key of the physical table in the AnalyticDB for MySQL database. The primary key must be specified in the Flink DDL statement and exist in the physical table in the AnalyticDB for MySQL database at the same time. The name of the primary key specified in the Flink DDL statement must be the same as the name of the primary key of the physical table. If the primary keys are not the same, data may be incorrect.
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the sink table.
STRING
Yes
No default value
Set the value to adb3.0.
url
The Java Database Connectivity (JDBC) URL of the database.
STRING
Yes
No default value
The JDBC URL of the AnalyticDB for MySQL database. The URL is in the jdbc:mysql://<endpoint>:<port>/<databaseName> format.
endpoint and port: You can log on to the AnalyticDB for MySQL console. In the left-side navigation pane, click the name of the desired cluster in the Cluster ID/Cluster Description column. On the page that appears, obtain the information in the Network Information section.
databaseName: the name of the AnalyticDB for MySQL database.
userName
The username that is used to access the AnalyticDB for MySQL database.
STRING
Yes
No default value
N/A.
password
The password that is used to access the AnalyticDB for MySQL database.
STRING
Yes
No default value
N/A.
tableName
The name of the table in the database.
STRING
Yes
No default value
N/A.
maxRetryTimes
The maximum number of retries that are allowed if a data writing or reading attempt fails.
INTEGER
No
The default value of this parameter varies based on the VVR version of Realtime Compute for Apache Flink:
If the VVR version is 3.X or earlier, the default value is 3.
If the VVR version is 4.0.10 or later, the default value is 10.
N/A.
Parameters only for sink tables
Parameter
Description
Data type
Required
Default value
Remarks
batchSize
The number of data records that can be written at a time.
INTEGER
No
The default value of this parameter varies based on the VVR version of Realtime Compute for Apache Flink:
If the VVR version is 3.X or earlier, the default value is 100.
If the VVR version is 4.0.10 or later, the default value is 1000.
This parameter takes effect only after you specify the primary key.
bufferSize
The maximum number of data records that can be cached in the memory. Write operations are triggered if the value of the batchSize or bufferSize parameter reaches the specified threshold.
INTEGER
No
1000
This parameter takes effect only after you specify the primary key.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.
flushIntervalMs
The interval at which the cache is cleared. This value indicates that if the number of cached data records does not reach the upper limit in a specified period of time, all cached data is written to the sink table.
INTEGER
No
The default value of this parameter varies based on the VVR version of Realtime Compute for Apache Flink:
If the VVR version is 3.X or earlier, the default value is 1000.
If the VVR version is 4.0.10 or later, the default value is 3000.
Unit: milliseconds.
ignoreDelete
Specifies whether to ignore delete operations.
BOOLEAN
No
false
Valid values:
true: The delete operations are ignored.
false: The delete operations are not ignored.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.
replaceMode
Specified whether to use the REPLACE INTO statement to insert data into the table if a primary key is specified in the DDL statement.
BOOLEAN
No
true
The isolation.level parameter has the following valid values:
true: The
REPLACE INTO
statement is used to insert data into the table.false: The
INSERT INTO ON DUPLICATE KEY UPDATE
statement is used to insert data into the table.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.
Only AnalyticDB for MySQL V3.1.3.5 or later supports this parameter.
This parameter takes effect only when a primary key is specified in the DDL statement. The statement that is used to insert data into the table varies based on whether a primary key is specified and the value of the replaceMode parameter.
If a primary key is specified in the DDL statement and the replaceMode parameter is set to true, the
REPLACE INTO
statement is used.If a primary key is specified in the DDL statement and the replaceMode parameter is set to false, the
INSERT INTO ON DUPLICATE KEY UPDATE
statement is used.If no primary key is specified in the DDL statement, the
INSERT INTO
statement is used.
excludeUpdateColumns
The fields that are not updated when data that has the same primary key is updated.
STRING
No
An empty string
Separate multiple fields with commas (,). Example:
excludeUpdateColumns=column1,column2
.NoteThis parameter takes effect only when the replaceMode parameter is set to false. If the replaceMode parameter is set to true, the values of the fields specified by this parameter are changed to null.
Make sure that the columns that you want to ignore are written in one line and are not wrapped.
connectionMaxActive
The maximum size of the thread pool.
INTEGER
No
40
Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.
Parameters only for dimension tables
Parameter
Description
Data type
Required
Default value
Remarks
cache
The cache policy.
STRING
No
ALL
Valid values:
None: No data is cached.
LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.
ALL: All data in the dimension table is cached. This is the default value. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.
If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.
NoteIf you set the cache parameter to ALL, you must monitor the memory usage of the node to prevent out of memory (OOM) errors.
If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice the data amount in the remote table.
cacheSize
The maximum number of data records that can be cached.
INTEGER
No
100000
You must configure the cacheSize parameter when the cache parameter is set to LRU.
cacheTTLMs
The cache timeout period. Unit: milliseconds.
INTEGER
No
Long.MAX_VALUE
You must configure the cacheTTLMs parameter when the cache parameter is set to LRU or ALL.
If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. Default value:
Long.MAX_VALUE
. The default value indicates that cache entries do not expire.If the cache parameter is set to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the data in the physical table. Default value:
Long.MAX_VALUE
. The default value indicates that data in the physical table is not reloaded.
NoteIf the cache parameter is set to None, you do not need to configure the cacheTTLMs parameter. If the cache parameter is set to None, data is not cached. Therefore, you do not need to configure the cacheTTLMs parameter.
maxJoinRows
The maximum number of results returned after each data record in the primary table is mapped to the data in the dimension table.
INTEGER
No
1024
If you can estimate that each data record in the primary table is mapped to a maximum of n data records in the dimension table, you can configure maxJoinRows='n' to ensure efficient matching in Realtime Compute for Apache Flink.
NoteWhen you join the primary table with a dimension table, the number of results returned after an input data record in the primary table is mapped to the data records in the dimension table is limited by this parameter.
Data type mappings
Data type of AnalyticDB for MySQL V3.0 | Data type of Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) or NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
Sample code
Sample code for a sink table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;
Sample code for a dimension table
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;