This topic describes how to use the Hologres connector.
Background information
Hologres is an end-to-end real-time data warehouse service that allows you to write, update, and analyze large amounts of data in real time. It is compatible with the PostgreSQL protocol and supports standard SQL syntax. Hologres supports online analytical processing (OLAP) and ad hoc queries for petabytes of data, and provides high-concurrency, low-latency online data services. Hologres is seamlessly integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, and provides full-stack online and offline data warehouse solutions. The following table describes the capabilities supported by the Hologres connector.
Item | Description |
Table type | Source table, dimension table, and sink table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric |
|
API type | DataStream API and SQL API |
Data update or deletion in a sink table | Supported |
Features
Features supported for a source table
Feature | Description |
Real-time consumption of Hologres data |
For more information, see Use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time. |
Features supported for a sink table
Feature | Description |
Writes changelogs to a Hologres sink table. | |
Updates only data in specific fields instead of data in an entire row. | |
Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements | Synchronizes data from an entire database or a single table to Hologres tables in real time and synchronizes schema changes of each table to Hologres tables in real time. |
Data update in specific columns Note Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later supports this feature. | Pushes down the column names that are specified in the INSERT statement of Realtime Compute for Apache Flink to the Hologres connector to update only the specified columns. |
Prerequisites
A Hologres table is created. For more information, see Manage internal tables.
Limits
Limits on all types of Hologres tables
Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Hologres connector.
The Hologres connector cannot be used to access Hologres foreign tables. For more information about Hologres foreign tables, see Create a foreign table mapped to MaxCompute.
For more information about the known defects and release notes of the Hologres connector, see the "Hologres connector release note" section of the Overview topic.
Limits only on Hologres source tables
By default, Realtime Compute for Apache Flink performs a full table scan only once to read all data from a Hologres source table at a time. Data consumption is complete when the data scan ends. Realtime Compute for Apache Flink does not read the data that is appended to the Hologres source table after the data consumption. Realtime Compute for Apache Flink that uses VVR 3.0.0 or later can consume Hologres data in real time. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time. In Realtime Compute for Apache Flink that uses VVR 6.0.3 or later, filter pushdown is supported when the Hologres connector reads data from Hologres. For more information, see the description of the
enable_filter_push_down
parameter that is used for source tables.Realtime Compute for Apache Flink that uses a VVR version earlier than 8.0 does not support watermarks for a Hologres source table in CDC mode. If you want to perform window aggregation on a Hologres source table, you can use a different approach to perform time-based aggregation. For more information, see the "MySQL CDC source tables and Hologres CDC source tables do not support window functions. How do I implement minute-level data aggregation on a MySQL CDC source table or Hologres CDC source table?" section of the FAQ about CDC topic.
Limits only on Hologres sink tables: None.
Limits only on Hologres dimension tables
We recommend that you specify a primary key as the JOIN condition for a Hologres dimension table when you join the Hologres dimension table with another table and use row-oriented storage to create a Hologres dimension table. Column-oriented storage incurs large performance overheads for point queries. When you use row-oriented storage to create a Hologres dimension table, you must specify the primary key as the clustering key. For more information, see CREATE TABLE.
If you cannot use the primary key as the JOIN condition and you perform a point query by using a non-primary key, we recommend that you use column-oriented storage when you create a Hologres table. If you perform a point query by using a non-primary key, a row of data is queried and multiple rows of data are returned. In this case, we recommend that you specify the distribution key and event time column (segment key) to optimize query performance. For more information, see Distribution key, Event time column (segment key), and Storage models of tables: row-oriented storage, column-oriented storage, and row-column hybrid storage.
In Realtime Compute for Apache Flink that uses VVR of a version earlier than 4.0, you can perform a point query by using the primary key of a Hologres dimension table when you join the Hologres dimension table with another table. In Realtime Compute for Apache Flink that uses VVR 4.0 or later, you can perform a point query by using a non-primary key of a Hologres dimension table when you join the Hologres dimension table with another table and a JDBC-related mode is used for the Hologres dimension table.
Precautions
If you set the sdkMode parameter to rpc for a table, take note of the following items when you upgrade the VVR version:
The SDK modes that can be configured for dimension tables and sink tables of Hologres V2.0 and later are changed. You cannot set the sdkMode parameter to rpc for dimension tables and sink tables. Only the Java Database Connectivity (JDBC)-related modes can be used for tables. The JDBC-related modes include the JDBC, jdbc_fixed, and jdbc_copy modes. If you set the sdkMode parameter to rpc for a table, Realtime Compute for Apache Flink does not deduplicate data that has the same primary key in the same batch. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication. You can also upgrade the VVR version to 8.0.5 and set the deduplication.enabled parameter to false to avoid deduplication.
If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X for a deployment in which data of Hologres V2.0 is read or written in remote procedure call (RPC) mode, you can configure the SDK mode based on the upgrade scenario.
If you upgrade the VVR version from 4.X to a 6.X minor version that ranges from 6.0.4 to 6.0.6, an error may be returned. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed for dimension tables and sink tables.
If you upgrade the VVR version from 4.X to 6.0.7 or later, no action is required. Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to JDBC-related modes.
If you set the sdkMode parameter to holohub for a binary log source table, take note of the following items when you upgrade the VVR version:
In Hologres V2.0 and later, you can set the sdkMode to holohub for Hologres binlog source tables only in specific scenarios. In Hologres V2.1 and later, you cannot set the sdkMode parameter to holohub for Hologres binlog source tables and you can only set the sdkMode parameter to jdbc for Hologres binlog source tables.
If a binary log source table is consumed in your deployment and you do not set the sdkMode parameter to jdbc, the HoloHub mode is used for the binary log source table by default. If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X, take note of the following points based on the Hologres version when you configure the SDK mode.
Hologres V2.0
If you upgrade the VVR version from 4.X to a VVR version that ranges from 6.0.7 to 8.0.3, Realtime Compute for Apache Flink still reads data from a binary log source table in HoloHub mode.
If you upgrade the VVR version from 4.X to 8.0.4 or later, a permission issue may occur. You must configure permissions. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.
Hologres V2.1
If you upgrade the VVR version from 4.X to a VVR version that ranges from 6.0.7 to 8.0.4, binary log data may not be consumed as expected. We recommend that you upgrade the VVR version to 8.0.5.
If you upgrade the VVR version from 4.X to 8.0.5 or later, no operation is required. Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC.
Syntax
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'sdkmode' = 'jdbc'
);
Parameters in the WITH clause
Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later supports all parameters whose names start with jdbc.
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
STRING
Yes
No default value
Set the value to
hologres
.dbname
The name of the database.
STRING
Yes
No default value
Hologres V2.0 introduces virtual warehouse instances as a new type of elastic and high-availability instances. Computing resources are divided into multiple virtual warehouses to implement high-availability deployments. Different virtual warehouses share the same endpoint. You can add a specific suffix to the value of the dbname parameter to specify the virtual warehouse to which you want to connect. For example, if you want to connect a dimension table to the virtual warehouse read_warehouse, you can specify
'dbname' = 'db_test@read_warehouse'
.NoteVirtual warehouses are supported only when JDBC-related modes are used for tables. For more information, see the sdkMode parameter of the source table, dimension table, and sink table in the "Parameters in the WITH clause" section of this topic.
tablename
The name of the table.
STRING
Yes
No default value
If the public schema is not used, you must set the tablename parameter to a value in the
schema.tableName
format.username
The username that is used to access the database. Enter the AccessKey ID of your Alibaba Cloud account.
STRING
Yes
No default value
For more information, see Console operations.
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys.
password
The password that is used to access the database. Enter the AccessKey secret of your Alibaba Cloud account.
STRING
Yes
No default value
For more information, see Console operations.
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys.
endpoint
The endpoint of Hologres.
STRING
Yes
No default value
For more information, see Endpoints for connecting to Hologres.
connection.ssl.mode
Specifies whether to enable SSL-encrypted transmission and specifies the SSL-encrypted transmission mode that you want to use.
STRING
No
disable
Valid values:
disable: SSL-encrypted transmission is disabled. This is the default value.
require: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt only the connections that are used to transmit data.
verify-ca: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data and uses certificate authority (CA) certificates to verify the Hologres server.
verify-full: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data, uses CA certificates to verify the Hologres server, and checks whether the common name (CN) or Domain Name System (DNS) specified in the CA certificates is consistent with the Hologres endpoint that is specified during connection setup.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.
You can set the connection.ssl.mode parameter to require in Hologres V1.1 and later. You can set the connection.ssl.mode parameter to verify-ca or verify-full in Hologres V2.1 and later. For more information, see Transmission encryption.
If you set this parameter to verify-ca or verify-full, you must specify the connection.ssl.root-cert.location parameter.
connection.ssl.root-cert.location
The path of the certificate if a CA certificate is used.
STRING
No
No default value
If you set the connection.ssl.mode parameter to verify-ca or verify-full, you must specify this parameter. You can click Upload Artifact on the Artifacts page in the development console of Realtime Compute for Apache Flink to upload the CA certificate file. The uploaded file is stored in the /flink/usrlib directory. For example, if the name of the CA certificate file that you want to use is certificate.crt, you must set this parameter to
'/flink/usrlib/certificate.crt'
in the WITH clause. This way, you can use the certificate after you upload the certificate file.NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.
For more information about how to obtain a CA certificate, see Step 2: Download the CA certificate.
jdbcRetryCount
The maximum number of retries allowed to read and write data if a connection failure occurs.
INTEGER
No
10
N/A.
jdbcRetrySleepInitMs
The fixed waiting period for each retry.
LONG
No
1000
The actual waiting period for a retry is calculated by using the following formula:
Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter
. Unit: milliseconds.jdbcRetrySleepStepMs
The accumulated waiting period for each retry.
LONG
No
5000
The actual waiting period for a retry is calculated by using the following formula:
Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter
. Unit: milliseconds.jdbcConnectionMaxIdleMs
The maximum duration for which the JDBC connection is idle.
LONG
No
60000
If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is closed and released. Unit: milliseconds.
jdbcMetaCacheTTL
The maximum time for storing the TableSchema information in the cache.
LONG
No
60000
Unit: milliseconds.
jdbcMetaAutoRefreshFactor
The factor for triggering automatic cache refresh. If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, Realtime Compute for Apache Flink automatically refreshes the cache.
INTEGER
No
4
The remaining time for storing data in the cache is calculated by using the following formula: Remaining time for storing data in the cache = Maximum time that is allowed to store data in the cache - Duration for which data is cached. After the cache is automatically refreshed, the duration for which data is cached is recalculated from 0.
The time for triggering automatic cache refresh is calculated by using the following formula: Time for triggering an automatic refresh of the cache = Value of the jdbcMetaCacheTTL parameter/Value of the jdbcMetaAutoRefreshFactor parameter.
type-mapping.timestamp-converting.legacy
Specifies whether to perform time type conversions between Realtime Compute for Apache Flink and Hologres.
BOOLEAN
No
true
Valid values:
true: does not perform time type conversions between Realtime Compute for Apache Flink and Hologres. Time zone conversion is performed based on the time zone of Java Virtual Machine (JVM) in the runtime environment.
false: recommended. Performs time type conversions between Realtime Compute for Apache Flink and Hologres. Time zone conversion is performed based on the time zone of Realtime Compute for Apache Flink.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
For more information about the time zones of Realtime Compute for Apache Flink and Hologres, see the Time zones of Realtime Compute for Apache Flink and Hologres section of this topic.
If you set the property-version parameter to 0, the default value of this parameter is true. If you set the property-version parameter to 1, the default value of this parameter is false.
property-version
The parameter version of the connector.
INTEGER
No
0
Valid values: 0 and 1. Default value: 0.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
The set of available parameters and the default values of the parameters may be different in different parameter versions. If the set of available parameters and the default values of the parameters are different, the differences are described in the description of the parameters.
We recommend that you set this parameter to 1.
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
field_delimiter
The delimiter that is used between rows of the data that you want to export.
STRING
No
"\u0002"
N/A.
binlog
Specifies whether to consume binary log data.
BOOLEAN
No
false
Valid values:
true: Binary log data is consumed.
false: Binary log data is not consumed. This is the default value.
NoteIn Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.
If you set the property-version parameter to 0, the default value of this parameter is false.
If you set the property-version parameter to 1, the default value of this parameter is true.
sdkMode
The SDK mode.
STRING
No
holohub
Valid values:
holohub: Binary log data is consumed in HoloHub mode. This is the default value.
jdbc: Binary log data is consumed in JDBC mode.
jdbc_fixed: Binary log data is consumed in fixed JDBC mode. Compared with data consumption in JDBC mode, data consumption in fixed JDBC mode does not occupy connections and is not subject to limits on the number of connections. This mode does not allow you to consume the binary log data of databases for which the data masking feature is enabled. For more information, see Mask data.
For more information, see the Create a Hologres source table for which the binary logging feature is enabled section of this topic.
The value of this parameter varies based on the VVR version and the Hologres version:
VVR 6.0.3 and an earlier 6.X version: This parameter is not supported.
VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to holohub.
VVR 6.0.7 and a later 6.X version: We recommend that you set this parameter to jdbc.
VVR 8.0.4 to 8.0.5: Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC.
VVR 8.0.6 and later: Realtime Compute for Apache Flink automatically changes the SDK mode based on the Hologres version.
For Hologres V2.1.27 or later, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to fixed JDBC.
For Hologres V2.1.0 to Hologres V2.1.26, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC.
In Hologres V2.0, Realtime Compute for Apache Flink first attempts to use the JDBC mode. If an exception, such as insufficient permission, exists, Realtime Compute for Apache Flink automatically uses the HoloHub mode.
NoteVVR 6.0.7 and a later 6.X version:
If the Hologres instance is of a version earlier than Hologres V2.0, Realtime Compute for Apache Flink uses the SDK mode that you configure.
In Hologres V2.0 or later, the HoloHub mode is no longer supported. If you set this parameter to holohub, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC. If you set this parameter to jdbc, Realtime Compute for Apache Flink uses the JDBC mode.
VVR 8.0.4 to VVR 8.0.5:
In Hologres V2.0, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC. A permission issue may occur. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.
jdbcBinlogSlotName
The slot name of the binary log source table in JDBC mode. For more information about how to create a binary log source table in JDBC mode, see the "JDBC mode used to consume binary log source tables" section of the Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time topic.
STRING
No
No default value
This parameter takes effect only when the sdkMode parameter is set to jdbc. If you do not configure this parameter, the Hologres connector creates a slot by default. For more information, see JDBC mode used to consume binary log source tables.
NoteYou do not need to specify this parameter when the version of the Hologres instance is 2.1 or later. The Hologres connector does not attempt to automatically create a slot.
binlogMaxRetryTimes
The number of retries after Realtime Compute for Apache Flink fails to read binary log data.
INTEGER
No
60
N/A.
binlogRetryIntervalMs
The interval between retires after Realtime Compute for Apache Flink fails to read binary log data.
LONG
No
2000
Unit: milliseconds.
binlogBatchReadSize
The number of rows in which binary log data is read at a time.
INTEGER
No
100
N/A.
cdcMode
Specifies whether to read binary log data in CDC mode.
BOOLEAN
No
false
Valid values:
true: Binary log data is read in CDC mode.
false: Binary log data is read in non-CDC mode. This is the default value.
NoteIn Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.
If you set the property-version parameter to 0, the default value of this parameter is false.
If you set the property-version parameter to 1, the default value of this parameter is true.
upsertSource
Specifies whether the source table reads a changelog stream that contains UPSERT messages.
BOOLEAN
No
false
This parameter takes effect only in CDC mode. Valid values:
true: The source table reads a changelog stream that contains only UPSERT messages, including INSERT, DELETE, and UPDATE_AFTER.
false: The source table reads a changelog stream that contains full change messages, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. This is the default value.
NoteFor example, if the ROW_NUMBER() function together with an OVER clause of Flink SQL is used to perform deduplication and retract operators exist in the sink table, you must set upsertSource to true. In this case, the source table reads data from Hologres in the upsert fashion.
binlogStartupMode
The mode in which binary log data is consumed.
STRING
No
earliestOffset
Valid values:
initial: The system consumes full data and then reads binary log data to consume incremental data.
earliestOffset: The system starts data consumption from the earliest binary log data. This is the default value.
timestamp: The system consumes binary log data from the point in time that is specified by the startTime parameter.
NoteIf the startTime parameter or the start time is specified in the dialog box for starting a deployment, the timestamp mode is forcefully used for the binlogStartupMode parameter. The other two values of this parameter cannot be used. In this case, the startTime parameter has a higher priority.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.
If you set the property-version parameter to 0, the default value of this parameter is false.
If you set the property-version parameter to 1, the default value of this parameter is true.
startTime
The start time when Hologres data is consumed.
STRING
No
No default value
Format: yyyy-MM-dd hh:mm:ss. If this parameter is not specified and deployments are not resumed from the state, Realtime Compute for Apache Flink starts to consume the Hologres data from the earliest generated binary log.
jdbcScanFetchSize
The number of data records that can be processed at a time during the scan operation.
INTEGER
No
256
N/A.
jdbcScanTimeoutSeconds
The timeout period of the scan operation.
INTEGER
No
60
Unit: seconds.
jdbcScanTransactionSessionTimeoutSeconds
The timeout period for the transaction to which the scan operation belongs.
INTEGER
No
600
A value of 0 indicates that the scan operation never times out. This parameter corresponds to the Grand Unified Configuration (GUC) parameter idle_in_transaction_session_timeout of Hologres. For more information, see GUC parameters.
NoteOnly Realtime Compute for Apache Flink whose engine version is vvr-4.0.15-flink-1.13 or later supports this parameter.
enable_filter_push_down
Specifies whether to perform filter pushdown during the full data reading phase.
BOOLEAN
No
false
Valid values:
false: Filter pushdown is not performed. This is the default value.
true: The supported filter conditions are pushed down to Hologres during the full data reading phase. The filter pushdown operation is performed in the following full data reading scenarios: full data reading from a Hologres source table in which the binary logging feature is disabled, and full data reading in a binary log source table when the Hologres connector consumes full and incremental data in a source table.
ImportantIn Realtime Compute for Apache Flink whose engine version is from vvr-6.0.3-flink-1.15 to vvr-6.0.5-flink-1.15, filter pushdown is automatically performed. However, if a deployment uses a Hologres dimension table and the DML statement that is used to write data contains filter conditions for non-primary key fields in the dimension table, the filter conditions of the dimension table are incorrectly pushed down. As a result, an error may occur when the dimension table is joined with another table. To resolve this issue, we recommend that you use Realtime Compute for Apache Flink whose engine version is vvr-6.0.6-flink-1.15 or later and add this parameter to the WITH clause in the DDL statement that is used to create the source table to enable filter pushdown.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
sdkMode
The SDK mode.
STRING
No
jdbc
Valid values:
jdbc: Data is written by using a JDBC driver. This is the default value.
jdbc_copy: Data is written in jdbc_copy mode, which refers to the copy mode of a JDBC driver.
This mode is a new feature that is supported in Hologres 1.3. Data writing in this mode provides higher throughput and lower data latency, and consumes less client memory resources than data writing in JDBC mode because data is written in streaming mode. If the jdbc_copy mode is used, data cannot be deleted, data cannot be written to a parent partitioned table, and the ignoreNullWhenUpdate parameter is not supported.
rpc: Data is written in RPC mode. The effect when you set this parameter to rpc is similar to the effect when you set the useRpcMode parameter to true. Compared with data writing in JDBC mode, data writing in RPC mode does not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be written to Hologres.
jdbc_fixed: In this mode, data writing is performed by using a fixed JDBC driver. This feature is in public preview.
Only Hologres 1.3 or later supports this feature. Compared with data writing that is performed in JDBC mode, data writing that is performed in jdbc_fixed mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be written to Hologres. This mode does not allow you to write data to databases for which the data masking feature is enabled. For more information, see Mask data.
The value of this parameter varies based on the VVR version and the Hologres version:
VVR 6.0.3 and an earlier 6.X version: This parameter is not supported.
VVR 6.0.4 and a later 6.X version: We recommend that you set this parameter to jdbc.
NoteVVR 6.0.7 to VVR 8.0.1:
If the Hologres instance is of a version earlier than Hologres V2.0, Realtime Compute for Apache Flink uses the SDK mode that you configure.
In Hologres V2.0 or later, the RPC mode is no longer supported. If you set this parameter to rpc, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed. If you set this parameter to a value other than rpc, Realtime Compute for Apache Flink uses the SDK mode that you configure.
If you use the RPC mode for a table, Realtime Compute for Apache Flink does not perform deduplication on the data that has the same primary key. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication.
VVR 8.0.3 and later:
For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the RPC mode is no longer supported for tables regardless of the versions of Hologres instances. If the RPC mode is used for tables, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed and you can set the jdbcWriteBatchSize parameter to 1 to avoid deduplication.
bulkload
Specifies whether to write data in bulkload mode.
BOOLEAN
No
false
This parameter takes effect only when the sdkMode parameter is set to jdbc_copy. You can write data in bulkload mode to tables that do not have primary keys or tables whose primary keys have the same value. Compared with the jdbc_copy mode, less Hologres resources are used when you write data in bulkload mode.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later and Hologres V2.1 or later support this parameter.
useRpcMode
Specifies whether to connect to the Hologres connector by using RPC.
BOOLEAN
No
false
Valid values:
true: RPC is used to connect to the Hologres connector.
The effect of this setting is the same as the effect when you set the sdkMode parameter to rpc. If RPC is used, the number of SQL connections is reduced.
false: A JDBC driver is used to connect to the Hologres connector. This is the default value.
JDBC drivers require SQL connections. This increases the number of JDBC connections.
NoteThe RPC mode is no longer supported in Hologres V2.0. We recommend that you specify the sdkMode parameter to write data in JDBC or jdbc_fixed mode.
Realtime Compute for Apache Flink that uses VVR 6.0.7 or VVR 8.0.1 automatically changes the SDK mode from RPC to jdbc_fixed for a table when it detects that the version of Hologres is V2.0 or later.
For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the system automatically changes the SDK mode from RPC to jdbc_fixed.
If you use the RPC mode for a table, Realtime Compute for Apache Flink does not perform deduplication on the data that has the same primary key. If you want to retain complete data, we recommend that you use Realtime Compute for Apache Flink that uses VVR 8.0.5 or later, set the sdkMode parameter to jdbc, and set the deduplication.enabled parameter to false to avoid deduplication.
If you set the property-version parameter to 1, this parameter is not supported.
mutatetype
The data writing mode.
STRING
No
insertorignore
For more information, see the Streaming semantics section of this topic.
NoteIn Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.
If you set the property-version parameter to 0, the default value of this parameter is insertorignore.
If you set the property-version parameter to 1, the default value of this parameter is insertorupdate.
partitionrouter
Specifies whether to write data to a partitioned table.
BOOLEAN
No
false
N/A.
createparttable
Specifies whether to automatically create a partitioned table to which data is written based on partition values.
BOOLEAN
No
false
In RPC mode, if the partition values contain hyphens (-), partitioned tables cannot be automatically created.
NoteIn Hologres V1.3.22 or later, a field of the DATE type can be used as the partition key. For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, a partitioned table can be automatically created when you use a field of the DATE type as the partition key.
Make sure that partition values do not contain dirty data. If dirty data exists, a failover occurs because an invalid partitioned table is created. Proceed with caution when you use this parameter.
If you set the sdk_mode parameter to jdbc_copy, data cannot be written to a parent partitioned table.
ignoredelete
Specifies whether to ignore retraction messages.
BOOLEAN
No
true
NoteThis parameter takes effect only when the streaming semantics is used.
For Realtime Compute for Apache Flink that uses VVR 8.0.8 or later, we recommend that you configure the
sink.delete-strategy
parameter. If you configure the ignoredelete parameter together with the sink.delete-strategy parameter, only the value of thesink.delete-strategy
parameter is used.In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.
If you set the property-version parameter to 0, the default value of this parameter is true.
If you set the property-version parameter to 1, the default value of this parameter is false.
sink.delete-strategy
The operation that is performed when a retraction message is received.
STRING
No
No default value
Valid values:
IGNORE_DELETE: ignores UPDATE BEFORE and DELETE messages. This operation applies to scenarios in which data is inserted or updated and no data deletion is required.
NON_PK_FIELD_TO_NULL: ignores UPDATE BEFORE messages and updates the non-primary key values in DELETE messages to NULL. This operation applies to scenarios in which you want to delete data without affecting other columns during a partial update operation.
DELETE_ROW_ON_PK: ignores UPDATE BEFORE messages and deletes the row that matches the primary key value when a DELETE message is received. This operation applies to scenarios in which you want to delete a row during a partial update operation. The deletion affects other columns.
CHANGELOG_STANDARD: The framework of Realtime Compute for Apache Flink runs according to the working principles of Flink SQL changelogs. Delete operations are not ignored. If an update operation is performed, the original data is deleted and then new data is inserted. This ensures data accuracy. This operation applies to scenarios in which partial updates are not involved.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
Setting this parameter to NON_PK_FIELD_TO_NULL may sink in only the primary key value being recorded and other column values being null.
connectionSize
The size of the JDBC connection pool that is created in a Realtime Compute for Apache Flink deployment.
INTEGER
No
3
If the deployment provides poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.
jdbcWriteBatchSize
The maximum number of rows of data that can be processed by a Hologres streaming sink node at a time when a JDBC driver is used.
INTEGER
No
256
Unit: row.
NoteYou can configure only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met.
jdbcWriteBatchByteSize
The maximum number of bytes of data that can be processed by a Hologres streaming sink node at a time when a JDBC driver is used.
LONG
No
2097152 (2 × 1024 × 1024 bytes = 2 MB)
NoteYou can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met.
jdbcWriteFlushInterval
The maximum period of time that is required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at the same time when a JDBC driver is used.
LONG
No
10000
Unit: milliseconds.
NoteYou can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres sink table when one of the related conditions is met.
ignoreNullWhenUpdate
Specifies whether to ignore the null value that is written in the data if mutatetype='insertOrUpdate' is specified.
BOOLEAN
No
false
Valid values:
false: The null value is written to a Hologres sink table. This is the default value.
true: The null value that is written in the data is ignored.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0 or later supports this parameter.
If you set the sdk_mode parameter to jdbc_copy, this parameter is not supported.
connectionPoolName
The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.
STRING
No
No default value
Set the value to a string other than
default
. If the same connection pool is configured for multiple tables, the value of the connectionSize parameter for the tables must be the same.NoteVVR version earlier than 4.0.12: This parameter is not supported.
VVR 4.0.12 to VVR 8.0.3: By default, tables do not share a connection pool. Each table uses its connection pool.
VVR 8.0.4 or later: By default, tables that use the same endpoint in the same deployment share a connection pool. If the number of tables in a deployment is excessively large, the connections in a connection pool may be relatively insufficient. This affects the deployment performance. In this case, we recommend that you set the connectionPoolName parameter to different values for different tables.
You can specify this parameter based on your business requirements. For example, a deployment uses the following Hologres tables: Dimension Table A, Dimension Table B, Sink Table C, Sink Table D, and Sink Table E. You can configure the connection pool pool1 for Dimension Table A and Dimension Table B and the connection pool pool2 for Sink Table C and Sink Table D. You can configure the connection pool pool3 for Sink Table E in which a large number of data is processed.
jdbcEnableDefaultForNotNullColumn
Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table.
BOOLEAN
No
true
Valid values:
true: The Hologres connector fills a default value. This is the default value. If you set this parameter to true, the system converts a null value into a default value based on the following rules:
If the column is of the STRING type, the column is left empty.
If the column is of the NUMBER data type, the null value is converted into 0.
If the column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted into 1970-01-01 00:00:00.
false: The Hologres connector does not fill a default value. If a null value is written to a non-null column for which the default value is not configured in the Hologres table, an error is returned.
remove-u0000-in-text.enabled
Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from data of the STRING type that is written to the result table if the data contains the invalid characters \u0000.
BOOLEAN
No
false
Valid values:
false: The Hologres connector does not process data. If the data that is written to the result table contains dirty data, the
"ERROR: invalid byte sequence for encoding "UTF8": 0x00"
error message may be reported. This is the default value.In this case, you need to remove the dirty data from the source table in advance or define the dirty data processing logic in the SQL statement.
true: The Hologres connector removes the invalid characters \u0000 from the data of the STRING type to prevent the error message from being reported.
ImportantFor Realtime Compute for Apache Flink that uses VVR 8.0.1 or a later 8.X version, this parameter is supported only if you set the sdkMode parameter to
jdbc
.For Realtime Compute for Apache Flink that uses VVR 8.0.8 or later, this parameter is supported only if you set the sdkMode parameter to
jdbc_copy
orjdbc
.
partial-insert.enabled
Specifies whether to insert only the fields declared in the INSERT statement.
BOOLEAN
No
false
Valid values:
false: All fields that are declared in the DDL statement of the result table are updated regardless of which fields are declared in the INSERT statement. Fields that are not declared in the INSERT statement are updated to null values. This is the default value.
true: The fields that are declared in the INSERT statement are pushed down to a connector. In this case, only the declared fields are updated in the result table or inserted into the result table.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.
This parameter takes effect only if the mutatetype parameter is set to InsertOrUpdate.
deduplication.enabled
Specifies whether to perform deduplication when data is written in batches in jdbc or jdbc_fixed mode.
BOOLEAN
No
true
Valid values:
true: If the data that is written in batches contains data that has the same primary key, deduplication is performed. Only the last data record is retained. This is the default value. In this example, two fields are used and the first field is the primary key.
If the
INSERT (1,'a')
andINSERT (1,'b')
records are written to the result table in chronological order, only the record(1,'b')
that arrives later is retained and written to the Hologres result table after deduplication is performed.The
(1,'a')
record already exists in the Hologres result table. In this case, theDELETE (1,'a')
andINSERT (1,'b')
records are written to the result table in chronological order. Only the(1,'b')
record that arrives later is retained and written to the Hologres result table. In this case, data is directly updated rather than deleted and then inserted into the table.
false: No deduplication is performed when data is written in batches. If the primary key of the inserted data has the same value as the primary key of the data that is written in batches, the data in batches is written, and then the data that needs to be inserted is written.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.
In extreme cases, if no deduplication is performed when data is written in batches and all data has the same primary key, data is not written in batches. This may affect the performance.
Parameters only for dimension tables
Parameter
Description
Data type
Required
Default value
Remarks
sdkMode
The SDK mode.
STRING
No
jdbc
Valid values:
jdbc: Queries are performed in JDBC mode. Point queries based on a primary key and queries based on a non-primary key are supported. However, queries based on a non-primary key affect the query performance and the query speed is slow. This is the default value.
rpc: Point queries are performed in RPC mode. If you set this parameter to rpc, the effect is similar to the effect when you set the useRpcMode parameter to true. Only point queries based on a primary key are supported. When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause. Compared with point queries that are performed in JDBC mode, point queries that are performed in RPC mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be queried in Hologres.
jdbc_fixed: In this mode, point queries are performed by using a fixed JDBC driver. This feature is in public preview. Only Hologres 1.3 or later supports this feature. Compared with point queries that are performed in JDBC mode, point queries that are performed in jdbc_fixed mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be queried in Hologres. Only point queries based on a primary key are supported. When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause. This mode does not allow you to query databases for which the data masking feature is enabled. For more information, see Mask data.
The value of this parameter varies based on the VVR version and the Hologres version:
VVR 6.0.3 and an earlier 6.X version: This parameter is not supported.
VVR 6.0.4 and a later 6.X version: We recommend that you set this parameter to jdbc.
NoteFor Realtime Compute for Apache Flink that uses VVR 6.0.6, if you set this parameter to jdbc, a null pointer error may occur when data of the STRING data type is queried in JDBC mode and the query results contain null values. To resolve this issue, we recommend that you use the RPC mode.
VVR 6.0.7 to VVR 8.0.1:
If the Hologres instance is of a version earlier than Hologres V2.0, Realtime Compute for Apache Flink uses the SDK mode that you configure.
In Hologres V2.0 or later, the RPC mode is no longer supported. If you set this parameter to rpc, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed. If you set this parameter to a value other than rpc, Realtime Compute for Apache Flink uses the SDK mode that you configure.
VVR 8.0.3 and later:
For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the RPC mode is no longer supported for tables regardless of the Hologres instance versions. If the RPC mode is used for tables, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed.
useRpcMode
Specifies whether to connect to the Hologres connector by using RPC.
BOOLEAN
No
false
Valid values:
true: RPC is used to connect to the Hologres connector. The effect of this setting is equivalent to the effect when you set the sdkMode parameter to rpc. If RPC is used, the number of SQL connections is reduced.
false: A JDBC driver is used to connect to the Hologres connector. This is the default value.
JDBC drivers require SQL connections. This increases the number of JDBC connections.
NoteThe RPC mode is no longer supported in Hologres V2.0. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed.
For Realtime Compute for Apache Flink that uses VVR 6.0.7 or VVR 8.0.1 and Hologres V2.0 or later, the SDK mode is automatically changed from RPC to jdbc_fixed.
For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the system automatically changes the SDK mode from RPC to jdbc_fixed.
connectionSize
The size of the JDBC connection pool that is created in a Realtime Compute for Apache Flink deployment.
INTEGER
No
3
If the deployment provides poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.
connectionPoolName
The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.
STRING
No
No default value
Set the value to a string other than
default
. If the same connection pool is configured for multiple tables, the value of the connectionSize parameter for the tables must be the same.You can specify this parameter based on your business requirements. For example, a deployment uses the following Hologres tables: Dimension Table A, Dimension Table B, Sink Table C, Sink Table D, and Sink Table E. You can configure the connection pool pool1 for Dimension Table A and Dimension Table B and the connection pool pool2 for Sink Table C and Sink Table D. You can configure the connection pool pool3 for Sink Table E in which a large number of data is processed.
NoteVVR version earlier than 4.0.12: This parameter is not supported.
VVR 4.0.12 to VVR 8.0.3: By default, tables do not share a connection pool. Each table uses its connection pool.
VVR 8.0.4 or later: By default, tables that use the same endpoint in the same deployment share a connection pool. If the number of tables in a deployment is excessively large, the connections in a connection pool may be relatively insufficient. This affects the deployment performance. In this case, we recommend that you set the connectionPoolName parameter to different values for different tables.
jdbcReadBatchSize
The maximum number of data records that can be processed at the same time for a point query in a Hologres dimension table.
INTEGER
No
128
N/A.
jdbcReadBatchQueueSize
The maximum number of queued requests allowed in a thread to perform a point query in a Hologres dimension table.
INTEGER
No
256
N/A.
jdbcReadTimeoutMs
The timeout period for performing a point query in a Hologres dimension table.
LONG
No
0
Default value: 0, which indicates that the point query in a Hologres dimension table never times out. Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.15-flink-1.13, vvr-6.0.2-flink-1.15, or their later minor versions supports this parameter.
jdbcReadRetryCount
The number of retries when a point query performed in a Hologres dimension table times out.
INTEGER
No
VVR version earlier than 8.0.5: 1
VVR 8.0.5 or later: 10
Only Realtime Compute for Apache Flink that uses VVR 6.0.3 or later supports this parameter.
This parameter is different from the jdbcRetryCount parameter. The jdbcRetryCount parameter specifies the maximum number of retries allowed to read and write data if a connection failure occurs.
jdbcScanFetchSize
The number of data records that can be processed at the same time by calling the scan operation when you perform a one-to-many table join. In a one-to-many table join, no complete primary key is used.
INTEGER
No
256
N/A.
jdbcScanTimeoutSeconds
The maximum time period for which you need to wait for a scan operation to complete.
INTEGER
No
60
Unit: seconds.
cache
The cache policy.
STRING
No
For more information, see the Remarks column.
Hologres supports only None and LRU cache policies. For more information, see Background information.
NoteThe default value of this parameter varies based on the VVR version:
For Realtime Compute for Apache Flink that uses VVR 4.X or later, the default value of this parameter is None.
For Realtime Compute for Apache Flink that uses a VVR version earlier than 4.X, the default value of this parameter is LRU.
cacheSize
The maximum number of rows of data records that can be cached.
INTEGER
No
10000
If you set the cache parameter to LRU, you can specify the cacheSize parameter.
cacheTTLMs
The interval at which the system refreshes the cache.
LONG
No
For more information, see the Remarks column.
Unit: milliseconds. The default value of the cacheTTLMs parameter varies based on the value of the cache parameter:
If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
If you set the cache parameter to None, the cacheTTLMs can be left empty. This indicates that cache entries do not expire.
cacheEmpty
Specifies whether to cache the JOIN queries whose return results are empty.
BOOLEAN
No
true
true: caches the JOIN queries whose return results are empty.
false: does not cache the JOIN queries whose return results are empty.
If the condition before AND is met but the condition after AND is not met in a JOIN statement, the JOIN queries whose return results are empty are also cached. Sample code:
LEFT JOIN latest_emergency FOR SYSTEM_TIME AS OF PROCTIME() AS t2 ON t1.alarm_id = t2.alarm_id -- If a dynamic alert exists, add the ID of the dynamic alert to JOIN queries for matching. Otherwise, you do not need to use the ID of the dynamic alert. AND CASE WHEN alarm_type = 2 THEN t1.dynamic_id = t2.dynamic_alarm_id ELSE true END
async
Specifies whether to enable data synchronization in asynchronous mode.
BOOLEAN
No
false
true: Data synchronization in asynchronous mode is enabled.
false: Data synchronization in asynchronous mode is disabled. This is the default value.
NoteData is not sorted when data is synchronized in asynchronous mode.
Data type mappings
For more information about the mappings between the data types in Realtime Compute for Apache Flink and Hologres, see Data type mappings between Realtime Compute for Apache Flink and Hologres.
Sample code
Sample code for a source table
Statement for creating a Hologres source table in which the binary logging feature is disabled
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' -- This parameter is optional.
'sdkmode' = 'jdbc'
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Statement for creating a Hologres source table in which the binary logging feature is enabled
Realtime Compute for Apache Flink can consume the binary log data of Hologres in real time. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.
Sample code for a sink table
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
Sample code for a dimension table
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
Feature description
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to access Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors. You can use the Hologres DataStream connectors of different versions stored in the Maven central repository. For Realtime Compute for Apache Flink that uses VVR 6.0.7, use the dependency of 1.15-vvr-6.0.7-1. For Realtime Compute for Apache Flink that uses VVR 8.0.7, download and use the dependency file ververica-connector-hologres-1.17-vvr-8.0.7.jar. When you debug a deployment in an on-premises environment, you must use the related uber JAR package. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. The uber JAR package that corresponds to VVR 8.0.7 is ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar.
Build an implementation class to read data from a Hologres source table
VVR provides the implementation class HologresBulkreadInputFormat of RichInputFormat to read data from a Hologres source table. The following examples show how to build the implementation class HologresBulkreadInputFormat to read data from a Hologres source table.
VVR 4.0.15
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// Build HologresBulkreadInputFormat to read data from the Hologres source table.
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7 and VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
XML
You can use the Hologres DataStream connectors of different versions stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Build an implementation class to read data from a Hologres source table for which binary logging is enabled
VVR provides the implementation class HologresBinlogSource of Source to read data from a Hologres source table for which binary logging is enabled. The following examples show how to build the implementation class HologresBinlogSource to read data from a Hologres source table for which binary logging is enabled.
VVR 4.0.15
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled.
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Create a slot and configure the default name of the slot.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Build the record converter to read data from the Hologres source table for which binary logging is enabled.
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled.
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 8.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
The buildRecordConverter method is not included in the dependency of the VVR connector. This method is provided in ververica-connector-demo.
For more information about the usage notes and implementation principles of Hologres binary logs, see the Create a Hologres source table for which the binary logging feature is enabled section of this topic.
Build an implementation class to write data to a Hologres sink table
VVR provides the implementation class HologresSinkFunction of OutputFormatSinkFunction to write data to a Hologres sink table. The following examples show how to build the implementation class OutputFormatSinkFunction to write data to a Hologres sink table.
VVR 4.0.15
// Initialize the schema of the sink table to which you want to write data.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres Writer to write data in the data structure of the RowData class.
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// Build HologresSinkFunction to write data to the Hologres sink table.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7 and VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres Writer to write data in the data structure of the RowData class.
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// Build HologresSinkFunction to write data to the Hologres sink table.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
The buildHologresWriter method is not included in the dependency of the VVR connector. This method is provided in ververica-connector-demo.
Time zones of Realtime Compute for Apache Flink and Hologres
Time types
Service | Time type | Description |
Realtime Compute for Apache Flink | The date and time without the time zone. Data of the TIMESTAMP type is a timestamp that represents the year, month, day, hour, minute, second, and fractional second. Data of the TIMESTAMP type can be a string, such as | |
An absolute point in time on the timeline. Data of the LONG type indicates the number of milliseconds that have elapsed since the epoch time. Data of the INT type indicates the number of nanoseconds in milliseconds. The epoch time refers to 00:00:00 UTC on January 1, 1970 in Java. Data of the TIMESTAMP_LTZ type is interpreted for calculations and visualization based on the time zone that is configured in the current session. The TIMESTAMP_LTZ type can be used for calculations across time zones. A value of the TIMESTAMP_LTZ type represents the same absolute point in time in different time zones based on the epoch time. A value of the TIMESTAMP_LTZ type may indicate different local values of the TIMESTAMP type in different time zones. For example, if a value of the TIMESTAMP_LTZ type is | ||
Hologres | TIMESTAMP | The date and time without the time zone, which is similar to the |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | The date and time with the time zone, which is similar to the For example, if the timestamp of the time zone of Beijing (UTC+8) is |
Time type mappings
If you set the
type-mapping.timestamp-converting.legacy
parameter to false in Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, you can perform conversions of all time types between Realtime Compute for Apache Flink and Hologres.Realtime Compute for Apache Flink
Hologres
Description
TIMESTAMP
TIMESTAMP
Time type conversions are performed without time zone conversions. We recommend that you use this type of time type conversion to read data from or write data to Hologres.
TIMESTAMP LTZ
TIMESTAMPTZ
TIMESTAMP
TIMESTAMPTZ
Time type conversions are performed with time zone conversions. To ensure data accuracy during conversion, you must specify the
table.local-time-zone
parameter to specify the time zone of Realtime Compute for Apache Flink. For more information about how to configure the parameters, see Console operations.For example, you specify
'table.local-time-zone': 'Asia/Shanghai'
to set the time zone of Realtime Compute for Apache Flink to the time zone of Shanghai (UTC+8). After you write the data 2022-01-01 01:01:01.123456 of the TIMESTAMP type from Realtime Compute for Apache Flink to Hologres, the data is converted to 2022-01-01 01:01:01: 01.123456+8 of the TIMESTAMPTZ type.TIMESTAMP LTZ
TIMESTAMP
In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later and you specify
type-mapping.timestamp-converting.legacy=true
or in Realtime Compute for Apache Flink that uses VVR 8.0.5 or earlier, data deviation may occur during time type conversions except for the conversions of the TIMESTAMP type.Realtime Compute for Apache Flink
Hologres
Description
TIMESTAMP
TIMESTAMP
Time type conversions are performed without time zone conversions. We recommend that you use this type of time type conversion to read data from or write data to Hologres.
TIMESTAMP LTZ
TIMESTAMPTZ
Data of the TIMESTAMP LTZ and TIMESTAMPTZ types is expressed as the time without the time zone when Realtime Compute for Apache Flink reads data from or write data to Hologres. This may cause data deviation.
For example, if data of the TIMESTAMP_LTZ type in Realtime Compute for Apache Flink is 2024-03-19T04:00:00Z, the time without the time zone in Shanghai (UTC+8) is 2024-03-19T12:00:00. However, when data is written to Hologres, 2024-03-19T04:00:00 is used as the time without the time zone and is converted to 2024-03-19T04:00:00+08 of the TIMESTAMPTZ type in Hologres. This causes an 8-hour data deviation.
TIMESTAMP
TIMESTAMPTZ
Time zone conversions are performed based on the time zone of JVM in the runtime environment instead of the time zone of Realtime Compute for Apache Flink. This is different from the time zone conversions in Realtime Compute for Apache Flink. If the time zone of Realtime Compute for Apache Flink is different from the time zone of JVM, data deviation may occur. We recommend that you read data from and write data to Hologres based on the time zone of Realtime Compute for Apache Flink.
TIMESTAMP LTZ
TIMESTAMP