All Products
Search
Document Center

Realtime Compute for Apache Flink:MaxCompute connector

Last Updated:Feb 05, 2025

This topic describes how to use the MaxCompute connector.

Background information

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?

The following table describes the capabilities supported by the MaxCompute connector.

Item

Description

Supported type

Source table, dimension table, and sink table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

  • Metrics for source tables

    numRecordsIn: the total number of data records that are read by using the MaxCompute connector.

    numRecordsInPerSecond: the number of data records that are read by using the MaxCompute connector per second.

    numBytesIn: the total number of bytes of data that is read by using the MaxCompute connector after data is decompressed.

    numBytesInPerSecond: the number of bytes of data that is read by using the MaxCompute connector per second after data is decompressed.

  • Metrics for sink tables

    numRecordsOut: the total number of data records that are written by using the MaxCompute connector.

    numRecordsOutPerSecond: the number of data records that are written by using the MaxCompute connector per second.

    numBytesOut: the total number of bytes of data that is written by using the MaxCompute connector before data is compressed.

    numBytesOutPerSecond: the number of bytes of data that is written by using the MaxCompute connector per second before data is compressed.

  • Metrics for dimension tables

    dim.odps.cacheSize: the number of data records that are cached in a dimension table.

Note

For more information about the metrics, see Metrics.

API type

DataStream API and SQL API

Data update or deletion in a sink table

If MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel is used, data can only be inserted into a sink table. If MaxCompute Upsert Tunnel is used, data in a sink table can be updated or deleted and data can be inserted into a sink table.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.

  • The MaxCompute connector supports only the at-least-once semantics.

    Note

    The at-least-once semantics is used to prevent data losses. In specific cases, duplicate data may be written to MaxCompute. Duplicate data may be generated based on the tunnel that you use. For more information about MaxCompute Tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.

  • By default, a source operates in full mode reads data only from the partition specified by the partition parameter. Once all data from the partition is read, the deployment finishes and does not monitor for new partitions.

    To continuously monitor for new partitions, create an incremental source by specifying the startPartition parameter in the WITH clause.

    Note
    • Each time a dimension table is updated, the dimension table checks for the latest partition.

    • After the source table starts to run, the source table does not read the data that is newly added to a partition. We recommend that you run a deployment when the partition data is complete.

Syntax

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The table type.

    STRING

    Yes

    No default value

    Set the value to odps.

    endpoint

    The endpoint of MaxCompute.

    STRING

    Yes

    No default value

    For more information, see Endpoints.

    tunnelEndpoint

    The endpoint of MaxCompute Tunnel.

    STRING

    No

    No default value

    For more information, see Endpoints.

    Note

    If this parameter is not specified, MaxCompute allocates tunnel connections based on the Server Load Balancer (SLB) service.

    project

    The name of the MaxCompute project.

    STRING

    Yes

    No default value

    N/A.

    schemaName

    The name of the MaxCompute schema.

    STRING

    No

    No default value

    This parameter is required only when the MaxCompute schema feature is enabled. In this case, you must set this parameter to the name of the schema of the MaxCompute table. For more information, see Schema-related operations.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    tableName

    The name of the MaxCompute table.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The AccessKey ID that is used to access MaxCompute.

    STRING

    Yes

    No default value

    For more information, see Console operations.

    Important

    In this example, the AccessKey ID and AccessKey secret are introduced as variables to prevent the risk of secret leaks. For more information,, see Namespace variables.

    accessKey

    The AccessKey secret that is used to access MaxCompute.

    STRING

    Yes

    No default value

    partition

    The name of the partition in the MaxCompute table.

    STRING

    No

    No default value

    You do not need to specify this parameter for a non-partitioned MaxCompute table or an incremental source.

    Note

    For more information about how to specify the partition parameter for a partitioned MaxCompute table, see the "How do I configure the partition parameter when data is read from or written to partitions?" section of the FAQ about upstream and downstream storage topic.

    compressAlgorithm

    The compression algorithm that is used by MaxCompute Tunnel.

    STRING

    No

    • ZLIB (for VVR 4.0.13 and later)

    • SNAPPY (for VVR 6.0.1 and later)

    Valid values:

    • RAW (no compression)

    • ZLIB

    • SNAPPY

      Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    quotaName

    The name of the quota for the exclusive Tunnel resource groups of MaxCompute.

    STRING

    No

    No default value

    You can specify this parameter to use the exclusive Tunnel resource groups of MaxCompute.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.

    • If you specify this parameter, you must delete the tunnelEndpoint parameter. Otherwise, the tunnel that is specified by the tunnelEndpoint parameter is used.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    maxPartitionCount

    The maximum number of partitions from which data can be read.

    INTEGER

    No

    100

    If the number of partitions from which data is read exceeds the value of this parameter, this error message appears: "The number of matched partitions exceeds the default limit".

    Important

    If data is read from a large number of partitions of a MaxCompute table, the workload on the MaxCompute service is high. In this case, the startup speed of the deployment slows down. To prevent this issue, you need to check whether data needs to be read from a large number of partitions and specify the partition parameter based on your business requirements. If your business requires data from a large number of partitions, manually increase the value of this parameter.

    useArrow

    Specifies whether to use the Arrow format to read data.

    BOOLEAN

    No

    false

    The Arrow format can be used to call the storage API operation of MaxCompute. For more information, see the "User interfaces and openness" section of the What is MaxCompute? topic.

    Important
    • This parameter takes effect only in a batch deployment.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    splitSize

    The size of data that can be pulled at a time when the Arrow format is used to read data.

    MEMORYSIZE

    No

    256 MB

    Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    Important

    This parameter takes effect only in a batch deployment.

    compressCodec

    The compression algorithm that is used when the Arrow format is used to read data.

    STRING

    No

    ""

    Valid values:

    • "" (no compression)

    • ZSTD

    • LZ4_FRAME

    Compared with no compression, the throughput can be improved if you specify a compression algorithm.

    Important
    • This parameter takes effect only in a batch deployment.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    dynamicLoadBalance

    Specifies whether to enable dynamic allocation of shards.

    BOOLEAN

    No

    false

    Valid values:

    • true

    • false

    Dynamic allocation of shards can improve the processing performance of different operators of Realtime Compute for Apache Flink and reduce the overall time required for reading from MaxCompute. However, this may cause data skew because the total amount of data read by different operators is inconsistent.

    Important
    • This parameter takes effect only in a batch deployment.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

  • Parameters only for incremental MaxCompute source tables

    The incremental table source monitors for new partitions by intermittently polling the MaxCompute server to obtain all partition information. Before the source reads data from new partitions, data writing in the partitions must be complete. For more information, see the "What do I do if an incremental MaxCompute source table detects a new partition when data is still being written to the partition?" section of the FAQ about upstream and downstream storage topic You can configure the startPartition parameter to specify the start partition from which data is read. Only data in the partitions whose alphabetical order is greater than or equal to the alphabetical order of the partition that is specified by the startPartition parameter is read. For example, the alphabetical order of the partition year=2023,month=10 is less than the alphabetical order of the partition year=2023,month=9. In this case, you can add a zero before the number of the month to the name of the partition that is declared in the code to ensure that the alphabetical order of the partition is valid. This way, you can change the value of the partition parameter from year=2023,month=9 to year=2023,month=09.

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    startPartition

    The start partition from which incremental data is read.

    STRING

    Yes

    No default value

    • If you specify this parameter, the incremental source is used. As a result, the partition parameter is ignored.

    • If the source table is a multi-level partitioned table, you must configure the value of each partition column in descending order based on partition levels.

    Note

    For more information about how to specify the startPartition parameter, see the "How do I configure the startPartition parameter for an incremental MaxCompute source table?" section of the FAQ about upstream and downstream storage topic.

    subscribeIntervalInSec

    The interval at which MaxCompute is polled to obtain the information about partitions.

    INTEGER

    No

    30

    Unit: seconds.

    modifiedTableOperation

    The operation that is performed when data in a partition is modified during partition reading.

    Enum (NONE, SKIP)

    No

    NONE

    Download sessions are saved in checkpoints. Each time you resume a session from a checkpoint, Realtime Compute for Apache Flink attempts to resume the reading progress from the session. However, the session is unavailable because data in the partition is modified. In this case, the deployment is repeatedly restarted. To resolve this issue, you can specify this parameter. Valid values:

    • NONE: If you set this parameter to NONE, you must change the value of the startPartition parameter to make the alphabetical order of the partition that is specified by the startPartition parameter greater than the alphabetical order of the unavailable partition and start the deployment without states.

    • SKIP: If you do not want to start the deployment without states, you can set this parameter to SKIP. In this case, Realtime Compute for Apache Flink skips the unavailable partition when it attempts to resume the session from the checkpoint.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.

    • If you set this parameter to NONE or SKIP, the data that is read from the partition in which data is modified is retained, and the data that is not read is ignored.

  • Parameters only for sink tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    useStreamTunnel

    Specifies whether to use MaxCompute Streaming Tunnel to upload data.

    BOOLEAN

    No

    false

    Valid values:

    • true: MaxCompute Streaming Tunnel is used to upload data.

    • false: MaxCompute Batch Tunnel is used to upload data.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    • For more information about how to select a tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.

    flushIntervalMs

    The interval at which the flush operation is performed in the buffer of a writer in MaxCompute Tunnel.

    LONG

    No

    30000 (30 seconds)

    The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at the interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table in MaxCompute when the size of the buffer data exceeds the specified threshold.

    If you use Streaming Tunnel, the data that is flushed to the destination MaxCompute table becomes available immediately. If you use Batch Tunnel, flushed data is not available until the checkpointing operation is complete. We recommend that you set this parameter to 0 to disable the scheduled flushing feature.

    Unit: milliseconds.

    Note

    This parameter can be used together with the batchSize parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.

    batchSize

    The interval at which a flush operation is performed in the buffer of a writer in MaxCompute Tunnel.

    LONG

    No

    67108864 (64 MB)

    The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table when the size of the buffer data exceeds the value that is specified by the batchSize parameter.

    Unit: bytes.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.

    • This parameter can be used together with the flushIntervalMs parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.

    numFlushThreads

    The number of threads that are used to flush data in the buffer of a writer in MaxCompute Tunnel.

    INTEGER

    No

    1

    Each MaxCompute sink creates the number of threads that is specified by the numFlushThreads parameter to flush data. If the value of this parameter is greater than 1, the data in different partitions can be flushed at the same time. This improves the flush operation efficiency.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.

    dynamicPartitionLimit

    The maximum number of dynamic partitions to which data can be written.

    INTEGER

    No

    100

    If the number of dynamic partitions to which data is written from the sink between two checkpoints exceeds the value of the dynamicPartitionLimit parameter, this error message appears: "Too many dynamic partitions".

    Important

    If data is written to a large number of partitions of a MaxCompute table, the workload on the MaxCompute service is high, slowing down checkpointing and flushing. To prevent this issue, you need to check whether data needs to be written to a large number of partitions. If your business requires data to be written to a large number of partitions, manually increase the value of the dynamicPartitionLimit parameter.

    retryTimes

    The maximum number of retries that can be performed for a request on the MaxCompute server.

    INTEGER

    No

    3

    The MaxCompute service may be unavailable for a short period of time when you create a session, submit a session, or data is flushed. If the MaxCompute service becomes unavailable, the MaxCompute server is requested based on the configuration of this parameter.

    sleepMillis

    The retry interval.

    INTEGER

    No

    1000

    Unit: milliseconds.

    enableUpsert

    Specifies whether to use MaxCompute Upsert Tunnel to upload data.

    BOOLEAN

    No

    false

    Valid values:

    • true: MaxCompute Upsert Tunnel is used to process INSERT, UPDATE_AFTER, and DELETE data in Realtime Compute for Apache Flink.

    • false: MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel that is specified by the useStreamTunnel parameter is used to process INSERT and UPDATE_AFTER data in Realtime Compute for Apache Flink.

    Important
    • If an issue such as an error, a deployment failure, or a long-time processing fault occurs when the MaxCompute sink commits a session in upsert mode, we recommend that you set the Parallelism parameter of sink operators to a value that is less than or equal to 10.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    upsertAsyncCommit

    Specifies whether to use the asynchronous mode when a MaxCompute sink commits a session in upsert mode.

    BOOLEAN

    No

    false

    Valid values:

    • true: The asynchronous mode is used. If you use the asynchronous mode, the time that is consumed to commit the session is reduced but the data that is written after the session is committed cannot be immediately queried.

    • false: The synchronous mode is used by default. When the MaxCompute sink commits the session, the system waits until the server processes the session.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    upsertCommitTimeoutMs

    The timeout period for which a MaxCompute sink commits a session in upsert mode.

    INTEGER

    No

    120000

    (120 seconds)

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    sink.operation

    The write operation mode for a Delta table.

    STRING

    No

    insert

    Valid values:

    • insert: Data is written to the table in append mode.

    • upsert: Data is updated.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    sink.parallelism

    The degree of parallelism when data is written to a Delta table.

    INTEGER

    No

    None

    • The degree of data writing parallelism. If you do not configure this parameter, the upstream data parallelism is used by default.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    Important

    Make sure that the value of the write.bucket.num parameter is an integral multiple of the value of the sink.parallelism parameter. This helps ensure the optimal write performance and efficiently saves memory of the sink node.

    sink.file-cached.enable

    Specifies whether to enable the file cache mode when data is written to dynamic partitions of a Delta table.

    BOOLEAN

    No

    false

    Valid values:

    • true: The file cache mode is enabled.

    • false: The file cache mode is disabled.

    If you enable the file cache mode, the number of small files that are written to the server is reduced. However, a higher writing latency exists. We recommend that you enable the file cache mode when the sink table has a high degree of parallelism.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    sink.file-cached.writer.num

    The number of threads that are used to concurrently upload data in a task in file cache mode.

    INTEGER

    No

    16

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    • We recommend that you do not increase the value of this parameter to a large value. If data is written to a large number of partitions at the same time, an out of memory (OOM) error may occur.

    sink.bucket.check-interval

    The interval at which the file size is checked in file cache mode. Unit: milliseconds.

    INTEGER

    No

    60000

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    sink.file-cached.rolling.max-size

    The maximum value of a single cached file in file cache mode.

    MEMORYSIZE

    No

    16 MB

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    • If the file size exceeds the value of this parameter, the file data is uploaded to the server.

    sink.file-cached.memory

    The maximum size of off-heap memory used to write data to files in file cache mode.

    MEMORYSIZE

    No

    64 MB

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    sink.file-cached.memory.segment-size

    The size of the buffer used to write data to files in file cache mode.

    MEMORYSIZE

    No

    128 KB

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    sink.file-cached.flush.always

    Specifies whether the cache is used for writing data to files in file cache mode.

    BOOLEAN

    No

    true

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    sink.file-cached.write.max-retries

    The number of retries for uploading data in file cache mode.

    INTEGER

    No

    3

    • This parameter takes effect only if the sink.file-cached.enable parameter is set to true.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.writer.max-retries

    The maximum number of retries for writing data to a bucket in an Upsert Writer session.

    INTEGER

    No

    3

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.writer.buffer-size

    The buffer size of an Upsert Writer session in Realtime Compute for Apache Flink.

    MEMORYSIZE

    No

    64 MB

    • When the total buffer size of all buckets reaches the specified threshold, the system automatically updates data to the server.

    Note

    Data in an Upsert Writer session can be written to multiple buckets at the same time. We recommend that you increase the value of this parameter to improve write efficiency.

    If data is written to a large number of partitions, an OOM error may occur. To prevent this issue, you can decrease the value of this parameter.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.writer.bucket.buffer-size

    The buffer size of a single bucket in Realtime Compute for Apache Flink.

    MEMORYSIZE

    No

    1 MB

    • If the memory resources of the Flink server are insufficient, you can decrease the value of this parameter.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.write.bucket.num

    The number of buckets for the table to which data is written.

    INTEGER

    Yes

    None

    • The value of this parameter must be the same as the value of the write.bucket.num parameter that is configured for the Delta table to which data is written.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.write.slot-num

    The number of Tunnel slots used in a session.

    INTEGER

    No

    1

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.commit.max-retries

    The maximum number of retries for an upsert session commit.

    INTEGER

    No

    3

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.commit.thread-num

    The degree of parallelism of upsert session commits.

    INTEGER

    No

    16

    • We recommend that you do not increase the value of this parameter to a large value. If excessive upsert session commits are performed at the same time, resource consumption increases. This may cause performance issues or excessive resource consumption.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.commit.timeout

    The timeout period for an upsert session commit. Unit: seconds.

    INTEGER

    No

    600

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.flush.concurrent

    The maximum number of buckets to which data in a partition can be written at the same time.

    INTEGER

    No

    2

    • A Tunnel slot is occupied each time data in a bucket is refreshed.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    insert.commit.thread-num

    The degree of parallelism of commit sessions.

    INTEGER

    No

    16

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    insert.arrow-writer.enable

    Specifies whether to use the Arrow format.

    BOOLEAN

    No

    false

    Valid values:

    • true: The Arrow format is used.

    • false: The Arrow format is not used.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    insert.arrow-writer.batch-size

    The maximum number of rows in a batch of Arrow-formatted data.

    INTEGER

    No

    512

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    insert.arrow-writer.flush-interval

    The interval at which a writer flushes data. Unit: milliseconds.

    INTEGER

    No

    100000

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    insert.writer.buffer-size

    The cache size for the buffered writer.

    MEMORYSIZE

    No

    64 MB

    Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.

    upsert.partial-column.enable

    Specifies whether to update only data in specific columns.

    BOOLEAN

    No

    false

    This parameter is used for only sink tables that are Delta tables. For more information, see Update data in specific columns. Valid values:

    • true: Only data in specific columns is updated.

    • false: Data in all columns is updated.

    Data writing varies based on whether the primary key for updating data exists in the sink table.

    • If the sink table contains data that has the same primary key, the data is updated based on the primary key. You can update the data by using a value specifying that the specified column is not null.

    • If the sink table does not contain data that has the same primary key, a data record is added based on the primary key, the value of the specified column is inserted into the sink table, and null is inserted for columns other than the specified column.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.11 or later supports this parameter.

  • Parameters only for dimension tables

    When a deployment starts, the dimension table pulls full data from a partition that is specified by the partition parameter. This parameter supports the max_pt() function. If the cache is reloaded after the cache entries expire, data of the latest partition specified by the partition parameter is re-parsed. If the partition parameter is set to max_two_pt(), the dimension table can pull data from two partitions. If the partition parameter is not set to max_two_pt(), data of only one partition can be pulled.

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    cache

    The cache policy.

    STRING

    Yes

    No default value

    You must set the cache parameter to ALL for a dimension table and explicitly declare the setting in the DDL statement. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source and dimension table cannot be associated based on the ON clause.

    ALL: indicates that all data in the dimension table is cached. Before the system runs a deployment, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire.

    Note
    • If the cache parameter is set to ALL, you must increase the memory of the join node because the system asynchronously loads data of the dimension table. We recommend that you increase the size of the memory at least four times the amount of data in the remote table. The size of the memory is related to the MaxCompute storage compression algorithm.

    • If a dimension table contains a large amount of data, you can use the SHUFFLE_HASH hint to evenly distribute the data to each subtask. For more information, see the "How do I use the SHUFFLE_HASH hint for a dimension table?" section of the FAQ about upstream and downstream storage topic.

    • If you use an ultra-large dimension table, frequent garbage collections (GCs) of Java virtual machine (JVM) may cause deployment exceptions. To resolve this issue, you can increase the memory of the node on which the dimension table is joined with another table. If the issue persists, we recommend that you convert the dimension table to a key-value dimension table that supports the least recently used (LRU) cache policy. For example, you can use an ApsaraDB for HBase dimension table as the key-value dimension table.

    cacheSize

    The maximum number of rows of data that can be cached.

    LONG

    No

    100000

    If the number of data records in the dimension table exceeds the value of the cacheSize parameter, this error message appears: "Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit" .

    Important

    If a large number of data records exists in a dimension table, a large amount of JVM heap memory is consumed. In this case, the startup speed of deployments and the speed of the updates of the dimension table slow down. To prevent this issue, you need to check whether a large number of data records need to be cached. If your business requires a large number of data records to be cached in a dimension table, manually increase the value of this parameter.

    cacheTTLMs

    The cache timeout period.

    LONG

    No

    Long.MAX_VALUE

    Unit: milliseconds.

    cacheReloadTimeBlackList

    The periods of time during which cache is not refreshed. The cache is not refreshed during the time periods specified by this parameter.

    STRING

    No

    No default value

    This parameter is applicable to large-scale online promotional events such as peak hours of activities. You can specify this parameter to prevent deployments from being unstable when the cache is refreshed. For more information about how to specify the parameter, see the "How do I configure the CacheReloadTimeBlackList parameter?" section of the FAQ about upstream and downstream storage topic.

    maxLoadRetries

    The maximum number of retries that can be performed to refresh the cache. The first time that data is pulled when the deployment is started, the cache is refreshed. If the number of retries exceeds the value of this parameter, the deployment fails to run.

    INTEGER

    No

    10

    N/A.

Data type mappings

For more information about the data types that are supported by MaxCompute, see MaxCompute V2.0 data type edition.

Data type of MaxCompute

Data type of Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Important

If a MaxCompute physical table contains a field of a nested composite data type (ARRAY, MAP, or STRUCT, etc) and a field of JSON type, you must specify tblproperties('columnar.nested.type'='true') when you create the MaxCompute physical table to allow Realtime Compute for Apache Flink to read data from and write data to the physical table correctly.

Examples

SQL API

  • Sample code for a source table

    • Read data from a full MaxCompute source table:

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • Read data from an incremental MaxCompute source table:

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        startPartition'='yyyy=2018, MM=09,dd=05' -- Data is read from the partition 20180905.
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • Sample code for a sink table

    • Write data to static partitions:

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- Data is written to the static partition 20180905. 
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • Write data to dynamic partitions:

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' -- The partition value is not provided. This indicates that data is written to a partition specified by the ds field. 
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • Sample code for a dimension table

    • Join a dimension table and another table:

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- You must declare a primary key when you join a dimension table and another table. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • Join a dimension table and multiple tables:

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- You do not need to declare a primary key when you join a dimension table and multiple tables. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream API

Important
  • If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.

  • To protect intellectual property, you can perform local debugging on a deployment that uses the MaxCompute DataStream connector for a maximum of 30 minutes in Realtime Compute for Apache Flink that uses VVR 6.0.6 or later. If local debugging takes more than 30 minutes, an error is returned and the deployment exits. For more information about how to run or debug a Realtime Compute for Apache Flink deployment that includes the MaxCompute connector in an on-premises environment, see Run or debug a Flink deployment that includes a connector in an on-premises environment.

  • If you submit a deployment in the development console of Realtime Compute for Apache Flink, an issue that MaxCompute-related classes cannot be found may occur. In this case, you must download the file with the uber.jar name extension from the Maven central repository, and add the file as an additional dependency of the deployment. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. For example, if the version of the ververica-connector-odps dependency of MaxCompute is 1.15-vvr-6.0.6, you can view the ververica-connector-odps-1.15-vvr-6.0.6-uber.jar package in the directory of the Maven repository and download the package to your on-premises directory.

We recommend that you declare a MaxCompute table by using SQL statements when you use the MaxCompute DataStream connector. You can call Table API operations to access MaxCompute tables or call DataStream API operations to access data streams.

Connect to the source table

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

Connect to the sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

The Maven dependencies of the MaxCompute connector contain the classes required to create the full source, incremental source, sink, and dimension table. The MaxCompute DataStream connectors of different versions are stored in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

References