All Products
Search
Document Center

Realtime Compute for Apache Flink:MaxCompute connector

Last Updated:Oct 31, 2024

This topic describes the syntax and parameters in the WITH clause of the MaxCompute connector and provides examples on how to use the MaxCompute connector.

Background information

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?

The following table describes the capabilities supported by the MaxCompute connector.

Item

Description

Table type

Source table, dimension table, and result table.

Running mode

Streaming mode and batch mode.

Data format

N/A.

Metric

  • Metrics for source tables

    numRecordsIn: the total number of data records that are read by using the MaxCompute connector.

    numRecordsInPerSecond: the number of data records that are read by using the MaxCompute connector per second.

    numBytesIn: the total number of bytes of data that is read by using the MaxCompute connector after data is decompressed.

    numBytesInPerSecond: the number of bytes of data that is read by using the MaxCompute connector per second after data is decompressed.

  • Metrics for result tables

    numRecordsOut: the total number of data records that are written by using the MaxCompute connector.

    numRecordsOutPerSecond: the number of data records that are written by using the MaxCompute connector per second.

    numBytesOut: the total number of bytes of data that is written by using the MaxCompute connector before data is compressed.

    numBytesOutPerSecond: the number of bytes of data that is written by using the MaxCompute connector per second before data is compressed.

  • Metrics for dimension tables

    dim.odps.cacheSize: the number of data records that are cached in a dimension table.

Note

For more information about the metrics, see Metrics.

API type

DataStream API and SQL API.

Data update or deletion in a result table

If MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel is used, data can only be inserted into a result table. If MaxCompute Upsert Tunnel is used, data in a result table can be updated or deleted and data can be inserted into a result table.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.

  • The MaxCompute connector supports only the at-least-once semantics.

    Note

    The at-least-once semantics is used to prevent data losses. In specific cases, duplicate data may be written to MaxCompute. Duplicate data may be generated based on the tunnel that you use. For more information about MaxCompute Tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.

  • By default, a full MaxCompute source table is used. The source table reads data only from the partition that is specified by the partition parameter. After the source table reads all data from the partition, the source table stops running and does not monitor whether a new partition is generated and the status of the source table is changed to FINISHED.

    If you want the source table to continuously monitor whether new partitions are generated, specify the startPartition parameter in the WITH clause to use an incremental source table.

    Note
    • Each time a dimension table is updated, the dimension table checks for the latest partition.

    • After the source table starts to run, the source table does not read the data that is newly added to a partition. We recommend that you run a deployment when the partition data is complete.

Syntax

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    String

    Yes

    None

    The value of this parameter is odps.

    endpoint

    The endpoint of MaxCompute.

    String

    Yes

    None

    For more information, see Endpoints.

    tunnelEndpoint

    The endpoint of MaxCompute Tunnel.

    String

    No

    None

    For more information, see Endpoints.

    Note

    If this parameter is not specified, MaxCompute allocates the tunnel connections based on the Server Load Balancer (SLB) service.

    project

    The name of the MaxCompute project.

    String

    Yes

    None

    N/A

    schemaName

    The name of the MaxCompute schema.

    String

    No

    None

    This parameter is required only when the MaxCompute schema feature is enabled. You must set this parameter to the name of the schema of the MaxCompute table. For more information, see Schema-related operations.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    tableName

    The name of the MaxCompute table.

    String

    Yes

    None

    N/A

    accessId

    The AccessKey ID that is used to access MaxCompute.

    String

    Yes

    None

    For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage variables.

    accessKey

    The AccessKey secret that is used to access MaxCompute.

    String

    Yes

    None

    For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage variables.

    partition

    The name of the partition in a MaxCompute partitioned table.

    String

    No

    None

    You do not need to specify this parameter for a non-partitioned MaxCompute table or an incremental MaxCompute source table.

    Note

    For more information about how to specify the partition parameter for a partitioned MaxCompute table, see the "How do I configure the partition parameter when data is read from or written to partitions?" section of the FAQ about upstream and downstream storage topic.

    compressAlgorithm

    The compression algorithm that is used by MaxCompute Tunnel.

    String

    No

    • VVR 4.0.13 and later: ZLIB

    • VVR 6.0.1 and later: SNAPPY

    Valid values:

    • RAW (no compression)

    • ZLIB

    • SNAPPY

      Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    quotaName

    The name of the quota for the exclusive Tunnel resource groups of MaxCompute.

    String

    No

    None

    You can specify this parameter to use the exclusive Tunnel resource groups of MaxCompute.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.

    • If you specify this parameter, you must delete the tunnelEndpoint parameter. Otherwise, the tunnel that is specified by the tunnelEndpoint parameter is used.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    maxPartitionCount

    The maximum number of partitions from which data can be read.

    Integer

    No

    100

    If the number of partitions from which data is read exceeds the value of this parameter, the "The number of matched partitions exceeds the default limit"error message appears.

    Important

    If data is read from a large number of partitions of a partitioned MaxCompute table, the workload on the MaxCompute service is high. In this case, the startup speed of the deployment slows down. To prevent this issue, you need to check whether data needs to be read from a large number of partitions and specify the partition parameter based on your business requirements. If your business requires data from a large number of partitions, manually increase the value of the maxPartitionCount parameter.

    useArrow

    Specifies whether to use the Arrow format to read data.

    Boolean

    No

    false

    The Arrow format can be used to call the storage API operation of MaxCompute. For more information, see the "User interfaces and openness" section of the What is MaxCompute? topic.

    Important
    • This parameter takes effect only in batch deployments.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    splitSize

    The size of data that can be pulled at a time when the Arrow format is used to read data.

    MemorySize

    No

    256 MB

    Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    Important

    This parameter takes effect only in batch deployments.

    compressCodec

    The compression algorithm that is used when the Arrow format is used to read data.

    String

    No

    ""

    Valid values:

    • "" (no compression)

    • ZSTD

    • LZ4_FRAME

    Compared with no compression, the throughput can be improved if you specify a compression algorithm.

    Important
    • This parameter takes effect only in batch deployments.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    dynamicLoadBalance

    Specifies whether to enable dynamic allocation of shards.

    Boolean

    No

    false

    Valid values:

    • true

    • false

    Dynamic allocation of shards can improve the processing performance of different operators of Realtime Compute for Apache Flink and reduce the overall time required for reading the source table. However, dynamic allocation of shards may cause data skew because the total amount of data read by different operators is inconsistent.

    Important
    • This parameter takes effect only in batch deployments.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

  • Parameters only for incremental MaxCompute source tables

    The incremental source table monitors new partitions by intermittently polling the MaxCompute server to obtain all partition information. Before the source table reads data from new partitions, data writing in the partitions must be complete. For more information, see the "What do I do if an incremental MaxCompute source table detects a new partition and some data is not written to the partition?" section of the FAQ about upstream and downstream storage topic You can specify the startPartition parameter to specify the start partition from which data is read. Only data in the partitions whose alphabetical order is greater than or equal to the alphabetical order of the partition that is specified by the startPartition parameter is read. For example, the alphabetical order of the partition year=2023,month=10 is less than the alphabetical order of the partition year=2023,month=9. In this case, you can add a zero before the number of the month to the name of the partition that is declared in the code to ensure that the alphabetical order of the partition is valid. This way, you can change the value of the partition parameter from year=2023,month=9 to year=2023,month=09.

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    startPartition

    The start partition from which incremental data is read.

    String

    Yes

    None

    • If you specify this parameter, an incremental source table is used. As a result, the partition parameter is ignored.

    • If the source table is a multi-level partitioned table, you must configure the value of each partition column in descending order based on partition levels.

    Note

    For more information about how to specify the startPartition parameter, see the "How do I configure the startPartition parameter for an incremental MaxCompute source table?" section of the FAQ about upstream and downstream storage topic.

    subscribeIntervalInSec

    The interval at which MaxCompute is polled to obtain the information about partitions.

    Integer

    No

    30

    Unit: seconds.

    modifiedTableOperation

    The operation that is performed when data in a partition is modified during partition reading.

    Enum (NONE, SKIP)

    No

    NONE

    Download sessions are saved in checkpoints. Each time you resume a session from a checkpoint, Realtime Compute for Apache Flink attempts to resume the reading progress from the session. However, the session is unavailable because data in the partition is modified. In this case, the deployment is repeatedly restarted. To resolve this issue, you can specify this parameter. Valid values:

    • NONE: If you set this parameter to NONE, you must change the value of the startPartition parameter to make the alphabetical order of the partition that is specified by the startPartition parameter greater than the alphabetical order of the unavailable partition and start the deployment without states.

    • SKIP: If you do not want to start the deployment without states, you can set this parameter to SKIP. In this case, Realtime Compute for Apache Flink skips the unavailable partition when Realtime Compute for Apache Flink attempts to resume the session from the checkpoint.

    Important

    If you set this parameter to NONE or SKIP, the data that is read from the partition in which data is modified is retained, and the data that is not read is ignored.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    useStreamTunnel

    Specifies whether to use MaxCompute Streaming Tunnel to upload data.

    Boolean

    No

    false

    Valid values:

    • true: MaxCompute Streaming Tunnel is used to upload data.

    • false: MaxCompute Batch Tunnel is used to upload data.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    • For more information about how to select a tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.

    flushIntervalMs

    The interval at which the flush operation is performed in the buffer of a writer in MaxCompute Tunnel.

    Long

    No

    30000 (30 seconds)

    The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at an interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table when the size of the buffer data exceeds the specified threshold.

    If you use Streaming Tunnel, the data that is flushed is immediately written to the destination MaxCompute table. If you use Batch Tunnel, the data that is flushed is written to the destination MaxCompute table only after the checkpointing operation is complete. We recommend that you set this parameter to 0 to disable the scheduled flushing feature.

    Unit: milliseconds.

    Note

    This parameter can be used together with the batchSize parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.

    batchSize

    The interval at which a flush operation is performed in the buffer of a writer in MaxCompute Tunnel.

    Long

    No

    67108864 (64 MB)

    The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table when the size of the buffer data exceeds the value that is specified by the batchSize parameter.

    Unit: byte.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.

    • This parameter can be used together with the flushIntervalMs parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.

    numFlushThreads

    The number of threads that are used to flush data in the buffer of a writer in MaxCompute Tunnel.

    Integer

    No

    1

    Each MaxCompute sink creates the number of threads that is specified by the numFlushThreads parameter to flush data. If the value of this parameter is greater than 1, the data in different partitions can be flushed at the same time. This improves the flush operation efficiency.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.

    dynamicPartitionLimit

    The maximum number of dynamic partitions to which data can be written.

    Integer

    No

    100

    If the number of dynamic partitions to which data is written in the result table between two checkpointing operations exceeds the value of the dynamicPartitionLimit parameter, the "Too many dynamic partitions" error message appears.

    Important

    If data is written to a large number of partitions of a partitioned MaxCompute table, the workload on the MaxCompute service is high. In this case, the speed of the checkpointing operations slows down and data in the result table is flushed. To prevent this issue, you need to check whether data needs to be written to a large number of partitions. If your business requires data to be written to a large number of partitions, manually increase the value of the dynamicPartitionLimit parameter.

    retryTimes

    The maximum number of retries that can be performed for a request on the MaxCompute server.

    Integer

    No

    3

    The MaxCompute service may be unavailable for a short period of time when you create a session, submit a session, or data is flushed in the result table. If the MaxCompute service becomes unavailable, the MaxCompute server is requested based on the configuration of this parameter.

    sleepMillis

    The retry interval.

    Integer

    No

    1000

    Unit: milliseconds.

    enableUpsert

    Specifies whether to use MaxCompute Upsert Tunnel to upload data.

    Boolean

    No

    false

    Valid values:

    • true: MaxCompute Upsert Tunnel is used to process INSERT, UPDATE_AFTER, and DELETE data in Realtime Compute for Apache Flink.

    • false: MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel that is specified by the useStreamTunnel parameter is used to process INSERT and UPDATE_AFTER data in Realtime Compute for Apache Flink.

    Important
    • If an issue such as an error, a deployment failure, or a long-time processing fault occurs when a MaxCompute sink commits a session in upsert mode, we recommend that you set the Parallelism parameter of sink operators to a value that is less than or equal to 10.

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    upsertAsyncCommit

    Specifies whether to use the asynchronous mode when a MaxCompute sink commits a session in upsert mode.

    Boolean

    No

    false

    Valid values:

    • true: The asynchronous mode is used. If you use the asynchronous mode, the time that is consumed to commit the session is reduced but the data that is written after the session is committed cannot be immediately read.

    • false: The synchronous mode is used by default. When the MaxCompute sink commits the session, the system waits until the server processes the session.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    upsertCommitTimeoutMs

    The timeout period for which a MaxCompute sink commits a session in upsert mode.

    Integer

    No

    120000

    (120 seconds)

    Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

  • Parameters only for dimension tables

    When a deployment starts, a MaxCompute dimension table pulls full data from a partition that is specified by the partition parameter. You can set the partition parameter to max_pt(). If the cache is reloaded after the cache entries expire, data of the latest partition specified by the partition parameter is re-parsed. If the partition parameter is set to max_two_pt(), the dimension table can pull data from two partitions. If the partition parameter is not set to max_two_pt(), data of only one partition can be pulled.

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    cache

    The cache policy.

    String

    Yes

    None

    You must set the cache parameter to ALL for a MaxCompute dimension table and explicitly declare the setting in the DDL statement. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

    ALL: indicates that all data in the dimension table is cached. Before the system runs a deployment, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire.

    Note
    • If the cache parameter is set to ALL, you must increase the memory of the join node because the system asynchronously loads data of the dimension table. We recommend that you increase the size of the memory at least four times the amount of data in the remote table. The size of the memory is related to the MaxCompute storage compression algorithm.

    • If a dimension table contains a large amount of data, you can use the SHUFFLE_HASH hint to evenly distribute the data of the dimension table to each subtask. For more information, see the "How do I use the SHUFFLE_HASH hint for a dimension table?" section of the FAQ about upstream and downstream storage topic.

    • If you use an ultra-large MaxCompute dimension table, frequent garbage collections (GCs) of Java virtual machine (JVM) may cause deployment exceptions. To resolve this issue, you can increase the memory of the node on which the dimension table is joined with another table. If the issue persists, we recommend that you convert the dimension table to a key-value dimension table that supports the least recently used (LRU) cache policy. For example, you can use an ApsaraDB for HBase dimension table as the key-value dimension table.

    cacheSize

    The maximum number of rows of data that can be cached.

    Long

    No

    100000

    If the number of data records in the dimension table exceeds the value of the cacheSize parameter, the "Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit" error message appears.

    Important

    If a large number of data records exists in a dimension table, a large amount of JVM heap memory is consumed. In this case, the startup speed of deployments and the speed of the updates of the dimension table slow down. To prevent this issue, you need to check whether a large number of data records need to be cached. If your business requires a large number of data records to be cached in a dimension table, manually increase the value of this parameter.

    cacheTTLMs

    The cache timeout period.

    Long

    No

    Long.MAX_VALUE

    Unit: milliseconds.

    cacheReloadTimeBlackList

    The periods of time during which cache is not refreshed. The cache is not refreshed during the time periods specified by this parameter.

    String

    No

    None

    This parameter is applicable to large-scale online promotional events such as peak hours of activities. You can specify this parameter to prevent deployments from being unstable when the cache is refreshed. For more information about how to specify the parameter, see the "How do I configure the CacheReloadTimeBlackList parameter?" section of the FAQ about upstream and downstream storage topic.

    maxLoadRetries

    The maximum number of retries that can be performed to refresh the cache. The first time that data is pulled when the deployment is started, the cache is refreshed. If the number of retries exceeds the value of this parameter, the deployment fails to run.

    Integer

    No

    10

    N/A

Data type mappings

For more information about the data types that are supported by MaxCompute, see MaxCompute V2.0 data type edition.

Data type of MaxCompute

Data type of Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

Important

If a MaxCompute physical table contains fields of a nested composite data type of Realtime Compute for Apache Flink, such as ARRAY, MAP, or STRUCT, and contains fields of JSON type, you must specify tblproperties('columnar.nested.type'='true') when you create the MaxCompute physical table to allow Realtime Compute for Apache Flink to read data from and write data to the physical table.

Examples

SQL API

  • Source table examples

    • The following sample code shows how to read data from a full MaxCompute source table:

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • The following sample code shows how to read data from an incremental MaxCompute source table:

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        startPartition'='yyyy=2018, MM=09,dd=05' -- Incremental data is read from the partition 20180905.
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • Result table examples

    • The following sample code shows how to write data to static partitions:

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- Data is written to the static partition 20180905. 
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • The following sample code shows how to write data to dynamic partitions:

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' -- The partition value is not provided. This indicates that data is written to a partition specified by the ds field. 
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • Dimension table examples

    • The following sample code shows how to join a dimension table and another table:

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- You must declare a primary key when you join a dimension table and another table. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • The following sample code shows how to join a dimension table and multiple tables:

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- You do not need to declare a primary key when you join a dimension table and multiple tables. 
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream API

Important
  • If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.

  • To protect intellectual property, you can perform local debugging on a deployment that uses the MaxCompute DataStream connector for a maximum of 30 minutes in Realtime Compute for Apache Flink that uses VVR 6.0.6 or later. If local debugging takes more than 30 minutes, an error is returned and the deployment exits. For more information about how to run or debug a Realtime Compute for Apache Flink deployment that includes the MaxCompute connector in an on-premises environment, see Run or debug a Flink deployment that includes a connector in an on-premises environment.

  • If you submit a deployment in the development console of Realtime Compute for Apache Flink, an issue that MaxCompute-related classes cannot be found may occur. In this case, you must download the file whose fine name extension is uber.jar from the Maven central repository and add the file as an additional dependency of the deployment. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. You can call Table API operations to access MaxCompute tables or call DataStreaverica-connector-odps dependency of MaxCompute is 1.15-vvr-6.0.6, you can view the ververica-connector-odps-1.15-vvr-6.0.6-uber.jar package in the directory of the Maven repository and download the package to your on-premises directory.

We recommend that you declare a MaxCompute table by using SQL statements when you use the MaxCompute DataStream connector. You can call Table API operations to access MaxCompute tables or call DataStream API operations to access data streams.

The following sample code shows how to access a source table by using the DataStream connector:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

The following sample code shows how to access a result table by using the DataStream connector:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

The Maven dependencies of the MaxCompute connector contain the classes required to create full MaxCompute source tables, incremental MaxCompute source tables, MaxCompute result tables, and MaxCompute dimension tables. The MaxCompute DataStream connectors of different versions are stored in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ