This topic describes how to use the Hologres connector.
Background information
Hologres is an end-to-end real-time data warehouse service that allows you to write, update, and analyze large amounts of data in real time. It is compatible with the PostgreSQL protocol and supports standard SQL syntax. Hologres supports online analytical processing (OLAP) and ad hoc queries for petabytes of data, and provides high-concurrency, low-latency online data services. Hologres is seamlessly integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, and provides full-stack online and offline data warehouse solutions. The following table describes the capabilities supported by the Hologres connector.
Item | Description |
Supported type | Source table, dimension table, and sink table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric |
|
API type | DataStream API and SQL API |
Data update or deletion in a sink table | Supported |
Features
Features supported for a source table
Feature | Description |
Real-time consumption of Hologres data |
For more information, see Use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time. |
Features supported for a sink table
Feature | Description |
Writes changelogs to a Hologres sink table. | |
Updates only data in specific fields instead of data in an entire row. | |
Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements | Synchronizes data from an entire database or a single table to Hologres tables in real time and synchronizes schema changes of each table to Hologres tables in real time. |
Data update in specific columns Note Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later supports this feature. | Pushes down the column names that are specified in the INSERT statement of Realtime Compute for Apache Flink to the Hologres connector to update only the specified columns. |
Prerequisites
A Hologres table is created. For more information, see Manage internal tables.
Limits
Limits on all types of Hologres tables
Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Hologres connector.
The Hologres connector cannot be used to access Hologres foreign tables. For more information about Hologres foreign tables, see Create foreign tables mapped to MaxCompute in the HoloWeb console.
For more information about the known defects and release notes of the Hologres connector, see the "Hologres connector release note" section of the Overview topic.
Limits only on Hologres source tables
By default, Realtime Compute for Apache Flink performs a full table scan only once to read all data from a Hologres source table at a time. Data consumption is complete when the data scan ends. Realtime Compute for Apache Flink does not read the data that is appended to the Hologres source table after the data consumption. Realtime Compute for Apache Flink that uses VVR 3.0.0 or later can consume Hologres data in real time. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time. In Realtime Compute for Apache Flink that uses VVR 6.0.3 or later, filter pushdown is supported when the Hologres connector reads data from Hologres. For more information, see the description of the
enable_filter_push_down
parameter that is used for source tables.Realtime Compute for Apache Flink that uses a VVR version earlier than 8.0 does not support watermarks for a Hologres source table in CDC mode. If you want to perform window aggregation on a Hologres source table, you can use a different approach to perform time-based aggregation. For more information, see the "MySQL CDC source tables and Hologres CDC source tables do not support window functions. How do I implement minute-level data aggregation on a MySQL CDC source table or Hologres CDC source table?" section of the FAQ about CDC topic.
Limits only on Hologres sink tables: None.
Limits only on Hologres dimension tables
We recommend that you specify a primary key as the JOIN condition for a Hologres dimension table when you join the Hologres dimension table with another table and use row-oriented storage to create a Hologres dimension table. Column-oriented storage incurs large performance overheads for point queries. When you use row-oriented storage to create a Hologres dimension table, you must specify the primary key as the clustering key. For more information, see CREATE TABLE.
If you cannot use the primary key as the JOIN condition and you perform a point query by using a non-primary key, we recommend that you use column-oriented storage when you create a Hologres table. If you perform a point query by using a non-primary key, a row of data is queried and multiple rows of data are returned. In this case, we recommend that you specify the distribution key and event time column (segment key) to optimize query performance. For more information, see Distribution key, Event time column (segment key), and Storage models of tables: row-oriented storage, column-oriented storage, and row-column hybrid storage.
In Realtime Compute for Apache Flink that uses VVR of a version earlier than 4.0, you can perform a point query by using the primary key of a Hologres dimension table when you join the Hologres dimension table with another table. In Realtime Compute for Apache Flink that uses VVR 4.0 or later, you can perform a point query by using a non-primary key of a Hologres dimension table when you join the Hologres dimension table with another table and a JDBC-related mode is used for the Hologres dimension table.
Precautions
If you set the sdkMode parameter to rpc for a table, take note of the following items when you upgrade the VVR version:
The SDK modes that can be configured for dimension tables and sink tables of Hologres V2.0 and later are changed. You cannot set the sdkMode parameter to rpc for dimension tables and sink tables. Only the Java Database Connectivity (JDBC)-related modes can be used for tables. The JDBC-related modes include the JDBC, jdbc_fixed, and jdbc_copy modes. If you set the sdkMode parameter to rpc for a table, Realtime Compute for Apache Flink does not deduplicate data that has the same primary key in the same batch. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication. You can also upgrade the VVR version to 8.0.5 and set the deduplication.enabled parameter to false to avoid deduplication.
If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X for a deployment in which data of Hologres V2.0 is read or written in remote procedure call (RPC) mode, you can configure the SDK mode based on the upgrade scenario.
If you upgrade the VVR version from 4.X to a 6.X minor version that ranges from 6.0.4 to 6.0.6, an error may be returned. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed for dimension tables and sink tables.
If you upgrade the VVR version from 4.X to 6.0.7 or later, no action is required. Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to JDBC-related modes.
If you set the sdkMode parameter to holohub for a binary log source table, take note of the following items when you upgrade the VVR version:
In Hologres V2.0 and later, you can set the sdkMode to holohub for Hologres binlog source tables only in specific scenarios. In Hologres V2.1 and later, you cannot set the sdkMode parameter to holohub for Hologres binlog source tables and you can only set the sdkMode parameter to jdbc for Hologres binlog source tables.
If a binary log source table is consumed in your deployment and you do not set the sdkMode parameter to jdbc, the HoloHub mode is used for the binary log source table by default. If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X, take note of the following points based on the Hologres version when you configure the SDK mode.
Hologres V2.0
If you upgrade the VVR version from 4.X to a VVR version that ranges from 6.0.7 to 8.0.3, Realtime Compute for Apache Flink still reads data from a binary log source table in HoloHub mode.
If you upgrade the VVR version from 4.X to 8.0.4 or later, a permission issue may occur. You must configure permissions. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.
Hologres V2.1
If you upgrade the VVR version from 4.X to a VVR version that ranges from 6.0.7 to 8.0.4, binary log data may not be consumed as expected. We recommend that you upgrade the VVR version to 8.0.5.
If you upgrade the VVR version from 4.X to 8.0.5 or later, no operation is required. Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC.
Syntax
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'sdkmode' = 'jdbc'
);
Parameters in the WITH clause
Data type mappings
For more information about the mappings between the data types in Realtime Compute for Apache Flink and Hologres, see Data type mappings between Realtime Compute for Apache Flink and Hologres.
Sample code
Sample code for a source table
Statement for creating a Hologres source table in which the binary logging feature is disabled
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' -- This parameter is optional.
'sdkmode' = 'jdbc'
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Statement for creating a Hologres source table in which the binary logging feature is enabled
Realtime Compute for Apache Flink can consume the binary log data of Hologres in real time. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.
Sample code for a sink table
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
Sample code for a dimension table
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
Feature description
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to access Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors. You can use the Hologres DataStream connectors of different versions stored in the Maven central repository. For Realtime Compute for Apache Flink that uses VVR 6.0.7, use the dependency of 1.15-vvr-6.0.7-1. For Realtime Compute for Apache Flink that uses VVR 8.0.7, download and use the dependency file ververica-connector-hologres-1.17-vvr-8.0.7.jar. When you debug a deployment in an on-premises environment, you must use the related uber JAR package. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. The uber JAR package that corresponds to VVR 8.0.7 is ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar.
Build an implementation class to read data from a Hologres source table
VVR provides the implementation class HologresBulkreadInputFormat of RichInputFormat to read data from a Hologres source table. The following examples show how to build the implementation class HologresBulkreadInputFormat to read data from a Hologres source table.
VVR 4.0.15
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// Build HologresBulkreadInputFormat to read data from the Hologres source table.
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7 and VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
XML
You can use the Hologres DataStream connectors of different versions stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Build an implementation class to read data from a Hologres source table for which binary logging is enabled
VVR provides the implementation class HologresBinlogSource of Source to read data from a Hologres source table for which binary logging is enabled. The following examples show how to build the implementation class HologresBinlogSource to read data from a Hologres source table for which binary logging is enabled.
VVR 4.0.15
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled.
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Create a slot and configure the default name of the slot.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Build the record converter to read data from the Hologres source table for which binary logging is enabled.
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled.
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 8.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
The buildRecordConverter method is not included in the dependency of the VVR connector. This method is provided in ververica-connector-demo.
For more information about the usage notes and implementation principles of Hologres binary logs, see the Create a Hologres source table for which the binary logging feature is enabled section of this topic.
Build an implementation class to write data to a Hologres sink table
VVR provides the implementation class HologresSinkFunction of OutputFormatSinkFunction to write data to a Hologres sink table. The following examples show how to build the implementation class OutputFormatSinkFunction to write data to a Hologres sink table.
VVR 4.0.15
// Initialize the schema of the sink table to which you want to write data.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres Writer to write data in the data structure of the RowData class.
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// Build HologresSinkFunction to write data to the Hologres sink table.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7 and VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the sink table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Configure Hologres-related parameters.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres Writer to write data in the data structure of the RowData class.
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// Build HologresSinkFunction to write data to the Hologres sink table.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
The buildHologresWriter method is not included in the dependency of the VVR connector. This method is provided in ververica-connector-demo.
Time zones of Realtime Compute for Apache Flink and Hologres
Time types
Service | Time type | Description |
Realtime Compute for Apache Flink | The date and time without the time zone. Data of the TIMESTAMP type is a timestamp that represents the year, month, day, hour, minute, second, and fractional second. Data of the TIMESTAMP type can be a string, such as | |
An absolute point in time on the timeline. Data of the LONG type indicates the number of milliseconds that have elapsed since the epoch time. Data of the INT type indicates the number of nanoseconds in milliseconds. The epoch time refers to 00:00:00 UTC on January 1, 1970 in Java. Data of the TIMESTAMP_LTZ type is interpreted for calculations and visualization based on the time zone that is configured in the current session. The TIMESTAMP_LTZ type can be used for calculations across time zones. A value of the TIMESTAMP_LTZ type represents the same absolute point in time in different time zones based on the epoch time. A value of the TIMESTAMP_LTZ type may indicate different local values of the TIMESTAMP type in different time zones. For example, if a value of the TIMESTAMP_LTZ type is | ||
Hologres | TIMESTAMP | The date and time without the time zone, which is similar to the |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | The date and time with the time zone, which is similar to the For example, if the timestamp of the time zone of Beijing (UTC+8) is |
Time type mappings
If you set the
type-mapping.timestamp-converting.legacy
parameter to false in Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, you can perform conversions of all time types between Realtime Compute for Apache Flink and Hologres.Realtime Compute for Apache Flink
Hologres
Description
TIMESTAMP
TIMESTAMP
Time type conversions are performed without time zone conversions. We recommend that you use this type of time type conversion to read data from or write data to Hologres.
TIMESTAMP LTZ
TIMESTAMPTZ
TIMESTAMP
TIMESTAMPTZ
Time type conversions are performed with time zone conversions. To ensure data accuracy during conversion, you must specify the
table.local-time-zone
parameter to specify the time zone of Realtime Compute for Apache Flink. For more information about how to configure the parameters, see Console operations.For example, you specify
'table.local-time-zone': 'Asia/Shanghai'
to set the time zone of Realtime Compute for Apache Flink to the time zone of Shanghai (UTC+8). After you write the data 2022-01-01 01:01:01.123456 of the TIMESTAMP type from Realtime Compute for Apache Flink to Hologres, the data is converted to 2022-01-01 01:01:01: 01.123456+8 of the TIMESTAMPTZ type.TIMESTAMP LTZ
TIMESTAMP
In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later and you specify
type-mapping.timestamp-converting.legacy=true
or in Realtime Compute for Apache Flink that uses VVR 8.0.5 or earlier, data deviation may occur during time type conversions except for the conversions of the TIMESTAMP type.Realtime Compute for Apache Flink
Hologres
Description
TIMESTAMP
TIMESTAMP
Time type conversions are performed without time zone conversions. We recommend that you use this type of time type conversion to read data from or write data to Hologres.
TIMESTAMP LTZ
TIMESTAMPTZ
Data of the TIMESTAMP LTZ and TIMESTAMPTZ types is expressed as the time without the time zone when Realtime Compute for Apache Flink reads data from or write data to Hologres. This may cause data deviation.
For example, if data of the TIMESTAMP_LTZ type in Realtime Compute for Apache Flink is 2024-03-19T04:00:00Z, the time without the time zone in Shanghai (UTC+8) is 2024-03-19T12:00:00. However, when data is written to Hologres, 2024-03-19T04:00:00 is used as the time without the time zone and is converted to 2024-03-19T04:00:00+08 of the TIMESTAMPTZ type in Hologres. This causes an 8-hour data deviation.
TIMESTAMP
TIMESTAMPTZ
Time zone conversions are performed based on the time zone of JVM in the runtime environment instead of the time zone of Realtime Compute for Apache Flink. This is different from the time zone conversions in Realtime Compute for Apache Flink. If the time zone of Realtime Compute for Apache Flink is different from the time zone of JVM, data deviation may occur. We recommend that you read data from and write data to Hologres based on the time zone of Realtime Compute for Apache Flink.
TIMESTAMP LTZ
TIMESTAMP