本文說明如何建立MaxCompute Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至MaxCompute的表。
前提條件
在建立MaxCompute Sink Connector前,請確保您已完成以下操作:
雲訊息佇列 Kafka 版
為雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector。
為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic。
本文以名稱為maxcompute-test-input的Topic為例。
MaxCompute(MaxCompute)
通過MaxCompute用戶端建立表。更多資訊,請參見建立表。
本文以名稱為connector_test的專案下名稱為test_kafka的表為例。該表的建表語句如下:
CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);
可選:事件匯流排EventBridge
說明僅在您建立的Connector任務所屬執行個體的地區為華東1(杭州)或西南1(成都)時,需要完成該操作。
注意事項
僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至MaxCompute。Connector的限制說明,請參見使用限制。
如果Connector所屬執行個體的地區為華東1(杭州)或西南1(成都),該功能會部署至事件匯流排EventBridge。
事件匯流排EventBridge目前免費供您使用。更多資訊,請參見計費說明。
建立Connector時,事件匯流排EventBridge會為您自動建立服務關聯角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC。
如果未建立服務關聯角色,事件匯流排EventBridge會為您自動建立對應的服務關聯角色,以便允許事件匯流排EventBridge使用此角色訪問雲訊息佇列 Kafka 版和Virtual Private Cloud。
如果已建立服務關聯角色,事件匯流排EventBridge不會重複建立。
關於服務關聯角色的更多資訊,請參見服務關聯角色。
部署到事件匯流排EventBridge的任務暫時不支援查看任務作業記錄。Connector任務執行完成後,您可以在訂閱資料來源Topic的Group中,通過消費情況查看任務進度。具體操作,請參見查看消費狀態。
操作流程
使用MaxCompute Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至MaxCompute的表操作流程如下:
授予雲訊息佇列 Kafka 版訪問MaxCompute的許可權。
可選:建立MaxCompute Sink Connector依賴的Topic和Group
如果您不需要自訂Topic和Group,您可以直接跳過該步驟,在下一步驟選擇自動建立。
重要部分MaxCompute Sink Connector依賴的Topic的儲存引擎必須為Local儲存,大版本為0.10.2的雲訊息佇列 Kafka 版執行個體不支援手動建立Local儲存的Topic,只支援自動建立。
結果驗證
建立RAM角色
由於RAM角色不支援直接選擇雲訊息佇列 Kafka 版作為受信服務,您在建立RAM角色時,需要選擇任意支援的服務作為受信服務。RAM角色建立後,手動修改信任策略。
登入存取控制控制台。
在左側導覽列,選擇 。
在角色頁面,單擊建立角色。
在建立角色面板,執行以下操作。
選擇可信實體類型為阿里雲服務,然後單擊下一步。
在角色類型地區,選擇普通服務角色,在角色名稱文字框,輸入AliyunKafkaMaxComputeUser1,從選擇受信服務列表,選擇MaxCompute,然後單擊完成。
在角色頁面,找到AliyunKafkaMaxComputeUser1,單擊AliyunKafkaMaxComputeUser1。
在AliyunKafkaMaxComputeUser1頁面,單擊信任策略管理頁簽,單擊修改信任策略。
在修改信任策略面板,將指令碼中odps替換為alikafka,單擊確定。
替換後的策略如下所示。
添加許可權
為使Connector將訊息同步到MaxCompute表,您需要為建立的RAM角色至少授予以下許可權:
客體 | 操作 | 描述 |
Project | CreateInstance | 在專案中建立執行個體。 |
Table | Describe | 讀取表的元資訊。 |
Table | Alter | 修改表的元資訊或添加刪除分區。 |
Table | Update | 覆蓋或添加表的資料。 |
關於以上許可權的詳細說明以及授權操作,請參見MaxCompute許可權。
為本文建立的AliyunKafkaMaxComputeUser1添加許可權的樣本步驟如下:
登入MaxCompute用戶端。
執行以下命令添加RAM角色為使用者。
add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
說明將<accountid>替換為您自己的阿里雲帳號ID。
為RAM使用者授予訪問MaxCompute所需的最小許可權。
執行以下命令為RAM使用者授予專案相關許可權。
grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
說明將<accountid>替換為您自己的阿里雲帳號ID。
執行以下命令為RAM使用者授予表相關許可權。
grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
說明將<accountid>替換為您自己的阿里雲帳號ID。
建立MaxCompute Sink Connector依賴的Topic
您可以在雲訊息佇列 Kafka 版控制台手動建立MaxCompute Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、無效信件佇列Topic以及異常資料Topic。每個Topic所需要滿足的分區數與儲存引擎會有差異,具體資訊,請參見配置源服務參數列表。
在概览頁面的资源分布地區,選擇地區。
重要Topic需要在應用程式所在的地區(即所部署的ECS的所在地區)進行建立。Topic不能跨地區使用。例如Topic建立在華北2(北京)這個地區,那麼訊息生產端和消費端也必須運行在華北2(北京)的ECS。
在实例列表頁面,單擊目標執行個體名稱。
在左側導覽列,單擊Topic 管理。
在Topic 管理頁面,單擊创建 Topic。
在创建 Topic面板,設定Topic屬性,然後單擊確定。
參數
說明
樣本
名称
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分区数
Topic的分區數量。
12
存储引擎
說明當前僅專業版執行個體支援選擇儲存引擎類型,標準版暫不支援,預設選擇為雲端儲存類型。
Topic訊息的儲存引擎。
雲訊息佇列 Kafka 版支援以下兩種儲存引擎。
云存储:底層接入阿里雲雲端硬碟,具有低時延、高效能、持久性、高可靠等特點,採用分布式3副本機制。執行個體的规格类型為标准版(高写版)時,儲存引擎只能為云存储。
Local 存储:使用原生Kafka的ISR複製演算法,採用分布式3副本機制。
云存储
消息类型
Topic訊息的類型。
普通消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,可能會造成訊息亂序。當存储引擎選擇云存储時,預設選擇普通消息。
分区顺序消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,仍然保證分區內按照發送順序儲存。但是會出現部分分區發送訊息失敗,等到分區恢複後即可恢複正常。當存储引擎選擇Local 存储時,預設選擇分区顺序消息。
普通消息
日志清理策略
Topic日誌的清理策略。
當存储引擎選擇Local 存储(當前僅專業版執行個體支援選擇儲存引擎類型為Local儲存,標準版暫不支援)時,需要配置日志清理策略。
雲訊息佇列 Kafka 版支援以下兩種日誌清理策略。
Delete:預設的訊息清理策略。在磁碟容量充足的情況下,保留在最長保留時間範圍內的訊息;在磁碟容量不足時(一般磁碟使用率超過85%視為不足),將提前刪除舊訊息,以保證服務可用性。
Compact:使用Kafka Log Compaction日誌清理策略。Log Compaction清理策略保證相同Key的訊息,最新的value值一定會被保留。主要適用於系統宕機後恢複狀態,系統重啟後重新載入緩衝等情境。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic儲存系統狀態資訊或配置資訊。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的訊息收發請勿為Topic設定該屬性。具體資訊,請參見雲訊息佇列 Kafka 版Demo庫。
Compact
标签
Topic的標籤。
demo
建立完成後,在Topic 管理頁面的列表中顯示已建立的Topic。
建立MaxCompute Sink Connector依賴的Group
您可以在雲訊息佇列 Kafka 版控制台手動建立MaxCompute Sink Connector資料同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體資訊,請參見配置源服務參數列表。
在概览頁面的资源分布地區,選擇地區。
在实例列表頁面,單擊目標執行個體名稱。
在左側導覽列,單擊Group 管理。
在Group 管理頁面,單擊创建 Group。
在创建 Group面板的Group ID文字框輸入Group的名稱,在描述文字框簡要描述Group,並給Group添加標籤,單擊確定。
建立完成後,在Group 管理頁面的列表中顯示已建立的Group。
建立並部署MaxCompute Sink Connector
建立並部署用於將資料從雲訊息佇列 Kafka 版同步至MaxCompute的MaxCompute Sink Connector。
在概览頁面的资源分布地區,選擇地區。
在左側導覽列,單擊Connector 任务列表。
在Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector。
在创建 Connector設定精靈面頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。
參數
描述
樣本值
名称
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
同一個雲訊息佇列 Kafka 版執行個體內保持唯一。
Connector的資料同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動建立該Group,系統將為您自動建立。
kafka-maxcompute-sink
实例
預設配置為執行個體的名稱與執行個體ID。
demo alikafka_post-cn-st21p8vj****
在配置源服务頁簽,選擇資料來源為訊息佇列Kafka版,並配置以下參數,然後單擊下一步。
說明如果您已建立好Topic和Group,那麼請選擇手動建立資源,並填寫已建立的資源資訊。否則,請選擇自動建立資源。
表 1. 配置源服務參數列表
參數
描述
樣本值
数据源 Topic
需要同步資料的Topic。
maxcompute-test-input
消费线程并发数
資料來源Topic的消費線程並發數。預設值為6。取值說明如下:
1
2
3
6
12
6
消费初始位置
開始消費的位置。取值說明如下:
最早位点:從最初位點開始消費。
最近位点:從最新位點開始消費。
最早位点
VPC ID
資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。
vsw-bp1d2jgg81***
失败处理
訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下。
继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔
說明如何查看日誌,請參見Connector相關操作。
如何根據錯誤碼尋找解決方案,請參見錯誤碼。
继续订阅
创建资源方式
選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。
自动创建
手动创建
自动创建
Connector 消费组
Connector的資料同步任務使用的Group。單擊配置运行环境顯示該參數。該Groupp的名稱必須為connect-任務名稱。
connect-kafka-maxcompute-sink
任务位点 Topic
用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。
Topic名稱:建議以connect-offset開頭。
分區數:Topic的分區數量必須大於1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-offset-kafka-maxcompute-sink
任务配置 Topic
用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。
Topic名稱:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-config-kafka-maxcompute-sink
任务状态 Topic
用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。
Topic名稱:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-status-kafka-maxcompute-sink
死信队列 Topic
用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。
Topic名稱:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
connect-error-kafka-maxcompute-sink
异常数据 Topic
用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。
Topic名稱:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
connect-error-kafka-maxcompute-sink
在配置目标服务頁簽,選擇目標服務為MaxCompute,並配置以下參數,然後單擊创建。
說明如果Connector所屬執行個體的地區為華東1(杭州)或西南1(成都),選擇目標服務為MaxCompute時, 會分別彈出建立服務關聯角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC的服務授權對話方塊,在彈出的服務授權對話方塊中單擊確認,然後再配置以下參數並單擊创建。如果服務關聯角色已建立,則不再重複建立,即不會再彈出服務授權對話方塊。
參數
描述
樣本值
连接地址
MaxCompute的服務存取點。更多資訊,請參見Endpoint。
VPC網路Endpoint:低延遲,推薦。適用於雲訊息佇列 Kafka 版執行個體和MaxCompute處於同一地區情境。
外網Endpoint:高延遲,不推薦。適用於雲訊息佇列 Kafka 版執行個體和MaxCompute處於不同地區的情境。如需使用公網Endpoint,您需要為Connector開啟公網訪問。更多資訊,請參見為Connector開啟公網訪問。
http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
工作空间
MaxCompute的工作空間。
connector_test
表
MaxCompute的表。
test_kafka
表地域
MaxCompute表所在地區。
華東1(杭州)
服务账号
MaxCompute的阿里雲帳號ID。
188***
授权角色名
雲訊息佇列 Kafka 版的RAM角色的名稱。更多資訊,請參見建立RAM角色。
AliyunKafkaMaxComputeUser1
模式
訊息同步到Connector的模式。預設為DEFAULT。取值說明如下:
KEY:只保留訊息的Key,並將Key寫入MaxCompute表的key列。
VALUE:只保留訊息的Value,並將Value寫入MaxCompute表的value列。
DEFAULT:同時保留訊息的Key和Value,並將Key和Value分別寫入MaxCompute表的key列和value列。
重要DEFAULT模式下,不支援選擇CSV格式,只支援TEXT格式和BINARY格式。
DEFAULT
格式
訊息同步到Connector的格式。預設為TEXT。取值說明如下:
TEXT:訊息的格式為字串。
BINARY:訊息的格式為位元組數組。
CSV:訊息的格式為逗號(,)分隔的字串。
重要CSV格式下,不支援DEFAULT模式,只支援KEY模式和VALUE模式:
KEY模式:只保留訊息的Key,根據逗號(,)分隔Key字串,並將分隔後的字串按照索引順序寫入表。
VALUE模式:只保留訊息的Value,根據逗號(,)分隔Value字串,並將分隔後的字串按照索引順序寫入表。
TEXT
分区
分區的粒度。預設為HOUR。取值說明如下:
DAY:每天將資料寫入一個新分區。
HOUR:每小時將資料寫入一個新分區。
MINUTE:每分鐘將資料寫入一個新分區。
HOUR
时区
向Connector的資料來源Topic發送訊息的雲訊息佇列 Kafka 版生產者用戶端所在時區。預設為GMT+08:00。
GMT 08:00
建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署。
發送測試訊息
部署MaxCompute Sink Connector後,您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被同步至MaxCompute。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试。
在发送消息面板,發送測試訊息。
发送方式選擇控制台。
在消息 Key文字框中輸入訊息的Key值,例如demo。
在消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
設定发送到指定分区,選擇是否指定分區。
單擊是,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。
发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。
查看錶資料
向雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,在MaxCompute用戶端查看錶資料,驗證是否收到訊息。
查看本文寫入的test_kafka的樣本步驟如下:
登入MaxCompute用戶端。
執行以下命令查看錶的資料分區。
show partitions test_kafka;
返回結果樣本如下:
pt=11-17-2020 15 OK
執行以下命令查看分區的資料。
select * from test_kafka where pt ="11-17-2020 14";
返回結果樣本如下:
+----------------------+------------+------------+-----+-------+---------------+ | topic | partition | offset | key | value | pt | +----------------------+------------+------------+-----+-------+---------------+ | maxcompute-test-input| 0 | 0 | 1 | 1 | 11-17-2020 14 | +----------------------+------------+------------+-----+-------+---------------+