The resumable writing feature allows data to be written to storage media by using EXACTLY_ONCE semantics. This topic describes how to use Apache Flink on an E-MapReduce (EMR) cluster to write data to OSS-HDFS in a resumable manner.
Prerequisites
A cluster of EMR V3.42.0 or later, or EMR V5.8.0 or later is created. For more information, see Create a cluster.
OSS-HDFS is enabled for a bucket and permissions are granted to access OSS-HDFS. For more information about how to enable OSS-HDFS, see Enable OSS-HDFS and grant access permissions.
Use Flink jobs to write data to OSS-HDFS
- Configure general settings.
To write data to OSS-HDFS by using exactly-once semantics, you must perform the following operations:
- Enable the checkpoint mechanism of Apache Flink.
Run the following command to create a StreamExecutionEnvironment class:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- Run the following command to enable the checkpoint mechanism:
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
- Use a data source that supports data retransmission, such as Kafka.
- Enable the checkpoint mechanism of Apache Flink.
- Configure quick settings to use Flink.
You can include a path that starts with the oss:// prefix, and use buckets and endpoints in OSS-HDFS to enable Flink. This method does not require additional dependencies.
- Add a sink.
The following sample code provides an example on how to write the DataStream<String> object to OSS-HDFS by using OutputStream:
String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
Important In the preceding example, the element that includes.<oss-hdfs-endpoint>
is optional. If you want to omit this element, make sure that you specify the correct endpoint of OSS-HDFS in the Flink component or Hadoop component. - Use
env.execute()
to execute Flink jobs.
- Add a sink.
(Optional) Configure custom settings
When you submit Flink jobs, you can configure custom parameters to enable or manage specific features.
The following sample code provides an example on how to use -yD
to configure settings for Flink job submissions based on YARN clusters:
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
You can enable entropy injection. The entropy injection feature allows you to replace a specific string of the destination path with a random string. This way, written data is distributed to different partitions based on the specified paths to improve write performance.
To write data to OSS-HDFS, you must complete the following configurations:
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
When you write data to an object, the <user-defined-key>
string in the write path is replaced with a random string. The length of the random string must be the same as the value of <user-defined-length>
. The value of <user-defined-length>
must be greater than 0.