全部產品
Search
文件中心

Realtime Compute for Apache Flink:JAR作業開發

更新時間:Sep 04, 2024

Flink DataStream提供了更靈活的編程模型和API,可以自訂各種資料轉換、操作和運算元,適用於複雜的商務邏輯和資料處理需求。本文為您介紹Flink JAR作業的開發方法。

支援開源Apache Flink

目前Realtime ComputeFlink支援的DataStream API完全相容開源的Flink版本,詳情請參見Apache Flink介紹Flink DataStream API開發指南

開發環境要求

  • 已安裝IntelliJ IDEA等開發工具。

  • 已安裝3.6.3及以上版本的Maven。

  • 作業開發需要使用JDK 1.8版本。

  • JAR作業需要您線上下完成開發,再在Flink全託管控制台上部署並運行。

開發準備

本範例涉及關於資料來源連接器如何使用,請準備好相關資料來源。

說明

作業開發

配置Flink環境依賴

說明

為了避免JAR包依賴衝突,請您注意以下幾點:

  • ${flink.version}為作業運行對應的Flink版本。請使用與作業部署頁面選擇的VVR引擎所使用的Flink版本一致。例如您在部署頁面選擇的引擎為1.17-vvr-8.0.4-1,其對應的Flink版本為1.17.0

  • Flink相關依賴,範圍請使用provided,即在依賴中添加<scope>provided</scope>。主要包含org.apache.flink組下以flink-開頭的非Connector依賴。

  • Flink原始碼中只有明確標註了@Public或者@PublicEvolving的才是公開供使用者調用的方法,阿里雲Realtime ComputeFlink版只對這些方法的相容性做出產品保證。

  • 如果是Flink服務內建的Connector支援的DataStream API,建議使用其內建的依賴。

下面是Flink的一些基本相關依賴,您可能還需要補充一些記錄檔相關的依賴,完整的依賴參考請參見文末的完整範例程式碼

flink相關依賴
         <!-- Apache Flink 依賴項 -->
        <!-- 之所以提供這些依賴項,是因為它們不應該打包到JAR檔案中。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

連接器依賴和使用

通過DataStream的方式讀寫資料,需要使用對應的DataStream連接器串連Flink全託管。Maven中央倉庫已經放置了VVR DataStream連接器,以供您在作業開發時直接使用。

重要

請使用我們在支援的連接器中指明提供DataStream API的連接器。如果某個連接器未註明提供給DataStream API,請勿自行使用,因為未來介面和參數可能會被修改。

您可以選擇以下任意一種方式來使用連接器:

(推薦)上傳連接器Uber JAR包到Flink開發控制台,部署作業時作為附加依賴檔案引入

  1. 在作業的Maven POM檔案中添加您需要的連接器作為專案依賴,其範圍為provided。完整的依賴檔案請參考文末的完整範例程式碼

    說明
    • ${vvr.version}是作業運行環境引擎版本,如您的作業運行在1.17-vvr-8.0.4-1版本引擎上,其對應的Flink版本為1.17.0。建議您使用最新的引擎,具體版本詳見引擎

    • 由於將連接器的Uber JAR包作為附加依賴檔案引入,則無需將該依賴打入JAR包中,所以需要聲明範圍為provided

            <!-- Kafka 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- MySQL 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
  2. 如果您有開發新連接器或者拓展現有連接器功能的需求,專案還需要依賴連接器公用包flink-connector-baseververica-connector-common

            <!-- Flink 連接器公用介面基礎依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里雲連接器公用介面基礎依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStream串連配置資訊和程式碼範例需要查看對應的DataStream連接器文檔。

    支援作為DataStream類型的連接器列表,請參見支援的連接器

  4. 部署作業並在附加依賴檔案項中添加相應的連接器Uber JAR包,詳情請參見部署JAR作業。您可以上傳您自己開發的連接器,也可以上傳Realtime ComputeFlink版提供的連接器(下載地址請參見Connector列表)。如圖所示。

    image

直接將連接器作為專案依賴打進作業JAR包

  1. 在作業的Maven POM檔案中添加您需要的連接器作為專案依賴。例如引入Kafka連接器和MySQL連接器。

    說明
    • ${vvr.version}是作業運行環境引擎版本,如您的作業運行在1.17-vvr-8.0.4-1版本引擎上,其對應的Flink版本為1.17.0。建議您使用最新的引擎,具體版本詳見引擎

    • 由於將連接器作為專案依賴直接打入JAR包,它們必須在預設範圍(compile)中。

            <!-- Kafka 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- MySQL 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
  2. 如果您有開發新連接器或者拓展現有連接器功能的需求,專案還需要依賴連接器公用包flink-connector-baseververica-connector-common

            <!-- Flink 連接器公用介面基礎依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里雲連接器公用介面基礎依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStream串連配置資訊和程式碼範例需要查看對應的DataStream連接器文檔。

    支援作為DataStream類型的連接器列表,請參見支援的連接器

OSS附加依賴檔案讀取

因為Flink JAR作業不支援在Main函數中讀取本地配置,您可以將設定檔上傳到Flink工作空間下的OSS Bucket,在部署JAR作業時,通過添加附加設定檔的方式進行讀取。樣本如下。

  1. 建立設定檔config.properties,避免在代碼中出現明文代碼。

    # Kafka 
    bootstrapServers=host1:9092,host2:9092,host3:9092
    inputTopic=topic
    groupId=groupId
    # MySQL
    database.url=jdbc:mysql://localhost:3306/my_database
    database.username=username
    database.password=password
  2. 在JAR作業中使用代碼讀取儲存在OSS Bucket上的設定檔config.properties。

    方式一:讀取工作空間綁定的OSS Bucket

    1. Realtime Compute開發控制台左側導覽列資源管理頁面,上傳該檔案。

    2. 在作業運行時,部署作業所添加附加依賴檔案將會載入到作業所運行Pod的/flink/usrlib目錄下。

    3. 讀取該設定檔程式碼範例如下。

                  Properties properties = new Properties();
                  Map<String,String> configMap = new HashMap<>();
      
                  try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
                      // 載入屬性檔案
                      properties.load(input);
                      // 擷取屬性值
                      configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
                      configMap.put("inputTopic",properties.getProperty("inputTopic"));
                      configMap.put("groupId",properties.getProperty("groupId"));
                      configMap.put("url",properties.getProperty("database.url")) ;
                      configMap.put("username",properties.getProperty("database.username"));
                      configMap.put("password",properties.getProperty("database.password"));
                  } catch (IOException ex) {
                      ex.printStackTrace();
                  }

    方式二:讀取工作空間有許可權訪問的OSS Bucket

    1. 將設定檔上傳目標OSS Bucket。

    2. 通過OSSClient直接讀取OSS上的隱藏檔詳情,請參見串流管理訪問憑據。程式碼範例如下。

      OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret");
      try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties");
           BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
          // read file and process ...
      } finally {
          if (ossClient != null) {
              ossClient.shutdown();
          }
      }

業務代碼編寫

  1. 將外部資料源整合到Flink資料流程式。Watermark是Flink一種基於時間語義的計算策略,往往伴隨著時間戳記一起使用,所以本樣本不使用浮水印策略。詳情請參考浮水印策略

             // 將外部資料源整合到flink資料流程式
            // WatermarkStrategy.noWatermarks() 指沒有使用浮水印策略
            DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
  2. 運算元轉換處理。樣本中將DataStream<Srting>轉換成DataStream<Student>,更多複雜的運算元轉化和處理方式請參考Flink運算元

              // 轉換資料結構為student的運算元
              DataStream<student> source = stream
                    .map(new MapFunction<String, student>() {
                        @Override
                        public student map(String s) throws Exception {
                            // 資料由逗號分隔
                            String[] data = s.split(",");
                            return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                        }
                    }).filter(student -> student.score >=60); // 篩選出分數大於60分的資料

作業打包

通過maven-shade-plugin外掛程式打包。

重要
  • 如果選擇作為附加依賴檔案引入使用連接器,打包作業時,確認連接器相關依賴的範圍為provided

  • 如果選擇連接器作為依賴一起打包,範圍預設(compile)即可。

maven-shade-plugin外掛程式依賴參考

<build>
        <plugins>
            <!-- Java 編譯器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- 我們使用maven-shade 建立一個包含所有必須依賴的 fat jar -->
            <!-- 修改<mainClass>的值.如果您的程式進入點發生了改變 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 去掉一些不必要的依賴性 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 不要複製META-INF檔案夾中的簽名。否則,這可能會在使用JAR檔案時導致安全異常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

作業測試及部署

  • 由於Realtime ComputeFlink版預設不具備訪問公網的能力,可能您的代碼無法在本地進行直接測試。建議您分開進行單元測試,詳情請參見本地運行和調試包含連接器的作業

  • JAR作業部署請參見部署JAR作業

    說明
    • 部署時,如果選擇方式一使用連接器打包的作業,切記需要上傳添加連接器相關的Uber JAR包。

    • 如果需要讀取設定檔,也需要在附加依賴檔案中上傳添加。

    image

完整範例程式碼

本範例程式碼中,將Kafka資料來源的資料進行處理後寫入MySQL。此樣本僅供參考,更多的代碼風格和品質指南請參見代碼風格和品質指南

FlinkDemo.java

package com.aliyun;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkDemo {
    // 定義資料結構
    public static class Student {
        public int id;
        public String name;
        public int score;

        public Student(int id, String name, int score) {
            this.id = id;
            this.name = name;
            this.score = score;
        }
    }

    public static void main(String[] args) throws Exception {
        // 建立Flink執行環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        Map<String,String> configMap = new HashMap<>();

        try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
            // 載入屬性檔案
            properties.load(input);
            // 擷取屬性值
            configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
            configMap.put("inputTopic",properties.getProperty("inputTopic"));
            configMap.put("groupId",properties.getProperty("groupId"));
            configMap.put("url",properties.getProperty("database.url")) ;
            configMap.put("username",properties.getProperty("database.username"));
            configMap.put("password",properties.getProperty("database.password"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        // Build Kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(configMap.get("bootstrapServers"))
                        .setTopics(configMap.get("inputTopic"))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(configMap.get("groupId"))
                        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                        .build();

        // 將外部資料源整合到flink資料流程式
        // WatermarkStrategy.noWatermarks() 指沒有使用浮水印策略
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

        // 篩選出分數大於60分的資料
        DataStream<Student> source = stream
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                    }
                }).filter(Student -> Student.score >=60);

        source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
                new JdbcStatementBuilder<Student>() {
                    public void accept(PreparedStatement ps, Student data) {
                        try {
                            ps.setInt(1, data.id);
                            ps.setString(2, data.name);
                            ps.setInt(3, data.score);
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5) // 每次批量寫入的記錄數
                        .withBatchIntervalMs(2000) // 重試時的最大延遲時間(毫秒)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(configMap.get("url"))
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(configMap.get("username"))
                        .withPassword(configMap.get("password"))
                        .build()
        )).name("Sink MySQL");

        env.execute("Flink Demo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>FlinkDemo</name>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.14.1</log4j.version>
    </properties>
    <dependencies>
        <!-- Apache Flink 依賴項 -->
        <!-- 之所以提供這些依賴項,是因為它們不應該打包到JAR檔案中。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 在這裡添加連接器依賴項。它們必須在預設範圍(compile)中。 -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <!-- 添加日誌架構,以便在IDE中運行時產生控制台輸出 -->
        <!-- 預設情況下,這些依賴項從應用程式JAR中排除 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java 編譯器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- 我們使用maven-shade 建立一個包含所有必須依賴的 fat jar -->
            <!-- 修改<mainClass>的值.如果您的程式進入點發生了改變 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 去掉一些不必要的依賴性 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 不要複製META-INF檔案夾中的簽名。否則,這可能會在使用JAR檔案時導致安全異常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

相關文檔