Realtime Compute for Apache Flink allows you to read data from AnalyticDB for PostgreSQL instances. This topic describes the prerequisites, syntax, parameters in the WITH and CACHE clauses, data type mappings, and how to create and run a Flink job.
Prerequisites
A Realtime Compute for Apache Flink cluster V3.6.0 or later is created.
An AnalyticDB for PostgreSQL V6.0 instance is created. The Realtime Compute for Apache Flink cluster and the AnalyticDB for PostgreSQL instance reside within the same virtual private cloud (VPC). The CIDR block of the cluster is added to the IP address whitelist of the AnalyticDB for PostgreSQL instance.
Syntax
CREATE TABLE dim_adbpg(
id int,
username varchar,
INDEX(id)
) with(
type='custom',
tableFactoryClass='com.alibaba.blink.customersink.ADBPGCustomSourceFactory',
url='jdbc:postgresql://Internal endpoint/databasename',
tableName='tablename',
userName='username',
password='password',
joinMaxRows='100',
maxRetryTimes='1',
connectionMaxActive='5',
retryWaitTime='100',
targetSchema='public',
caseSensitive='0',
cache='LRU',
cacheSize='1000',
cacheTTLMs='10000',
cacheReloadTimeBlackList='2017-10-24 14:00 -> 2017-10-24 15:00',
partitionedJoin='true'
);
-- When you join a dimension table with another table, you must include FOR SYSTEM_TIME AS OF PROCTIME() in the JOIN clause.
INSERT INTO print_sink
SELECT R.c1, R.a2, R.a3, R.a4, R.a5, R.a6, R.a6, R.a8, R.a9, R.a10, R.a11, R.a13, T.username
FROM s_member_cart_view AS R
left join
dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS T
on R.c1 = T.id;
Parameters in the WITH clause
Parameter | Description | Remarks |
Parameter | Description | Remarks |
url | The JDBC URL that is used to connect to the AnalyticDB for PostgreSQL instance. | Required. Specify the URL in the jdbc:postgresql://<Internal endpoint>/databaseName format. |
type | The table type. | Required. |
tableName | The name of the source table created in the AnalyticDB for PostgreSQL instance. | Required. Set the tableName parameter to the name of the source table that is joined with the dimension table. |
userName | The name of the database account used to connect to the AnalyticDB for PostgreSQL instance. | Required. |
password | The password of the database account used to connect to the AnalyticDB for PostgreSQL instance. | Required. |
joinMaxRows | The maximum number of rows in the right table that can be joined with a row in the left table. | Optional. If a row in the left table needs to be joined with multiple rows in the right table, specify this parameter. Default value: 1024. If one row is joined with a large number of rows, the performance of streaming tasks may degrade. In this case, you must increase the cache size. The cacheSize parameter is used to limit the number of keys in the left table. |
maxRetryTimes | The maximum number of retries allowed after a statement fails to be executed. | Optional. In actual scenarios, a statement may fail to be executed for a variety of reasons, such as network jitter, I/O latency, and timeout. If a statement fails to be executed on an AnalyticDB for PostgreSQL dimension table, the statement is automatically retried. You can use the maxRetryTimes parameter to specify the maximum number of retries. Default value: 3. |
connectionMaxActive | The maximum number of active connections that can be allocated in a connection pool at the same time. | Optional. An AnalyticDB for PostgreSQL dimension table provides built-in connection pooling. For efficiency and security purposes, we recommend that you specify this parameter. Default value: 5. |
retryWaitTime | The wait interval between retries of failed statements. | Optional. Default value: 100. Unit: milliseconds. |
targetSchema | The schema of the AnalyticDB for PostgreSQL instance. | Optional. Default value: public. |
caseSensitive | Specifies whether the dimension table name is case-sensitive. | Optional. A value of 0 indicates that the table name is case-insensitive. A value of 1 indicates that the table name is case-sensitive. Default value: 0. |
Parameters in the CACHE clause
Parameter | Description | Remarks |
Parameter | Description | Remarks |
cache | The policy that is used to cache data. | Default value: None. Valid values:
|
cacheSize | The maximum number of rows that can be cached. | Optional. You can specify this parameter only when the cache parameter is set to LRU. Default value: 10000. |
cacheTTLMs | The interval at which the system refreshes the cache. The system reloads the most recent data in the dimension table based on the value of this parameter to ensure that the source table can be joined with the most recent data of the dimension table. | Optional. Unit: milliseconds. By default, this parameter is left empty, which indicates that the most recent data in the dimension table is not reloaded. |
cacheReloadTimeBlackList | The time periods during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the time periods that you specify for this parameter. This parameter is useful for large-scale online promotional events such as Double 11. | Optional. By default, this parameter is left empty. Specify this parameter in the '2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00' format. Make sure that the parameter conforms to the following rules of delimiters:
|
partitionedJoin | Specifies whether to enable the partitionedJoin feature. If the partitionedJoin feature is enabled, the shuffle operation is performed based on join keys before the source table is joined with the dimension table. This method provides the following benefits:
| Optional. By default, this parameter is set to false, which indicates that the partitionedJoin feature is disabled. |
Data type mappings
Data type of Realtime Compute for Apache Flink | Data type of AnalyticDB for PostgreSQL |
Data type of Realtime Compute for Apache Flink | Data type of AnalyticDB for PostgreSQL |
BOOLEAN | BOOLEAN |
TINYINT | SMALLINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DOUBLE | DOUBLE PRECISION |
VARCHAR | TEXT |
DATETIME | TIMESTAMP |
DATE | DATE |
FLOAT | REAL |
DECIMAL | DOUBLE PRECISION |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
Create and run a Flink job
Log on to the Realtime Compute Development Platform. In the top navigation bar, move the pointer over the profile picture and click Project Management. On the Projects page, find a project and click its name.
In the left-side navigation pane, click Development. In the top navigation bar, click Create File to create a Flink SQL file in which to write data.
Dimension tables can be created in Realtime Compute for Apache Flink to read data from tables of an AnalyticDB for PostgreSQL instance. Before the dimension table customization feature is published, you must upload and reference a JAR package. In the left-side navigation pane, click the Resources navigation tab, and then click Create Resource. In the Upload Resource dialog box, upload a JAR package from your computer and click OK. In the left-side navigation pane, choose More > Reference in the Actions column corresponding to the JAR package name. Download the JAR package.
After you create a file, click Save and then click Publish to publish the job.
On the Administration page, find the job and click Start in the Actions column.
Sample code
The following sample code demonstrates how to use a dimension table to read data from an AnalyticDB for PostgreSQL instance and use the Print connector to write the data to the standard SQL output of the Realtime Compute for Apache Flink cluster logs.
--SQL
--********************************************************************--
--Author: zihua
--CreateTime: 2019-09-07 10:34:34
--********************************************************************--
CREATE TABLE s_member_cart
(
a1 int,
a2 tinyint ,
a3 smallint ,
a4 int,
a5 boolean,
a6 FLOAT ,
a7 DECIMAL ,
a8 double,
a9 date ,
a10 time ,
a11 timestamp ,
a12 tinyint
) WITH (
type='random'
);
CREATE VIEW s_member_cart_view AS
SELECT MOD(a1, 10) c1, a2, a3, a4, a5, a6, a6, a8, a9, a10, a11, case when a12 >0 then 'test1' else 'test5' end as b12,'{ "customer": "English56", "items": {"product": "Beer","qty": 6}}' a13
FROM s_member_cart;
--adbpg dim index
CREATE TABLE dim_adbpg(
id int,
username varchar,
INDEX(id)
) with(
type='custom',
tableFactoryClass='com.alibaba.blink.customersink.ADBPGCustomSourceFactory',
url='jdbc:postgresql://Internal endpoint/databasename',
tableName='tablename',
userName='username',
password='password',
joinMaxRows='100',
maxRetryTimes='1',
connectionMaxActive='5',
retryWaitTime='100',
targetSchema='public',
caseSensitive='0',
cache='LRU',
cacheSize='1000',
cacheTTLMs='10000',
cacheReloadTimeBlackList='2017-10-24 14:00 -> 2017-10-24 15:00',
partitionedJoin='true'
);
-- ads sink.
CREATE TABLE print_sink (
B1 int,
B2 tinyint ,
B3 smallint ,
B4 int,
B5 boolean,
B6 FLOAT ,
B7 FLOAT ,
B8 double,
B9 date ,
B10 time ,
B11 timestamp ,
B12 varchar,
B15 varchar,
PRIMARY KEY(B1)
) with (
type='print'
);
INSERT INTO print_sink
SELECT R.c1, R.a2, R.a3, R.a4, R.a5, R.a6, R.a6, R.a8, R.a9, R.a10, R.a11, R.a13, T.username
FROM s_member_cart_view AS R
left join
dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS T
on R.c1 = T.id;