All Products
Search
Document Center

Hologres:Use Flink to import data

Last Updated:Dec 17, 2024

Alibaba Cloud Realtime Compute for Apache Flink is an enterprise-level high-performance platform that is developed by Alibaba Cloud based on Apache Flink to process big data in real time. Hologres is seamlessly integrated with Realtime Compute for Apache Flink. You can use Realtime Compute for Apache Flink to write data to Hologres and query the data in real time. This helps you build a real-time data warehouse for your enterprise.

Description

  • Realtime Compute for Apache Flink does not store data. All data that is processed by Realtime Compute for Apache Flink is from external storage systems. Realtime Compute for Apache Flink supports the following data storage types:

    • Source tables

    Source tables contain data that is imported to Realtime Compute for Apache Flink. If you use a Hologres source table, Hologres data is imported in batch mode instead of streaming mode. Hologres scans the entire table and transfers the required data to Realtime Compute for Apache Flink at a time for data processing.

    • Dimension tables

    Dimension tables are suitable for point query scenarios in which data is queried based on primary keys. If you use a Hologres dimension table, we recommend that you use the row-oriented storage mode for the dimension table. All fields that are configured as the primary key must be used for JOIN operations.

    • Result tables

    Result tables are used to receive and store result data that is processed by Realtime Compute for Apache Flink and provide read and write interfaces for downstream data consumption.

  • Realtime Compute for Apache Flink is integrated with Hologres to provide the following enterprise-level advanced features:

    • Consumption of Hologres binary logs

    You can consume the change logs of Hologres tables by using various consumption modes in Message Queue.

    • Flink catalog

    Hologres catalogs are supported in Realtime Compute for Apache Flink. You can read Hologres metadata in the console of fully managed Flink without the need to register Hologres tables. Hologres catalogs improve development efficiency and ensure schema accuracy.

    • Schema evolution

    Schema evolution is supported in fully managed Flink. When Realtime Compute for Apache Flink reads JSON-formatted data, it automatically parses the data type and creates columns in the corresponding table. Dynamic evolution of schemas is supported.

  • The following table describes the Flink service types that are supported by Hologres and the features of the service types.

    Service type

    Data storage type

    Enterprise-level advanced feature

    Description

    Source table

    Result table

    Dimension table

    Consumption of Hologres binary logs

    Flink catalog

    Schema evolution

    Semi-managed Flink

    Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage or row-column hybrid storage for binary log source tables.

    Row-oriented storage and column-oriented storage are supported.

    We recommend that you use row-oriented storage or row-column hybrid storage.

    Supported.

    Supported.

    Supported.

    Uses the EMR Studio development platform.

    Blink in exclusive mode (discontinued)

    Row-oriented storage and column-oriented storage are supported. We recommend that you use row-oriented storage or row-column hybrid storage for binary log source tables.

    Row-oriented storage and column-oriented storage are supported.

    We recommend that you use row-oriented storage or row-column hybrid storage.

    Hologres V0.8 supports only row-oriented storage. Hologres V0.9 and later support both row-oriented storage and column-oriented storage. We recommend that you use row-oriented storage.

    Not supported.

    Not supported.

    Uses the Bayes development platform.

    We recommend that you use fully managed Flink of Realtime Compute for Apache Flink.

    Apache Flink V1.10

    Not supported.

    Row-oriented storage and column-oriented storage are supported.

    Not supported.

    Not supported.

    Not supported.

    Not supported.

    -

    Apache Flink V1.11 and later

    Not supported.

    Row-oriented storage and column-oriented storage are supported.

    We recommend that you use row-oriented storage.

    Not supported.

    Not supported.

    Not supported.

    The code of Hologres is publicly accessible in Apache Flink V1.11 and later. For more information, see alibabacloud-hologres-connectors.

    Apache Flink V1.13 and later

    Batch source tables are supported.

    Row-oriented storage and column-oriented storage are supported.

    We recommend that you use row-oriented storage.

    Not supported.

    Not supported.

    Not supported.

    The code of Hologres is publicly accessible in Apache Flink V1.11 and later. For more information, see alibabacloud-hologres-connectors.

Hologres connector release note

Flink version

Realtime Compute for Apache Flink (VVR version)

Hologres version

Update information

References

1.17

8.0.7

2.1.x

Dimension tables: If a dimension table contains a large number of fields, metadata is frequently obtained and draft deployment times out. This issue is resolved.

All types of tables: Different users access different tables. When the users perform operations on the tables in the same connection pool, an error message appears, indicating that the required permissions are not granted. This issue is resolved.

Use the Hologres connector of Realtime Compute for Apache Flink to consume data from Hologres in real time

1.17

8.0.6

2.1.x

Source tables

  • In Hologres V2.1 and later, the HoloHub mode is discontinued. You can no longer use Realtime Compute for Apache Flink to consume Hologres binary logs in HoloHub mode. If you use the Hologres connector of Realtime Compute for Apache Flink that uses VVR 8.0.6 and the Hologres instance version is later than V2.1, the Hologres connector automatically changes the HoloHub mode to the Java Database Connectivity (JDBC) mode. For more information, see Use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time.

Hologres catalogs

  • When you create a catalog, you can specify a virtual warehouse in the format of dbname@warehouse_name. If you specify a follower virtual warehouse, tables of the Hologres catalog can be used only as source tables and dimension tables but not result tables in Flink SQL deployments.

All types of tables

  • You can configure the type-mapping.timestamp-converting.legacy parameter to enable time type conversions between Realtime Compute for Apache Flink and Hologres. This way, the TIMESTAMP_LTZ data type of Realtime Compute for Apache Flink is supported by Hologres. For more information, see Time zones of Realtime Compute for Apache Flink and Hologres in the "Hologres connector" topic.

1.17

8.0.5

2.0.x

2.1.x

Source tables

  • For Hologres V2.1 and later, if you use JDBC to consume Hologres binary logs, you do not need to create slots. For more information, see Use JDBC to consume Hologres binary logs. Since Flink 1.17, if the version of your Hologres instance is later than V2.1, publications and slots are not automatically created.

Result tables

  • The deduplication.enabled parameter is added. The default value is true. If you set this parameter to false, data is not deduplicated when the data is written to and aggregated in result tables. This feature is suitable for scenarios such as full playback of upstream Change Data Capture (CDC) streaming data.

  • Tables without primary keys support batch writes. Batch writes consume fewer Hologres resources than the jdbc_copy mode.

All types of tables

  • You can enable transmission encryption by configuring the connection.ssl.mode and connection.ssl.root-cert.location parameters.

  • A timeout parameter is added for internal JDBC connections. This prevents long waiting time for client connections in scenarios such as server restarts upon failures.

1.17

8.0.4

2.0.x

2.1.x

Source tables

  • A publication is automatically created when you use JDBC to consume binary logs. However, if a table is rebuilt, the existing publication is not automatically deleted. As a result, binary logs of the rebuilt table cannot be consumed. In this version, the system automatically deletes the existing publication to resolve this issue.

All types of tables

  • A default value is added to the connection pool parameter. Hologres dimension tables and result tables in the same task share a connection pool.

1.17

8.0.3

2.0.x

2.1.x

All types of tables

  • Dimension tables and result tables in all Hologres versions do not support the remote procedure call (RPC) mode. If you specify the RPC mode, the system automatically switches the RPC mode to the jdbc_fixed mode. If your Hologres instance runs in an earlier version, we recommend that you upgrade your instance.

Hologres connector

1.15

6.0.7

  • 1.3.x

  • 2.0.x

  • Source tables

    The Hologres connector is updated to adapt to Hologres V2.0 and later. If a Hologres instance of V2.0 or later is connected, the HoloHub mode that is used for binary log source tables is automatically switched to the JDBC mode.

  • Dimension tables

    The Hologres connector is updated to adapt to Hologres V2.0 and later. If a Hologres instance of V2.0 or later is connected, the RPC mode that is used for dimension tables is automatically switched to the jdbc_fixed mode.

  • Result tables

    • The Hologres connector is updated to adapt to Hologres V2.0 and later. If a Hologres instance of V2.0 or later is connected, the RPC mode that is used for result tables is automatically switched to the jdbc_fixed mode.

    • The feature for updating specific columns is added. You can insert only the fields that are declared in the INSERT statement of Realtime Compute for Apache Flink. This feature simplifies the merging of data into a wide table.

  • All types of tables

    If a record conversion exception occurs in the connector, the source data and the current conversion result are displayed to facilitate troubleshooting of dirty data.

  • Fixed issues

    • The following issue is fixed: Errors are not displayed when different instances or databases use the same connectionPoolName in a deployment.

    • The following issue is fixed: If columns of the STRING type in dimension tables contain null values in VVR 6.0.6 of Realtime Compute for Apache Flink, a NullPointerException (NPE) error is returned.

Hologres connector

1.15

6.0.6

1.3.x

Source tables

  • If you consume Hologres binary logs in JDBC mode, the slot name parameter is not required. You can create a default slot to facilitate switchovers from the HoloHub mode to the JDBC mode.

  • The enable_filter_push_down parameter is added. If Realtime Compute for Apache Flink reads data from a Hologres source table in batch mode, filter pushdown is not automatically performed. If you want to enable filter pushdown, set this parameter to true.

Hologres connector

1.15

6.0.5

1.3.x

  • All types of tables: When a deployment starts, all parameter information is printed in the TasknManager log to facilitate troubleshooting.

  • CREATE TABLE AS (CTAS)/CREATE DATABASE AS (CDAS): The field type normalization mode is supported. In this mode, data type changes in the source table do not lead to a deployment failure if the data types before and after the change can be converted into the same data type based on type normalization rules.

  • Hologres catalogs: The ALTER TABLE statement can be executed to modify Hologres physical table properties. For example, you can change table names, add columns, rename columns, and modify column comments.

1.15

6.0.3 to 6.0.4

1.3.x

Source tables

  • Hologres binary logs can be consumed in JDBC mode. This mode supports many data types and allows you to create custom accounts.

  • Filter pushdown is supported by source tables that contain full and incremental data and are in the full phase and batch source tables.

Result tables

Data can be written to result tables in fixed copy mode. This mode is supported in Hologres V1.3. The streaming mode is used in fixed copy mode. In JDBC mode, data is processed in batches. Therefore, writes in fixed copy mode have higher throughput, lower latency, and less client memory consumption than writes in JDBC mode.

Hologres catalogs

  • Default table properties can be used when you create Hologres catalogs.

sdkMode parameter: Multiple modes are available for different types of tables in Hologres. The sdkMode parameter is used to specify a mode for a table in Hologres.

1.13

4.0.18

1.1 and later

The following issue is fixed: Write performance is negatively affected if sink table-related metrics are reported.

1.13 and 1.15

4.0.15 and 6.0.2

1.1 and later

Source tables

  • Batch source tables are case-sensitive by default.

  • The timeout duration for transactions that are involved in the scan operations on batch source tables can be configured.

  • The following issue is fixed: Complex strings in batch source tables may fail to be parsed.

  • The Upsert mode is added for source tables that contain full and incremental data.

Dimension tables

Timeout duration settings for asynchronous requests are supported by Hologres dimension tables. The timeout duration is specified by the asyncTimeoutMs parameter.

Result tables

  • The PARTITION BY statement is supported. When you create a Hologres result table by using the CREATE TABLE AS statement, you can use the PARTITION BY statement to define a partitioned table.

  • The metric currentSendTime is supported.

1.13

4.0.13

1.1 and later

  • Consumption of full and incremental data in a source table is supported.

  • The DataStream API is supported.

1.13

4.0.11

0.10 and later

The CREATE TABLE AS and CREATE DATABASE AS statements are supported.

1.13

4.0.8

0.10 and later

Hologres catalogs are supported by result tables, source tables, and dimension tables.

Manage Hologres catalogs

1.13

3.0.0

0.9 and later

Real-time consumption of Hologres data is supported.

Fully managed Flink

Known issues and fixed versions of the Hologres connector

Severity

Issue

Affected version

Fixed version

Solution

P1

If a dimension table contains a large number of fields, draft deployment times out.

8.0.6

8.0.7

We recommend that you upgrade the version of your Hologres instance.

P0

If the enable_filter_push_down parameter is set to true for batch source tables, filter pushdown does not take effect. As a result, the data that needs to be filtered out is also read.

Note

This issue does not exist for source tables that contain full and incremental data and binary log source tables.

8.0.5 and earlier

8.0.6

We recommend that you upgrade the version of your Hologres instance.

P0

If you write data of the JSON or JSONB data type to Hologres in the FixedFE mode and the data is invalid, the connected FE node restarts and the FE connection is interrupted. The FixedFE mode corresponds to the jdbc_fixed mode in the Hologres connector.

8.0.5 and earlier

None

If the source data of the JSON or JSONB data type may be invalid, we recommend that you use the JDBC mode to write the data to Hologres.

P1

If a JDBC dimension table is joined with multiple tables, internal exceptions such as connection failures cannot be reported. In this case, the nodes that are joined in asynchronous manner have backpressure, and data no longer flows. This issue occurs with a small probability.

6.0.7 and earlier

8.0.3

We recommend that you upgrade your instance version. You can also restart the deployment.

P1

When you consume Hologres binary logs in JDBC mode, memory leaks may occur. If memory leaks occur, the consumption rate may be high when a job starts and may then continue to decrease.

6.0.7 and earlier

6.0.7

We recommend that you upgrade your instance version. For DataStream deployments, you need to use the dependency of version 1.15-vvr-6.0.7-1.

P0

Exceptions that are captured by scheduled flush operations in JDBC mode are reported only when the next data record is written. Scheduled flush operations are controlled by the jdbcWriteFlushInterval parameter. If a small amount of data is written, a checkpoint operation may be successfully performed during the period when an exception is captured but is not reported. In this case, if a data write failure occurs, the deployment starts to rewrite data from the checkpoint. As a result, data may be lost.

6.0.6 and earlier

6.0.7

This issue is likely to occur when the amount of data is small. We recommend that you upgrade your instance version or change the value of the jdbcWriteFlushInterval parameter to be greater than the checkpoint interval.

P2

If you do not specify the slot name when you consume binary logs in JDBC mode, the system automatically creates a slot and uses the default name. If the table name contains special characters or the schema name, the name of the automatically created slot is invalid, and the slot cannot be used. As a result, a syntax error is reported.

6.0.6

6.0.7

We recommend that you upgrade your instance version. For DataStream deployments, you need to use the dependency of version 1.15-vvr-6.0.7-1.

P1

If different Hologres instances or databases in a deployment use the same connectionPoolName, exceptions may occur. For example, a table is not found.

6.0.6 and earlier

6.0.7

Specify different values for the connectionPoolName parameter for different Hologres instances or databases used for a deployment.

P1

If a dimension table contains a null string, an NPE exception is reported.

6.0.6

6.0.7

We recommend that you upgrade the version of your Hologres instance.

P0

By default, filter pushdown is enabled for a Hologres source table. If a deployment uses a Hologres dimension table and the data manipulation language (DML) statement that is used to write data contains filter conditions on non-primary key fields in the dimension table, filter pushdown is incorrectly performed on the dimension table. This may lead to an invalid join of the dimension table.

6.0.3 to 6.0.5

6.0.6

We recommend that you upgrade the version of your Hologres instance.

P0

If different values of the mutatetype parameter are specified for multiple result tables but the same value of the connectionPoolName parameter is specified for the result tables, the mutatetype parameter configuration may be overwritten, and the configuration does not take effect.

6.0.2 and earlier

6.0.3

Set mutatetype to InsertOrUpdate for all result tables. You can also specify different values of the connectionPoolName parameter for tables whose values of the mutatetype parameter are different.

P1

An NPE exception is reported if the hg_binlog_timestamp_us field is declared in the data definition language (DDL) statement of a binary log source table.

6.0.2

6.0.3

Do not use the specified field, or upgrade the version of your Hologres instance.

P1

Metric reporting negatively affects write performance of result tables. The troubleshooting result shows that the thread dump of the sink node is stuck at reportWriteLatency.

4.0.15 to 4.0.17

4.0.18

Use a version that is not affected by this issue.

P2

Strings that contain special characters fail to be parsed when you read multiple rows of data of the string or string array types from a source table at a time.

4.0.14 and earlier

4.0.15

Clear the dirty data in the source table or upgrade the version of your Hologres instance.

P2

If you declare unique binary log fields such as hg_binlog in the DDL statement that is used to process full and incremental data of a source table, only part of data can be consumed.

4.0.13

4.0.14

Do not use the full and incremental data synchronization features, or upgrade the version of your Hologres instance.