This topic describes the syntax, WITH parameters, and usage examples of the MaxCompute connector.
Background information
MaxCompute (formerly ODPS) is a fast, fully managed data warehouse solution for exabyte-scale data. It stores and computes large volumes of structured data and provides data warehouse solutions, analysis, and modeling services.
The MaxCompute connector supports the following features.
Category | Details |
Supported types | Source table, dimension table, and sink table |
Run modes | Stream and batch modes |
Data format | Not supported |
Specific monitoring metrics | |
API types | DataStream and SQL |
Supports updating or deleting data in sink tables | Batch Tunnel and Stream Tunnel modes support only data insertion. Upsert Tunnel mode supports data insertion, update, and deletion. |
Prerequisites
You have created a MaxCompute table. For more information, see Create table.
Limits
The MaxCompute connector supports only at-least-once semantics.
NoteAt-least-once semantics ensure that no data is lost. However, in rare cases, duplicate data may be written to MaxCompute. The likelihood of this depends on the MaxCompute Tunnel type. For more information about MaxCompute Tunnels, see Which data tunnel should I choose?.
By default, a source table is in full data mode. It reads only the partitions specified in the `partition` parameter. After all data is read, the job finishes. The connector does not monitor for new partitions.
To continuously monitor for new partitions, you can use the incremental source table mode by specifying the `startPartition` parameter in the WITH clause.
NoteDimension tables check for the latest partition during each update and are not subject to this limit.
After a source table job starts, the job does not read new data that is added to a partition. You should run the job only after the partition data is complete.
SQL
You can use the MaxCompute connector in SQL jobs as a source table, dimension table, or sink table.
Syntax
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);WITH parameters
General
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The table type. | String | Yes | None | The value must be odps. |
endpoint | The endpoint of the MaxCompute service. | String | Yes | None | For more information, see Endpoints. |
tunnelEndpoint | The endpoint of the MaxCompute Tunnel service. | String | No | None | For more information, see Endpoints. Note If you do not specify this parameter, MaxCompute assigns a tunnel connection based on its internal load balancing service. |
project | The name of the MaxCompute project. | String | Yes | None | None. |
schemaName | The name of the MaxCompute schema. | String | No | None | This parameter is required only if the schema feature is enabled for the MaxCompute project. Set this parameter to the name of the schema to which the MaxCompute table belongs. For more information, see Schema operations. Note This parameter is supported only in VVR 8.0.6 and later. |
tableName | The name of the MaxCompute table. | String | Yes | None | None. |
accessId | The AccessKey ID for MaxCompute. | String | Yes | None | For more information, see How do I view the AccessKey ID and AccessKey secret? Important To prevent your AccessKey information from being leaked, use variables to specify the AccessKey values. For more information, see Project variables. |
accessKey | The AccessKey secret for MaxCompute. | String | Yes | None | |
partition | The name of the MaxCompute partition. | String | No | None | This parameter is not required for non-partitioned tables and incremental source tables. Note For more information about partitioned tables, see How do I specify the partition parameter when reading from or writing to a partition?. |
compressAlgorithm | The compression algorithm used by MaxCompute Tunnel. | String | No | SNAPPY | Valid values:
|
quotaName | The quota name of the dedicated resource for MaxCompute Data Transmission Service. | String | No | None | Set this parameter to use the dedicated MaxCompute Data Transmission Service. Important
|
Source table specific
Parameter | Description | Data type | Required | Default value | Remarks |
maxPartitionCount | The maximum number of partitions that can be read. | Integer | No | 100 | If the number of partitions to read exceeds this value, the error Important Reading too many partitions at once increases the load on MaxCompute and slows down job startup. Check if the `partition` parameter is misconfigured. To read many partitions, manually increase the value of `maxPartitionCount`. |
useArrow | Specifies whether to use the Arrow format to read data. | Boolean | No | false | Using the Arrow format lets you call the MaxCompute Storage API. Important
|
splitSize | The size of data to pull at a time when reading data in the Arrow format. | MemorySize | No | 256 MB | This parameter is supported only in VVR 8.0.8 and later. Important This parameter takes effect only in batch jobs. |
compressCodec | The compression algorithm to use when reading data in the Arrow format. | String | No | "" | Valid values:
Specifying a compression algorithm can improve throughput compared to no compression. Important
|
dynamicLoadBalance | Specifies whether to allow dynamic sharding. | Boolean | No | false | Valid values:
Allowing dynamic sharding lets you leverage the processing power of different Flink nodes and reduces the overall read time for the source table. However, it can lead to inconsistent data volumes read by different nodes, causing data skew. Important
|
Specific to incremental source tables
An incremental source table discovers new partitions by periodically polling the MaxCompute server for all partition information. When the connector reads a new partition, the data in that partition must be complete. For more information, see What do I do if an incremental MaxCompute source table detects a new partition that is still being written to?. You can use `startPartition` to specify a start offset. Note that only partitions that are lexicographically greater than or equal to the start offset are read. For example, the partition year=2023,month=10 is lexicographically smaller than the partition year=2023,month=9. For this type of partition declaration, you can pad the value with a zero to ensure the correct lexicographical order, such as year=2023,month=09.
Parameter | Description | Data type | Required | Default value | Remarks |
startPartition | The starting MaxCompute partition (inclusive) for incremental reading. | String | Yes | None |
Note For more information about the `startPartition` parameter, see How do I configure the startPartition parameter for an incremental MaxCompute source table?. |
subscribeIntervalInSec | The interval at which to poll MaxCompute for the partition list. | Integer | No | 30 | The unit is seconds. |
modifiedTableOperation | The action to take when partition data is modified during reading. | Enum (NONE, SKIP) | No | NONE | The download session is saved in a checkpoint. Each time the job recovers from a checkpoint, it tries to resume reading from that session. If the session is unavailable because the partition data was modified, the Flink job enters a restart loop. You can set this parameter to handle this situation:
Important
|
Sink table parameters
Parameter | Description | Data type | Required | Default value | Remarks |
useStreamTunnel | Specifies whether to use MaxCompute Stream Tunnel to upload data. | Boolean | No | false | Valid values:
Note For more information about choosing a data tunnel, see Which data tunnel should I choose?. |
flushIntervalMs | The flush interval for the MaxCompute Tunnel Writer buffer. | Long | No | 30000 (30 seconds) | Data is first written to a buffer. When the buffer is full or the `flushIntervalMs` interval is reached, the data is written to the destination table in batches.
The unit is milliseconds. Note This parameter can be used with `batchSize`. Data is flushed when either condition is met. |
batchSize | The flush size of the MaxCompute Tunnel Writer buffer. | Long | No | 67108864 (64 MB) | The unit is bytes. When a record is written, the data is first stored in the MaxCompute buffer. When the buffer reaches a certain size (`batchSize`), the data in the buffer is written to the destination MaxCompute table. Note This parameter can be used with `flushIntervalMs`. Data is flushed when either condition is met. |
numFlushThreads | The number of threads for flushing the MaxCompute Tunnel Writer buffer. | Integer | No | 1 | Each MaxCompute sink concurrency creates `numFlushThreads` threads to flush data. If this value is greater than 1, data from different partitions can be flushed concurrently, improving flush efficiency. |
slotNum | The number of slots used by the MaxCompute Tunnel Writer. | Integer | No | 0 | For more information about the limits on the number of slots, see Data Transmission Service overview. |
dynamicPartitionLimit | The maximum number of dynamic partitions to write to. | Integer | No | 100 | If the number of dynamic partitions written to the sink table between two checkpoints exceeds `dynamicPartitionLimit`, the error Important Writing to many partitions at once can put pressure on the MaxCompute service and slow down sink table flushes and job checkpoints. When this error occurs, confirm whether you need to write to so many partitions. If you do, manually increase the value of `dynamicPartitionLimit`. |
retryTimes | The maximum number of retries for requests to the MaxCompute server. | Integer | No | 3 | When creating a session, committing a session, or flushing data, the MaxCompute service may be temporarily unavailable. The system retries based on this configuration. |
sleepMillis | The retry interval. | Integer | No | 1000 | The unit is milliseconds. |
enableUpsert | Specifies whether to use MaxCompute Upsert Tunnel to upload data. | Boolean | No | false | Valid values:
Important
|
upsertAsyncCommit | Specifies whether to use asynchronous mode when committing a session in Upsert mode. | Boolean | No | false | Valid values:
Note This parameter is supported only in VVR 8.0.6 and later. |
upsertCommitTimeoutMs | The timeout period for committing a session in Upsert mode. | Integer | No | 120000 (120 seconds) | The unit is milliseconds. Note This parameter is supported only in VVR 8.0.6 and later. |
sink.operation | The write mode when writing to a Delta Lake table. | String | No | insert | Valid values:
Note This parameter is supported only in VVR 8.0.10 and later. |
sink.parallelism | The degree of parallelism for writing to a Delta Lake table. | Integer | No | None |
Important Ensure that the `write.bucket.num` property of the Delta Lake table is an integer multiple of this configuration value. This provides the best write performance and saves the most memory on the sink node. |
sink.file-cached.enable | Specifies whether to use file cache mode when writing to dynamic partitions of a Delta Lake table. | Boolean | No | false | Valid values:
Using file cache mode reduces the number of small files written to the server-side, but increases the latency of data output. When the degree of parallelism for the sink table is high, use file cache mode. Note This parameter is supported only in VVR 8.0.10 and later. |
sink.file-cached.writer.num | The number of concurrent data uploads for a single task in file cache mode. | Integer | No | 16 |
Note This parameter is supported only in VVR 8.0.10 and later. |
sink.bucket.check-interval | The interval for checking the file size in file cache mode. The unit is milliseconds (ms). | Integer | No | 60000 | This parameter takes effect only if Note This parameter is supported only in VVR 8.0.10 and later. |
sink.file-cached.rolling.max-size | The maximum size of a single cache file in file cache mode. | MemorySize | No | 16 M |
Note This parameter is supported only in VVR 8.0.10 and later. |
sink.file-cached.memory | The maximum off-heap memory size used for writing files in file cache mode. | MemorySize | No | 64 MB | This parameter takes effect only if Note This parameter is supported only in VVR 8.0.10 and later. |
sink.file-cached.memory.segment-size | The buffer size used for writing files in file cache mode. | MemorySize | No | 128 KB | This parameter takes effect only if Note This parameter is supported only in VVR 8.0.10 and later. |
sink.file-cached.flush.always | Specifies whether to use a cache when writing files in file cache mode. | Boolean | No | true | This parameter takes effect only if Note This parameter is supported only in VVR 8.0.10 and later. |
sink.file-cached.write.max-retries | The number of retries for uploading data in file cache mode. | Integer | No | 3 | This parameter takes effect only if Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.writer.max-retries | The number of retries after an Upsert Writer fails to write to a bucket. | Integer | No | 3 | Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.writer.buffer-size | The cache size for a single Upsert Writer in Flink. | MemorySize | No | 64 m |
Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.writer.bucket.buffer-size | The cache size for a single bucket in Flink. | MemorySize | No | 1 m | When cluster memory resources are tight, you can reduce this parameter value. Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.write.bucket.num | The number of buckets in the table to be written to. | Integer | Yes | None | This value must be the same as the value of the Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.write.slot-num | The number of Tunnel slots used by a single session. | Integer | No | 1 | Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.commit.max-retries | The number of retries for an Upsert Session Commit. | Integer | No | 3 | Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.commit.thread-num | The degree of parallelism for an Upsert Session Commit. | Integer | No | 16 | Do not set this parameter to a very large value. A higher number of concurrent commits consumes more resources, which may lead to performance issues or excessive resource consumption. Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.commit.timeout | The timeout period for an Upsert Session Commit. The unit is seconds (s). | Integer | No | 600 | Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.flush.concurrent | The maximum number of buckets that can be written to simultaneously for a single partition. | Integer | No | 2 | Each time data in a bucket is flushed, it consumes a Tunnel Slot resource. Note This parameter is supported only in VVR 8.0.10 and later. |
insert.commit.thread-num | The degree of parallelism for a Commit Session. | Integer | No | 16 | Note This parameter is supported only in VVR 8.0.10 and later. |
insert.arrow-writer.enable | Specifies whether to use the Arrow format. | Boolean | No | false | Valid values:
Note This parameter is supported only in VVR 8.0.10 and later. |
insert.arrow-writer.batch-size | The maximum number of rows in an Arrow Batch. | Integer | No | 512 | Note This parameter is supported only in VVR 8.0.10 and later. |
insert.arrow-writer.flush-interval | The Writer flush interval. The unit is milliseconds (ms). | Integer | No | 100000 | Note This parameter is supported only in VVR 8.0.10 and later. |
insert.writer.buffer-size | The cache size of the Buffered Writer. | MemorySize | No | 64 MB | Note This parameter is supported only in VVR 8.0.10 and later. |
upsert.partial-column.enable | Specifies whether to update only some columns. | Boolean | No | false | This parameter takes effect only when the sink table is a Delta Lake table. For more information, see Partial column update. Valid values:
Data is written in different ways depending on whether a primary key for the update data exists in the sink table:
Note This parameter is supported only in VVR 8.0.11 and later. |
Dimension table specifics
When a job starts, a MaxCompute dimension table pulls full data from the specified partition. The `partition` parameter supports functions such as `max_pt()`. When the cache expires and reloads, the `partition` parameter is re-parsed to pull data from the latest partition. If you use `max_two_pt()`, the dimension table can pull data from two partitions. Otherwise, you can specify only a single partition.
Parameter | Description | Data type | Required | Default value | Remarks |
cache | The cache policy. | String | Yes | None | Currently, MaxCompute dimension tables support only the ALL: Caches all data in the dimension table. Before the job runs, the system loads all data from the dimension table into the cache. All subsequent dimension table queries are performed through the cache. If data is not found in the cache, the key does not exist. The full cache is reloaded after it expires. Note
|
cacheSize | The maximum number of data records to cache. | Long | No | 100000 | If the dimension table data volume exceeds `cacheSize`, the error Important Because a large dimension table consumes a lot of JVM heap memory and slows down job startup and dimension table updates, confirm whether you need to cache this much data. If you do, manually increase this parameter. |
cacheTTLMs | The cache timeout period, which is the cache update interval. | Long | No | Long.MAX_VALUE (never updates) | The unit is milliseconds. |
cacheReloadTimeBlackList | The disallowed cache refresh time. The cache is not updated during the time period specified by this parameter. | String | No | None | Prevents the cache from being updated during critical periods, such as traffic peaks during a promotion, which could cause job instability. For more information about how to configure this parameter, see How do I configure the cacheReloadTimeBlackList parameter?. |
maxLoadRetries | The maximum number of attempts to update the cache, including the initial data pull at job startup. If this number is exceeded, the job fails. | Integer | No | 10 | None. |
Data type mapping
For more information about the data types supported by MaxCompute, see Data types (2.0).
MaxCompute type | Flink type |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
If a MaxCompute physical table contains both nested complex type fields (ARRAY, MAP, or STRUCT) and JSON type fields, you must specify tblproperties('columnar.nested.type'='true') when you create the MaxCompute physical table to ensure that Flink can correctly read and write these fields.
Data ingestion (Public preview)
You can use the MaxCompute connector as a sink to write data in YAML-based data ingestion jobs.
Limits
This feature is supported only in VVR 11.1 and later versions.
Syntax
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8Configuration items
Configuration item | Required | Default value | Type | Description |
type | Yes | None | String | Specifies the connector to use. Set this parameter to |
name | No | None | String | The name of the sink. |
access-id | Yes | None | String | The AccessKey ID of an Alibaba Cloud account or a RAM user. You can obtain the AccessKey ID on the AccessKey Management page. |
access-key | Yes | None | String | The AccessKey secret that corresponds to the AccessKey ID. |
endpoint | Yes | None | String | The endpoint of the MaxCompute service. You must configure the endpoint based on the region and network connection type you selected when creating the MaxCompute project. For more information about the endpoints for each region and network type, see Endpoints. |
project | Yes | None | String | The name of the MaxCompute project. You can log on to the MaxCompute console and obtain the project name on the Workspaces > Project Management page. |
tunnel.endpoint | No | None | String | The endpoint of the MaxCompute Tunnel service. This configuration is usually routed automatically based on the region of the specified project. Use this configuration only in special network environments, such as when using a proxy. |
quota.name | No | None | String | The name of the dedicated resource group for MaxCompute data transmission. If you do not specify this configuration, a shared resource group is used. For more information, see Purchase and use a dedicated resource group for Data Transmission Service. |
sts-token | No | None | String | Specify this parameter when using a short-lived Security Token Service (STS) token issued by a RAM role for authentication. |
buckets-num | No | 16 | Integer | The number of buckets to use when automatically creating a MaxCompute Delta Lake table. For more information about how to use this feature, see Near real-time data warehouse overview. |
compress.algorithm | No | zlib | String | The data compression algorithm used when writing to MaxCompute. Supported values are |
total.buffer-size | No | 64MB | String | The amount of data to buffer in memory, at the partition level (or table level for non-partitioned tables). The buffers for different partitions (or tables) are independent. When the threshold is reached, the data is written to MaxCompute. |
bucket.buffer-size | No | 4MB | String | The amount of data to buffer in memory, at the bucket level. This parameter takes effect only when writing to a Delta Lake table. The buffers for different data buckets are independent. When the threshold is reached, the data in the bucket is written to MaxCompute. |
commit.thread-num | No | 16 | Integer | The number of partitions (or tables) that can be processed simultaneously during the checkpoint phase. |
flush.concurrent-num | No | 4 | Integer | The number of buckets that can be written to simultaneously when writing data to MaxCompute. This parameter takes effect only when writing to a Delta Lake table. |
Table location mapping
When the connector automatically creates a table, it maps the location information from the source table to the MaxCompute table as follows.
If the MaxCompute project does not support the Schema model, each sync task can sync only one database. For example, if the upstream data source is MySQL, a sync task can sync only one MySQL database. This also applies to other data sources. The connector ignores the `tableId.namespace` information.
Object in data ingestion job | MaxCompute location | MySQL location |
Project parameter in configuration | Project | none |
TableId.namespace | Schema (This configuration is ignored if the MaxCompute project does not support the Schema model.) | Database |
TableId.tableName | Table | Table |
Type mapping
CDC type | MaxCompute type |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
Examples
SQL
Source table example
Read full data
By default, a source table is in full data mode and reads the partitions specified in the partition parameter.
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;Read incremental data
Incrementally reads data starting from the specified startPartition.
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- Reads data starting from the partition for 20180905.
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;Sink table examples
Write to a static partition
Specifies a static partition value for partition.
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- Writes data to the static partition ds=20180905.
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;Write to a dynamic partition
Specifies partition based on the partition key column of the table.
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR -- The dynamic partition column must be explicitly declared.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' -- If no partition value is specified, data is written to different partitions based on the value of the ds field.
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;Dimension table examples
One-to-one dimension table
A primary key must be declared for a one-to-one dimension table.
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- A primary key must be declared for a one-to-one dimension table.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;One-to-many dimension table
A primary key is not required for a one-to-many dimension table.
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- A primary key is not required for a one-to-many dimension table.
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;DataStream
To read and write data using DataStream, you must use the corresponding DataStream connector to connect to Flink. For more information about how to set up the DataStream connector, see How to use the DataStream connector.
To protect intellectual property, starting from VVR 6.0.6, this connector limits a single local debugging job to run for 30 minutes. After 30 minutes, the job reports an error and exits. For more information about how to run and debug jobs that contain the MaxCompute connector locally, see Run and debug jobs that contain connectors locally.
The connector does not support reading from Delta Lake tables, which are tables created with a
primary keyandtransactional=true. For more information, see Basic concepts.
To use the MaxCompute connector in a DataStream job, you must declare the MaxCompute table using SQL and then convert between a Table and a DataStream to connect the MaxCompute table and the data stream.
Connect to a source table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); Connect to a sink table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
The Maven dependency for the MaxCompute connector includes the classes required to build full source tables, incremental source tables, sink tables, and dimension tables. The MaxCompute DataStream connector is available in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>FAQ
What are endpoint and tunnelEndpoint? What happens if they are configured incorrectly?
How do full and incremental MaxCompute source tables read data from MaxCompute?
MaxCompute connector runtime error: Authorization Failed [4019]: Insufficient privileges
How do I configure the startPartition parameter for an incremental MaxCompute source table?
How do I specify the partition parameter when reading from or writing to a partition?
What do I do if duplicate data is written to a MaxCompute sink table?
What is the difference between max_pt() and max_pt_with_done()?