This topic describes how to use the Object Storage Service (OSS) connector.
Alibaba Cloud OSS is a secure and cost-effective object storage service that offers a data durability of 99.9999999999% (twelve 9's) and a data availability of 99.995%. OSS provides multiple storage classes to help you manage and reduce storage costs. The following table describes the capabilities supported by the OSS connector.
Item | Description |
Table type | Source table and result table. |
Running mode | Batch mode and streaming mode. |
Data format | ORC, PARQUET, AVRO, CSV, JSON, and RAW. Note Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later can read data in the Parquet format. |
Metric | None. |
API type | DataStream API and SQL API. |
Data update or deletion in a result table | Data in a result table cannot be updated or deleted. Data can only be inserted into a result table. |
Limits
General limits
Only Realtime Compute for Apache Flink that uses VVR 4.0.14 or later can read data from or write data to OSS.
Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or earlier can read data from or write data to only OSS buckets within your Alibaba Cloud account.
NoteIf you want to read data from and write data to OSS buckets within other Alibaba Cloud accounts, use Realtime Compute for Apache Flink that uses VVR 8.0.6 or later and configure the authentication information of OSS buckets. For more information, see the Configure authentication information of OSS buckets section of this topic.
Limits only on result tables
Data in row-oriented storage formats, such as AVRO, CSV, JSON, and RAW, cannot be written to OSS. For more information, see FLINK-30635.
Only Realtime Compute for Apache Flink that uses VVR 6.0.6 or later can write data to OSS-HDFS. For more information, see the Write data to OSS-HDFS section of this topic.
Syntax
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);
Metadata columns
You can specify a metadata column in an OSS source table to obtain the related metadata of OSS. For example, if you specify a metadata column named file.path
in an OSS source table, the value of the column is the path of the file in which each row of data is stored. Sample code:
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)
The following table describes the metadata columns supported by OSS source tables.
Key | Data type | Description |
file.path | STRING NOT NULL | The path of the file in which each row of data is stored. |
file.name | STRING NOT NULL | The name of the file in which each row of data is stored. The file name is the farthest element from the root path of the file. |
file.size | BIGINT NOT NULL | The number of bytes in the file in which each row of data is stored. |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | The time when the file in which each row of data is stored was modified. |
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
String
Yes
No default value
Set the value to
filesystem
.path
The path of the file system.
String
Yes
No default value
The path is in the Uniform Resource Identifier (URI) format. Example:
oss://my_bucket/my_path
.NoteAfter you configure this parameter in Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, you must configure authentication information of OSS buckets to read data from or write data to the specified path of the file system. For more information, see the Configure authentication information of OSS buckets section of this topic.
format
The format of the file.
String
Yes
No default value
Valid values:
csv
json
avro
parquet
orc
raw
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
source.monitor-interval
The interval at which the source table monitors the generation of new files. The value of this parameter must be greater than 0.
Duration
No
No default value
If you do not configure this parameter, the specified path is scanned only once. In this case, the data source is bounded.
Each file is uniquely identified by the path of the file. Each time a new file is detected, the file is processed.
The processed files are stored in the state during the lifecycle of the data source. Therefore, the state of the data source is saved when checkpoints and savepoints are generated. If you set this parameter to a small value, new files can be quickly detected but the file system or OSS is frequently traversed.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
partition.default-name
The name of the partition when the value of the partition field is null or an empty string.
String
No
_DEFAULT_PARTITION__
N/A
sink.rolling-policy.file-size
The maximum size of a file before the file is rolled.
MemorySize
No
128 MB
The data that is written to the specified directory is split and saved in PART files. Each subtask in which a sink operator of a partition receives the data of the partition generates at least one PART file. Files are rolled based on the rolling policy that is configured. If the current PART file that is in the in-progress state is to be closed, a new PART file is generated. A PART file is rolled based on the size of the file and the maximum period of time for which the specified file can be open.
NoteIf column-oriented storage is used for a file,
the file is rolled when checkpointing is performed even if the file does not meet the requirements on the rolling policy.
In this case, if a file meets the requirements on the rolling policy or if checkpointing is performed, the file is always rolled.
If row-oriented storage is used for a file, the file is rolled only when the file meets the requirements on the rolling policy.
sink.rolling-policy.rollover-interval
The maximum period of time for which a PART file is open before the file is rolled.
Duration
No
30 minutes
The check frequency is specified by the sink.rolling-policy.check-interval parameter.
sink.rolling-policy.check-interval
The interval at which Realtime Compute for Apache Flink checks whether a file needs to be rolled based on the rolling policy.
Duration
No
1 minute
This parameter specifies whether the specified file needs to be rolled based on the value of the sink.rolling-policy.rollover-interval parameter.
auto-compaction
Specifies whether to enable the automatic file merging feature for streaming result tables. If this feature is enabled, data is first written to temporary files. After the checkpointing operation is complete, the temporary files generated during checkpointing are merged. The temporary files are not visible before the files are merged.
Boolean
No
false
If the automatic file merging feature is enabled, multiple small files are merged into a large file based on the specified size of the destination file. When you use this feature in the production environment, take note of the following points:
Only files that are generated during checkpointing can be merged. The number of generated files is greater than or equal to the number of checkpoints.
The temporary files are not visible before the files are merged. The files become visible after the
checkpointing interval and the merge duration
elapse.If the merge duration is excessively long, backpressure may occur. This prolongs the time required for checkpointing.
compaction.file-size
The size of the destination file into which temporary files are merged.
MemorySize
No
128 MB
The default value is the same as the value of the sink.rolling-policy.file-size parameter.
sink.partition-commit.trigger
The type of the partition commit trigger.
String
No
process-time
Realtime Compute for Apache Flink provides the following types of partition commit triggers that can be used to write data to a partitioned table:
process-time: This partition commit trigger is used based on the partition creation time and the current system time and does not require a partition time extractor or a watermark generator. If the current system time passes the sum of the partition creation time and the time specified by the sink.partition-commit.delay parameter, the process-time trigger immediately commits a partition. This trigger is suitable for general purposes but does not ensure accuracy. For example, if a data delay or a failure occurs, this trigger may commit a partition earlier than expected.
partition-time: This partition commit trigger is used based on the extracted partition creation time and requires a watermark generator. If you want to use this trigger for a deployment, make sure that the deployment supports watermark generation and data is partitioned by time such as hour or day. If the time specified by the watermark passes the sum of the partition creation time and the time specified by the sink.partition-commit.delay parameter, the partition-time trigger immediately commits a partition.
sink.partition-commit.delay
The maximum delay that is allowed before a partition can be committed. The partition is not committed until the delay time is reached.
Duration
No
0 second
If data is partitioned by day, you need to set this parameter to
1 d
.If data is partitioned by hour, you need to set this parameter to
1 h
.
sink.partition-commit.watermark-time-zone
The time zone that is used when a watermark of the LONG data type is converted into a watermark of the TIMESTAMP data type. After the conversion, Realtime Compute for Apache Flink compares the watermark of the TIMESTAMP data type with the partition creation time to determine whether the partition needs to be committed.
String
No
UTC
This parameter takes effect only when the sink.partition-commit.trigger parameter is set to partition-time.
The configuration of the sink.partition-commit.watermark-time-zone parameter has an impact on partition commitment. For example, if you specify ROWTIME for a TIMESTAMP_LTZ column in the source table and do not configure this parameter, the partition may be committed several hours later than the expected time. The default value UTC indicates that a watermark is defined on a TIMESTAMP column or no watermark is defined.
If a watermark is defined on a TIMESTAMP_LTZ column, the time zone of the watermark must be the session time zone. The valid value for this parameter can be a full name of a time zone such as 'America/Los_Angeles' or a custom time zone such as 'GMT-08:00'.
partition.time-extractor.kind
The time extractor that extracts time from a partition field.
String
No
default
Valid values:
default: By default, you can configure the timestamp pattern or formatter. This is the default value.
custom: The extractor class must be specified.
partition.time-extractor.class
The extractor class that implements the PartitionTimeExtractor interface.
String
No
No default value
N/A
partition.time-extractor.timestamp-pattern
The default construction method that allows you to use partition fields to obtain a valid timestamp pattern.
String
No
No default value
By default, the first field is extracted in the
yyyy-MM-dd hh:mm:ss
format.If you want to extract the timestamp of a partition from the dt partition field, you can set this parameter to $dt.
If you want to extract the timestamp of a partition from multiple partition fields, such as year, month, day, and hour, you can set this parameter to
$year-$month-$day $hour:00:00
.If you want to extract the timestamp of a partition from the dt and hour partition fields, you can set this parameter to
$dt $hour:00:00
.
partition.time-extractor.timestamp-formatter
The formatter that is used to convert a partition timestamp string value into a timestamp. The partition timestamp string value is specified by the partition.time-extractor.timestamp-pattern parameter.
String
No
yyyy-MM-dd HH:mm:ss
For example, if you want to extract the timestamp of a partition from multiple partition fields, such as year, month, and day, you can set the partition.time-extractor.timestamp-pattern parameter to
$year$month$day
and the partition.time-extractor.timestamp-formatter parameter to yyyyMMdd. The default value of this parameter isyyyy-MM-dd HH:mm:ss
. The timestamp formatter specified by this parameter is compatible with DateTimeFormatter of Java.sink.partition-commit.policy.kind
The type of the partition commit policy.
String
No
No default value
The partition commit policy allows Realtime Compute for Apache Flink to notify a downstream application that data writing to a partition is complete and data can be read from the partition. Valid values:
success-file: The _success file is added to the specified directory.
custom: A commit policy is created based on the specified class. You can specify multiple commit policies at the same time.
sink.partition-commit.policy.class
The partition commit policy class that implements the PartitionCommitPolicy interface.
String
No
No default value
This class is available only when the sink.partition-commit.policy.kind parameter is set to custom.
sink.partition-commit.success-file.name
The name of the file that is used if the sink.partition-commit.policy.kind parameter is set to success-file.
String
No
_SUCCESS
N/A
sink.parallelism
The degree of parallelism for writing files to the file system.
Integer
No
No default value
By default, the value of the sink.parallelism parameter is the same as the degree of parallelism of the upstream chained operators. If the value of the sink.parallelism parameter is different from the degree of parallelism of the upstream chained operators, the operator that writes files uses the value of this parameter. If file merging is enabled, the operator that merges files also uses the value of this parameter.
NoteThe value must be greater than 0. Otherwise, an error occurs.
Configure authentication information of OSS buckets
Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later allows you to configure authentication information of OSS buckets.
After you specify the path of a file system, you must configure authentication information of OSS buckets. This way, you can read data from and write data to the specified path of the file system. To configure authentication information of OSS buckets, perform the following steps: Log on to the development console of Realtime Compute for Apache Flink. On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section and add the following configurations to the Other Configuration field:
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx
The following table describes the parameters in the preceding configurations.
Parameter | Description |
fs.oss.bucket.<bucketName>.accessKeyId | Parameters:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
Write data to OSS-HDFS
Log on to the development console of Realtime Compute for Apache Flink. On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section and add the following configurations to the Other Configuration field:
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx
The following table describes the parameters in the preceding configurations.
Parameter | Description |
fs.oss.jindo.buckets | The name of the bucket of the OSS-HDFS service to which data is written. You can specify multiple bucket names. Separate the bucket names with semicolons (;). When Flink writes data to an OSS path, the data is written to the OSS-HDFS service if the related bucket name is contained in the value of the fs.oss.jindo.buckets parameter. |
fs.oss.jindo.accessKeyId | The AccessKey ID of your Alibaba Cloud account. For more information about how to obtain the AccessKey secret of the Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user. |
fs.oss.jindo.accessKeySecret | The AccessKey secret of your Alibaba Cloud account. For more information about how to obtain the AccessKey secret of the Alibaba Cloud account, see View the information about AccessKey pairs of a RAM user. |
You must also configure the endpoint of the OSS-HDFS service. You can use one of the following methods to configure the endpoint of the OSS-HDFS service:
On the Configuration tab in the development console of Realtime Compute for Apache Flink, click Edit in the upper-right corner of the Parameters section and add the following configuration to the Other Configuration field:
fs.oss.jindo.endpoint: xxx
Configure the endpoint of the OSS-HDFS service in the OSS path.
Sample code:
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
In the OSS path, user-defined-oss-hdfs-bucket specifies the name of the related bucket and oss-hdfs-endpoint specifies the endpoint of the OSS-HDFS service. The value of the fs.oss.jindo.buckets parameter must contain <user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>.
For example, if the bucket name is jindo-test and the endpoint of the OSS-HDFS service is
cn-beijing.oss-dls.aliyuncs.com, the OSS path must be oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>, and the value of the fs.oss.jindo.buckets parameter must contain jindo-test.cn-beijing.oss-dls.aliyuncs.com.
Sample code
Sample code for a source table
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
Sample code for a result table
Write data to a partitioned table
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- Use the time in milliseconds. ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Specify a watermark in a TIMESTAMP_LTZ column. ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- The time zone is 'Asia/Shanghai'. 'sink.partition-commit.policy.kind'='success-file' ); -- Execute the following streaming SQL statement to insert data into a file system table. INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;
Write data to a non-partitioned table
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.
The following sample code shows how to use the DataStream API to write data to OSS or OSS-HDFS.
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);
If you want to write data to OSS-HDFS, run the preceding code and perform the following steps: Log on to the development console of Realtime Compute for Apache Flink. On the Configuration tab of the Deployments page, click Edit in the upper-right corner of the Parameters section and add the configurations related to OSS-HDFS to the Other Configuration field. For more information, see the Write data to OSS-HDFS section of this topic.
References
For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.
For more information about how to use the Tablestore connector, see Tablestore connector.
For more information about how to use the Apache Paimon connector, see Apache Paimon connector.