Realtime Compute for Apache Flink DataStream provides flexible programming models and APIs. You can use the programming models and APIs to define various data conversions, operations, and operators to meet requirements for complex business logic and data processing. This topic describes how to develop a JAR draft in Realtime Compute for Apache Flink.
Support for Apache Flink
The DataStream APIs supported by Realtime Compute for Apache Flink are fully compatible with Apache Flink. For more information, see What is Apache Flink? and Flink DataStream API Programming Guide.
Environment requirements
A development tool, such as IntelliJ IDEA, is installed.
Maven 3.6.3 or later is installed.
Java Development Kit (JDK) 8 or JDK 11 is installed.
You must develop drafts in your on-premises environment before you deploy the drafts for running in the development console of Realtime Compute for Apache Flink.
Preparations
In this topic, an example that shows how to use connectors is provided. You must prepare data sources for the connectors.
By default, Realtime Compute for Apache Flink cannot access the Internet. Therefore, an ApsaraMQ for Kafka instance whose major version is 2.6.2, and an ApsaraDB RDS for MySQL instance that runs MySQL 8.0 are used in this topic.
If you created the preceding instances, make sure that the ApsaraDB RDS for MySQL instance and the ApsaraMQ for Kafka instance reside in the same virtual private cloud (VPC) as the workspace of Realtime Compute for Apache Flink. If the instances and the workspace do not reside in the same VPC, you can refer to How does Realtime Compute for Apache Flink access a service across VPCs?
If you want to use a self-managed data source, make sure that Realtime Compute for Apache Flink can access the data source. For more information, see How does Realtime Compute for Apache Flink access the Internet? and How do I configure a whitelist?
If you do not have an ApsaraMQ for Kafka instance, you can refer to Step 2: Purchase and deploy an instance to purchase and deploy an ApsaraMQ for Kafka instance. Make sure that the instance is deployed in the same VPC as the workspace of Realtime Compute for Apache Flink.
If you do not have an ApsaraDB RDS for MySQL instance, you can refer to Step 1: Create an ApsaraDB RDS for MySQL instance and configure databases to purchase and deploy an ApsaraDB RDS for MySQL instance. Make sure that the instance is deployed in the same VPC as the workspace of Realtime Compute for Apache Flink.
Develop a draft
Configure environment dependencies for Apache Flink
To prevent conflicts between JAR package dependencies, take note of the following points:
Use
${flink.version}
to specify the Flink version that corresponds to the runtime environment of the draft. The specified Flink version must be the same as the Flink version that corresponds to the Ververica Runtime (VVR) version of your deployment displayed on the Deployments page. For example, if the engine version that you select for your draft on the Deployments page isvvr-8.0.9-flink-1.17
, the Flink version is1.17.2
. For more information about how to view the engine version that is used by a deployment, see How do I view the engine version of Realtime Compute for Apache Flink that is used by a deployment?Specify
<scope>provided</scope>
for Apache Flink dependencies. The non-connector dependencies whose names start withflink-
in theorg.apache.flink
group are mainly required.Call only methods that are explicitly marked with @Public or @PublicEvolving in the source code of Apache Flink. Alibaba Cloud Realtime Compute for Apache Flink only ensures compatibility with these methods.
Use the built-in dependencies of Realtime Compute for Apache Flink if the DataStream APIs supported by the built-in connectors of Realtime Compute for Apache Flink are used.
The following sample code provides an example of Flink-related dependencies. You may also need to add some dependencies related to log files. For more information about the complete dependencies, see the "Complete sample code" section of this topic.
Flink-related dependencies
Use connector dependencies
If you want to read and write data in DataStream mode, you must use the related DataStream connector to connect to Realtime Compute for Apache Flink. You can use the VVR DataStream connectors that are stored in the Maven central repository to develop drafts.
Interfaces and parameters of connectors may change in the future. We recommend that you use the connectors that are specified to provide DataStream APIs in Supported connectors.
You can use connectors in one of the following ways:
(Recommended) Upload the uber JAR files of the connectors and call the uber JAR files as additional dependency files
Add the required connectors as project dependencies to the pom.xml file of the Maven project for your draft and specify <scope>provided</scope> for the dependencies. For more information about the complete dependencies, see the "Complete sample code" section of this topic.
Note${vvr.version}
indicates the engine version that corresponds to the runtime environment of the draft. If the engine version of your draft isvvr-8.0.9-flink-1.17
, the Flink version is1.17.2
. We recommend that you use the latest version of VVR. For more information, see Engine updates.In this topic, the uber JAR files of the connectors are called as additional dependency files. In this case, you do not need to package the required connectors into the JAR file of your draft. Therefore, you must specify
<scope>provided</scope>
for the connector dependencies.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>
Add the public package
flink-connector-base
orververica-connector-common
of connectors as project dependencies if you want to develop new connectors or use extended features of the existing connectors.<!-- Basic dependency of the public interface of Flink connectors --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency of the public interface of Alibaba Cloud connectors --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
View the configuration information and sample code of DataStream connectors. For more information, see the topics in the DataStream connector documentation.
For more information about DataStream connectors, see Supported connectors.
Deploy the draft and add the uber JAR packages to the Additional Dependencies field in the Create Jar Deployment dialog box. For more information, see Create a JAR deployment. You can upload the JAR file of a connector that you develop or a connector provided by Realtime Compute for Apache Flink. The download links of the official JAR files of the connectors provided by Realtime Compute for Apache Flink are available in Connectors. The following figure shows an example.
Package the connectors as project dependencies into the JAR file of your draft
Add the required connectors as project dependencies to the pom.xml file of the Maven project for your draft. For example, you can add the Kafka and MySQL connectors as project dependencies to the pom.xml file of the Maven project for your draft.
Note${vvr.version}
indicates the engine version that corresponds to the runtime environment of the draft. If the engine version of your draft isvvr-8.0.9-flink-1.17
, the Flink version is1.17.2
. We recommend that you use the latest version of VVR. For more information, see Engine updates.In this topic, the connectors are packaged as project dependencies into the JAR file of your draft. In this case, the default scope (compile) must be used for the dependencies.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>
Add the public package
flink-connector-base
orververica-connector-common
of connectors as project dependencies if you want to develop new connectors or use extended features of the existing connectors.<!-- Basic dependency of the public interface of Flink connectors --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency of the public interface of Alibaba Cloud connectors --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
View the configuration information and sample code of DataStream connectors. For more information, see the topics in the DataStream connector documentation.
For more information about DataStream connectors, see Supported connectors.
Read data from a dependency file stored in OSS
You cannot read data from a local configuration file by using the Main function for a JAR deployment for Realtime Compute for Apache Flink. In this case, you can upload the configuration file to an Object Storage Service (OSS) bucket and add the configuration file to the Additional Dependencies field in the Create Jar Deployment dialog box when you create a JAR deployment to read data from the configuration file. Example:
Create the configuration file config.properties and do not use plaintext in your code.
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=password
Run code in a JAR draft to read data from the configuration file config.properties that is stored in an OSS bucket.
Method 1: Read data from the configuration file in the OSS bucket that is associated with your workspace
In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, click Artifacts. On the Artifacts page, upload the file.
When the deployment is running, the additional dependency file that is added to the Additional Dependencies field is loaded to the /flink/usrlib directory of the pod in which the deployment is running.
Run the following code in the JAR draft to read data from the configuration file:
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // Load the property file. properties.load(input); // Obtain the property values. configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
Method 2: Read data from the configuration file in the OSS bucket on which the workspace of Realtime Compute for Apache Flink has access permissions
Upload the configuration file config.properties to the OSS bucket.
Use OSSClient in a JAR draft to read data from the configuration file in the OSS bucket. For more information, see Streaming download and Manage access credentials. Sample code:
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
Write business code
Integrate external data sources into data streaming programs of Realtime Compute for Apache Flink.
Watermarks
are computing strategies based on time semantics in Realtime Compute for Apache Flink. In most cases, watermarks are used together with timestamps. In this example, watermark strategies are not used. For more information, see Generating Watermarks.// Integrate external data sources into data streaming programs of Realtime Compute for Apache Flink. // Specify WatermarkStrategy.noWatermarks(), which indicates that no watermark strategy is used. DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
Convert operators. In this example, the
DataStream<String>
operator is converted intoDataStream<Student>
. For more information about how to convert and process more complex operators, see Operators.// Convert the operator whose data structure is student. DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // Separate data with commas (,). String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // Obtain data records whose score is greater than 60.
Package the JAR file of the draft
Package the JAR file of the draft by using maven-shade-plugin.
If you upload the uber JAR files of the connectors to the development console of Realtime Compute for Apache Flink and call the uber JAR files as additional dependency files in a DataStream draft, you must specify
<scope>provided</scope>
for dependencies of the connectors when you package the JAR file of the draft.If you package the connectors as project dependencies into the JAR file of your draft, you must specify <scope>compile</scope> for dependencies of the connectors when you package the JAR file of the draft.
Test and deploy a draft
By default, Realtime Compute for Apache Flink cannot access the Internet. In this case, you may be unable to directly test your code in an on-premises machine. We recommend that you separately perform unit tests. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment.
For more information about how to create a JAR deployment for the draft, see Create a JAR deployment.
NoteIf you use the uber JAR files of the connectors and call the uber JAR files as additional dependency files in a DataStream draft when you deploy the draft, you must upload the uber JAR files of the connectors to the development console of Realtime Compute for Apache Flink.
If you want to read the configuration file, you must add the configuration file to the Additional Dependencies field in the Create Jar Deployment dialog box.
Complete sample code
In this example, data of the Kafka data source is processed and written to the MySQL database. This example is for reference only. For more information about the code style and quality guidelines, see Apache Flink Code Style and Quality Guide.
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// Define the data structure.
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// Create a Flink execution environment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// Load the property file.
properties.load(input);
// Obtain the property values.
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Integrate external data sources into Flink data streaming programs.
// Specify WatermarkStrategy.noWatermarks(), which indicates that no watermark strategy is used.
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// Obtain data records whose score is greater than 60.
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // The number of records to be written in each batch.
.withBatchIntervalMs(2000) // The maximum delay that is allowed for a retry. Unit: milliseconds.
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- Provide the dependencies to prevent them from being packaged in a JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add the connector dependencies and specify <scope>compile</scope> for the dependencies. -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- Add a logging framework to generate output data in the development console of Realtime Compute for Apache Flink when you run the deployment. -->
<!-- By default, these dependencies are excluded from the JAR file in the program. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
<!-- Modify the value of <mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- Remove unnecessary dependencies. -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signature in the META-INF folder. Otherwise, a security error may occur when you use the JAR file. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
References
For more information about DataStream connectors, see Supported connectors.
For more information about how to create a JAR deployment, see Getting started with a JAR deployment of Realtime Compute for Apache Flink.
For more information about how to develop an SQL draft or a Python API draft, see Develop an SQL draft and Develop a Python API draft.