You cannot use open source Apache Flink to write data to OSS-HDFS in streaming mode or to write data to storage media by using exactly-once semantics. If you want to write data to OSS-HDFS in streaming mode by using exactly-once semantics in Apache Flink, you must use JindoSDK together with Apache Flink.
If you do not want to deploy JindoSDK before you write data to OSS-HDFS in streaming mode by using Apache Flink, you can use Realtime Compute for Apache Flink to write data to OSS-HDFS. For more information, see Use Realtime Compute for Apache Flink to read data from or write data to OSS or OSS-HDFS.
Prerequisites
An Elastic Compute Service (ECS) instance is created. For more information, see Create an instance.
OSS-HDFS is enabled for a bucket and access permissions on OSS-HDFS are granted. For more information, see Enable OSS-HDFS and grant access permissions.
Apache Flink 1.10.1 or later is downloaded and installed. The availability of Flink 1.16.0 and later is not verified. For more information about the installation package and version description of Apache Flink, visit Apache Flink.
Configure JindoSDK
Log on to the ECS instance. For more information, see Connect to an instance.
Download and decompress the latest version of the JindoSDK JAR package. For more information, visit GitHub.
Move the jindo-flink-${version}-full.jar file in the plugins/flink/ directory of JindoSDK to the lib directory in the root directory where Apache Flink resides.
mv plugins/flink/jindo-flink-${version}-full.jar lib/
If Apache Flink has a built-in Object Storage Service (OSS) connector, you must remove it by deleting the
flink-oss-fs-hadoop-${flink-version}.jar
file from thelib
subdirectory orplugins/oss-fs-hadoop
path.After you configure JindoSDK, you can write data to OSS-HDFS in streaming mode by using Flink jobs. You must use the
oss://
prefix to write data to OSS-HDFS or OSS. JindoSDK automatically recognizes the written data.
Examples
Configure general settings.
To write data to OSS-HDFS by using exactly-once semantics, perform the following operations:
Enable the checkpoint mechanism of Apache Flink.
Sample code:
Run the following command to create a StreamExecutionEnvironment class:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Run the following command to enable the checkpoint mechanism:
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
Use a data source that supports data retransmission, such as Kafka.
Configure quick settings to use Apache Flink.
You can include a path that starts with oss://, and use buckets and endpoints in OSS-HDFS to enable Apache Flink. This method does not require additional dependencies.
Add a sink.
The following sample code provides an example on how to write the DataStream<String> object to OSS-HDFS by using OutputStream:
String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
ImportantIn the preceding example, the field that includes
.<oss-hdfs-endpoint>
is optional. If you want to ignore this field, make sure that you specify the correct endpoint of OSS-HDFS in the Flink component or Hadoop component.Use
env.execute()
to execute Flink jobs.
(Optional) Configure custom settings
When you submit Flink jobs, you can configure custom parameters to enable or manage specific features.
The following sample code provides an example on how to use -yD
to configure settings for Flink job submissions based on Yarn clusters.
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
You can enable entropy injection. The entropy injection feature allows you to replace a specific string of the destination path with a random string. This way, written data is distributed to different partitions based on the specified paths to improve write performance.
To write data to OSS-HDFS, you must complete the following configurations:
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
When you write data to an object, the <user-defined-key>
string in the write path is replaced by a random string. The length of the random string must be the same as the value of <user-defined-length>
. The value of <user-defined-length>
must be greater than 0.