The built-in plug-in of Realtime Compute for Apache Flink can be used to write data to MaxCompute through Tunnel. However, the performance of this plug-in is affected by the parallelism and maximum number of stored files of Tunnel. MaxCompute provides the Flink plug-in that uses the Streaming Tunnel feature. This plug-in is suitable for writing data from Realtime Compute for Apache Flink to MaxCompute in high concurrency and high queries per second (QPS) scenarios.
Prerequisites
Realtime Compute for Apache Flink is activated.
The Flink plug-in that uses the Streaming Tunnel feature of MaxCompute is installed.
For more information about how to install the plug-in.
Background information
Realtime Compute for Apache Flink calls an interface of the MaxCompute SDK to write data to a buffer. Then, Realtime Compute for Apache Flink uploads the data from the buffer to a MaxCompute result table at a specified interval or when the data in the buffer exceeds the specified size (1 MB by default).
If the number of deployments that synchronizes data from Realtime Compute for Apache Flink to MaxCompute in parallel is greater than 32 or the flush interval is less than 60s, we recommend that you use the Flink plug-in that is provided by MaxCompute. In other scenarios, you can choose between the built-in plug-in of Realtime Compute for Apache Flink and the Flink plug-in that is provided by MaxCompute based on your business requirements.
The following table describes the mappings between field data types of MaxCompute and Realtime Compute for Apache Flink.
Data type of MaxCompute | Data type of Realtime Compute for Apache Flink |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
Limits
The Flink plug-in provided by MaxCompute has the following limits:
This plug-in supports only Blink 3.2.1 and later.
MaxCompute clustered tables cannot be used as MaxCompute result tables.
Syntax
In this example, you need to create a deployment in the Realtime Compute for Apache Flink console and create a MaxCompute result table. For more information about how to create a deployment.
The names, sequence, and data types of the fields defined in DDL statements must be consistent with those in a MaxCompute physic table. If they are different, data that is queried in a MaxCompute physical table may be displayed as /n
.
Sample statement:
create table odps_output(
id INT,
user_name VARCHAR,
content VARCHAR
) with (
type ='custom',
class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
endpoint = '<YourEndPoint>',
project = '<YourProjectName>',
`table` = '<YourtableName>',
access_id = '<yourAccessKeyId>',
access_key = '<yourAccessKeySecret>',
`partition` = 'ds=2018****'
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
type | The type of the result table. | Yes | Set the value to |
class | The entry class of the plug-in. | Yes | Set the value to |
endpoint | The endpoint of MaxCompute. | Yes | For more information, see Endpoints in different regions (Internet). |
tunnel_endpoint | The endpoint of MaxCompute Tunnel. | No | For more information, see Endpoints in different regions (Internet). Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC). |
project | The name of the MaxCompute project. | Yes | None. |
table | The name of the MaxCompute physical table. | Yes | None. |
access_id | The AccessKey ID that is used to access the MaxCompute project. | Yes | None. |
access_key | The AccessKey secret that corresponds to the AccessKey ID. | Yes | None. |
partition | The name of the partition in a partitioned table. | No | Required only for partitioned tables:
|
enable_dynamic_partition | Specifies whether to enable dynamic partitioning. | No | Default value: False. |
dynamic_partition_limit | The maximum number of concurrent partitions. If dynamic partitioning is enabled, the system allocates a buffer for each partition. The buffer size is based on the flush_batch_size parameter. Therefore, the maximum memory that can be occupied in dynamic partitioning mode is calculated by using the following formula: Number of partitions × Buffer size. If 100 partitions exist and the data of each partition that can be stored in the buffer is 1 MB, the maximum memory that can be occupied is 100 MB. | No | Default value: 100. A map that records the mapping between partitions and output writers is saved in the memory of the MaxCompute system. If the number of partitions in the map exceeds the value of dynamicPartitionLimit, the system removes partitions to which no data is written based on the least recently used (LRU) caching algorithm. If data is written to all partitions, the error |
flush_batch_size | The buffer size. Unit: bytes. If the buffer is full, a flush operation is triggered to upload the data in the buffer to MaxCompute. | No | Default value: 1048576 (1 MB). |
flush_interval_ms | The interval at which the flush operation is triggered in the buffer. Unit: milliseconds. The MaxCompute sink writes data to the buffer. Then, the MaxCompute sink writes the data in the buffer to a specified MaxCompute table at an interval that is specified by the flushInterval_ms parameter. The sink also writes the data to the table when the size of data in the buffer exceeds the specified threshold. | No | Default value: -1. This indicates that you do not need to specify the interval at which the flush operation is triggered in the buffer. |
flush_retry_count | The number of retries for the flush operation in the buffer. | No | Default value: 10. |
flush_retry_interval_sec | The interval of retries for the flush operation in the buffer. Unit: seconds. | No | Default value: 1. |
flush_retry_strategy | The flush retry policy. This parameter specifies how the intervals between retries increase. This parameter must be used with the flush_retry_interval_sec parameter. Valid values:
| No | Default value: |
Data type mappings
Data type of MaxCompute | Data type of Realtime Compute for Apache Flink |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
STRING | VARCHAR |
DECIMAL | DECIMAL |
Sample code
The following sample code shows how to create a MaxCompute result table in a Realtime Compute for Apache Flink deployment.
Write data to a static partition
create table source ( id INT, len INT, content VARCHAR ) with ( type = 'random' ); create table odps_sink ( id INT, len INT, content VARCHAR ) with ( type='custom', class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink', endpoint = '<yourEndpoint>', project = '<yourProjectName>', `table` = '<yourTableName>', accessId = '<yourAccessId>', accessKey = '<yourAccessPassword>', `partition` = 'ds=20180418' ); insert into odps_sink select id, len, content from source;
Write data to a dynamic partition
create table source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) with ( type = 'random' ); create table odps_sink ( id INT, len INT, content VARCHAR, DS VARCHAR -- The dynamic partition key column must be explicitly written in the CREATE TABLE statement. ) with ( type = 'odps', endpoint = '<yourEndpoint>', project = '<yourProjectName>', `table` = '<yourTableName>', accessId = '<yourAccessId>', accessKey = '<yourAccessPassword>', 'partition'='ds' -- If no value is specified for the ds field, data is written to different partitions based on the value of the ds field in the partitioned table. ,enable_dynamic_partition = 'true '-- Enable dynamic partitioning. ,dynamic_partition_limit=' 50' -- The maximum number of concurrent partitions is 50. ,flush_batch_size = '524288' -- The buffer size is 512 KB. ,flush_interval_ms = '60000' -- The flush interval is 60s. ,flush_retry_count = '5' -- The number of flush retries is 5. ,flush_retry_interval_sec = '2' -- The interval between retries is 2s. ,flush_retry_strategy = 'linear' -- The intervals between retries increase linearly. ); insert into odps_sink select id, len, content, date_dormat(c, 'yyMMdd') as ds from source;