This topic describes how to use the MaxCompute connector.
Background information
MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?
The following table describes the capabilities supported by the MaxCompute connector.
Item | Description |
Supported type | Source table, dimension table, and sink table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | DataStream API and SQL API |
Data update or deletion in a sink table | If MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel is used, data can only be inserted into a sink table. If MaxCompute Upsert Tunnel is used, data in a sink table can be updated or deleted and data can be inserted into a sink table. |
Prerequisites
A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the MaxCompute connector.
The MaxCompute connector supports only the at-least-once semantics.
NoteThe at-least-once semantics is used to prevent data losses. In specific cases, duplicate data may be written to MaxCompute. Duplicate data may be generated based on the tunnel that you use. For more information about MaxCompute Tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.
By default, a source operates in full mode reads data only from the partition specified by the partition parameter. Once all data from the partition is read, the deployment finishes and does not monitor for new partitions.
To continuously monitor for new partitions, create an incremental source by specifying the startPartition parameter in the WITH clause.
NoteEach time a dimension table is updated, the dimension table checks for the latest partition.
After the source table starts to run, the source table does not read the data that is newly added to a partition. We recommend that you run a deployment when the partition data is complete.
Syntax
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The table type.
STRING
Yes
No default value
Set the value to odps.
endpoint
The endpoint of MaxCompute.
STRING
Yes
No default value
For more information, see Endpoints.
tunnelEndpoint
The endpoint of MaxCompute Tunnel.
STRING
No
No default value
For more information, see Endpoints.
NoteIf this parameter is not specified, MaxCompute allocates tunnel connections based on the Server Load Balancer (SLB) service.
project
The name of the MaxCompute project.
STRING
Yes
No default value
N/A.
schemaName
The name of the MaxCompute schema.
STRING
No
No default value
This parameter is required only when the MaxCompute schema feature is enabled. In this case, you must set this parameter to the name of the schema of the MaxCompute table. For more information, see Schema-related operations.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
tableName
The name of the MaxCompute table.
STRING
Yes
No default value
N/A.
accessId
The AccessKey ID that is used to access MaxCompute.
STRING
Yes
No default value
For more information, see Console operations.
ImportantIn this example, the AccessKey ID and AccessKey secret are introduced as variables to prevent the risk of secret leaks. For more information,, see Namespace variables.
accessKey
The AccessKey secret that is used to access MaxCompute.
STRING
Yes
No default value
partition
The name of the partition in the MaxCompute table.
STRING
No
No default value
You do not need to specify this parameter for a non-partitioned MaxCompute table or an incremental source.
NoteFor more information about how to specify the partition parameter for a partitioned MaxCompute table, see the "How do I configure the partition parameter when data is read from or written to partitions?" section of the FAQ about upstream and downstream storage topic.
compressAlgorithm
The compression algorithm that is used by MaxCompute Tunnel.
STRING
No
ZLIB (for VVR 4.0.13 and later)
SNAPPY (for VVR 6.0.1 and later)
Valid values:
RAW (no compression)
ZLIB
SNAPPY
Compared with ZLIB, SNAPPY can significantly improve the throughput. In test scenarios, the throughput is increased by about 50%.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
quotaName
The name of the quota for the exclusive Tunnel resource groups of MaxCompute.
STRING
No
No default value
You can specify this parameter to use the exclusive Tunnel resource groups of MaxCompute.
ImportantOnly Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.
If you specify this parameter, you must delete the tunnelEndpoint parameter. Otherwise, the tunnel that is specified by the tunnelEndpoint parameter is used.
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
maxPartitionCount
The maximum number of partitions from which data can be read.
INTEGER
No
100
If the number of partitions from which data is read exceeds the value of this parameter, this error message appears:
"The number of matched partitions exceeds the default limit"
.ImportantIf data is read from a large number of partitions of a MaxCompute table, the workload on the MaxCompute service is high. In this case, the startup speed of the deployment slows down. To prevent this issue, you need to check whether data needs to be read from a large number of partitions and specify the partition parameter based on your business requirements. If your business requires data from a large number of partitions, manually increase the value of this parameter.
useArrow
Specifies whether to use the Arrow format to read data.
BOOLEAN
No
false
The Arrow format can be used to call the storage API operation of MaxCompute. For more information, see the "User interfaces and openness" section of the What is MaxCompute? topic.
ImportantThis parameter takes effect only in a batch deployment.
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
splitSize
The size of data that can be pulled at a time when the Arrow format is used to read data.
MEMORYSIZE
No
256 MB
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
ImportantThis parameter takes effect only in a batch deployment.
compressCodec
The compression algorithm that is used when the Arrow format is used to read data.
STRING
No
""
Valid values:
"" (no compression)
ZSTD
LZ4_FRAME
Compared with no compression, the throughput can be improved if you specify a compression algorithm.
ImportantThis parameter takes effect only in a batch deployment.
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
dynamicLoadBalance
Specifies whether to enable dynamic allocation of shards.
BOOLEAN
No
false
Valid values:
true
false
Dynamic allocation of shards can improve the processing performance of different operators of Realtime Compute for Apache Flink and reduce the overall time required for reading from MaxCompute. However, this may cause data skew because the total amount of data read by different operators is inconsistent.
ImportantThis parameter takes effect only in a batch deployment.
Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.
Parameters only for incremental MaxCompute source tables
The incremental table source monitors for new partitions by intermittently polling the MaxCompute server to obtain all partition information. Before the source reads data from new partitions, data writing in the partitions must be complete. For more information, see the "What do I do if an incremental MaxCompute source table detects a new partition when data is still being written to the partition?" section of the FAQ about upstream and downstream storage topic You can configure the startPartition parameter to specify the start partition from which data is read. Only data in the partitions whose alphabetical order is greater than or equal to the alphabetical order of the partition that is specified by the startPartition parameter is read. For example, the alphabetical order of the partition
year=2023,month=10
is less than the alphabetical order of the partitionyear=2023,month=9
. In this case, you can add a zero before the number of the month to the name of the partition that is declared in the code to ensure that the alphabetical order of the partition is valid. This way, you can change the value of the partition parameter from year=2023,month=9 toyear=2023,month=09
.Parameter
Description
Data type
Required
Default value
Remarks
startPartition
The start partition from which incremental data is read.
STRING
Yes
No default value
If you specify this parameter, the incremental source is used. As a result, the partition parameter is ignored.
If the source table is a multi-level partitioned table, you must configure the value of each partition column in descending order based on partition levels.
NoteFor more information about how to specify the startPartition parameter, see the "How do I configure the startPartition parameter for an incremental MaxCompute source table?" section of the FAQ about upstream and downstream storage topic.
subscribeIntervalInSec
The interval at which MaxCompute is polled to obtain the information about partitions.
INTEGER
No
30
Unit: seconds.
modifiedTableOperation
The operation that is performed when data in a partition is modified during partition reading.
Enum (NONE, SKIP)
No
NONE
Download sessions are saved in checkpoints. Each time you resume a session from a checkpoint, Realtime Compute for Apache Flink attempts to resume the reading progress from the session. However, the session is unavailable because data in the partition is modified. In this case, the deployment is repeatedly restarted. To resolve this issue, you can specify this parameter. Valid values:
NONE: If you set this parameter to NONE, you must change the value of the startPartition parameter to make the alphabetical order of the partition that is specified by the startPartition parameter greater than the alphabetical order of the unavailable partition and start the deployment without states.
SKIP: If you do not want to start the deployment without states, you can set this parameter to SKIP. In this case, Realtime Compute for Apache Flink skips the unavailable partition when it attempts to resume the session from the checkpoint.
ImportantOnly Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports this parameter.
If you set this parameter to NONE or SKIP, the data that is read from the partition in which data is modified is retained, and the data that is not read is ignored.
Parameters only for sink tables
Parameter
Description
Data type
Required
Default value
Remarks
useStreamTunnel
Specifies whether to use MaxCompute Streaming Tunnel to upload data.
BOOLEAN
No
false
Valid values:
true: MaxCompute Streaming Tunnel is used to upload data.
false: MaxCompute Batch Tunnel is used to upload data.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
For more information about how to select a tunnel, see the "How do I select a data tunnel?" section of the FAQ about upstream and downstream storage topic.
flushIntervalMs
The interval at which the flush operation is performed in the buffer of a writer in MaxCompute Tunnel.
LONG
No
30000 (30 seconds)
The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table at the interval that is specified by the flushIntervalMs parameter. The sink also writes the data to the destination table in MaxCompute when the size of the buffer data exceeds the specified threshold.
If you use Streaming Tunnel, the data that is flushed to the destination MaxCompute table becomes available immediately. If you use Batch Tunnel, flushed data is not available until the checkpointing operation is complete. We recommend that you set this parameter to 0 to disable the scheduled flushing feature.
Unit: milliseconds.
NoteThis parameter can be used together with the batchSize parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.
batchSize
The interval at which a flush operation is performed in the buffer of a writer in MaxCompute Tunnel.
LONG
No
67108864 (64 MB)
The MaxCompute sink inserts data into the buffer. Then, the MaxCompute sink writes the data in the buffer to the destination MaxCompute table when the size of the buffer data exceeds the value that is specified by the batchSize parameter.
Unit: bytes.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.
This parameter can be used together with the flushIntervalMs parameter. The flush operation is triggered when the condition that is specified by the batchSize parameter or the flushIntervalMs parameter is met.
numFlushThreads
The number of threads that are used to flush data in the buffer of a writer in MaxCompute Tunnel.
INTEGER
No
1
Each MaxCompute sink creates the number of threads that is specified by the numFlushThreads parameter to flush data. If the value of this parameter is greater than 1, the data in different partitions can be flushed at the same time. This improves the flush operation efficiency.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.14 or later supports this parameter.
dynamicPartitionLimit
The maximum number of dynamic partitions to which data can be written.
INTEGER
No
100
If the number of dynamic partitions to which data is written from the sink between two checkpoints exceeds the value of the dynamicPartitionLimit parameter, this error message appears:
"Too many dynamic partitions"
.ImportantIf data is written to a large number of partitions of a MaxCompute table, the workload on the MaxCompute service is high, slowing down checkpointing and flushing. To prevent this issue, you need to check whether data needs to be written to a large number of partitions. If your business requires data to be written to a large number of partitions, manually increase the value of the dynamicPartitionLimit parameter.
retryTimes
The maximum number of retries that can be performed for a request on the MaxCompute server.
INTEGER
No
3
The MaxCompute service may be unavailable for a short period of time when you create a session, submit a session, or data is flushed. If the MaxCompute service becomes unavailable, the MaxCompute server is requested based on the configuration of this parameter.
sleepMillis
The retry interval.
INTEGER
No
1000
Unit: milliseconds.
enableUpsert
Specifies whether to use MaxCompute Upsert Tunnel to upload data.
BOOLEAN
No
false
Valid values:
true: MaxCompute Upsert Tunnel is used to process INSERT, UPDATE_AFTER, and DELETE data in Realtime Compute for Apache Flink.
false: MaxCompute Batch Tunnel or MaxCompute Streaming Tunnel that is specified by the useStreamTunnel parameter is used to process INSERT and UPDATE_AFTER data in Realtime Compute for Apache Flink.
ImportantIf an issue such as an error, a deployment failure, or a long-time processing fault occurs when the MaxCompute sink commits a session in upsert mode, we recommend that you set the Parallelism parameter of sink operators to a value that is less than or equal to 10.
Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
upsertAsyncCommit
Specifies whether to use the asynchronous mode when a MaxCompute sink commits a session in upsert mode.
BOOLEAN
No
false
Valid values:
true: The asynchronous mode is used. If you use the asynchronous mode, the time that is consumed to commit the session is reduced but the data that is written after the session is committed cannot be immediately queried.
false: The synchronous mode is used by default. When the MaxCompute sink commits the session, the system waits until the server processes the session.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
upsertCommitTimeoutMs
The timeout period for which a MaxCompute sink commits a session in upsert mode.
INTEGER
No
120000
(120 seconds)
Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
sink.operation
The write operation mode for a Delta table.
STRING
No
insert
Valid values:
insert: Data is written to the table in append mode.
upsert: Data is updated.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
sink.parallelism
The degree of parallelism when data is written to a Delta table.
INTEGER
No
None
The degree of data writing parallelism. If you do not configure this parameter, the upstream data parallelism is used by default.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
ImportantMake sure that the value of the write.bucket.num parameter is an integral multiple of the value of the sink.parallelism parameter. This helps ensure the optimal write performance and efficiently saves memory of the sink node.
sink.file-cached.enable
Specifies whether to enable the file cache mode when data is written to dynamic partitions of a Delta table.
BOOLEAN
No
false
Valid values:
true: The file cache mode is enabled.
false: The file cache mode is disabled.
If you enable the file cache mode, the number of small files that are written to the server is reduced. However, a higher writing latency exists. We recommend that you enable the file cache mode when the sink table has a high degree of parallelism.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
sink.file-cached.writer.num
The number of threads that are used to concurrently upload data in a task in file cache mode.
INTEGER
No
16
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
We recommend that you do not increase the value of this parameter to a large value. If data is written to a large number of partitions at the same time, an out of memory (OOM) error may occur.
sink.bucket.check-interval
The interval at which the file size is checked in file cache mode. Unit: milliseconds.
INTEGER
No
60000
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
sink.file-cached.rolling.max-size
The maximum value of a single cached file in file cache mode.
MEMORYSIZE
No
16 MB
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
If the file size exceeds the value of this parameter, the file data is uploaded to the server.
sink.file-cached.memory
The maximum size of off-heap memory used to write data to files in file cache mode.
MEMORYSIZE
No
64 MB
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
sink.file-cached.memory.segment-size
The size of the buffer used to write data to files in file cache mode.
MEMORYSIZE
No
128 KB
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
sink.file-cached.flush.always
Specifies whether the cache is used for writing data to files in file cache mode.
BOOLEAN
No
true
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
sink.file-cached.write.max-retries
The number of retries for uploading data in file cache mode.
INTEGER
No
3
This parameter takes effect only if the sink.file-cached.enable parameter is set to true.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.writer.max-retries
The maximum number of retries for writing data to a bucket in an Upsert Writer session.
INTEGER
No
3
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.writer.buffer-size
The buffer size of an Upsert Writer session in Realtime Compute for Apache Flink.
MEMORYSIZE
No
64 MB
When the total buffer size of all buckets reaches the specified threshold, the system automatically updates data to the server.
NoteData in an Upsert Writer session can be written to multiple buckets at the same time. We recommend that you increase the value of this parameter to improve write efficiency.
If data is written to a large number of partitions, an OOM error may occur. To prevent this issue, you can decrease the value of this parameter.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.writer.bucket.buffer-size
The buffer size of a single bucket in Realtime Compute for Apache Flink.
MEMORYSIZE
No
1 MB
If the memory resources of the Flink server are insufficient, you can decrease the value of this parameter.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.write.bucket.num
The number of buckets for the table to which data is written.
INTEGER
Yes
None
The value of this parameter must be the same as the value of the
write.bucket.num
parameter that is configured for the Delta table to which data is written.Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.write.slot-num
The number of Tunnel slots used in a session.
INTEGER
No
1
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.commit.max-retries
The maximum number of retries for an upsert session commit.
INTEGER
No
3
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.commit.thread-num
The degree of parallelism of upsert session commits.
INTEGER
No
16
We recommend that you do not increase the value of this parameter to a large value. If excessive upsert session commits are performed at the same time, resource consumption increases. This may cause performance issues or excessive resource consumption.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.commit.timeout
The timeout period for an upsert session commit. Unit: seconds.
INTEGER
No
600
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.flush.concurrent
The maximum number of buckets to which data in a partition can be written at the same time.
INTEGER
No
2
A Tunnel slot is occupied each time data in a bucket is refreshed.
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
insert.commit.thread-num
The degree of parallelism of commit sessions.
INTEGER
No
16
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
insert.arrow-writer.enable
Specifies whether to use the Arrow format.
BOOLEAN
No
false
Valid values:
true: The Arrow format is used.
false: The Arrow format is not used.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
insert.arrow-writer.batch-size
The maximum number of rows in a batch of Arrow-formatted data.
INTEGER
No
512
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
insert.arrow-writer.flush-interval
The interval at which a writer flushes data. Unit: milliseconds.
INTEGER
No
100000
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
insert.writer.buffer-size
The cache size for the buffered writer.
MEMORYSIZE
No
64 MB
Only Realtime Compute for Apache Flink that uses VVR 8.0.10 or later supports this parameter.
upsert.partial-column.enable
Specifies whether to update only data in specific columns.
BOOLEAN
No
false
This parameter is used for only sink tables that are Delta tables. For more information, see Update data in specific columns. Valid values:
true: Only data in specific columns is updated.
false: Data in all columns is updated.
Data writing varies based on whether the primary key for updating data exists in the sink table.
If the sink table contains data that has the same primary key, the data is updated based on the primary key. You can update the data by using a value specifying that the specified column is not
null
.If the sink table does not contain data that has the same primary key, a data record is added based on the primary key, the value of the specified column is inserted into the sink table, and
null
is inserted for columns other than the specified column.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.11 or later supports this parameter.
Parameters only for dimension tables
When a deployment starts, the dimension table pulls full data from a partition that is specified by the partition parameter. This parameter supports the max_pt() function. If the cache is reloaded after the cache entries expire, data of the latest partition specified by the partition parameter is re-parsed. If the partition parameter is set to max_two_pt(), the dimension table can pull data from two partitions. If the partition parameter is not set to max_two_pt(), data of only one partition can be pulled.
Parameter
Description
Data type
Required
Default value
Remarks
cache
The cache policy.
STRING
Yes
No default value
You must set the cache parameter to
ALL
for a dimension table and explicitly declare the setting in the DDL statement. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source and dimension table cannot be associated based on the ON clause.ALL: indicates that all data in the dimension table is cached. Before the system runs a deployment, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire.
NoteIf the cache parameter is set to ALL, you must increase the memory of the join node because the system asynchronously loads data of the dimension table. We recommend that you increase the size of the memory at least four times the amount of data in the remote table. The size of the memory is related to the MaxCompute storage compression algorithm.
If a dimension table contains a large amount of data, you can use the SHUFFLE_HASH hint to evenly distribute the data to each subtask. For more information, see the "How do I use the SHUFFLE_HASH hint for a dimension table?" section of the FAQ about upstream and downstream storage topic.
If you use an ultra-large dimension table, frequent garbage collections (GCs) of Java virtual machine (JVM) may cause deployment exceptions. To resolve this issue, you can increase the memory of the node on which the dimension table is joined with another table. If the issue persists, we recommend that you convert the dimension table to a key-value dimension table that supports the least recently used (LRU) cache policy. For example, you can use an ApsaraDB for HBase dimension table as the key-value dimension table.
cacheSize
The maximum number of rows of data that can be cached.
LONG
No
100000
If the number of data records in the dimension table exceeds the value of the cacheSize parameter, this error message appears:
"Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit"
.ImportantIf a large number of data records exists in a dimension table, a large amount of JVM heap memory is consumed. In this case, the startup speed of deployments and the speed of the updates of the dimension table slow down. To prevent this issue, you need to check whether a large number of data records need to be cached. If your business requires a large number of data records to be cached in a dimension table, manually increase the value of this parameter.
cacheTTLMs
The cache timeout period.
LONG
No
Long.MAX_VALUE
Unit: milliseconds.
cacheReloadTimeBlackList
The periods of time during which cache is not refreshed. The cache is not refreshed during the time periods specified by this parameter.
STRING
No
No default value
This parameter is applicable to large-scale online promotional events such as peak hours of activities. You can specify this parameter to prevent deployments from being unstable when the cache is refreshed. For more information about how to specify the parameter, see the "How do I configure the CacheReloadTimeBlackList parameter?" section of the FAQ about upstream and downstream storage topic.
maxLoadRetries
The maximum number of retries that can be performed to refresh the cache. The first time that data is pulled when the deployment is started, the cache is refreshed. If the number of retries exceeds the value of this parameter, the deployment fails to run.
INTEGER
No
10
N/A.
Data type mappings
For more information about the data types that are supported by MaxCompute, see MaxCompute V2.0 data type edition.
Data type of MaxCompute | Data type of Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
If a MaxCompute physical table contains a field of a nested composite data type (ARRAY, MAP, or STRUCT, etc) and a field of JSON type, you must specify tblproperties('columnar.nested.type'='true')
when you create the MaxCompute physical table to allow Realtime Compute for Apache Flink to read data from and write data to the physical table correctly.
Examples
SQL API
Sample code for a source table
Read data from a full MaxCompute source table:
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
Read data from an incremental MaxCompute source table:
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', startPartition'='yyyy=2018, MM=09,dd=05' -- Data is read from the partition 20180905. ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
Sample code for a sink table
Write data to static partitions:
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905' -- Data is written to the static partition 20180905. ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
Write data to dynamic partitions:
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR -- The partition key column that you use to create dynamic partitions must be explicitly specified. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds' -- The partition value is not provided. This indicates that data is written to a partition specified by the ds field. ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
Sample code for a dimension table
Join a dimension table and another table:
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- You must declare a primary key when you join a dimension table and another table. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
Join a dimension table and multiple tables:
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- You do not need to declare a primary key when you join a dimension table and multiple tables. ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.
To protect intellectual property, you can perform local debugging on a deployment that uses the MaxCompute DataStream connector for a maximum of 30 minutes in Realtime Compute for Apache Flink that uses VVR 6.0.6 or later. If local debugging takes more than 30 minutes, an error is returned and the deployment exits. For more information about how to run or debug a Realtime Compute for Apache Flink deployment that includes the MaxCompute connector in an on-premises environment, see Run or debug a Flink deployment that includes a connector in an on-premises environment.
If you submit a deployment in the development console of Realtime Compute for Apache Flink, an issue that MaxCompute-related classes cannot be found may occur. In this case, you must download the file with the uber.jar name extension from the Maven central repository, and add the file as an additional dependency of the deployment. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. For example, if the version of the ververica-connector-odps dependency of MaxCompute is 1.15-vvr-6.0.6, you can view the ververica-connector-odps-1.15-vvr-6.0.6-uber.jar package in the directory of the Maven repository and download the package to your on-premises directory.
We recommend that you declare a MaxCompute table by using SQL statements when you use the MaxCompute DataStream connector. You can call Table API operations to access MaxCompute tables or call DataStream API operations to access data streams.
Connect to the source table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
Connect to the sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
XML
The Maven dependencies of the MaxCompute connector contain the classes required to create the full source, incremental source, sink, and dimension table. The MaxCompute DataStream connectors of different versions are stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>
References
How do full and incremental MaxCompute source tables read data from MaxCompute?
How do I configure the startPartition parameter for an incremental MaxCompute source table?
How do I configure the partition parameter when data is read from or written to partitions?
What do I do if duplicate data is written to a MaxCompute sink table?
What is the difference between max_pt() and max_pt_with_done()?