This topic describes the syntax and parameters in the WITH clause of the MaxCompute connector and provides examples on how to use the MaxCompute connector.
Background information
MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?
The following table describes the capabilities supported by the MaxCompute connector.
Item | Description |
Table type | Source table, dimension table, and result table. |
Running mode | Streaming mode and batch mode. |
Data format | N/A. |
Metric |
Note For more information about the metrics, see Metrics. |
API type | DataStream API and SQL API. |
Data update or deletion in a result table | If MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel is used, data can only be inserted into a result table. If MaxCompute Upsert Tunnel is used, data in a result table can be updated or deleted and data can be inserted into a result table. |
Prerequisites
A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.
The MaxCompute connector supports only the at-least-once semantics.
NoteThe at-least-once semantics is used to prevent data losses. In specific cases, duplicate data may be written to MaxCompute. Duplicate data may be generated based on the tunnel that you use. For more information about MaxCompute Tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.
By default, a full MaxCompute source table is used. The source table reads data only from the partition that is specified by the partition parameter. After the source table reads all data from the partition, the source table stops running and does not monitor whether a new partition is generated and the status of the source table is changed to FINISHED.
If you want the source table to continuously monitor whether new partitions are generated, specify the startPartition parameter in the WITH clause to use an incremental source table.
NoteEach time a dimension table is updated, the dimension table checks for the latest partition.
After the source table starts to run, the source table does not read the data that is newly added to a partition. We recommend that you run a deployment when the partition data is complete.
Syntax
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
String
Yes
None
The value of this parameter is odps.
endpoint
The endpoint of MaxCompute.
String
Yes
None
For more information, see Endpoints.
tunnelEndpoint
The endpoint of MaxCompute Tunnel.
String
No
None
For more information, see Endpoints.
NoteIf this parameter is not specified, MaxCompute allocates the tunnel connections based on the Server Load Balancer (SLB) service.
project
The name of the MaxCompute project.
String
Yes
None
N/A
schemaName
The name of the MaxCompute schema.
String
No
None
This parameter is required only when the MaxCompute schema feature is enabled. You must set this parameter to the name of the schema of the MaxCompute table. For more information, see Schema-related operations.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
tableName
The name of the MaxCompute table.
String
Yes
None
N/A
accessId
The AccessKey ID that is used to access MaxCompute.
String
Yes
None
For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.
ImportantTo protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage variables.
accessKey
The AccessKey secret that is used to access MaxCompute.
String
Yes
None
For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.
ImportantTo protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage variables.
partition
The name of the partition in a MaxCompute partitioned table.
String
No
None
You do not need to specify this parameter for a non-partitioned MaxCompute table or an incremental MaxCompute source table.
NoteFor more information about how to specify the partition parameter for a partitioned MaxCompute table, see the "How do I configure the partition parameter when data is read from or written to partitions?" section of the FAQ about upstream and downstream storage topic.
compressAlgorithm
The compression algorithm that is used by MaxCompute Tunnel.
String
No
VVR 4.0.13 and later: ZLIB
VVR 6.0.1 and later: SNAPPY
Valid values:
RAW (no compression)
ZLIB
SNAPPY
Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
quotaName
The name of the quota for the exclusive Tunnel resource groups of MaxCompute.
String
No
None
You can specify this parameter to use the exclusive Tunnel resource groups of MaxCompute.
ImportantOnly Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.
If you specify this parameter, you must delete the tunnelEndpoint parameter. Otherwise, the tunnel that is specified by the tunnelEndpoint parameter is used.
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
maxPartitionCount
The maximum number of partitions from which data can be read.
Integer
No
100
If the number of partitions from which data is read exceeds the value of this parameter, the
"The number of matched partitions exceeds the default limit"
error message appears.ImportantIf data is read from a large number of partitions of a partitioned MaxCompute table, the workload on the MaxCompute service is high. In this case, the startup speed of the deployment slows down. To prevent this issue, you need to check whether data needs to be read from a large number of partitions and specify the partition parameter based on your business requirements. If your business requires data from a large number of partitions, manually increase the value of the maxPartitionCount parameter.
useArrow
Specifies whether to use the Arrow format to read data.
Boolean
No
false
The Arrow format can be used to call the storage API operation of MaxCompute. For more information, see the "User interfaces and openness" section of the What is MaxCompute? topic.
ImportantThis parameter takes effect only in batch deployments.
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
splitSize
The size of data that can be pulled at a time when the Arrow format is used to read data.
MemorySize
No
256 MB
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
ImportantThis parameter takes effect only in batch deployments.
compressCodec
The compression algorithm that is used when the Arrow format is used to read data.
String
No
""
Valid values:
"" (no compression)
ZSTD
LZ4_FRAME
Compared with no compression, the throughput can be improved if you specify a compression algorithm.
ImportantThis parameter takes effect only in batch deployments.
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
dynamicLoadBalance
Specifies whether to enable dynamic allocation of shards.
Boolean
No
false
Valid values:
true
false
Dynamic allocation of shards can improve the processing performance of different operators of Realtime Compute for Apache Flink and reduce the overall time required for reading the source table. However, dynamic allocation of shards may cause data skew because the total amount of data read by different operators is inconsistent.
ImportantThis parameter takes effect only in batch deployments.
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
Parameters only for incremental MaxCompute source tables
The incremental source table monitors new partitions by intermittently polling the MaxCompute server to obtain all partition information. Before the source table reads data from new partitions, data writing in the partitions must be complete. For more information, see the "What do I do if an incremental MaxCompute source table detects a new partition and some data is not written to the partition?" section of the FAQ about upstream and downstream storage topic You can specify the startPartition parameter to specify the start partition from which data is read. Only data in the partitions whose alphabetical order is greater than or equal to the alphabetical order of the partition that is specified by the startPartition parameter is read. For example, the alphabetical order of the partition
year=2023,month=10
is less than the alphabetical order of the partitionyear=2023,month=9
. In this case, you can add a zero before the number of the month to the name of the partition that is declared in the code to ensure that the alphabetical order of the partition is valid. This way, you can change the value of the partition parameter from year=2023,month=9 toyear=2023,month=09
.Parameter
Description
Data type
Required
Default value
Remarks
startPartition
The start partition from which incremental data is read.
String
Yes
None
If you specify this parameter, an incremental source table is used. As a result, the partition parameter is ignored.
If the source table is a multi-level partitioned table, you must configure the value of each partition column in descending order based on partition levels.
NoteFor more information about how to specify the startPartition parameter, see the "How do I configure the startPartition parameter for an incremental MaxCompute source table?" section of the FAQ about upstream and downstream storage topic.
subscribeIntervalInSec
The interval at which MaxCompute is polled to obtain the information about partitions.
Integer
No
30
Unit: seconds.
modifiedTableOperation
The operation that is performed when data in a partition is modified during partition reading.
Enum (NONE, SKIP)
No
NONE
Download sessions are saved in checkpoints. Each time you resume a session from a checkpoint, Realtime Compute for Apache Flink attempts to resume the reading progress from the session. However, the session is unavailable because data in the partition is modified. In this case, the deployment is repeatedly restarted. To resolve this issue, you can specify this parameter. Valid values:
NONE: If you set this parameter to NONE, you must change the value of the startPartition parameter to make the alphabetical order of the partition that is specified by the startPartition parameter greater than the alphabetical order of the unavailable partition and start the deployment without states.
SKIP: If you do not want to start the deployment without states, you can set this parameter to SKIP. In this case, Realtime Compute for Apache Flink skips the unavailable partition when Realtime Compute for Apache Flink attempts to resume the session from the checkpoint.
ImportantIf you set this parameter to NONE or SKIP, the data that is read from the partition in which data is modified is retained, and the data that is not read is ignored.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
useStreamTunnel
Specifies whether to use MaxCompute Streaming Tunnel to upload data.
Boolean
No
false
Valid values:
true: MaxCompute Streaming Tunnel is used to upload data.
false: MaxCompute Batch Tunnel is used to upload data.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
For more information about how to select a tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.
flushIntervalMs
The interval at which the flush operation is performed in the buffer of a writer in MaxCompute Tunnel.
Long
No
30000 (30 seconds)
The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at an interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table when the size of the buffer data exceeds the specified threshold.
If you use Streaming Tunnel, the data that is flushed is immediately written to the destination MaxCompute table. If you use Batch Tunnel, the data that is flushed is written to the destination MaxCompute table only after the checkpointing operation is complete. We recommend that you set this parameter to 0 to disable the scheduled flushing feature.
Unit: milliseconds.
NoteThis parameter can be used together with the batchSize parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.
batchSize
The interval at which a flush operation is performed in the buffer of a writer in MaxCompute Tunnel.
Long
No
67108864 (64 MB)
The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table when the size of the buffer data exceeds the value that is specified by the batchSize parameter.
Unit: byte.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.
This parameter can be used together with the flushIntervalMs parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.
numFlushThreads
The number of threads that are used to flush data in the buffer of a writer in MaxCompute Tunnel.
Integer
No
1
Each MaxCompute sink creates the number of threads that is specified by the numFlushThreads parameter to flush data. If the value of this parameter is greater than 1, the data in different partitions can be flushed at the same time. This improves the flush operation efficiency.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.
dynamicPartitionLimit
The maximum number of dynamic partitions to which data can be written.
Integer
No
100
If the number of dynamic partitions to which data is written in the result table between two checkpointing operations exceeds the value of the dynamicPartitionLimit parameter, the
"Too many dynamic partitions"
error message appears.ImportantIf data is written to a large number of partitions of a partitioned MaxCompute table, the workload on the MaxCompute service is high. In this case, the speed of the checkpointing operations slows down and data in the result table is flushed. To prevent this issue, you need to check whether data needs to be written to a large number of partitions. If your business requires data to be written to a large number of partitions, manually increase the value of the dynamicPartitionLimit parameter.
retryTimes
The maximum number of retries that can be performed for a request on the MaxCompute server.
Integer
No
3
The MaxCompute service may be unavailable for a short period of time when you create a session, submit a session, or data is flushed in the result table. If the MaxCompute service becomes unavailable, the MaxCompute server is requested based on the configuration of this parameter.
sleepMillis
The retry interval.
Integer
No
1000
Unit: milliseconds.
enableUpsert
Specifies whether to use MaxCompute Upsert Tunnel to upload data.
Boolean
No
false
Valid values:
true: MaxCompute Upsert Tunnel is used to process INSERT, UPDATE_AFTER, and DELETE data in Realtime Compute for Apache Flink.
false: MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel that is specified by the useStreamTunnel parameter is used to process INSERT and UPDATE_AFTER data in Realtime Compute for Apache Flink.
ImportantIf an issue such as an error, a deployment failure, or a long-time processing fault occurs when a MaxCompute sink commits a session in upsert mode, we recommend that you set the Parallelism parameter of sink operators to a value that is less than or equal to 10.
Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
upsertAsyncCommit
Specifies whether to use the asynchronous mode when a MaxCompute sink commits a session in upsert mode.
Boolean
No
false
Valid values:
true: The asynchronous mode is used. If you use the asynchronous mode, the time that is consumed to commit the session is reduced but the data that is written after the session is committed cannot be immediately read.
false: The synchronous mode is used by default. When the MaxCompute sink commits the session, the system waits until the server processes the session.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
upsertCommitTimeoutMs
The timeout period for which a MaxCompute sink commits a session in upsert mode.
Integer
No
120000
(120 seconds)
Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
Parameters only for dimension tables
When a deployment starts, a MaxCompute dimension table pulls full data from a partition that is specified by the partition parameter. You can set the partition parameter to max_pt(). If the cache is reloaded after the cache entries expire, data of the latest partition specified by the partition parameter is re-parsed. If the partition parameter is set to max_two_pt(), the dimension table can pull data from two partitions. If the partition parameter is not set to max_two_pt(), data of only one partition can be pulled.
Parameter
Description
Data type
Required
Default value
Remarks
cache
The cache policy.
String
Yes
None
You must set the cache parameter to
ALL
for a MaxCompute dimension table and explicitly declare the setting in the DDL statement. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.ALL: indicates that all data in the dimension table is cached. Before the system runs a deployment, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire.
NoteIf the cache parameter is set to ALL, you must increase the memory of the join node because the system asynchronously loads data of the dimension table. We recommend that you increase the size of the memory at least four times the amount of data in the remote table. The size of the memory is related to the MaxCompute storage compression algorithm.
If a dimension table contains a large amount of data, you can use the SHUFFLE_HASH hint to evenly distribute the data of the dimension table to each subtask. For more information, see the "How do I use the SHUFFLE_HASH hint for a dimension table?" section of the FAQ about upstream and downstream storage topic.
If you use an ultra-large MaxCompute dimension table, frequent garbage collections (GCs) of Java virtual machine (JVM) may cause deployment exceptions. To resolve this issue, you can increase the memory of the node on which the dimension table is joined with another table. If the issue persists, we recommend that you convert the dimension table to a key-value dimension table that supports the least recently used (LRU) cache policy. For example, you can use an ApsaraDB for HBase dimension table as the key-value dimension table.
cacheSize
The maximum number of rows of data that can be cached.
Long
No
100000
If the number of data records in the dimension table exceeds the value of the cacheSize parameter, the
"Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit"
error message appears.ImportantIf a large number of data records exists in a dimension table, a large amount of JVM heap memory is consumed. In this case, the startup speed of deployments and the speed of the updates of the dimension table slow down. To prevent this issue, you need to check whether a large number of data records need to be cached. If your business requires a large number of data records to be cached in a dimension table, manually increase the value of this parameter.
cacheTTLMs
The cache timeout period.
Long
No
Long.MAX_VALUE
Unit: milliseconds.
cacheReloadTimeBlackList
The periods of time during which cache is not refreshed. The cache is not refreshed during the time periods specified by this parameter.
String
No
None
This parameter is applicable to large-scale online promotional events such as peak hours of activities. You can specify this parameter to prevent deployments from being unstable when the cache is refreshed. For more information about how to specify the parameter, see the "How do I configure the CacheReloadTimeBlackList parameter?" section of the FAQ about upstream and downstream storage topic.
maxLoadRetries
The maximum number of retries that can be performed to refresh the cache. The first time that data is pulled when the deployment is started, the cache is refreshed. If the number of retries exceeds the value of this parameter, the deployment fails to run.
Integer
No
10
N/A
Data type mappings
For more information about the data types that are supported by MaxCompute, see MaxCompute V2.0 data type edition.
Data type of MaxCompute | Data type of Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
If a MaxCompute physical table contains fields of a nested composite data type of Realtime Compute for Apache Flink, such as ARRAY, MAP, or STRUCT, and contains fields of JSON type, you must specify tblproperties('columnar.nested.type'='true')
when you create the MaxCompute physical table to allow Realtime Compute for Apache Flink to read data from and write data to the physical table.
Examples
SQL API
Source table examples
The following sample code shows how to read data from a full MaxCompute source table:
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
The following sample code shows how to read data from an incremental MaxCompute source table:
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', startPartition'='yyyy=2018, MM=09,dd=05' -- Incremental data is read from the partition 20180905. ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
Result table examples
The following sample code shows how to write data to static partitions:
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905' -- Data is written to the static partition 20180905. ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
The following sample code shows how to write data to dynamic partitions:
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds' -- The partition value is not provided. This indicates that data is written to a partition specified by the ds field. ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
Dimension table examples
The following sample code shows how to join a dimension table and another table:
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- You must declare a primary key when you join a dimension table and another table. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
The following sample code shows how to join a dimension table and multiple tables:
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- You do not need to declare a primary key when you join a dimension table and multiple tables. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.
To protect intellectual property, you can perform local debugging on a deployment that uses the MaxCompute DataStream connector for a maximum of 30 minutes in Realtime Compute for Apache Flink that uses VVR 6.0.6 or later. If local debugging takes more than 30 minutes, an error is returned and the deployment exits. For more information about how to run or debug a Realtime Compute for Apache Flink deployment that includes the MaxCompute connector in an on-premises environment, see Run or debug a Flink deployment that includes a connector in an on-premises environment.
If you submit a deployment in the development console of Realtime Compute for Apache Flink, an issue that MaxCompute-related classes cannot be found may occur. In this case, you must download the file whose fine name extension is uber.jar from the Maven central repository and add the file as an additional dependency of the deployment. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. You can call Table API operations to access MaxCompute tables or call DataStreaverica-connector-odps dependency of MaxCompute is 1.15-vvr-6.0.6, you can view the ververica-connector-odps-1.15-vvr-6.0.6-uber.jar package in the directory of the Maven repository and download the package to your on-premises directory.
We recommend that you declare a MaxCompute table by using SQL statements when you use the MaxCompute DataStream connector. You can call Table API operations to access MaxCompute tables or call DataStream API operations to access data streams.
The following sample code shows how to access a source table by using the DataStream connector:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
The following sample code shows how to access a result table by using the DataStream connector:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
XML
The Maven dependencies of the MaxCompute connector contain the classes required to create full MaxCompute source tables, incremental MaxCompute source tables, MaxCompute result tables, and MaxCompute dimension tables. The MaxCompute DataStream connectors of different versions are stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>
FAQ
How do full and incremental MaxCompute source tables read data from MaxCompute?
How do I configure the startPartition parameter for an incremental MaxCompute source table?
How do I configure the partition parameter when data is read from or written to partitions?
What do I do if duplicate data is written to a MaxCompute sink table?
What is the difference between max_pt() and max_pt_with_done()?