完成資料訂閱通道的配置後,您可以使用flink-dts-connector檔案消費通道中的資料,用於Flink用戶端消費。本文介紹如何flink-dts-connector檔案的使用說明。
注意事項
僅支援Flink用戶端使用DataStream API、Table API和SQL。
如您的Flink用戶端使用Table API和SQL,則單次配置時僅支援消費單張表的資料,如需消費多張表的資料,您需進行多次配置獨立的任務。
操作步驟
本文以IntelliJ IDEA軟體(Community Edition 2020.1 Windows版本)為例,介紹如何使用flink-dts-connector檔案來消費訂閱通道中的資料。
建立新版資料訂閱通道,詳情請參見建立RDS MySQL資料訂閱通道、建立PolarDB MySQL版資料訂閱通道或建立Oracle資料訂閱通道。
建立一個或多個消費組,詳情請參見新增消費組。
下載flink-dts-connector檔案並解壓。
運行IntelliJ IDEA工具,然後單擊Open or Import。
在彈出的對話方塊中,定位至flink-dts-connector檔案所在目錄,依次展開檔案夾,找到專案物件模型檔案:pom.xml。
在彈出對話方塊中,選擇Open as Project。
在pom.xml檔案中添加如下依賴:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
在IntelliJ IDEA軟體介面,依次展開檔案夾,並根據您所使用的Flink Connector的程式類型,選擇對應的Java檔案。
如Flink用戶端類型為DataStream API,您需雙擊開啟flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java檔案,並執行如下操作:
在IntelliJ IDEA軟體介面的頂部,單擊如下表徵圖。
在彈跳框中單擊
。在彈跳框的Program arguments中,按如下樣本輸入參數及對應的值,並單擊下方的Run,啟動flink-dts-connector。
說明具體參數說明及查詢方式,請參見參數說明。
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
運行結果如下圖所示,該用戶端可正常訂閱到源庫的資料變更資訊。
說明如需查詢資料變更的具體記錄,您可登入Flink用戶端的Task Manager介面進行查看。
如Flink用戶端類型為Table API和SQL,您需雙擊開啟flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java檔案,並執行如下操作:
說明單個
DtsTableISelectTCaseTest.java
檔案,僅支援配置並消費單張表的訂閱資料。如需配置並消費多張表中的資料,您需要重複配置,並運行多個獨立任務。在如下位置添加前置字元
//
,注釋該行代碼資訊。設定所需消費的單張表的資訊,支援使用SQL語句。
設定訂閱通道參數,具體參數說明及查詢方式,請參見參數說明。
在IntelliJ IDEA軟體介面的頂部,單擊Run'DtsTableISelectTCaseTest',啟動flink-dts-connector。
運行結果如下圖所示,該用戶端可正常訂閱到源庫的資料變更資訊。
說明如需查詢資料變更的具體記錄,您可登入Flink用戶端的Task Manager介面進行查看。
參數說明
DstExample檔案中的參數 | DtsTableISelectTCaseTest檔案中的參數 | 說明 | 查詢方式 |
|
| 資料訂閱通道的網路地址及連接埠號碼資訊。 說明
| 在DTS控制台單擊目標訂閱執行個體ID,在訂閱配置頁面,您可以擷取到訂閱Topic、網路地址及連接埠號碼資訊。 |
|
| 資料訂閱通道的訂閱Topic。 | |
|
| 消費組ID。 | 在DTS控制台單擊目標訂閱執行個體ID,然後單擊資料消費,您可以擷取到消費組ID和消費組的帳號資訊。 說明 消費組帳號的密碼已在您建立消費組時指定。 |
|
| 消費組的帳號。 警告 如您未使用本文提供的flink-dts-connector檔案,請按照 | |
|
| 該帳號的密碼。 | |
|
| 消費位點,即flink-dts-connector消費第一條資料的時間戳記,格式為Unix時間戳記,例如1624440043。 說明 消費位點資訊可用於:
| 消費位點必須在訂閱執行個體的資料範圍(如圖所示)之內,並需轉化為Unix時間戳記。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
無 |
| 訂閱對象。僅支援傳入單張表,且格式要求如下:
| 在DTS控制台單擊目標訂閱執行個體ID,在訂閱配置頁面,單擊右上方的查看訂閱對象,查詢訂閱對象所屬資料庫和表。 |
常見問題
報錯提示 | 可能的原因 | 解決方式 |
| DTS用於讀取增量資料的模組DStore發生切換,導致Flink用戶端的消費位點丟失。 | 您無需重啟用戶端,僅需查詢用戶端的消費位點,並在DtsExample.java和DtsTableISelectTCaseTest.java檔案中重新傳入消費位點 |