全部產品
Search
文件中心

Realtime Compute for Apache Flink:DataStream連接器設定方法

更新時間:Sep 13, 2024

本文為您介紹如何使用DataStream連接器,即通過DataStream的方式讀寫資料。

背景資訊

如果您通過DataStream的方式讀寫資料,則需要使用對應的DataStream連接器串連Flink全託管。Maven中央庫中已經放置了VVR DataStream連接器,以供您在作業開發時直接使用。您可以通過以下任何一種方式來使用連接器:

(推薦)上傳連接器JAR包到Realtime Compute控制台後,填寫配置資訊

  1. 登入Realtime Compute控制台

  2. 單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊檔案管理

  4. 單擊上傳資源,選擇您要上傳的目標連結器的JAR包。

    您可以上傳您自己開發的連接器,也可以上傳Flink全託管產品提供的連接器。Flink全託管產品提供的連接器官方JAR包的下載地址,請參見Connector列表

  5. 在目標作業開發頁面附加依賴檔案項,選擇目標連接器的JAR包。

直接將連接器作為專案依賴打進作業JAR包

步驟一:準備DataStream作業開發環境

  1. 在Maven專案的pom.xml檔案中添加以下配置以引用SNAPSHOT倉庫。

    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>

  2. 檢查您的settings.xml設定檔中是否存在<mirrorOf>*</mirrorOf>配置。

    <mirrorOf>中包含*,表示當前mirror已經包含了所有倉庫,maven不會從上述指定的兩個 SNAPSHOT倉庫中下載,這會導致Maven工程無法下載這兩個倉庫中的SNAPSHOT依賴。因此如果 settings.xml檔案<mirrorOf>*</mirrorOf>配置中包含*,您可以根據如下情況進行相應的修改:

    • 存在<mirrorOf>*</mirrorOf>配置,請將配置改為<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    • 存在<mirrorOf>external:*</mirrorOf>配置,請將配置改為<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    • 存在<mirrorOf>external:http:*</mirrorOf>配置,請將配置改為<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

  3. 在作業的Maven POM檔案中添加您需要的連接器作為專案依賴。

    每個Connector版本對應的Connector類型可能不同,建議您使用最新版本。完整的依賴資訊請參見MaxCompute-DemoDataHub-DemoKafka-DemoRocketMQ-Demo樣本中的pom.xml檔案。MaxCompute增量源表的專案依賴程式碼範例如下。

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-continuous-odps</artifactId>
        <version>${connector.version}</version>
    </dependency>

    除connector外,專案還需要依賴connector的公用包flink-connector-base

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>

    其中,${flink.version}是作業運行環境對應的Flink版本,如您的作業運行在1.15-vvr-6.0.7版本引擎上,其對應的Flink版本為1.15.0

    重要
    • 您需要在SNAPSHOT倉庫(oss.sonatype.org)尋找帶SNAPSHOT的Connector版本,在Maven中央庫(search.maven.org)上會尋找不到。

    • 在使用多個Connector時,請注意META-INF目錄需要Merge,即在pom.xml檔案中添加如下代碼。

      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>

步驟二:開發DataStream作業

DataStream串連配置資訊和程式碼範例需要去查看對應的DataStream連接器文檔,詳情請參見:

步驟三:打包並提交DataStream作業

使用Maven工具打包工程專案,並將產生的JAR包上傳和提交到Flink全託管平台上,詳細請參見部署JAR作業