All Products
Search
Document Center

MaxCompute:Use Realtime Compute for Apache Flink (streaming data transfer)

Last Updated:Nov 14, 2023

The built-in plug-in of Realtime Compute for Apache Flink can be used to write data to MaxCompute through Tunnel. However, the performance of this plug-in is affected by the parallelism and maximum number of stored files of Tunnel. MaxCompute provides the Flink plug-in that uses the Streaming Tunnel feature. This plug-in is suitable for writing data from Realtime Compute for Apache Flink to MaxCompute in high concurrency and high queries per second (QPS) scenarios.

Prerequisites

Background information

Realtime Compute for Apache Flink calls an interface of the MaxCompute SDK to write data to a buffer. Then, Realtime Compute for Apache Flink uploads the data from the buffer to a MaxCompute result table at a specified interval or when the data in the buffer exceeds the specified size (1 MB by default).

Note

If the number of deployments that synchronizes data from Realtime Compute for Apache Flink to MaxCompute in parallel is greater than 32 or the flush interval is less than 60s, we recommend that you use the Flink plug-in that is provided by MaxCompute. In other scenarios, you can choose between the built-in plug-in of Realtime Compute for Apache Flink and the Flink plug-in that is provided by MaxCompute based on your business requirements.

The following table describes the mappings between field data types of MaxCompute and Realtime Compute for Apache Flink.

Data type of MaxCompute

Data type of Realtime Compute for Apache Flink

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

STRING

VARCHAR

DECIMAL

DECIMAL

BINARY

VARBINARY

Limits

The Flink plug-in provided by MaxCompute has the following limits:

  • This plug-in supports only Blink 3.2.1 and later.

  • MaxCompute clustered tables cannot be used as MaxCompute result tables.

Syntax

In this example, you need to create a deployment in the Realtime Compute for Apache Flink console and create a MaxCompute result table. For more information about how to create a deployment.

Note

The names, sequence, and data types of the fields defined in DDL statements must be consistent with those in a MaxCompute physic table. If they are different, data that is queried in a MaxCompute physical table may be displayed as /n.

Sample statement:

create table odps_output(
    id INT,
    user_name VARCHAR,
    content VARCHAR
) with (
    type ='custom',
    class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
    endpoint = '<YourEndPoint>',
    project = '<YourProjectName>',
    `table` = '<YourtableName>',
    access_id = '<yourAccessKeyId>',
    access_key = '<yourAccessKeySecret>',
    `partition` = 'ds=2018****'
);

Parameters in the WITH clause

Parameter

Description

Required

Remarks

type

The type of the result table.

Yes

Set the value to custom.

class

The entry class of the plug-in.

Yes

Set the value to com.alibaba.blink.customersink.MaxComputeStreamTunnelSink.

endpoint

The endpoint of MaxCompute.

Yes

For more information, see Endpoints in different regions (Internet).

tunnel_endpoint

The endpoint of MaxCompute Tunnel.

No

For more information, see Endpoints in different regions (Internet).

Note

This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).

project

The name of the MaxCompute project.

Yes

None.

table

The name of the MaxCompute physical table.

Yes

None.

access_id

The AccessKey ID that is used to access the MaxCompute project.

Yes

None.

access_key

The AccessKey secret that corresponds to the AccessKey ID.

Yes

None.

partition

The name of the partition in a partitioned table.

No

Required only for partitioned tables:

  • Static partitions

    For example, `partition`='ds=20180905' indicates that data is written to the ds=20180905 partition.

  • Dynamic partitions

    If the partition values are not displayed in plaintext mode, data is written to different partitions based on the values of the partition key columns specified in the data. For example, `partition`='ds' indicates that data is written to partitions based on the value of the ds field.

    If you want to create multi-level dynamic partitions, make sure that the sequence of the partition fields is consistent. This applies to the WITH clause and DDL statements of the MaxCompute result table and the fields in the MaxCompute physical table. Multiple partition fields are separated by commas (,).

    Note
    • In the CREATE TABLE statement, you must explicitly specify the partition key column that you use to create dynamic partitions.

    • If the dynamic partition key column is empty and ds=null or ds='' exists in the data source, the partition specified by ds=NULL is created.

enable_dynamic_partition

Specifies whether to enable dynamic partitioning.

No

Default value: False.

dynamic_partition_limit

The maximum number of concurrent partitions. If dynamic partitioning is enabled, the system allocates a buffer for each partition. The buffer size is based on the flush_batch_size parameter. Therefore, the maximum memory that can be occupied in dynamic partitioning mode is calculated by using the following formula: Number of partitions × Buffer size. If 100 partitions exist and the data of each partition that can be stored in the buffer is 1 MB, the maximum memory that can be occupied is 100 MB.

No

Default value: 100. A map that records the mapping between partitions and output writers is saved in the memory of the MaxCompute system. If the number of partitions in the map exceeds the value of dynamicPartitionLimit, the system removes partitions to which no data is written based on the least recently used (LRU) caching algorithm. If data is written to all partitions, the error dynamic partition limit exceeded: 100 is reported.

flush_batch_size

The buffer size. Unit: bytes. If the buffer is full, a flush operation is triggered to upload the data in the buffer to MaxCompute.

No

Default value: 1048576 (1 MB).

flush_interval_ms

The interval at which the flush operation is triggered in the buffer. Unit: milliseconds.

The MaxCompute sink writes data to the buffer. Then, the MaxCompute sink writes the data in the buffer to a specified MaxCompute table at an interval that is specified by the flushInterval_ms parameter. The sink also writes the data to the table when the size of data in the buffer exceeds the specified threshold.

No

Default value: -1. This indicates that you do not need to specify the interval at which the flush operation is triggered in the buffer.

flush_retry_count

The number of retries for the flush operation in the buffer.

No

Default value: 10.

flush_retry_interval_sec

The interval of retries for the flush operation in the buffer. Unit: seconds.

No

Default value: 1.

flush_retry_strategy

The flush retry policy. This parameter specifies how the intervals between retries increase. This parameter must be used with the flush_retry_interval_sec parameter. Valid values:

  • constant: indicates that the interval between retries is fixed.

  • linear: indicates that the intervals between retries increase linearly. For example, if you set the flush_retry_interval_sec parameter to 1 and the flush_retry_count parameter to 5, the intervals between the five retries are 1s, 2s, 3s, 4s, and 5s.

  • exponential: indicates that the intervals between retries increase exponentially. For example, if you set the flush_retry_interval_sec parameter to 1 and the flush_retry_count parameter to 5, the intervals between the five retries are 1s, 2s, 4s, 8s, and 16s.

No

Default value: constant.

Data type mappings

Data type of MaxCompute

Data type of Realtime Compute for Apache Flink

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

STRING

VARCHAR

DECIMAL

DECIMAL

Sample code

The following sample code shows how to create a MaxCompute result table in a Realtime Compute for Apache Flink deployment.

  • Write data to a static partition

    create table source (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type='custom',
       class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition` = 'ds=20180418'
    );
    insert into odps_sink 
    select 
       id, len, content 
    from source;
  • Write data to a dynamic partition

    create table source (
       id INT,
       len INT,
       content VARCHAR,
       c TIMESTAMP 
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR,
       DS VARCHAR                         -- The dynamic partition key column must be explicitly written in the CREATE TABLE statement. 
    ) with (
       type = 'odps',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       'partition'='ds'                 -- If no value is specified for the ds field, data is written to different partitions based on the value of the ds field in the partitioned table. 
       ,enable_dynamic_partition = 'true '-- Enable dynamic partitioning. 
       ,dynamic_partition_limit=' 50' -- The maximum number of concurrent partitions is 50. 
       ,flush_batch_size = '524288' -- The buffer size is 512 KB. 
       ,flush_interval_ms = '60000' -- The flush interval is 60s. 
       ,flush_retry_count = '5' -- The number of flush retries is 5. 
       ,flush_retry_interval_sec = '2' -- The interval between retries is 2s. 
       ,flush_retry_strategy = 'linear' -- The intervals between retries increase linearly. 
    );
    insert into odps_sink 
    select 
       id, 
       len, 
       content,
       date_dormat(c, 'yyMMdd') as ds
    from source;