All Products
Search
Document Center

Hologres:Hologres sink table

Last Updated:Feb 04, 2026

Hologres is deeply integrated with Real-time Compute for Flink in exclusive mode, which lets you write data to a Hologres sink table using a connector. You can query the written data immediately. This topic describes how to write data to a Hologres sink table from Real-time Compute for Flink in exclusive mode.

Limits

  • Different versions of Real-time Compute for Flink in exclusive mode have different developer semantics. Before you start, determine the version that you are using and refer to the examples for that version.

  • To prevent connection failures, make sure that your Real-time Compute for Flink service and Hologres instance are in the same region.

  • Versions of Real-time Compute for Flink in exclusive mode earlier than 3.6 do not include a built-in Hologres connector. To write data to Hologres in real time, you must reference a JAR file. For troubleshooting, see Common upgrade preparation failure errors. You can also join the Hologres DingTalk group for feedback. For more information about how to join the group, see How do I get more online support?.

    Note

    We recommend upgrading to version 3.6 or later to run jobs.

  • Version 3.7 of Real-time Compute for Flink in exclusive mode supports automatic creation of Hologres partitioned tables. However, you must configure createparttable='true' in your job. When you use partitioned tables, take note of the following points:

    • Hologres currently supports only list partitioning.

    • When you create a partitioned table, you must explicitly specify the partition key columns. Currently, only text and int4 types are supported for partition key columns. The partition values cannot contain hyphens (-), such as 2020-09-12.

    • If a partitioned table has a primary key, the partition key columns must be part of the primary key.

    • When you create a child partition table, the partition key columns for the child table must have static field values.

    • The partition key column values of data written to a child partition table must exactly match the values that are defined when the child table is created. Otherwise, an error is reported.

    • The DEFAULT partition feature is not supported.

  • If the destination Hologres table has a primary key, the default semantics for real-time writes do not update data based on the primary key. Data with duplicate primary keys is discarded.

  • Hologres writes data asynchronously. You must add the blink.checkpoint.fail_on_checkpoint_error=true configuration to your job. This ensures that a failover is triggered if the job fails. This parameter is not required for Version 3.7.6 and later of Real-time Compute for Flink in exclusive mode.

DDL semantics

Use the following statement to create a Hologres sink table.

create table Hologres_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  type='hologres',
  dbname='<yourDbname>', --The name of the Hologres database.
  tablename='<yourTablename>', --The name of the table that receives data in Hologres.
  username='<yourUsername>', --The AccessKey ID of your Alibaba Cloud account.
  password='<yourPassword>', --The AccessKey secret of your Alibaba Cloud account.
  endpoint='<yourEndpoint>'); --The Endpoint of the VPC network for the Hologres instance.

WITH parameters

Parameter

Description

Example

type

The type of the sink table. The value is fixed to hologres.

hologres

endpoint

The VPC network address of the Hologres instance.

Go to the Hologres Management Console. On the product page of the destination instance, obtain the Endpoint from the Network Information section. The Endpoint must include the port number in the ip:port format.

demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80

username

AccessKey ID

Click AccessKey Management to obtain the AccessKey ID.

xxxxm3FMWaxxxx

password

AccessKey secret

Click AccessKey Management to obtain the AccessKey secret.

xxxxm355fffaxxxx

dbname

The name of the Hologres database.

Holodb

tablename

The name of the table in the Hologres database.

blink_test

arraydelimiter

The Hologres sink supports splitting a STRING field into an array based on the field_delimiter and importing the array into Hologres.

The default value is \u0002.

\u0002

mutatetype

The data write mode. For more information, see Hologres sink table.

The default value is insertorignore.

insertorignore

ignoredelete

Specifies whether to ignore retraction messages.

  • true: Ignores retraction messages.

  • false: Does not ignore retraction messages.

Note

This parameter takes effect only when streaming semantics are used.

The default value is false.

Typically, Flink Groupby operations generate retraction messages. When these messages are sent to the Hologres connector, they generate DELETE requests.

false

partitionrouter

Specifies whether to write data to a partitioned table.

  • true: Writes data to a partitioned table.

  • false: Does not write data to a partitioned table.

The default value is false.

false

createparttable

When writing to a partitioned table, specifies whether to automatically create a child partition table based on the partition value. This feature is supported in Blink exclusive mode V3.7 and later.

The default value is false.

Important

Use this feature with caution. Make sure that partition values do not contain dirty data, which can lead to the creation of incorrect partitioned tables.

false

Note

The arraydelimiter, mutatetype, ignoredelete, partitionrouter, and createparttable parameters are not included in the DDL example statement. To use these parameters in your application, refer to their descriptions in the preceding table.

Write data to a regular Hologres sink table in real time

  1. Create a table in Hologres.

    Create a table in Hologres to receive data. The following code provides an example of a table creation SQL statement.

     create table blink_test (a int, b text, c text, d float8, e bigint);
  2. Create a Real-time Compute for Flink job.

    1. Log on to the Real-time Compute for Flink console.

    2. Create a Real-time Compute for Flink job.

      • Real-time Compute for Flink in exclusive mode V3.6 and later supports the Hologres data source. You can call it directly. The following code provides an example SQL statement.

        create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random');
        
        create table test (
          a int,
          b VARCHAR,
          c VARCHAR,
          PRIMARY KEY (a)
        ) with (
          type = 'hologres',
          `endpoint` = '$ip:$port', --The VPC network address and port number of the Hologres instance.
          `username` = 'The AccessKey ID of your Alibaba Cloud account',
          `password` = 'The AccessKey secret of your Alibaba Cloud account',
          `dbname` = 'The name of the Hologres database',
          `tablename` = 'blink_test'--The name of the table that receives data in Hologres.
        );
        
        insert
          into test
        select
          a,b,c
        from
          randomSource;
  3. Publish the job.

    1. After you create the job, click Syntax Check in the editor. If Successful is displayed, the syntax is correct.

    2. Click Save to save the job.

    3. Click Publish to submit the job to the production environment. Configure the job publishing settings as required.4

  4. Start the job.

    After you submit the job to the production environment, you must start it manually.

    In the upper-right corner of the menu bar on the Real-time Compute for Flink Platform for Developers page, click O&M. On the O&M page, select the job that you want to start and click Start in the upper-right corner.6

  5. Query data in Hologres in real time.

    Query the table that receives data in Hologres to retrieve the written data in real time. The following code provides an example of a query SQL statement.

    select * from blink_test;

How to use the wide table merge/partial update feature

To write data from multiple streams to a single Hologres wide table, which is a common scenario, follow these steps:

Assume that Hologres has a wide table named WIDE_TABLE with columns A, B, C, D, and E. Column A is the primary key. One Flink stream contains data for columns A, B, and C, and another stream contains data for columns A, D, and E.

  1. Use Flink SQL to declare two Hologres sink tables. One table declares only fields A, B, and C, and the other declares only fields A, D, and E. Both tables are mapped to WIDE_TABLE.

  2. Set the mutatetype property of both sink tables to insertorupdate.

  3. Set the ignoredelete property of both sink tables to true to prevent retraction messages from generating DELETE requests.

  4. Insert the data from the two streams into their respective sink tables.

The following limits apply to this scenario:

  1. The wide table must have a primary key.

  2. The data from each stream must include the complete primary key fields.

  3. In a wide table merge scenario for a column-oriented table, a high number of records per second (RPS) can lead to high CPU usage. You can disable Dictionary encoding for the fields in the table.

Write data to a partitioned Hologres sink table in real time

Hologres supports writing data directly to a parent partitioned table by calling the real-time data API. The data is automatically routed to the corresponding child partition tables. For more information about the real-time data API, see Real-time data API.

The following limits apply:

  • Hologres supports only list partitioning.

  • When you create a partitioned table, you must explicitly specify the partition key columns. Only text and int4 types are supported for partition key columns.

  • If a primary key is set, the partition key columns must be part of the primary key.

  • When you create a child partition table, the partition key columns for the child table must have static field values.

  • The partition key column values of data written to a child partition table must exactly match the values that are defined when the child table is created. Otherwise, an error is reported.

  • Hologres does not support default partitions.

  1. Create a partitioned table in Hologres.

    Create a partitioned table in Hologres to receive data and create the corresponding child partition tables. The following code provides an example of a table creation SQL statement.

    --Create the parent partition table test_message and its corresponding child partition tables.
    
    drop table if exists test_message;
    
    begin;
    create table test_message (
     "bizdate" text NOT NULL,
     "tag" text NOT NULL,
     "id" int4 NOT NULL,
     "title" text NOT NULL,
     "body" text,
    PRIMARY KEY (bizdate,tag,id)
    )
    PARTITION BY LIST (bizdate);
    commit;
    Note
    • When you execute the command, replace the ${bizdate} parameter with the actual value.

    • Version 3.7 of Real-time Compute for Flink in exclusive mode is the first version that supports automatic partition creation. If you are using a version earlier than 3.7, you must create child partition tables in Hologres in advance. Otherwise, the data import fails.

  2. Create a job in Real-time Compute for Flink in exclusive mode.

    The following code provides an example statement for creating a job in Real-time Compute for Flink in exclusive mode.

    Note

    The following example applies to Real-time Compute for Flink in exclusive mode V3.7 and later. If you are using a version earlier than 3.7, you must upgrade to V3.7 or later, or delete the configuration for automatic child partition table creation: `createparttable` = 'true'.

    create table test_message_src(
      tag VARCHAR,
      id INTEGER,
      title VARCHAR,
      body VARCHAR
    ) with (
      type = 'random',
      `interval` = '10',
      `count` = '100'
    );
    
    create table test_message_sink (
      bizdate VARCHAR,
      tag VARCHAR,
      id INTEGER,
      title VARCHAR,
      body VARCHAR
    ) with (
      type = 'hologres',
      `endpoint` = '$ip:$port', --The VPC network address of the Hologres instance.
      `username` ='<AccessID>', --The AccessKey ID of your Alibaba Cloud account.
      `password` = '<AccessKey>', --The AccessKey secret of your Alibaba Cloud account.
      `dbname` = '<DBname>', --The name of the Hologres database.
      `tablename` = '<Tablename>', --The name of the table in the Hologres database.
      `partitionrouter` = 'true', --Write data to the partitioned table in Hologres.
      `createparttable` = 'true' --Automatically create child partition tables in Hologres.
    );
    
    insert into test_message_sink select "20200327",* from test_message_src;
    insert into test_message_sink select "20200328",* from test_message_src;
  3. Publish and start the job.

    For more information, see the Publish the job and Start the job steps in the Write data to a regular Hologres sink table in real time section.

  4. Query data in Hologres in real time.

    Query the table that receives data in Hologres to retrieve the written data in real time. The following code provides an example of a query SQL statement.

    select * from test_message;
    select * from test_message where bizdate = '20200327';

Data type mapping

For more information about the data type mapping between Real-time Compute for Flink in exclusive mode and Hologres, see Data type summary.