Flink DataStream提供了更靈活的編程模型和API,可以自訂各種資料轉換、操作和運算元,適用於複雜的商務邏輯和資料處理需求。本文為您介紹Flink JAR作業的開發方法。
支援開源Apache Flink
目前Realtime ComputeFlink支援的DataStream API完全相容開源的Flink版本,詳情請參見Apache Flink介紹和Flink DataStream API開發指南。
開發環境要求
已安裝IntelliJ IDEA等開發工具。
已安裝3.6.3及以上版本的Maven。
作業開發僅支援JDK 8和JDK 11版本。
JAR作業需要您線上下完成開發,再在Realtime Compute管理主控台上部署並運行。
開發準備
本範例涉及關於資料來源連接器如何使用,請準備好相關資料來源。
Realtime ComputeFlink版預設不具備訪問公網的能力,所以本案例採用的資料來源為阿里雲訊息佇列Kafka(2.6.2)和阿里雲RDS MySQL(8.0)。
如您已經購買了上述產品,請確保其與您購買的Realtime ComputeFlink版處於同一VPC中,如果不是,請參考如何訪問跨VPC的其他服務?
如您有自建的資料來源需要使用,請確認Realtime ComputeFlink版能訪問該資料來源,請參考如何訪問公網?和如何設定白名單?
如您還沒有雲訊息佇列Kafka資料來源,請購買雲訊息佇列Kafka並部署執行個體,詳情請參見步驟二:購買和部署執行個體。部署執行個體時,請確認部署至與Realtime ComputeFlink同一VPC下。
如您還沒有RDS MySQL資料來源,請購買RDS MySQL並部署執行個體,詳情請參見第一步:快捷建立RDS MySQL執行個體與設定資料庫。部署執行個體時,請確認部署至與Realtime ComputeFlink同一VPC下。
作業開發
配置Flink環境依賴
為了避免JAR包依賴衝突,請您注意以下幾點:
${flink.version}
為作業運行對應的Flink版本。請使用與作業部署頁面選擇的VVR引擎所使用的Flink版本一致。例如您在部署頁面選擇的引擎為vvr-8.0.9-flink-1.17
,其對應的Flink版本為1.17.2
,查看VVR引擎版本詳情請參見如何查看當前作業的Flink版本?。Flink相關依賴,範圍請使用provided,即在依賴中添加
<scope>provided</scope>
。主要包含org.apache.flink
組下以flink-
開頭的非Connector依賴。Flink原始碼中只有明確標註了@Public或者@PublicEvolving的才是公開供使用者調用的方法,阿里雲Realtime ComputeFlink版只對這些方法的相容性做出產品保證。
如果是Flink服務內建的Connector支援的DataStream API,建議使用其內建的依賴。
下面是Flink的一些基本相關依賴,您可能還需要補充一些記錄檔相關的依賴,完整的依賴參考請參見文末的完整範例程式碼。
flink相關依賴
連接器依賴和使用
通過DataStream的方式讀寫資料,需要使用對應的DataStream連接器串連Flink全託管。Maven中央倉庫已經放置了VVR DataStream連接器,以供您在作業開發時直接使用。
請使用我們在支援的連接器中指明提供DataStream API的連接器。如果某個連接器未註明提供給DataStream API,請勿自行使用,因為未來介面和參數可能會被修改。
您可以選擇以下任意一種方式來使用連接器:
(推薦)上傳連接器Uber JAR包作為附加依賴檔案引入
在作業的Maven POM檔案中添加您需要的連接器作為專案依賴,其範圍為provided。完整的依賴檔案請參考文末的完整範例程式碼。
說明${vvr.version}
是作業運行環境引擎版本,如您的作業運行在vvr-8.0.9-flink-1.17
版本引擎上,其對應的Flink版本為1.17.2
。建議您使用最新的引擎,具體版本詳見引擎。由於將連接器的Uber JAR包作為附加依賴檔案引入,則無需將該依賴打入JAR包中,所以需要聲明範圍為
provided
。
<!-- Kafka 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>
如果您有開發新連接器或者拓展現有連接器功能的需求,專案還需要依賴連接器公用包
flink-connector-base
或ververica-connector-common
。<!-- Flink 連接器公用介面基礎依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里雲連接器公用介面基礎依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
DataStream串連配置資訊和程式碼範例需要查看對應的DataStream連接器文檔。
支援作為DataStream類型的連接器列表,請參見支援的連接器。
部署作業並在附加依賴檔案項中添加相應的連接器Uber JAR包,詳情請參見部署JAR作業。您可以上傳您自己開發的連接器,也可以上傳Realtime ComputeFlink版提供的連接器(下載地址請參見Connector列表)。如圖所示。
直接將連接器作為專案依賴打進作業JAR包
在作業的Maven POM檔案中添加您需要的連接器作為專案依賴。例如引入Kafka連接器和MySQL連接器。
說明${vvr.version}
是作業運行環境引擎版本,如您的作業運行在vvr-8.0.9-flink-1.17
版本引擎上,其對應的Flink版本為1.17.2
。建議您使用最新的引擎,具體版本詳見引擎。由於將連接器作為專案依賴直接打入JAR包,它們必須在預設範圍(compile)中。
<!-- Kafka 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>
如果您有開發新連接器或者拓展現有連接器功能的需求,專案還需要依賴連接器公用包
flink-connector-base
或ververica-connector-common
。<!-- Flink 連接器公用介面基礎依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里雲連接器公用介面基礎依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
DataStream串連配置資訊和程式碼範例需要查看對應的DataStream連接器文檔。
支援作為DataStream類型的連接器列表,請參見支援的連接器。
OSS附加依賴檔案讀取
因為Flink JAR作業不支援在Main函數中讀取本地配置,您可以將設定檔上傳到Flink工作空間下的OSS Bucket,在部署JAR作業時,通過添加附加設定檔的方式進行讀取。樣本如下。
建立設定檔config.properties,避免在代碼中出現明文代碼。
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=password
在JAR作業中使用代碼讀取儲存在OSS Bucket上的設定檔config.properties。
方式一:讀取工作空間綁定的OSS Bucket
Realtime Compute開發控制台左側導覽列資源管理頁面,上傳該檔案。
在作業運行時,部署作業所添加附加依賴檔案將會載入到作業所運行Pod的/flink/usrlib目錄下。
讀取該設定檔程式碼範例如下。
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // 載入屬性檔案 properties.load(input); // 擷取屬性值 configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
方式二:讀取工作空間有許可權訪問的OSS Bucket
將設定檔上傳目標OSS Bucket。
通過OSSClient直接讀取OSS上的隱藏檔詳情,請參見串流和管理訪問憑據。程式碼範例如下。
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
業務代碼編寫
將外部資料源整合到Flink資料流程式。
Watermark
是Flink一種基於時間語義的計算策略,往往伴隨著時間戳記一起使用,所以本樣本不使用浮水印策略。詳情請參考浮水印策略。// 將外部資料源整合到flink資料流程式 // WatermarkStrategy.noWatermarks() 指沒有使用浮水印策略 DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
運算元轉換處理。樣本中將
DataStream<String>
轉換成DataStream<Student>
,更多複雜的運算元轉化和處理方式請參考Flink運算元。// 轉換資料結構為student的運算元 DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // 資料由逗號分隔 String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // 篩選出分數大於60分的資料
作業打包
通過maven-shade-plugin外掛程式打包。
如果選擇作為附加依賴檔案引入使用連接器,打包作業時,確認連接器相關依賴的範圍為
provided
。如果選擇連接器作為依賴一起打包,範圍預設(compile)即可。
作業測試及部署
由於Realtime ComputeFlink版預設不具備訪問公網的能力,可能您的代碼無法在本地進行直接測試。建議您分開進行單元測試,詳情請參見本地運行和調試包含連接器的作業。
JAR作業部署請參見部署JAR作業。
說明部署時,如果選擇方式一使用連接器打包的作業,切記需要上傳添加連接器相關的Uber JAR包。
如果需要讀取設定檔,也需要在附加依賴檔案中上傳添加。
完整範例程式碼
本範例程式碼中,將Kafka資料來源的資料進行處理後寫入MySQL。此樣本僅供參考,更多的代碼風格和品質指南請參見代碼風格和品質指南。
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// 定義資料結構
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// 建立Flink執行環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// 載入屬性檔案
properties.load(input);
// 擷取屬性值
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// 將外部資料源整合到flink資料流程式
// WatermarkStrategy.noWatermarks() 指沒有使用浮水印策略
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// 篩選出分數大於60分的資料
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // 每次批量寫入的記錄數
.withBatchIntervalMs(2000) // 重試時的最大延遲時間(毫秒)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink 依賴項 -->
<!-- 之所以提供這些依賴項,是因為它們不應該打包到JAR檔案中。 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 在這裡添加連接器依賴項。它們必須在預設範圍(compile)中。 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- 添加日誌架構,以便在運行時產生控制台輸出 -->
<!-- 預設情況下,這些依賴項從應用程式JAR中排除 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java 編譯器-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- 我們使用maven-shade-plugin建立一個包含所有必須依賴的 fat jar -->
<!-- 修改<mainClass>的值.如果您的程式進入點發生了改變 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- 去掉一些不必要的依賴性 -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 不要複製META-INF檔案夾中的簽名。否則,這可能會在使用JAR檔案時導致安全異常 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
相關文檔
支援作為DataStream類型的連接器列表,請參見支援的連接器。
Flink JAR作業的完整開發流程樣本,請參見Flink JAR作業快速入門。
Flink全託管還支援運行SQL和Python作業,開發方法請參見SQL作業開發和Python作業開發。