Flink DataStream提供了更灵活的编程模型和API,可以自定义各种数据转换、操作和算子,适用于复杂的业务逻辑和数据处理需求。本文为您介绍Flink JAR作业的开发方法。
支持开源Apache Flink
目前实时计算Flink支持的DataStream API完全兼容开源的Flink版本,详情请参见Apache Flink介绍和Flink DataStream API开发指南。
开发环境要求
已安装IntelliJ IDEA等开发工具。
已安装3.6.3及以上版本的Maven。
作业开发仅支持JDK 8和JDK 11版本。
JAR作业需要您在线下完成开发,再在实时计算管理控制台上部署并运行。
开发准备
本样例涉及关于数据源连接器如何使用,请准备好相关数据源。
实时计算Flink版默认不具备访问公网的能力,所以本案例采用的数据源为阿里云消息队列Kafka(2.6.2)和阿里云RDS MySQL(8.0)。
如您已经购买了上述产品,请确保其与您购买的实时计算Flink版处于同一VPC中,如果不是,请参考如何访问跨VPC的其他服务?
如您有自建的数据源需要使用,请确认实时计算Flink版能访问该数据源,请参考实时计算Flink版如何访问公网?和如何设置白名单?
如您还没有云消息队列Kafka数据源,请购买云消息队列Kafka并部署实例,详情请参见步骤二:购买和部署实例。部署实例时,请确认部署至与实时计算Flink同一VPC下。
如您还没有RDS MySQL数据源,请购买RDS MySQL并部署实例,详情请参见第一步:创建RDS MySQL实例与配置数据库。部署实例时,请确认部署至与实时计算Flink同一VPC下。
作业开发
配置Flink环境依赖
为了避免JAR包依赖冲突,请您注意以下几点:
${flink.version}
为作业运行对应的Flink版本。请使用与作业部署页面选择的VVR引擎所使用的Flink版本一致。例如您在部署页面选择的引擎为vvr-8.0.9-flink-1.17
,其对应的Flink版本为1.17.2
,查看VVR引擎版本详情请参见操作指导。Flink相关依赖,作用域请使用provided,即在依赖中添加
<scope>provided</scope>
。主要包含org.apache.flink
组下以flink-
开头的非Connector依赖。Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云实时计算Flink版只对这些方法的兼容性做出产品保证。
如果是Flink服务内置的Connector支持的DataStream API,建议使用其内置的依赖。
下面是Flink的一些基本相关依赖,您可能还需要补充一些日志文件相关的依赖,完整的依赖参考请参见文末的完整示例代码。
flink相关依赖
连接器依赖和使用
通过DataStream的方式读写数据,需要使用对应的DataStream连接器连接实时计算Flink版。Maven中央仓库已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。
请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供给DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。
您可以选择以下任意一种方式来使用连接器:
(推荐)上传连接器Uber JAR包作为附加依赖文件引入
在作业的Maven POM文件中添加您需要的连接器作为项目依赖,其作用域为provided。完整的依赖文件请参考文末的完整示例代码。
说明${vvr.version}
是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17
版本引擎上,其对应的Flink版本为1.17.2
。建议您使用最新的引擎,具体版本详见引擎。由于将连接器的Uber JAR包作为附加依赖文件引入,则无需将该依赖打入JAR包中,所以需要声明作用域为
provided
。
<!-- Kafka 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>
如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包
flink-connector-base
或ververica-connector-common
。<!-- Flink 连接器公共接口基础依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里云连接器公共接口基础依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
DataStream连接配置信息和代码示例需要查看对应的DataStream连接器文档。
支持作为DataStream类型的连接器列表,请参见支持的连接器。
部署作业并在附加依赖文件项中添加相应的连接器Uber JAR包,详情请参见部署JAR作业。您可以上传您自己开发的连接器,也可以上传实时计算Flink版提供的连接器(下载地址请参见Connector列表)。如图所示。
直接将连接器作为项目依赖打进作业JAR包
在作业的Maven POM文件中添加您需要的连接器作为项目依赖。例如引入Kafka连接器和MySQL连接器。
说明${vvr.version}
是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17
版本引擎上,其对应的Flink版本为1.17.2
。建议您使用最新的引擎,具体版本详见引擎。由于将连接器作为项目依赖直接打入JAR包,它们必须在默认作用域(compile)中。
<!-- Kafka 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>
如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包
flink-connector-base
或ververica-connector-common
。<!-- Flink 连接器公共接口基础依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里云连接器公共接口基础依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
DataStream连接配置信息和代码示例需要查看对应的DataStream连接器文档。
支持作为DataStream类型的连接器列表,请参见支持的连接器。
OSS附加依赖文件读取
因为Flink JAR作业不支持在Main函数中读取本地配置,您可以将配置文件上传到Flink工作空间下的OSS Bucket,在部署JAR作业时,通过添加附加配置文件的方式进行读取。示例如下。
创建配置文件config.properties,避免在代码中出现明文代码。
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=password
在JAR作业中使用代码读取存储在OSS Bucket上的配置文件config.properties。
方式一:读取工作空间绑定的OSS Bucket
实时计算开发控制台左侧导航栏资源管理页面,上传该文件。
在作业运行时,部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下。
读取该配置文件代码示例如下。
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // 加载属性文件 properties.load(input); // 获取属性值 configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
方式二:读取工作空间有权限访问的OSS Bucket
将配置文件上传目标OSS Bucket。
通过OSSClient直接读取OSS上的存储文件详情,请参见流式传输和管理访问凭据。代码示例如下。
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
业务代码编写
将外部数据源集成到Flink数据流程序。
Watermark
是Flink一种基于时间语义的计算策略,往往伴随着时间戳一起使用,所以本示例不使用水印策略。详情请参考水印策略。// 将外部数据源集成到flink数据流程序 // WatermarkStrategy.noWatermarks() 指没有使用水印策略 DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
算子转换处理。示例中将
DataStream<String>
转换成DataStream<Student>
,更多复杂的算子转化和处理方式请参考Flink算子。// 转换数据结构为student的算子 DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // 数据由逗号分隔 String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // 筛选出分数大于60分的数据
作业打包
通过maven-shade-plugin插件打包。
如果选择作为附加依赖文件引入使用连接器,打包作业时,确认连接器相关依赖的作用域为
provided
。如果选择连接器作为依赖一起打包,作用域默认(compile)即可。
作业测试及部署
由于实时计算Flink版默认不具备访问公网的能力,可能您的代码无法在本地进行直接测试。建议您分开进行单元测试,详情请参见本地运行和调试包含连接器的作业。
JAR作业部署请参见部署JAR作业。
说明部署时,如果选择方式一使用连接器打包的作业,切记需要上传添加连接器相关的Uber JAR包。
如果需要读取配置文件,也需要在附加依赖文件中上传添加。
完整示例代码
本示例代码中,将Kafka数据源的数据进行处理后写入MySQL。此示例仅供参考,更多的代码风格和质量指南请参见代码风格和质量指南。
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// 定义数据结构
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// 创建Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// 加载属性文件
properties.load(input);
// 获取属性值
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// 将外部数据源集成到flink数据流程序
// WatermarkStrategy.noWatermarks() 指没有使用水印策略
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// 筛选出分数大于60分的数据
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // 每次批量写入的记录数
.withBatchIntervalMs(2000) // 重试时的最大延迟时间(毫秒)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink 依赖项 -->
<!-- 之所以提供这些依赖项,是因为它们不应该打包到JAR文件中。 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 在这里添加连接器依赖项。它们必须在默认作用域(compile)中。 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- 添加日志框架,以便在运行时生成控制台输出 -->
<!-- 默认情况下,这些依赖项从应用程序JAR中排除 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java 编译器-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- 我们使用maven-shade-plugin创建一个包含所有必须依赖的 fat jar -->
<!-- 修改<mainClass>的值.如果您的程序入口点发生了改变 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- 去掉一些不必要的依赖性 -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 不要复制META-INF文件夹中的签名。否则,这可能会在使用JAR文件时导致安全异常 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
相关文档
支持作为DataStream类型的连接器列表,请参见支持的连接器。
Flink JAR作业的完整开发流程示例,请参见Flink JAR作业快速入门。
实时计算Flink版还支持运行SQL和Python作业,开发方法请参见SQL作业开发和Python作业开发。