All Products
Search
Document Center

MaxCompute:Use Flink to write data to a Delta table

Last Updated:Dec 02, 2024

MaxCompute provides a new version of the Flink connector plug-in. The Flink connector plug-in can be used to write data from Flink to MaxCompute standard tables and Delta tables. This facilitates data writing from Flink to MaxCompute. This topic describes how to use the new version of the Flink connector to write data from Flink to MaxCompute.

Background information

  • Write modes supported by the new version of the Flink connector

    The new version of the Flink connector allows you to execute the UPSERT or INSERT statement to write data from Flink to MaxCompute. If the UPSERT statement is executed to write data, data can be grouped by one of the following items:

    • Primary key

    • Partition field

      If the table to which data is written contains a large number of partitions, you can specify partition fields to group data. However, this may result in data skew.

  • For more information about how to write data by using the Flink connector in upsert mode and the related parameter configuration suggestions, see Ingest data into data warehouses in real time.

  • When you configure parameters for writing data from Flink to MaxCompute, you can configure parameters of the Flink connector to specify the write mode. For more information about the parameters of the Flink connector, see Appendix: Parameters of the Flink connector of the new version.

  • We recommend that you set the checkpoint interval to more than 3 minutes for a deployment that executes the UPSERT statement to write data from Flink to MaxCompute. If the interval is set to an excessively small value, the write efficiency may not meet business requirements and a large number of small files may be generated.

  • The following table shows the mappings between field data types of MaxCompute and Realtime Compute for Apache Flink:

    Data type of Realtime Compute for Apache Flink

    Data type of MaxCompute

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE and TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE and TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    Note

    The Flink TIMESTAMP data type does not include a time zone, whereas the MaxCompute TIMESTAMP data type does include a time zone. This difference can result in an 8-hour time discrepancy. This can be resolved by using TIMESTAMP_LTZ(9) to standardize the timestamps.

    --FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT 'create time',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'update time',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Write data from self-managed open source Flink to MaxCompute

  1. Create a MaxCompute table.

    You must create a MaxCompute table to which you want to write Flink data. In this example, a non-partitioned Delta table and a partitioned Delta table are created in MaxCompute. For more information about the configuration of table properties, see Parameters for Delta tables.

    -- Create a non-partitioned Delta table.
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    -- Create a partitioned Delta table.
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. Build an open source Flink cluster. Open source Flink 1.13, 1.15, 1.16, and 1.17 are supported. You can download one of the following Flink connector packages based on the version of the open source Flink cluster:

    Note
    • You can use the package of Flink connector 1.16 for Flink 1.17 clusters.

    • In this example, the package of Flink connector 1.13 is used. The package is decompressed after it is downloaded to the local environment.

  3. Download the package of the Flink connector and add the package of the Flink connector to the Flink cluster package.

    1. Download the JAR package of the Flink connector to the local environment.

    2. Add the JAR package of the Flink connector to the lib directory of the Flink installation package that is decompressed.

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. Start the Flink service.

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. Start the Flink SQL client.

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. Create Flink tables and configure the parameters of the Flink connector.

    You can directly use the Flink SQL client to create Flink tables and configure parameters. You can also use the DataStream API of Flink to perform related operations. The following sample code provides examples of the operations.

    Use the Flink SQL client

    1. Go to the code editor of the Flink SQL client and execute the following statements to create tables and configure parameters.

      -- Create a non-partitioned table that corresponds to the created MaxCompute non-partitioned table on the Flink SQL client.
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- Create a partitioned table that corresponds to the created MaxCompute partitioned table on the Flink SQL client.
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. Write data to the Flink tables and query data in the MaxCompute tables to check whether the Flink data is written to MaxCompute.

      -- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client.
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      -- Query data in the MaxCompute table and view the returned results.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the non-partitioned Flink table mf_flink on the Flink SQL client.
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      -- Query data in the MaxCompute table and view the returned results.
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client.
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      -- Query data in the MaxCompute table and view the returned results.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      -- Insert data into the partitioned Flink table mf_flink_part on the Flink SQL client.
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      -- Query data in the MaxCompute table and view the returned results.
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    Use the DataStream API

    1. Before you use the DataStream API, add the following dependency.

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      Note

      Replace xxx in the preceding code with the version of the JAR package.

    2. Write code to create Flink tables and configure parameters. The following sample code provides an example.

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Write data from fully managed Flink to MaxCompute

  1. Create a MaxCompute table.

    You must create a MaxCompute table to which you want to write Flink data. The following sample code provides an example on how to create a Delta table.

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. Log on to the Realtime Compute for Apache Flink console and view the information about the Flink connector. The Flink connector is loaded to the Ververica Platform (VVP) of fully managed Flink.

  3. Use a Flink SQL draft to create a Flink table and construct Flink real-time data. After the draft is developed, deploy the draft.

    On the SQL Editor page in the console of Realtime Compute for Apache Flink, create and edit a Flink SQL draft. In the following example, a Flink source table and a temporary Flink result table are created, the real-time data generation logic is automatically constructed to write data to the source table, and then the computing logic is used to write data from the source table to the temporary result table. For more information about how to develop an SQL draft, see Develop an SQL draft.

    -- Create a Flink source table. 
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    -- Create a temporary Flink result table.
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    -- Execute the Flink computing logic.
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    Parameters in the WITH clause:

    odps.end.point: Use the endpoint of the cloud product interconnection network of the region.

    upsert.write.bucket.num: Use a value that is the same as the value of the write.bucket.num parameter for the Delta table created in MaxCompute.

  4. Query data in the MaxCompute table and check whether Flink data is written to MaxCompute.

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    -- View the returned results. The actual returned results in MaxCompute may be different from the data in the following example because Flink data is randomly generated.
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

Appendix: Parameters of the Flink connector of the new version

  • Basic parameters

    Parameter

    Required

    Default value

    Description

    connector

    Yes

    No default value

    The type of the connector. Set the value to MaxCompute.

    odps.project.name

    Yes

    No default value

    The name of the MaxCompute project.

    odps.access.id

    Yes

    No default value

    The AccessKey ID of your Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.

    odps.access.key

    Yes

    No default value

    The AccessKey secret of you Alibaba Cloud account. You can go to the AccessKey Pair page to view the AccessKey pair.

    odps.end.point

    Yes

    No default value

    The endpoint of MaxCompute. For more information about the MaxCompute endpoints of each region, see Endpoints.

    odps.tunnel.end.point

    No

    The public endpoint of MaxCompute Tunnel. If you do not configure this parameter, traffic is automatically routed to the Tunnel endpoint that corresponds to the network in which MaxCompute resides. If you configure this parameter, traffic is routed to the specified endpoint and automatic routing is not performed.

    For more information about the Tunnel endpoints of different network types in each region, see Endpoints.

    odps.tunnel.quota.name

    No

    No default value

    The name of the Tunnel quota that is used to access MaxCompute.

    table.name

    Yes

    No default value

    The name of the MaxCompute table. The table name is in the [project.][schema.]table format.

    odps.namespace.schema

    No

    false

    Specifies whether to use the three-tier model. For more information about the three-tier model, see Schema-related operations.

    sink.operation

    Yes

    insert

    The write mode. Valid values: insert and upsert.

    Note

    Only Delta tables of MaxCompute support data writing by using the UPSERT statement.

    sink.parallelism

    No

    No default value

    The degree of data writing parallelism. If you do not configure this parameter, the upstream data parallelism is used by default.

    Note

    Make sure that the value of the write.bucket.num parameter is an integral multiple of the value of the sink.parallelism parameter. This helps ensure the optimal write performance and efficiently saves memory of the sink node.

    sink.meta.cache.time

    No

    400

    The size of the metadata that is written to the cache.

    sink.meta.cache.expire.time

    No

    1200

    The cache timeout period for the metadata. Unit: seconds.

    sink.coordinator.enable

    No

    Yes

    Specifies whether to enable the coordinator mode.

  • Partition parameters

    Parameter

    Required

    Default value

    Description

    sink.partition

    No

    No default value

    The name of the partition to which data needs to be written.

    If you use dynamic partitioning, the value of this parameter is the name of the parent partition of a dynamic partition.

    sink.partition.default-value

    No

    __DEFAULT_PARTITION__

    The name of the default partition when dynamic partitioning is used.

    sink.dynamic-partition.limit

    No

    100

    The maximum number of partitions to which data can be imported in a single checkpoint when data is written to dynamic partitions.

    Note

    We recommend that you do not increase the value of this parameter to a large value. If data is written to a large number of partitions at the same time, an out-of-memory (OOM) error may occur on the sink node. If the number of partitions to which data is concurrently written exceeds the specified threshold, an error is reported.

    sink.group-partition.enable

    No

    false

    Specifies whether to group data by partition when data is written to dynamic partitions.

    sink.partition.assigner.class

    No

    No default value

    The PartitionAssigner implementation class.

  • Parameters for data writing in file cache mode

    If a large number of dynamic partitions exist, you can enable the file cache mode. The following table describes the parameters that need to be configured for data writing in file cache mode.

    Parameter

    Required

    Default value

    Description

    sink.file-cached.enable

    No

    false

    Specifies whether to enable the file cache mode for data writing. Valid values:

    • false

    • true

      Note

      If a large number of dynamic partitions exist, you can enable the file cache mode for data writing.

    sink.file-cached.tmp.dirs

    No

    ./local

    The default directory for storing cached files in file cache mode.

    sink.file-cached.writer.num

    No

    16

    The number of threads that are used to concurrently upload data in a task in file cache mode.

    Note

    We recommend that you do not increase the value of this parameter to a large value. If data is written to a large number of partitions at the same time, an OOM error may occur.

    sink.bucket.check-interval

    No

    60000

    The interval at which the file size is checked in file cache mode. Unit: milliseconds.

    sink.file-cached.rolling.max-size

    No

    16 M

    The maximum value of a single cached file in file cache mode.

    If the file size exceeds the value of this parameter, the file data is uploaded to the server.

    sink.file-cached.memory

    No

    64 M

    The maximum size of off-heap memory used to write data to files in file cache mode.

    sink.file-cached.memory.segment-size

    No

    128 KB

    The size of the buffer used to write data to files in file cache mode.

    sink.file-cached.flush.always

    No

    true

    Specifies whether the cache is used for writing data to files in file cache mode.

    sink.file-cached.write.max-retries

    No

    3

    The number of retries for uploading data in file cache mode.

  • Parameters for data writing by using the INSERT or UPSERT statement

    Parameters for data writing by using the UPSERT statement

    Parameter

    Required

    Default value

    Description

    upsert.writer.max-retries

    No

    3

    The maximum number of retries for writing data to a bucket in an Upsert Writer session.

    upsert.writer.buffer-size

    No

    64 m

    The buffer size of an Upsert Writer session in Flink.

    Note
    • When the total buffer size of all buckets reaches the specified threshold, the system automatically updates data to the server.

    • Data in an Upsert Writer session can be written to multiple buckets at the same time. We recommend that you increase the value of this parameter to improve write efficiency.

    • If data is written to a large number of partitions, an OOM error may occur. To prevent this issue, you can decrease the value of this parameter.

    upsert.writer.bucket.buffer-size

    No

    1 m

    The buffer size of a single bucket in Flink. If the memory resources of the Flink server are insufficient, you can decrease the value of this parameter.

    upsert.write.bucket.num

    Yes

    No default value

    The number of buckets for the table to which data is written. The value of this parameter must be the same as the value of the write.bucket.num parameter that is configured for the table to which data is written.

    upsert.write.slot-num

    No

    1

    The number of Tunnel slots used in a session.

    upsert.commit.max-retries

    No

    3

    The maximum number of retries for an upsert session commit.

    upsert.commit.thread-num

    No

    16

    The degree of parallelism of upsert session commits.

    We recommend that you do not increase the value of this parameter to a large value. If excessive upsert session commits are performed at the same time, resource consumption increases. This may cause performance issues or excessive resource consumption.

    upsert.major-compact.min-commits

    No

    100

    The minimum number of times that major compaction is committed.

    upsert.commit.timeout

    No

    600

    The timeout period for an upsert session commit. Unit: seconds.

    upsert.major-compact.enable

    No

    false

    Specifies whether to enable major compaction.

    upsert.flush.concurrent

    No

    2

    The maximum number of buckets to which data in a partition can be written at the same time.

    Note

    A Tunnel slot is occupied each time data in a bucket is refreshed.

    Note

    For more information about the suggestions on parameter settings for data writing by using the UPSERT statement, see Suggestions on parameter settings for data writing by using the UPSERT statement.

    Parameters for data writing by using the INSERT statement

    Parameter

    Required

    Default value

    Description

    insert.commit.thread-num

    No

    16

    The degree of parallelism of commit sessions.

    insert.arrow-writer.enable

    No

    false

    Specifies whether to use the Arrow format.

    insert.arrow-writer.batch-size

    No

    512

    The maximum number of rows in a batch of Arrow-formatted data.

    insert.arrow-writer.flush-interval

    No

    100000

    The interval at which a writer flushes data. Unit: milliseconds.

    insert.writer.buffer-size

    No

    64 M

    The cache size for the buffered writer.