The Flink DataStream API provides a flexible programming model that lets you define custom data transformations, operations, and operators. This flexibility is ideal for complex business logic and data processing requirements. This topic describes how to develop Flink JAR jobs.
Support for open source Apache Flink
The DataStream API in Realtime Compute for Apache Flink is fully compatible with the open source Apache Flink version. For more information, see Introduction to Apache Flink and Flink DataStream API Developer Guide.
Development environment requirements
An integrated development environment (IDE) such as IntelliJ IDEA is installed.
Maven 3.6.3 or later is installed.
Job development supports only JDK 8 and JDK 11.
You can develop JAR jobs offline and then deploy and run them in the Realtime Compute for Apache Flink console.
Prerequisites
This example demonstrates how to use data source connectors. You must prepare the required data sources in advance.
This example uses Alibaba Cloud Message Queue for Kafka 2.6.2 and ApsaraDB RDS for MySQL 8.0 as data sources.
For information about how to access a self-managed data source over the internet or across VPCs, see Select a network connection method.
If you do not have a Message Queue for Kafka data source, you must purchase and deploy a Kafka instance. For more information, see Step 2: Purchase and deploy an instance. When you deploy the instance, make sure that it is in the same VPC as your Realtime Compute for Apache Flink workspace.
If you do not have an ApsaraDB RDS for MySQL data source, you must purchase an RDS for MySQL instance. For more information, see Step 1: Create an ApsaraDB RDS for MySQL instance and configure a database. When you purchase the instance, make sure that it is in the same region and VPC as your Realtime Compute for Apache Flink workspace.
Develop a job
Configure Flink environment dependencies
To avoid JAR package dependency conflicts, note the following:
${flink.version}is the Flink version that corresponds to the job's runtime. Use the same Flink version as the Ververica Runtime (VVR) engine that you select on the deployment page. For example, if you select thevvr-8.0.9-flink-1.17engine on the deployment page, the corresponding Flink version is1.17.2. To view VVR engine version details, see How do I view the Flink version of the current job?.For Flink-related dependencies, set the scope to `provided`. To do this, add
<scope>provided</scope>to the dependency. This primarily applies to non-connector dependencies that start withflink-under theorg.apache.flinkgroup.In the Flink source code, only methods explicitly annotated with @Public or @PublicEvolving are considered public. Realtime Compute for Apache Flink guarantees compatibility only for these methods.
If a built-in connector in Realtime Compute for Apache Flink supports the DataStream API, you must use its built-in dependencies.
The following code shows some basic Flink dependencies. You may also need to add dependencies for log files. For a complete list of dependencies, see Complete sample code at the end of this topic.
Flink-related dependencies
Connector dependencies and usage
To read and write data using DataStream, you must use the corresponding DataStream connector. The Maven central repository contains VVR DataStream connectors that you can use directly during job development.
You must use the connectors that are specified as supporting the DataStream API in Supported connectors. Do not use other connectors, because their interfaces and parameters may be modified in the future.
You can use one of the following methods to use a connector:
(Recommended) Upload the connector JAR package as an additional dependency file
In the Maven POM file of the job, add the required connector as a project dependency and set its scope to `provided`. For the complete dependency file, see Complete sample code at the end of this topic.
Note${vvr.version}is the version of the job's runtime engine. For example, if your job runs on thevvr-8.0.9-flink-1.17engine, the corresponding Flink version is1.17.2. We recommend that you use the latest engine version. For more information about engine versions, see Engines.Because the connector JAR package is imported as an additional dependency, you do not need to package it into the job JAR file. Therefore, you must set the scope to
provided.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>To develop a new connector or extend the features of an existing connector, the project also requires the
flink-connector-baseorververica-connector-commonpublic package.<!-- Basic dependency for the Flink connector common interface --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency for the Alibaba Cloud connector common interface --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>For DataStream connection configuration information and code samples, see the documentation for the corresponding DataStream connector.
For a list of connectors that can be used as DataStream types, see Supported connectors.
When you deploy the job, add the corresponding connector JAR package in the Additional Dependencies section. For more information, see Deploy a JAR job. You can upload a connector that you developed or a connector provided by Realtime Compute for Apache Flink. For download links, see Connector list. The following figure shows an example.

Package the connector as a project dependency into the job JAR file
In the Maven POM file of the job, add the required connector as a project dependency. For example, you can import the Kafka connector and the MySQL connector.
Note${vvr.version}is the version of the job's runtime engine. For example, if your job runs on thevvr-8.0.9-flink-1.17engine, the corresponding Flink version is1.17.2. We recommend that you use the latest engine version. For more information about engine versions, see Engines.Because the connectors are packaged directly into the JAR file as project dependencies, you must use the default `compile` scope.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>To develop a new connector or extend the features of an existing connector, the project also requires the
flink-connector-baseorververica-connector-commonpublic package.<!-- Basic dependency for the Flink connector common interface --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency for the Alibaba Cloud connector common interface --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>For DataStream connection configuration information and code samples, see the documentation for the corresponding DataStream connector.
For a list of connectors that can be used as DataStream types, see Supported connectors.
Read additional dependency files from OSS
Flink JAR jobs do not support reading local configuration files from within the `main` function. Instead, you can upload the configuration file to an OSS bucket associated with your Flink workspace. When you deploy the job, you can then read the file by adding it as an additional dependency. The following sections provide examples.
Create a configuration file named `config.properties` to avoid using plaintext passwords in your code.
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=passwordUse code in the JAR job to read the `config.properties` file stored in the OSS bucket.
Method 1: Read from the OSS Bucket that is attached to the workspace
In the Realtime Compute for Apache Flink development console, go to the Resource Management page in the navigation pane on the left and upload the file.
When the job runs, the additional dependency file that you added during deployment is loaded into the /flink/usrlib directory of the pod where the job runs.
The following code shows an example of how to read the configuration file.
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // Load the properties file. properties.load(input); // Get property values. configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
Method 2: Read from an OSS Bucket that the workspace has permission to access
Upload the configuration file to the destination OSS bucket.
You can use `OSSClient` to directly read the file stored in OSS. For more information, see Stream and Manage access credentials. The following is a sample code.
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
Write business code
The following code shows how to integrate an external data source into the Flink data stream program. A
watermarkis a mechanism in Flink that measures progress in event time and is often used with timestamps. This example does not use a watermark policy. For more information, see Watermark Strategies.// Integrate an external data source into the Flink data stream program. // WatermarkStrategy.noWatermarks() indicates that no watermark policy is used. DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");The following code shows an operator transformation. The example converts
DataStream<String>toDataStream<Student>. For more information about complex operator transformations and processing methods, see Flink Operators.// An operator that transforms the data structure to student. DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // The data is separated by commas. String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // Filter data to find scores greater than or equal to 60.
Package the job
You can package the job using the `maven-shade-plugin` plug-in.
If you choose to import the connector as an additional dependency file, make sure that the scope of the connector-related dependency is set to
providedwhen you package the job.If you choose to package the connector as a dependency, you can use the default `compile` scope.
Test and deploy the job
Realtime Compute for Apache Flink does not have internet access by default. Therefore, you may not be able to test your code locally. We recommend that you perform unit tests separately. For more information, see Run and debug a job that contains connectors locally.
For information about how to deploy a JAR job, see Deploy a JAR job.
NoteWhen you deploy the job, if you chose to upload the connector as an additional dependency file, you must upload the related connector JAR package.
To read a configuration file, you must also upload it as an additional dependency file.

Complete sample code
This sample code shows how data from a Kafka data source is processed and then written to MySQL. This example is for reference only. For more information about code style and quality guidelines, see Code Style and Quality Guide.
The example does not include configurations for runtime parameters such as checkpoints, Time to Live (TTL), and restart policies. You can customize these configurations on the Deployment Details page after the job is deployed. Configurations set in the code have a higher priority than those set on the page. We recommend that you customize these configurations on the page after deployment. This approach simplifies future modifications and reuse. For more information, see Configure job deployment information.
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// Define the data structure.
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// Create a Flink execution environment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// Load the properties file.
properties.load(input);
// Get property values.
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Integrate an external data source into the Flink data stream program.
// WatermarkStrategy.noWatermarks() indicates that no watermark policy is used.
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// Filter data to find scores greater than or equal to 60.
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // The number of records written in each batch.
.withBatchIntervalMs(2000) // The maximum delay for retries in milliseconds.
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink Dependencies -->
<!-- These dependencies are provided because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- Add a logging framework to generate console output at runtime. -->
<!-- By default, these dependencies are excluded from the application JAR. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade-plugin to create a fat jar that contains all required dependencies. -->
<!-- Modify the value of <mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- Remove some unnecessary dependencies. -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, this may cause a security exception when you use the JAR file. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>References
For a list of connectors that can be used as DataStream types, see Supported connectors.
For a complete example of the Flink JAR job development process, see Quick Start for Flink JAR jobs.
Realtime Compute for Apache Flink also supports SQL and Python jobs. For information about how to develop these jobs, see Job development map and Develop Python jobs.