全部產品
Search
文件中心

ApsaraMQ for Kafka:建立AnalyticDB Sink Connector

更新時間:Dec 27, 2024

本文介紹如何通過建立AnalyticDB Sink Connector,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic通過Function Compute服務匯出至AnalyticDB for MySQL或雲原生資料倉儲AnalyticDB PostgreSQL版。

前提條件

在匯出資料前,請確保您已完成以下操作:

注意事項

  • 僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至AnalyticDB for MySQL或雲原生資料倉儲AnalyticDB PostgreSQL版。關於Connector的限制說明,請參見使用限制

  • 該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述

  • Function Compute的函數調用支援日誌查詢,以便您迅速排查問題。具體操作步驟,請參見配置日誌

  • 訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。

  • 如果AnalyticDB Sink Connector存取點是私網存取點,Function Compute運行環境預設無法訪問,為確保網路暢通,需在Function Compute控制台為函數服務配置與雲原生資料倉儲一致的VPC和vSwitch資訊。更多資訊,請參見更新服務

  • 建立Connector時,雲訊息佇列 Kafka 版會為您自動建立服務關聯角色。

    • 如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Table Store的功能。

    • 如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。

    關於服務關聯角色的更多資訊,請參見服務關聯角色

操作流程

使用AnalyticDB Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至雲原生資料倉儲操作流程如下:

  1. 可選:建立AnalyticDB Sink Connector依賴的Topic和Group

    如果您不需要自訂Topic和Group,您可以直接跳過該步驟,在下一步驟選擇自動建立。

    重要

    部分AnalyticDB Sink Connector依賴的Topic的儲存引擎必須為Local儲存,大版本為0.10.2的雲訊息佇列 Kafka 版執行個體不支援手動建立Local儲存的Topic,只支援自動建立。

    1. 建立AnalyticDB Sink Connector依賴的Topic

    2. 建立AnalyticDB Sink Connector依賴的

  2. 建立並部署AnalyticDB Sink Connector

  3. 服務配置

    1. 配置函數服務

    2. 配置雲原生資料庫

  4. 結果驗證

    1. 發送測試訊息

    2. 驗證結果

建立AnalyticDB Sink Connector依賴的Topic

您可以在雲訊息佇列 Kafka 版控制台手動建立AnalyticDB Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、無效信件佇列Topic以及異常資料Topic。每個Topic所需要滿足的分區數與儲存引擎會有差異,具體資訊,請參見配置源服務參數列表

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

    重要

    Topic需要在應用程式所在的地區(即所部署的ECS的所在地區)進行建立。Topic不能跨地區使用。例如Topic建立在華北2(北京)這個地區,那麼訊息生產端和消費端也必須運行在華北2(北京)的ECS。

  3. 实例列表頁面,單擊目標執行個體名稱。

  4. 在左側導覽列,單擊Topic 管理

  5. Topic 管理頁面,單擊创建 Topic

  6. 创建 Topic面板,設定Topic屬性,然後單擊確定

    參數

    說明

    樣本

    名称

    Topic名稱。

    demo

    描述

    Topic的簡單描述。

    demo test

    分区数

    Topic的分區數量。

    12

    存储引擎

    說明

    當前僅專業版執行個體支援選擇儲存引擎類型,標準版暫不支援,預設選擇為雲端儲存類型。

    Topic訊息的儲存引擎。

    雲訊息佇列 Kafka 版支援以下兩種儲存引擎。

    • 云存储:底層接入阿里雲雲端硬碟,具有低時延、高效能、持久性、高可靠等特點,採用分布式3副本機制。執行個體的规格类型标准版(高写版)時,儲存引擎只能為云存储

    • Local 存储:使用原生Kafka的ISR複製演算法,採用分布式3副本機制。

    云存储

    消息类型

    Topic訊息的類型。

    • 普通消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,可能會造成訊息亂序。當存储引擎選擇云存储時,預設選擇普通消息

    • 分区顺序消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,仍然保證分區內按照發送順序儲存。但是會出現部分分區發送訊息失敗,等到分區恢複後即可恢複正常。當存储引擎選擇Local 存储時,預設選擇分区顺序消息

    普通消息

    日志清理策略

    Topic日誌的清理策略。

    存储引擎選擇Local 存储(當前僅專業版執行個體支援選擇儲存引擎類型為Local儲存,標準版暫不支援)時,需要配置日志清理策略

    雲訊息佇列 Kafka 版支援以下兩種日誌清理策略。

    • Delete:預設的訊息清理策略。在磁碟容量充足的情況下,保留在最長保留時間範圍內的訊息;在磁碟容量不足時(一般磁碟使用率超過85%視為不足),將提前刪除舊訊息,以保證服務可用性。

    • Compact:使用Kafka Log Compaction日誌清理策略。Log Compaction清理策略保證相同Key的訊息,最新的value值一定會被保留。主要適用於系統宕機後恢複狀態,系統重啟後重新載入緩衝等情境。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic儲存系統狀態資訊或配置資訊。

      重要

      Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的訊息收發請勿為Topic設定該屬性。具體資訊,請參見雲訊息佇列 Kafka 版Demo庫

    Compact

    标签

    Topic的標籤。

    demo

    建立完成後,在Topic 管理頁面的列表中顯示已建立的Topic。

建立AnalyticDB Sink Connector依賴的Group

您可以在雲訊息佇列 Kafka 版控制台手動建立AnalyticDB Sink Connector資料同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體資訊,請參見配置源服務參數列表

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 实例列表頁面,單擊目標執行個體名稱。

  4. 在左側導覽列,單擊Group 管理

  5. Group 管理頁面,單擊创建 Group

  6. 创建 Group面板的Group ID文字框輸入Group的名稱,在描述文字框簡要描述Group,並給Group添加標籤,單擊確定

    建立完成後,在Group 管理頁面的列表中顯示已建立的Group。

建立並部署AnalyticDB Sink Connector

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊Connector 任务列表

  4. Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector

  5. 创建 Connector設定精靈頁面,完成以下操作。

    1. 配置基本信息頁簽,按需配置以下參數,然後單擊下一步

      參數

      描述

      樣本值

      名称

      Connector的名稱。命名規則:

      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。

      • 同一個雲訊息佇列 Kafka 版執行個體內保持唯一。

      Connector的資料同步任務必須使用名稱為connect-任務名稱Group。如果您未手動建立該Group,系統將為您自動建立。

      kafka-adb-sink

      实例

      預設配置為執行個體的名稱與執行個體ID。

      demo alikafka_post-cn-st21p8vj****

    2. 配置源服务頁簽,選擇資料來源訊息佇列Kafka版,並配置以下參數,然後單擊下一步

      表 1. 配置源服務參數列表

      參數

      描述

      樣本值

      数据源 Topic

      需要同步資料的Topic。

      adb-test-input

      消费线程并发数

      資料來源Topic的消費線程並發數。預設值為6。取值說明如下:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      消费初始位置

      開始消費的位置。取值說明如下:

      • 最早位点:從最初位點開始消費。

      • 最近位点:從最新位點開始消費。

      最早位点

      VPC ID

      資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。

      vpc-bp1xpdnd3l***

      vSwitch ID

      資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。

      vsw-bp1d2jgg81***

      失败处理

      訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下:

      • 继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。

      • 停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。

      說明

      继续订阅

      创建资源方式

      選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。

      • 自动创建

      • 手动创建

      自动创建

      Connector 消费组

      Connector的資料同步任務使用的Group。單擊配置运行环境顯示該參數。該Group的名稱必須為connect-任務名稱

      connect-kafka-adb-sink

      任务位点 Topic

      用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。

      • Topic:建議以connect-offset開頭。

      • 分區數:Topic的分區數量必須大於1。

      • 儲存引擎:Topic的儲存引擎必須為Local儲存。

      • cleanup.policy:Topic的日誌清理策略必須為compact。

      connect-offset-kafka-adb-sink

      任务配置 Topic

      用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。

      • Topic:建議以connect-config開頭。

      • 分區數:Topic的分區數量必須為1。

      • 儲存引擎:Topic的儲存引擎必須為Local儲存。

      • cleanup.policy:Topic的日誌清理策略必須為compact。

      connect-config-kafka-adb-sink

      任务状态 Topic

      用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。

      • Topic:建議以connect-status開頭。

      • 分區數:Topic的分區數量建議為6。

      • 儲存引擎:Topic的儲存引擎必須為Local儲存。

      • cleanup.policy:Topic的日誌清理策略必須為compact。

      connect-status-kafka-adb-sink

      死信队列 Topic

      用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。

      • Topic:建議以connect-error開頭。

      • 分區數:Topic的分區數量建議為6。

      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。

      connect-error-kafka-adb-sink

      异常数据 Topic

      用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。

      • Topic:建議以connect-error開頭。

      • 分區數:Topic的分區數量建議為6。

      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。

      connect-error-kafka-adb-sink

    3. 配置目标服务頁簽,選擇目標服務雲原生資料倉儲,並配置以下參數,然後單擊创建

      參數

      描述

      樣本值

      实例类型

      雲原生資料倉儲執行個體類型。支援MySQL版PostgreSQL版

      MySQL版

      AnalyticDB 实例 ID

      阿里雲AnalyticDB for MySQL或雲原生資料倉儲AnalyticDB PostgreSQL版執行個體ID。

      am-bp139yqk8u1ik****

      数据库名

      阿里雲雲原生資料倉儲執行個體的資料庫名稱。

      adb_demo

      表名

      雲原生資料倉儲中儲存訊息表名稱。

      user

      数据库用户名

      串連雲原生資料倉儲執行個體資料庫匯入資料的資料庫使用者名稱。

      adbmysql

      数据库密码

      串連雲原生資料倉儲執行個體資料庫匯入資料的資料庫密碼。使用者的密碼在建立執行個體時設定,如果忘記可重設。

      ********

      說明

      使用者名稱和使用者密碼是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Function Compute的函數,任務建立成功後,雲訊息佇列 Kafka 版不儲存相關資訊。

      建立完成後,在Connector 任务列表頁面,查看建立的Connector 。

  6. 建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署

配置函數服務

您在雲訊息佇列 Kafka 版控制台成功建立並部署AnalyticDB Sink Connector後,Function Compute會自動為您建立給該Connector使用的函數服務和函數,服務命名格式為kafka-service-<connector_name>-<隨機String>,函數命名格式為fc-adb-<隨機String>

  1. Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊函数配置

    頁面跳轉至Function Compute控制台。

  2. Function Compute控制台,找到自動建立的函數服務,並配置其VPC和交換器資訊。配置的具體步驟,請參見更新服務

配置雲原生資料庫

您在配置完Function Compute服務後,需要在雲原生資料倉儲控制台將Function Compute服務所屬的網段加入白名單。所屬網段可以在專用網路管理主控台交換器頁面,Function Compute服務對應的VPC和交換器所在行查看。

發送測試訊息

您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至阿里雲雲原生資料倉儲AnalyticDB MySQL版或雲原生資料倉儲AnalyticDB PostgreSQL版。

說明

訊息內容(Value)格式需為JSON格式,Value將通過JSON解析為K-V形式,其中K對應雲原生資料倉儲的資料庫中的欄位名,V對應該欄位插入的資料,因此雲訊息佇列 Kafka 版發送的訊息內容中的每個K在資料庫中需要有對應的相同名稱的欄位名。欄位名可以在AnalyticDB for MySQL控制台雲原生資料倉儲AnalyticDB PostgreSQL版控制台串連資料庫,在資料庫表中查看。

  1. Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试

  2. 发送消息面板,發送測試訊息。

    • 发送方式選擇控制台

      1. 消息 Key文字框中輸入訊息的Key值,例如demo。

      2. 消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。

      3. 設定发送到指定分区,選擇是否指定分區。

        • 單擊,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態

        • 單擊,不指定分區。

    • 发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。

    • 发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。

驗證結果

雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,登入AnalyticDB for MySQL控制台雲原生資料倉儲AnalyticDB PostgreSQL版控制台,串連資料庫,進入Data Management 5.0SQL視窗介面,找到對應執行個體的表,驗證資料匯出結果。

雲訊息佇列 Kafka 版資料匯出至AnalyticDB for MySQL樣本如下:ADB-Connector-Result