本文介紹如何通過建立AnalyticDB Sink Connector,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic通過Function Compute服務匯出至AnalyticDB for MySQL或雲原生資料倉儲AnalyticDB PostgreSQL版。
前提條件
在匯出資料前,請確保您已完成以下操作:
雲訊息佇列 Kafka 版
為雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector。
為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic。
Function Compute
AnalyticDB for MySQL和雲原生資料倉儲AnalyticDB PostgreSQL版
AnalyticDB for MySQL:在AnalyticDB for MySQL控制台建立叢集、資料庫帳號,串連叢集並建立資料庫。更多資訊,請參見建立叢集、建立資料庫帳號、串連叢集和建立資料庫。
雲原生資料倉儲AnalyticDB PostgreSQL版:在雲原生資料倉儲AnalyticDB PostgreSQL版控制台建立執行個體、資料庫帳號和登入資料庫。更多資訊,請參見建立執行個體、建立資料庫帳號和用戶端串連。
注意事項
僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至AnalyticDB for MySQL或雲原生資料倉儲AnalyticDB PostgreSQL版。關於Connector的限制說明,請參見使用限制。
該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述。
Function Compute的函數調用支援日誌查詢,以便您迅速排查問題。具體操作步驟,請參見配置日誌。
訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。
如果AnalyticDB Sink Connector存取點是私網存取點,Function Compute運行環境預設無法訪問,為確保網路暢通,需在Function Compute控制台為函數服務配置與雲原生資料倉儲一致的VPC和vSwitch資訊。更多資訊,請參見更新服務。
建立Connector時,雲訊息佇列 Kafka 版會為您自動建立服務關聯角色。
如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Table Store的功能。
如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。
關於服務關聯角色的更多資訊,請參見服務關聯角色。
操作流程
使用AnalyticDB Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至雲原生資料倉儲操作流程如下:
可選:建立AnalyticDB Sink Connector依賴的Topic和Group
如果您不需要自訂Topic和Group,您可以直接跳過該步驟,在下一步驟選擇自動建立。
重要部分AnalyticDB Sink Connector依賴的Topic的儲存引擎必須為Local儲存,大版本為0.10.2的雲訊息佇列 Kafka 版執行個體不支援手動建立Local儲存的Topic,只支援自動建立。
服務配置
結果驗證
建立AnalyticDB Sink Connector依賴的Topic
您可以在雲訊息佇列 Kafka 版控制台手動建立AnalyticDB Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、無效信件佇列Topic以及異常資料Topic。每個Topic所需要滿足的分區數與儲存引擎會有差異,具體資訊,請參見配置源服務參數列表。
在概览頁面的资源分布地區,選擇地區。
重要Topic需要在應用程式所在的地區(即所部署的ECS的所在地區)進行建立。Topic不能跨地區使用。例如Topic建立在華北2(北京)這個地區,那麼訊息生產端和消費端也必須運行在華北2(北京)的ECS。
在实例列表頁面,單擊目標執行個體名稱。
在左側導覽列,單擊Topic 管理。
在Topic 管理頁面,單擊创建 Topic。
在创建 Topic面板,設定Topic屬性,然後單擊確定。
參數
說明
樣本
名称
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分区数
Topic的分區數量。
12
存储引擎
說明當前僅專業版執行個體支援選擇儲存引擎類型,標準版暫不支援,預設選擇為雲端儲存類型。
Topic訊息的儲存引擎。
雲訊息佇列 Kafka 版支援以下兩種儲存引擎。
云存储:底層接入阿里雲雲端硬碟,具有低時延、高效能、持久性、高可靠等特點,採用分布式3副本機制。執行個體的规格类型為标准版(高写版)時,儲存引擎只能為云存储。
Local 存储:使用原生Kafka的ISR複製演算法,採用分布式3副本機制。
云存储
消息类型
Topic訊息的類型。
普通消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,可能會造成訊息亂序。當存储引擎選擇云存储時,預設選擇普通消息。
分区顺序消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,仍然保證分區內按照發送順序儲存。但是會出現部分分區發送訊息失敗,等到分區恢複後即可恢複正常。當存储引擎選擇Local 存储時,預設選擇分区顺序消息。
普通消息
日志清理策略
Topic日誌的清理策略。
當存储引擎選擇Local 存储(當前僅專業版執行個體支援選擇儲存引擎類型為Local儲存,標準版暫不支援)時,需要配置日志清理策略。
雲訊息佇列 Kafka 版支援以下兩種日誌清理策略。
Delete:預設的訊息清理策略。在磁碟容量充足的情況下,保留在最長保留時間範圍內的訊息;在磁碟容量不足時(一般磁碟使用率超過85%視為不足),將提前刪除舊訊息,以保證服務可用性。
Compact:使用Kafka Log Compaction日誌清理策略。Log Compaction清理策略保證相同Key的訊息,最新的value值一定會被保留。主要適用於系統宕機後恢複狀態,系統重啟後重新載入緩衝等情境。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic儲存系統狀態資訊或配置資訊。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的訊息收發請勿為Topic設定該屬性。具體資訊,請參見雲訊息佇列 Kafka 版Demo庫。
Compact
标签
Topic的標籤。
demo
建立完成後,在Topic 管理頁面的列表中顯示已建立的Topic。
建立AnalyticDB Sink Connector依賴的Group
您可以在雲訊息佇列 Kafka 版控制台手動建立AnalyticDB Sink Connector資料同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體資訊,請參見配置源服務參數列表。
在概览頁面的资源分布地區,選擇地區。
在实例列表頁面,單擊目標執行個體名稱。
在左側導覽列,單擊Group 管理。
在Group 管理頁面,單擊创建 Group。
在创建 Group面板的Group ID文字框輸入Group的名稱,在描述文字框簡要描述Group,並給Group添加標籤,單擊確定。
建立完成後,在Group 管理頁面的列表中顯示已建立的Group。
建立並部署AnalyticDB Sink Connector
在概览頁面的资源分布地區,選擇地區。
在左側導覽列,單擊Connector 任务列表。
在Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector。
在创建 Connector設定精靈頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。
參數
描述
樣本值
名称
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
同一個雲訊息佇列 Kafka 版執行個體內保持唯一。
Connector的資料同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動建立該Group,系統將為您自動建立。
kafka-adb-sink
实例
預設配置為執行個體的名稱與執行個體ID。
demo alikafka_post-cn-st21p8vj****
在配置源服务頁簽,選擇資料來源為訊息佇列Kafka版,並配置以下參數,然後單擊下一步。
表 1. 配置源服務參數列表
參數
描述
樣本值
数据源 Topic
需要同步資料的Topic。
adb-test-input
消费线程并发数
資料來源Topic的消費線程並發數。預設值為6。取值說明如下:
1
2
3
6
12
6
消费初始位置
開始消費的位置。取值說明如下:
最早位点:從最初位點開始消費。
最近位点:從最新位點開始消費。
最早位点
VPC ID
資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。
vsw-bp1d2jgg81***
失败处理
訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下:
继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
說明如何查看日誌,請參見Connector相關操作。
如何根據錯誤碼尋找解決方案,請參見錯誤碼。
继续订阅
创建资源方式
選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。
自动创建
手动创建
自动创建
Connector 消费组
Connector的資料同步任務使用的Group。單擊配置运行环境顯示該參數。該Group的名稱必須為connect-任務名稱。
connect-kafka-adb-sink
任务位点 Topic
用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-offset開頭。
分區數:Topic的分區數量必須大於1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-offset-kafka-adb-sink
任务配置 Topic
用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-config-kafka-adb-sink
任务状态 Topic
用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-status-kafka-adb-sink
死信队列 Topic
用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
connect-error-kafka-adb-sink
异常数据 Topic
用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
connect-error-kafka-adb-sink
在配置目标服务頁簽,選擇目標服務為雲原生資料倉儲,並配置以下參數,然後單擊创建。
參數
描述
樣本值
实例类型
雲原生資料倉儲執行個體類型。支援MySQL版和PostgreSQL版。
MySQL版
AnalyticDB 实例 ID
阿里雲AnalyticDB for MySQL或雲原生資料倉儲AnalyticDB PostgreSQL版執行個體ID。
am-bp139yqk8u1ik****
数据库名
阿里雲雲原生資料倉儲執行個體的資料庫名稱。
adb_demo
表名
雲原生資料倉儲中儲存訊息表名稱。
user
数据库用户名
串連雲原生資料倉儲執行個體資料庫匯入資料的資料庫使用者名稱。
adbmysql
数据库密码
串連雲原生資料倉儲執行個體資料庫匯入資料的資料庫密碼。使用者的密碼在建立執行個體時設定,如果忘記可重設。
AnalyticDB for MySQL:關於重設密碼的具體操作,請參見重設高許可權帳號密碼。
雲原生資料倉儲AnalyticDB PostgreSQL版:登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台,單擊執行個體名稱,在左側導覽列單擊帳號管理,找到需要重設密碼的帳號,在操作列單擊重設密碼。
********
說明使用者名稱和使用者密碼是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Function Compute的函數,任務建立成功後,雲訊息佇列 Kafka 版不儲存相關資訊。
建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署。
配置函數服務
您在雲訊息佇列 Kafka 版控制台成功建立並部署AnalyticDB Sink Connector後,Function Compute會自動為您建立給該Connector使用的函數服務和函數,服務命名格式為kafka-service-<connector_name>-<隨機String>,函數命名格式為fc-adb-<隨機String>。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊函数配置。
頁面跳轉至Function Compute控制台。
在Function Compute控制台,找到自動建立的函數服務,並配置其VPC和交換器資訊。配置的具體步驟,請參見更新服務。
配置雲原生資料庫
您在配置完Function Compute服務後,需要在雲原生資料倉儲控制台將Function Compute服務所屬的網段加入白名單。所屬網段可以在專用網路管理主控台的交換器頁面,Function Compute服務對應的VPC和交換器所在行查看。
AnalyticDB for MySQL:登入AnalyticDB for MySQL控制台,配置白名單。具體操作,請參見設定白名單。
雲原生資料倉儲AnalyticDB PostgreSQL版:登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台,配置白名單。具體操作,請參見設定白名單。
發送測試訊息
您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至阿里雲雲原生資料倉儲AnalyticDB MySQL版或雲原生資料倉儲AnalyticDB PostgreSQL版。
訊息內容(Value)格式需為JSON格式,Value將通過JSON解析為K-V形式,其中K對應雲原生資料倉儲的資料庫中的欄位名,V對應該欄位插入的資料,因此雲訊息佇列 Kafka 版發送的訊息內容中的每個K在資料庫中需要有對應的相同名稱的欄位名。欄位名可以在AnalyticDB for MySQL控制台或雲原生資料倉儲AnalyticDB PostgreSQL版控制台串連資料庫,在資料庫表中查看。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试。
在发送消息面板,發送測試訊息。
发送方式選擇控制台。
在消息 Key文字框中輸入訊息的Key值,例如demo。
在消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
設定发送到指定分区,選擇是否指定分區。
單擊是,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。
发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。
驗證結果
向雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,登入AnalyticDB for MySQL控制台或雲原生資料倉儲AnalyticDB PostgreSQL版控制台,串連資料庫,進入Data Management 5.0的SQL視窗介面,找到對應執行個體的表,驗證資料匯出結果。
雲訊息佇列 Kafka 版資料匯出至AnalyticDB for MySQL樣本如下: