開源Flink不支援流式寫入OSS-HDFS服務,也不支援以EXACTLY_ONCE語義寫入儲存介質。當您希望開源Flink以EXACTLY_ONCE語義流式寫入OSS-HDFS服務,需要結合JindoSDK。
如果您不希望通過Flink流式寫入OSS-HDFS服務前部署JindoSDK,您可以選擇阿里雲Realtime ComputeFlink完成OSS-HDFS服務讀寫需求。更多資訊,請參見Realtime ComputeFlink讀寫OSS或者OSS-HDFS。
前提條件
已建立ECS執行個體。具體步驟,請參見選購ECS執行個體。
已開通並授權訪問OSS-HDFS服務。具體操作,請參見開通並授權訪問OSS-HDFS服務。
已自行下載並安裝開源版本Flink,且版本不低於1.10.1。Flink 1.16.0及更高版本的可用性尚未得到驗證。關於Apache Flink的安裝包及版本說明,請參見Apache Flink。
配置JindoSDK
登入已建立的ECS執行個體。具體操作,請參見串連ECS執行個體。
下載並解壓最新版本JindoSDK JAR包。下載地址,請參見GitHub。。
將JindoSDK解壓縮後的plugins/flink/目錄下的jindo-flink-${version}-full.jar檔案移動至Flink所在根目錄下的lib檔案夾。
mv plugins/flink/jindo-flink-${version}-full.jar lib/
如果存在Apache Flink內建的Flink OSS Connector,需將其移除,即從Flink的
lib
目錄或者plugins/oss-fs-hadoop
路徑下移除flink-oss-fs-hadoop-${flink-version}.jar
。JindoSDK配置完成後,無需額外配置即支援以常規Flink流式作業的方法進行使用。寫入OSS-HDFS服務以及OSS服務須使用相同的首碼
oss://
,JindoSDK會自動識別寫入的內容。
樣本
通用配置
為了支援EXACTLY_ONCE語義寫入OSS-HDFS,您需要執行如下配置:
開啟Flink的檢查點(Checkpoint)。
樣本如下。
通過如下方式建立的StreamExecutionEnvironment。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
執行如下命令,啟動Checkpoint。
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
使用可以重發的資料來源,例如Kafka。
便捷使用
您無需額外引入依賴,只需攜帶oss://首碼的路徑,並使用OSS-HDFS服務的Bucket及Endpoint,即可啟用Flink。
添加Sink。
以將DataStream<String>的對象OutputStream寫入OSS-HDFS為例。
String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
重要在OSS-HDFS服務的Bucket中帶有
.<oss-hdfs-endpoint>
的欄位為可選項。如果您希望省略該欄位,請確保已在Flink或Hadoop組件中正確配置了OSS-HDFS服務的Endpoint。使用
env.execute()
執行Flink作業。
(可選)自訂配置
您在提交Flink作業時,可以自訂參數,以開啟或控制特定功能。
例如,通過-yD
配置以yarn-cluster模式提交Flink作業時,樣本如下:
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
您可以開啟熵注入(Entropy Injection)功能。熵注入可以匹配寫入路徑的一段特定字串,用一段隨機的字串進行替換,以削弱所謂片區效應,提高寫入效率。
當寫入情境為OSS-HDFS時,需要完成下列配置。
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
寫入新檔案時,路徑中與<user-defined-key>
相同的字串會被替換為一個隨機字串,隨機串的長度為<user-defined-length>
,且<user-defined-length>
必須大於零。