本文為您介紹如何使用DataStream連接器,即通過DataStream的方式讀寫資料。
背景資訊
如果您通過DataStream的方式讀寫資料,則需要使用對應的DataStream連接器串連Flink全託管。Maven中央庫中已經放置了VVR DataStream連接器,以供您在作業開發時直接使用。您可以通過以下任何一種方式來使用連接器:
- 重要
請使用我們在支援的連接器中指明提供DataStream API的連接器。如果某個連接器未註明提供DataStream API,請勿自行使用,因為未來介面和參數可能會被修改。
DataStream連接器均添加了商業化加密保護,直接運行會報錯。如需本地調試和運行,請參見本地運行和調試包含連接器的作業。
(推薦)上傳連接器JAR包到Realtime Compute控制台後,填寫配置資訊
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理。
單擊上傳資源,選擇您要上傳的目標連結器的JAR包。
您可以上傳您自己開發的連接器,也可以上傳Flink全託管產品提供的連接器。Flink全託管產品提供的連接器官方JAR包的下載地址,請參見Connector列表。
在目標作業開發頁面附加依賴檔案項,選擇目標連接器的JAR包。
直接將連接器作為專案依賴打進作業JAR包
步驟一:準備DataStream作業開發環境
在Maven專案的pom.xml檔案中添加以下配置以引用SNAPSHOT倉庫。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
檢查您的settings.xml設定檔中是否存在
<mirrorOf>*</mirrorOf>
配置。<mirrorOf>
中包含*,表示當前mirror已經包含了所有倉庫,maven不會從上述指定的兩個 SNAPSHOT倉庫中下載,這會導致Maven工程無法下載這兩個倉庫中的SNAPSHOT依賴。因此如果 settings.xml檔案<mirrorOf>*</mirrorOf>
配置中包含*,您可以根據如下情況進行相應的修改:存在
<mirrorOf>*</mirrorOf>
配置,請將配置改為<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。存在
<mirrorOf>external:*</mirrorOf>
配置,請將配置改為<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。存在
<mirrorOf>external:http:*</mirrorOf>
配置,請將配置改為<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。
在作業的Maven POM檔案中添加您需要的連接器作為專案依賴。
每個Connector版本對應的Connector類型可能不同,建議您使用最新版本。完整的依賴資訊請參見MaxCompute-Demo、DataHub-Demo、Kafka-Demo或RocketMQ-Demo樣本中的pom.xml檔案。MaxCompute增量源表的專案依賴程式碼範例如下。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-continuous-odps</artifactId> <version>${connector.version}</version> </dependency>
除connector外,專案還需要依賴connector的公用包
flink-connector-base
:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency>
其中,
${flink.version}
是作業運行環境對應的Flink版本,如您的作業運行在1.15-vvr-6.0.7
版本引擎上,其對應的Flink版本為1.15.0
。重要您需要在SNAPSHOT倉庫(oss.sonatype.org)尋找帶SNAPSHOT的Connector版本,在Maven中央庫(search.maven.org)上會尋找不到。
在使用多個Connector時,請注意META-INF目錄需要Merge,即在pom.xml檔案中添加如下代碼。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
步驟二:開發DataStream作業
DataStream串連配置資訊和程式碼範例需要去查看對應的DataStream連接器文檔,詳情請參見:
步驟三:打包並提交DataStream作業
使用Maven工具打包工程專案,並將產生的JAR包上傳和提交到Flink全託管平台上,詳細請參見部署JAR作業。