This topic provides answers to some frequently asked questions about Blink and Flink when you use Hologres.
Background information
Hologres performance
Write performance
Column-oriented table: InsertOrIgnore > InsertOrReplace > InsertOrUpdate
Row-oriented table: InsertOrReplace = InsertOrUpdate > InsertOrIgnore
Insert mode
Description
InsertOrIgnore
Discards the data that you want to write if the inserted primary key has the same value as the primary key in the result table.
InsertOrReplace
Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the null value is inserted into the columns that are not overwritten.
InsertOrUpdate
Updates the table based on the inserted primary key if the inserted primary key has the same value as the primary key in the result table. If the written data does not cover all columns, the columns that are not overwritten are not updated.
Point query performance
Row-oriented storage = Row-column hybrid storage > Column-oriented storage
Support for the Blink, Realtime compute for Apache Flink (VVP), and Apache Flink service types
Service type
Data storage
Description
Source table
Result table
Dimension table
Binary logging
Hologres catalog
Fully managed Flink
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage is recommended.
Supported.
Supported.
None.
Blink in exclusive mode
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage is recommended.
Hologres V0.8 supports only row-oriented storage. Hologres V0.9 and later support row-oriented storage and column-oriented storage. Row-oriented storage is recommended.
Not supported.
Blink in exclusive mode will be discontinued. We recommend that you use fully managed Flink.
Apache Flink V1.10
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage and column-oriented storage are supported.
None.
Not supported.
Not supported.
None.
Apache Flink V1.11 and later
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage and column-oriented storage are supported.
Row-oriented storage is recommended.
Not supported.
Not supported.
The code of Hologres is publicly accessible in Apache Flink V1.11 and later. For more information, see alibabacloud-hologres-connectors.
Sample SQL statement for mapping a Blink or Flink table to a Hologres table:
create table holo_source( 'hg_binlog_lsn' BIGINT HEADER, 'hg_binlog_event_type' BIGINT HEADER, 'hg_binlog_timestamp_us' BIGINT HEADER, A int, B int, C timestamp ) with ( type = 'hologres', 'endpoint' = 'xxx.hologres.aliyuncs.com:80', -- The endpoint of the Hologres instance. 'userName' = '', -- The AccessKey ID of your Alibaba Cloud account. 'password' = '', -- The AccessKey secret of your Alibaba Cloud account. 'dbName' = 'binlog', -- The name of the database in the Hologres instance. 'tableName' ='test' -- The name of the table in the Hologres instance. 'binlog' = 'true', );
A Blink, Flink (VVP), or Flink SQL table is created in Flink and mapped to a physical table in Hologres based on parameter settings. Blink, Flink (VVP), or Flink SQL tables cannot be mapped to foreign tables in Hologres.
Troubleshoot the issue of slow real-time writes
Verify write-related configurations.
Verify the following configurations:
The storage modes of the destination table, including row-oriented storage, column-oriented storage, and hybrid row-column storage.
The insert mode, such as InsertOrIgnore, InsertOrUpdate, and InsertOrReplace.
The table group and shard count of the destination table.
Check the real-time write latency metric.
If the average write latency reaches 100 milliseconds or several seconds, the backend server reaches the write performance bottleneck. In this case, the following issues may occur:
If the column-oriented table uses the InsertOrUpdate mode and the write operation involves a large amount of traffic, the CPU load on the instance and the write latency are high.
Solution: Use a row-oriented table, or use a hybrid row-column storage table if your Hologres instance version is V1.1 and later.
The CPU load of the instance monitored by CloudMonitor approaches 100% while the column-oriented table is not updated. This issue may occur due to high queries per second (QPS) or a large amount of written data.
Solution: Scale up the instance.
Continuous executions of the
INSERT INTO SELECT FROM
statement trigger batch writes, which blocks real-time writes.Solution: Convert batch writes into real-time writes, or perform batch writes during off-peak hours.
Check whether data skew exists.
Execute the following SQL statement to check whether data skew exists:
SELECT hg_shard_id, count(1) FROM t1 GROUP BY hg_shard_id ORDER BY hg_shard_id;
Solution: If data skew exists, change the distribution key to balance data distribution.
Check whether heavy workloads exist in backend clusters.
If no issues are identified after you perform the preceding checks, a sudden drop in the write performance is typically caused by heavy workloads in backend clusters. In this case, contact the technical support. For more information, see Obtain online support for Hologres.
Check whether backpressure exists in Blink or Flink.
If no issues are identified after you perform the preceding checks on Hologres, slow real-time writes may be caused by the slow operation of Blink or Flink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. For more information, contact the Flink technical support.
Troubleshoot issues with written data
Issues with written data are typically caused by out-of-order data. For example, if data that uses the same primary key is distributed in different Flink tasks, the data cannot be written in the original order. You must check the logic of Flink SQL and make sure that the written data is shuffled based on the primary key of the destination Hologres table.
Troubleshoot queries in the dimension table
JOIN operations for dimension tables and two data streams
In scenarios that involve data read in Hologres, you must check whether the JOIN operations for dimension tables are used and correct. You must make sure that you are not using JOIN operations for two data streams. The following code provides an example of the JOIN operations for dimension tables in Hologres. If
proctime AS PROCTIME()
andhologres_dim FOR SYSTEM_TIME AS
are missing, the operations become JOIN operations on two streams.CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, proctime AS PROCTIME() ) with ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE hologres_dim ( a INT, b VARCHAR, c VARCHAR ) with ( 'connector' = 'hologres', ... ); CREATE TEMPORARY TABLE blackhole_sink ( a INT, b STRING ) with ( 'connector' = 'blackhole' ); insert into blackhole_sink select T.a,H.b FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
Queries in the dimension table
Check the storage mode of the dimension table.
Check whether the storage mode of the dimension table is row-oriented storage, column-oriented storage, or hybrid row-column storage.
Check whether the latency of queries in the dimension table is high.
Backpressure on the Flink or Blink join node is a common issue in the use of dimension tables and causes low throughput.
Check the join mode of the Flink dimension table.
The JOIN operation of Hologres dimension table connectors of Flink can be performed in synchronous or asynchronous mode. The asynchronous mode is better than the synchronous mode. The following code provides an example on how to enable the asynchronous mode:
CREATE TABLE hologres_dim( id INT, len INT, content VARCHAR ) with ( 'connector'='hologres', 'dbname'='<yourDbname>', -- The name of the Hologres database. 'tablename'='<yourTablename>', --The name of the Hologres table to which data is written. 'username'='<yourUsername>', --The AccessKey ID of your Alibaba Cloud account. 'password'='<yourPassword>', --The AccessKey secret of your Alibaba Cloud account. 'endpoint'='<yourEndpoint>' -- The Virtual Private Cloud (VPC) endpoint of the Hologres instance. 'async' = 'true'-- Enable the asynchronous mode. );
Check the query latency in the backend.
Check the real-time write latency metric.
Check whether the dimension table is a column-oriented table. Column-oriented dimension tables require high overheads in scenarios with a high QPS.
If the dimension table is a row-oriented table, high latency is typically caused by the high load of the instance. In this case, you must scale up the instance.
Check whether the join key is the primary key.
Hologres connectors of VVR 4.x (Flink 1.13) and later allow queries by using Holo Client based on columns other than primary key columns. In this case, the query performance is usually low and the instance load is high. In addition, tables are not optimized when they are created. You must optimize the table structure. A common practice is to configure the join key as the distribution key, which enables shard pruning.
Check backpressure on the Blink node.
If no issues are identified after you perform the preceding checks on Hologres, slow queries may be caused by the slow operation of Blink. In this case, you must check backpressure on the sink node. If only one sink node is used, you cannot check whether backpressure exists. You must check the components of the sink node. You can use the same method to check backpressure on the join node. For more information, contact the Flink technical support.
Usage notes of connections
By default, Hologres connectors use a Java Database Connectivity (JDBC)-related mode.
The JDBC_FIXED mode is supported. This mode does not occupy connections, and binary log consumption in this mode is not subject to limits on the maximum number of walsenders. For more information, see Hologres connector.
For Realtime Compute for Apache Flink whose engine version is VVR-8.0.5-Flink-1.17 or later, connection reuse is enabled by default by the
'connectionPoolName' = 'default'
configuration. In most cases, this configuration does not affect performance of jobs. However, if a job involves a large number of tables, performance may degrade after the Flink engine is upgraded. In this case, we recommend that you set theconnectionPoolName
parameter to other values for frequently accessed tables to optimize performance.In JDBC mode, a certain number of connections are occupied. The following table describes the default number of connections that are used by different types of tables.
Table type
Default number of connections for each pod in a Flink job
Binary log source table
0
Source table for batch reading
1
Dimension table
3 (The value can be changed by using the
connectionSize
parameter.)Result table
3 (The value can be changed by using the
connectionSize
parameter.)Calculation of the number of connections
Default setting
By default, the maximum number of connections that a job can use is calculated by using the following formula:
Maximum number of connections = (Number of source tables × 1 + Number of dimension tables × connectionSize + Number of result tables × connectionSize) × Job parallelism
For example, a job has a source table that contains full and incremental data, two dimension tables, and three result tables, and all these tables use the default value of the
connectionSize
parameter. If the job parallelism is5
, the number of connections used is 80, which is calculated by using the following formula:(1 × 1 + 2 × 3 + 3 × 3) × 5 = 80
.Connection reuse
Realtime Compute for Apache Flink whose engine version is Flink 1.13-VVR 4.1.12 or later supports connection reuse. If dimension tables and result tables within the same pod of a job are configured with the same value for the
connectionPoolName
parameter, the dimension tables and result tables use the same connection pool. In the preceding example, if the two dimension tables and three result tables are configured with the same value for theconnectionPoolName
parameter, andconnectionSize
is set to5
for these tables, the number of connections used is 30, which is calculated by using the following formula:(1 × 1 + 5) × 5 = 30
.NoteThe connection reuse mode is applicable to most scenarios. However, in scenarios with a large number of dimension tables or in scenarios without enabling the asynchronous mode and caching, synchronous point queries are frequently performed. This way, multi-table connection reuse may cause slow queries. In this case, you can configure connection reuse only for result tables.
Other scenarios
When a job is started, three to six connections need to be established to verify the metadata of a table. After the job runs as expected, the connections are released.
Fully managed Flink supports features such as Hologres catalogs, the CREATE TABLE AS statement (CTAS), and the CREATE DATABASE AS statement (CDAS). When you use these features, connections are used. By default, a job that uses Hologres catalogs consumes three more connections for data definition language (DDL) operations such as creating a table.
Diagnosis of connections
If a job has many tables and has a high degree of parallelism, a large number of connections are used. The number of used connections in Hologres may reach the upper limit. You can use the following methods to learn and diagnose the current connections:
Execute the following statement to query the active queries on the current instance by using the
pg_stat_activity
view in HoloWeb. For more information, see Query the pg_stat_activity view. In theapplication_name
field, if the value of a query isververica-connector-hologres
, the query uses the read and write connections of Realtime Compute for Apache Flink.SELECT application_name, COUNT (1) AS COUNT FROM pg_stat_activity WHERE backend_type = 'client backend' AND application_name != 'hologres' GROUP BY application_name;
If the parallelism of a job is set to a large value, the number of connections is large when the job is started, and then decreases after the job runs for a period of time. You can view the change on the Monitoring Information tab of the instance details page in the Hologres console. The reason for the change is that a large number of idle connections are closed. This indicates that the job does not require high parallelism or excessive connections. You can configure an appropriate number of connections, reduce parallelism, decrease the value of the
connectionSize
parameter, or use the connection reuse mode.Adjust the parallelism of Hologres nodes. By default, the parallelism of all operators in a Flink job is the same. In specific scenarios, operators that contain complex computing logic require a higher parallelism. However, the high parallelism may be redundant for Hologres result tables and may occupy a large number of connections. In this case, you can use the expert mode and specify an appropriate and small parallelism for the write operators based on your job resource configurations. This helps reduce the total number of connections.
FAQ
What do I do if the error message ERPC TIMEOUT
or ERPC CONNECTION CLOSED
is reported?
Description: The error message
com.alibaba.blink.store.core.rpc.RpcException: request xx UpsertRecordBatchRequest failed on final try 4, maxAttempts=4, errorCode=3, msg=ERPC_ERROR_TIMEOUT
is reported.Cause: The issue may be caused by write failures resulting from highly concurrent write operations or overloaded clusters. You can check whether the instance has high CPU loads. The
CONNECTION CLOSED
error may be reported when excessive loads on the backend node cause out-of-memory (OOM) errors or core dumps.Solution: Retry the write operation. If the error persists, contact the Hologres technical support.
What do I do if the error message BackPresure Exceed Reject Limit
is reported?
Cause: Workloads on writes in the backend are heavy and the system cannot clear data in the memory table in a timely manner. As a result, a write failure occurs.
Solution: Ignore the error if the error message is occasionally reported, or add the rpcRetries = '100' configuration for the sink to increase the number of write retries. If the error persists, contact the Hologres technical support.
What do I do if the error message The requested table name xxx mismatches the version of the table xxx from server/org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.Caused by: java.net.SocketTimeoutException: Read timed out
is reported?
Cause: An ALTER TABLE statement is executed. As a result, the schema version of the Blink table is earlier than that on the server and the maximum number of retries is reached.
Solution: Ignore the error if it occurs only occasionally. If the error persists, contact the Hologres technical support.
What do I do if the error message Failed to query table meta for table
is reported?
Cause: A foreign table is read. However, Hologres connectors do not support read and write operations on foreign tables. Otherwise, the error is caused by an issue related to the metadata of the Hologres instance.
Solution: Contact the Hologres technical support.
What do I do if the error message Cloud authentication failed for access id
is reported?
Cause: The AccessKey pair is incorrect, or no user account is added to the Hologres instance.
Solution:
Verify the AccessKey ID and AccessKey secret. In most cases, this error is reported if the AccessKey secret is incorrect or contains spaces.
Use the AccessKey pair to log on to the HoloWeb console. If the AccessKey pair is invalid, the preceding error message is reported again. If the user does not have permissions to manage the instance,
FATAL: role“ALIYUN$xxxx“does not exist
is reported. In this case, you must use the administrator account to grant permissions to the user.
What do I do if no data can be queried after a dimension table is joined with another table?
Cause: The dimension table is a partitioned table. Hologres does not allow partitioned tables to serve as dimension tables.
Solution: Use standard tables as dimension tables.
What do I do if the error message Modify record by primary key is not on this table
is reported?
Cause: An update mode is specified, but no primary key is configured for the Hologres result table.
Solution: Configure a primary key.
What do I do if the error message shard columns count is no match
is reported?
Cause: Not all distribution key columns are written. By default, primary key columns are distribution key columns.
Solution: Write all distribution key columns.
What do I do if the error message Full row is required, but the column xxx is missing
is reported?
Cause: Data in a required column is missing. This error is reported in earlier Hologres versions.
Solution: Assign values to the required columns.
What do I do if the number of JDBC connections sharply increases when VVP users read data from or write data to Hologres?
Cause: The maximum number of JDBC connections that can be used to read data from or write data to Hologres other than binary logs by using Hologres connectors that use VVP is reached. The maximum number of JDBC connections is calculated based on the following formula:
Number of Hologres tables × Number of concurrent read or write operations × connectionSize. The connectionSize parameter is used for VVP tables, and the default value of the connectionSize parameter is 3
.Solution: Schedule an appropriate number of connections, and reduce the number of concurrent read/write operations or the value of the connectionSize parameter. If the number of concurrent read/write operations or the value of the connectionSize parameter cannot be reduced, set useRpcMode to true to switch to the RPC mode.
What do I do if Blink and VVP users cannot connect to Hologres when they attempt to read data from or write data to Hologres?
Cause: The Blink or VVP cluster is slow or unable to access the Internet.
Solution: Make sure that the Blink or VVP cluster resides in the same region as the Hologres instance and uses a VPC endpoint.
What do I do if the error message Hologres rpc mode dimension table does not support one to many join
is reported?
Cause: The Blink or VVP dimension table in RPC mode is not a row-oriented table, or the field used to join tables is not the primary key.
Solution: Use the JDBC mode and use the column-oriented or hybrid row-column storage mode for the dimension table.
What do I do if the error message DatahubClientException is reported?
Description: The error message
Caused by: com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:503, requestId:null, errorCode:null, errorMessage:{"ErrorCode":"ServiceUnavailable","ErrorMessage":"Queue Full"}]
is reported.Cause: The thread pool is full because a large number of binary log consumption jobs are simultaneously restarted for a specific reason.
Solution: Consume binary logs in batches.
What do I do if the error message Error occurs when reading data from datahub is reported?
Description: The error message
Error occurs when reading data from datahub, msg: [httpStatus:500, requestId:xxx, errorCode:InternalServerError, errorMessage:Get binlog timeout.]
is reported.Cause: Each binary log contains a large amount of data, which makes the size of each RPC request exceed the upper limit.
Solution: Reduce the number of binary logs in each batch when a large number of data fields and long character strings exist in each row.
What do I do if the error message Caused by: java.lang.IllegalArgumentException: Column: created_time type does not match: flink row type: TIMESTAMP(6) WITH LOCAL TIME ZONE, hologres type: timestamp
is reported?
Cause: A Flink field is of the TIMESTAMP(6) data type, which is not supported by Hologres.
Solution: Change the data type to TIMESTAMP.
What do I do if the error message Caused by: org.postgresql.util.PSQLException: FATAL: Rejected by ip white list. db = xxx, usr=xxx, ip=xx.xx.xx.xx
is reported?
Cause: An IP address whitelist configured in Hologres does not contain the IP address that is used by Flink to connect to Hologres.
Solution: Add the IP address used by Flink to the IP address whitelist of Hologres. For more information, see Configure an IP address whitelist.
What do I do if the error message Caused by: java.lang.RuntimeException: shaded.hologres.com.aliyun.datahub.client.exception.DatahubClientException: [httpStatus:400, requestId:xx, errorCode:TableVersionExpired, errorMessage:The specified table has been modified, please refresh cursor and try again
is reported?
Cause: DDL operations are performed on the source table, and the table version changes. As a result, data consumption fails.
Solution: Upgrade the version of Realtime Compute for Apache Flink to 4.0.16 or later. After the upgrade, data consumption retries are made in case of failures.
When a job that consumes binary logs starts, an error is reported to indicate that the shard ID does not exist. What do I do?
Cause: The number of shards in the table whose data you want to consume changes after you rename the table or perform other operations on the table. However, the shard information of the original table is used when the job recovers from the checkpoint.
Solution: After you perform operations such as table recreation, the binary log consumption offset that is stored in the checkpoint is invalid. You must start the job without the state.
What do I do if the error message ERROR,22021,"invalid byte sequence for encoding ""UTF8"": 0x00"
is reported?
Cause: When you perform a point query in a dimension table, the primary key of the STRING data type contains non-UTF-8 encoded characters. As a result, the SQL statement fails to be executed.
Solution: Process the upstream dirty data.
What do I do if the error message hologres.org.postgresql.util.PSQLException: ERROR: syntax error
is reported?
Cause: You must specify a slot when you consume data in a binary log in JDBC mode. This error may occur if the name of the created slot contains unsupported characters. The slot name can contain only lowercase letters, digits, and underscores (_).
Solution: Recreate a slot, or use the automatic slot creation feature of VVR 6.0.7.
What do I do if the error message create table hologres.hg_replication_progress failed
is reported?
Cause: You may need to create a
hg_replication_progress
table when the JDBC mode is used to consume binary logs if the table does not exist in the current database. However, the number of shards that can be created for the instance reached the upper limit. As a result, the creation failure error is reported.Solution: Clean up databases that you no longer need.
What do I do if a job suspends during the runtime? The thread dump
field indicates that the job suspends when the JDBC driver is loaded at the Class.forName
phase in most cases.
Cause: Some static initialization operations are performed when the JDBC driver is loaded on JDK 8. A race condition may occur during multi-thread loading.
Solution: Retry the operation or use a connector whose Realtime Compute for Apache Flink version is 6.0.7.
What do I do if the error message "no table is defined in publication" or "The table xxx has no slot named xxx" is reported when I consume binary logs in JDBC mode?
Cause: When you drop a table and create another table with the same name, the publication that is bound to the old table is not dropped.
Solution: Execute the
select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
statement in Hologres to query the publication that is not dropped. Execute thedrop publication xx;
statement to drop the publication. Then, restart the job.
What do I do if the error message "permission denied for database" is reported when I publish a job?
Cause: The required permissions are not granted to your account when you want to consume binary logs in JDBC mode in Hologres V1.3 or Hologres V2.0.
Solution: We recommend that you upgrade your Hologres instance to V2.1 and use a connector that runs on Ververica Runtime (VVR) 8.0.5 or later. This way, you can consume binary logs by using an account that is granted the read-only permission. If the upgrade is not supported, grant the required permissions based on limits in Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.
What do I do if the error message "table writer init failed: Fail to fetch table meta from sm" is reported?
Cause: Data is written to the table after the truncate or rename operation is performed on the table.
Solution: This issue occasionally occurs and can be ignored. This issue is automatically resolved after a job failover. In Hologres V2.1.1 to V2.1.14, the replay cache time is increased for FE nodes. If you execute a DDL statement and then a data manipulation language (DML) statement on a table, the DDL replay slows down. The probability of similar exceptions may increase. We recommend that you upgrade your Hologres instance to the latest minor version of V2.1.
What do I do if an error message similar to "java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter" is reported when I use connector dependencies to develop DataStream jobs in an on-premises environment?
Cause: Specific run classes are not available in the JAR package of the connector of Alibaba Cloud Realtime Compute for Apache Flink.
Solution: Adjust the dependencies by following the instructions in Run or debug a Flink deployment that includes a connector in an on-premises environment.
What do I do if the "Binlog Convert Failed" error message is reported or data reading from some shards stops at a specific point in time during binary log consumption in JDBC mode?
Cause: When the gateway of the Hologres instance receives the backend timeout message, the gateway sends the exception to the client. In this process, an error occurred. As a result, data reading is stuck, or data parsing fails.
Solution: In most cases, this error occurs due to job backpressure. If data reading is stuck, resume the job from the latest checkpoint. To completely resolve this issue, you must upgrade your Hologres instance to V2.2.21 or later.