MaxCompute provides a new version of the Flink connector plug-in. The Flink connector plug-in can be used to write data from Flink to MaxCompute standard tables and Delta tables. This facilitates data writing from Flink to MaxCompute. This topic describes how to use the new version of the Flink connector to write data from Flink to MaxCompute.
Background information
Write modes supported by the new version of the Flink connector
The new version of the Flink connector allows you to execute the UPSERT or INSERT statement to write data from Flink to MaxCompute. If the UPSERT statement is executed to write data, data can be grouped by one of the following items:
Primary key
Partition field
If the table to which data is written contains a large number of partitions, you can specify partition fields to group data. However, this may result in data skew.
For more information about how to write data by using the Flink connector in upsert mode and the related parameter configuration suggestions, see Ingest data into data warehouses in real time.
When you configure parameters for writing data from Flink to MaxCompute, you can configure parameters of the Flink connector to specify the write mode. For more information about the parameters of the Flink connector, see Appendix: Parameters of the Flink connector of the new version.
We recommend that you set the checkpoint interval to more than 3 minutes for a deployment that executes the UPSERT statement to write data from Flink to MaxCompute. If the interval is set to an excessively small value, the write efficiency may not meet business requirements and a large number of small files may be generated.
The following table shows the mappings between field data types of MaxCompute and Realtime Compute for Apache Flink:
Data type of Realtime Compute for Apache Flink
Data type of MaxCompute
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIME ZONE and TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE and TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
NoteThe Flink
TIMESTAMP
data type does not include a time zone, whereas the MaxComputeTIMESTAMP
data type does include a time zone. This difference can result in an 8-hour time discrepancy. This can be resolved by usingTIMESTAMP_LTZ(9)
to standardize the timestamps.--FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT 'create time', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'update time', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Write data from self-managed open source Flink to MaxCompute
Create a MaxCompute table.
You must create a MaxCompute table to which you want to write Flink data. In this example, a non-partitioned Delta table and a partitioned Delta table are created in MaxCompute. For more information about the configuration of table properties, see Parameters for Delta tables.
-- Create a non-partitioned Delta table. CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; -- Create a partitioned Delta table. CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
Build an open source Flink cluster. Open source Flink 1.13, 1.15, 1.16, and 1.17 are supported. You can download one of the following Flink connector packages based on the version of the open source Flink cluster:
NoteYou can use the package of Flink connector 1.16 for Flink 1.17 clusters.
In this example, the package of Flink connector 1.13 is used. The package is decompressed after it is downloaded to the local environment.
Download the package of the Flink connector and add the package of the Flink connector to the Flink cluster package.
Download the JAR package of the Flink connector to the local environment.
Add the JAR package of the Flink connector to the lib directory of the Flink installation package that is decompressed.
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
Start the Flink service.
cd $FLINK_HOME/bin ./start-cluster.sh
Start the Flink SQL client.
cd $FLINK_HOME/bin ./sql-client.sh
Create Flink tables and configure the parameters of the Flink connector.
You can directly use the Flink SQL client to create Flink tables and configure parameters. You can also use the DataStream API of Flink to perform related operations. The following sample code provides examples of the operations.
Use the Flink SQL client
Go to the code editor of the Flink SQL client and execute the following statements to create tables and configure parameters.
-- Create a non-partitioned table that corresponds to the created MaxCompute non-partitioned table on the Flink SQL client. CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- Create a partitioned table that corresponds to the created MaxCompute partitioned table on the Flink SQL client. CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
Write data to the Flink tables and query data in the MaxCompute tables to check whether the Flink data is written to MaxCompute.
-- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client. INSERT INTO mf_flink VALUES (1,'Danny',27, false); -- Query data in the MaxCompute table and view the returned results. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client. INSERT INTO mf_flink VALUES (1,'Danny',28, false); -- Query data in the MaxCompute table and view the returned results. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client. INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); -- Query data in the MaxCompute table and view the returned results. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client. INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); -- Query data in the MaxCompute table and view the returned results. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
Use the DataStream API
Before you use the DataStream API, add the following dependency.
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>
NoteReplace xxx in the preceding code with the version of the JAR package.
Write code to create Flink tables and configure parameters. The following sample code provides an example.
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Write data from fully managed Flink to MaxCompute
Create a MaxCompute table.
You must create a MaxCompute table to which you want to write Flink data. The following sample code provides an example on how to create a Delta table.
SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
Log on to the Realtime Compute for Apache Flink console and view the information about the Flink connector. The Flink connector is loaded to the Ververica Platform (VVP) of fully managed Flink.
Use a Flink SQL draft to create a Flink table and construct Flink real-time data. After the draft is developed, deploy the draft.
On the SQL Editor page in the console of Realtime Compute for Apache Flink, create and edit a Flink SQL draft. In the following example, a Flink source table and a temporary Flink result table are created, the real-time data generation logic is automatically constructed to write data to the source table, and then the computing logic is used to write data from the source table to the temporary result table. For more information about how to develop an SQL draft, see Develop an SQL draft.
-- Create a Flink source table. CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); -- Create a temporary Flink result table. CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyL****', 'odps.access.key'='gJwKaF3hK9MDAQgb**********', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); -- Execute the Flink computing logic. INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;
Parameters in the WITH clause:
odps.end.point
: Use the endpoint of the cloud product interconnection network of the region.upsert.write.bucket.num
: Use a value that is the same as the value of the write.bucket.num parameter for the Delta table created in MaxCompute.Query data in the MaxCompute table and check whether Flink data is written to MaxCompute.
SELECT * FROM mf_flink_upsert WHERE ds=2023061517; -- View the returned results. The actual returned results in MaxCompute may be different from the data in the following example because Flink data is randomly generated. +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
Appendix: Parameters of the Flink connector of the new version
Basic parameters
Parameter
Required
Default value
Description
connector
Yes
No default value
The type of the connector. Set the value to
MaxCompute
.odps.project.name
Yes
No default value
The name of the MaxCompute project.
odps.access.id
Yes
No default value
The AccessKey ID of your Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.
odps.access.key
Yes
No default value
The AccessKey secret of you Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.
odps.end.point
Yes
No default value
The endpoint of MaxCompute. For more information about the MaxCompute endpoints of each region, see Endpoints.
odps.tunnel.end.point
No
The public endpoint of MaxCompute Tunnel. If you do not configure this parameter, traffic is automatically routed to the Tunnel endpoint that corresponds to the network in which MaxCompute resides. If you configure this parameter, traffic is routed to the specified endpoint and automatic routing is not performed.
For more information about the Tunnel endpoints of different network types in each region, see Endpoints.
odps.tunnel.quota.name
No
No default value
The name of the Tunnel quota that is used to access MaxCompute.
table.name
Yes
No default value
The name of the MaxCompute table. The table name is in the
[project.][schema.]table
format.odps.namespace.schema
No
false
Specifies whether to use the three-tier model. For more information about the three-tier model, see Schema-related operations.
sink.operation
Yes
insert
The write mode. Valid values:
insert
andupsert
.NoteOnly Delta tables of MaxCompute support data writing by using the UPSERT statement.
sink.parallelism
No
No default value
The degree of data writing parallelism. If you do not configure this parameter, the upstream data parallelism is used by default.
NoteMake sure that the value of the write.bucket.num parameter is an integral multiple of the value of the sink.parallelism parameter. This helps ensure the optimal write performance and efficiently saves memory of the sink node.
sink.meta.cache.time
No
400
The size of the metadata that is written to the cache.
sink.meta.cache.expire.time
No
1200
The cache timeout period for the metadata. Unit: seconds.
sink.coordinator.enable
No
Yes
Specifies whether to enable the coordinator mode.
Partition parameters
Parameter
Required
Default value
Description
sink.partition
No
No default value
The name of the partition to which data needs to be written.
If you use dynamic partitioning, the value of this parameter is the name of the parent partition of a dynamic partition.
sink.partition.default-value
No
__DEFAULT_PARTITION__
The name of the default partition when dynamic partitioning is used.
sink.dynamic-partition.limit
No
100
The maximum number of partitions to which data can be imported in a single checkpoint when data is written to dynamic partitions.
NoteWe recommend that you do not increase the value of this parameter to a large value. If data is written to a large number of partitions at the same time, an out-of-memory (OOM) error may occur on the sink node. If the number of partitions to which data is concurrently written exceeds the specified threshold, an error is reported.
sink.group-partition.enable
No
false
Specifies whether to group data by partition when data is written to dynamic partitions.
sink.partition.assigner.class
No
No default value
The PartitionAssigner implementation class.
Parameters for data writing in file cache mode
If a large number of dynamic partitions exist, you can enable the file cache mode. The following table describes the parameters that need to be configured for data writing in file cache mode.
Parameter
Required
Default value
Description
sink.file-cached.enable
No
false
Specifies whether to enable the file cache mode for data writing. Valid values:
false
true
NoteIf a large number of dynamic partitions exist, you can enable the file cache mode for data writing.
sink.file-cached.tmp.dirs
No
./local
The default directory for storing cached files in file cache mode.
sink.file-cached.writer.num
No
16
The number of threads that are used to concurrently upload data in a task in file cache mode.
NoteWe recommend that you do not increase the value of this parameter to a large value. If data is written to a large number of partitions at the same time, an OOM error may occur.
sink.bucket.check-interval
No
60000
The interval at which the file size is checked in file cache mode. Unit: milliseconds.
sink.file-cached.rolling.max-size
No
16 M
The maximum value of a single cached file in file cache mode.
If the file size exceeds the value of this parameter, the file data is uploaded to the server.
sink.file-cached.memory
No
64 M
The maximum size of off-heap memory used to write data to files in file cache mode.
sink.file-cached.memory.segment-size
No
128 KB
The size of the buffer used to write data to files in file cache mode.
sink.file-cached.flush.always
No
true
Specifies whether the cache is used for writing data to files in file cache mode.
sink.file-cached.write.max-retries
No
3
The number of retries for uploading data in file cache mode.
Parameters for data writing by using the
INSERT
orUPSERT
statementParameters for data writing by using the UPSERT statement
Parameter
Required
Default value
Description
upsert.writer.max-retries
No
3
The maximum number of retries for writing data to a bucket in an Upsert Writer session.
upsert.writer.buffer-size
No
64 m
The buffer size of an Upsert Writer session in Flink.
NoteWhen the total buffer size of all buckets reaches the specified threshold, the system automatically updates data to the server.
Data in an Upsert Writer session can be written to multiple buckets at the same time. We recommend that you increase the value of this parameter to improve write efficiency.
If data is written to a large number of partitions, an OOM error may occur. To prevent this issue, you can decrease the value of this parameter.
upsert.writer.bucket.buffer-size
No
1 m
The buffer size of a single bucket in Flink. If the memory resources of the Flink server are insufficient, you can decrease the value of this parameter.
upsert.write.bucket.num
Yes
No default value
The number of buckets for the table to which data is written. The value of this parameter must be the same as the value of the
write.bucket.num
parameter that is configured for the table to which data is written.upsert.write.slot-num
No
1
The number of Tunnel slots used in a session.
upsert.commit.max-retries
No
3
The maximum number of retries for an upsert session commit.
upsert.commit.thread-num
No
16
The degree of parallelism of upsert session commits.
We recommend that you do not increase the value of this parameter to a large value. If excessive upsert session commits are performed at the same time, resource consumption increases. This may cause performance issues or excessive resource consumption.
upsert.major-compact.min-commits
No
100
The minimum number of times that major compaction is committed.
upsert.commit.timeout
No
600
The timeout period for an upsert session commit. Unit: seconds.
upsert.major-compact.enable
No
false
Specifies whether to enable major compaction.
upsert.flush.concurrent
No
2
The maximum number of buckets to which data in a partition can be written at the same time.
NoteA Tunnel slot is occupied each time data in a bucket is refreshed.
NoteFor more information about the suggestions on parameter settings for data writing by using the UPSERT statement, see Suggestions on parameter settings for data writing by using the UPSERT statement.
Parameters for data writing by using the INSERT statement
Parameter
Required
Default value
Description
insert.commit.thread-num
No
16
The degree of parallelism of commit sessions.
insert.arrow-writer.enable
No
false
Specifies whether to use the Arrow format.
insert.arrow-writer.batch-size
No
512
The maximum number of rows in a batch of Arrow-formatted data.
insert.arrow-writer.flush-interval
No
100000
The interval at which a writer flushes data. Unit: milliseconds.
insert.writer.buffer-size
No
64 M
The cache size for the buffered writer.