本文說明如何建立FC Sink Connector,您可以通過FC Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute的函數。
前提條件
在建立FC Sink Connector前,請確保您已完成以下操作:
雲訊息佇列 Kafka 版
為雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector。
為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic。
本文以名稱為fc-test-input的Topic為例。
Function Compute
在Function Compute建立函數。更多資訊,請參見快速建立函數。
重要函數類型必須為事件函數。
本文以服務名稱為guide-hello_world、函數名稱為hello_world、運行環境為Python的事件函數為例。該樣本函數的代碼如下:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
可選:事件匯流排EventBridge
說明僅在您建立的Connector任務所屬執行個體的地區為華東1(杭州)或西南1(成都)時,需要完成該操作。
注意事項
僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute。Connector的限制說明,請參見使用限制。
如果Connector所屬執行個體的地區為華東1(杭州)或西南1(成都),該功能會部署至事件匯流排EventBridge。
事件匯流排EventBridge目前免費供您使用。更多資訊,請參見計費說明。
建立Connector時,事件匯流排EventBridge會為您自動建立服務關聯角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC。
如果未建立服務關聯角色,事件匯流排EventBridge會為您自動建立對應的服務關聯角色,以便允許事件匯流排EventBridge使用此角色訪問雲訊息佇列 Kafka 版和Virtual Private Cloud。
如果已建立服務關聯角色,事件匯流排EventBridge不會重複建立。
關於服務關聯角色的更多資訊,請參見服務關聯角色。
部署到事件匯流排EventBridge的任務暫時不支援查看任務作業記錄。Connector任務執行完成後,您可以在訂閱資料來源Topic的Group中,通過消費情況查看任務進度。具體操作,請參見查看消費狀態。
操作流程
使用FC Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute的函數的操作流程如下:
可選:使FC Sink Connector跨地區訪問Function Compute
重要如果您不需要使FC Sink Connector跨地區訪問Function Compute,您可以直接跳過該步驟。
可選:使FC Sink Connector跨帳號訪問Function Compute
重要如果您不需要使FC Sink Connector跨帳號訪問Function Compute,您可以直接跳過該步驟。
可選:建立FC Sink Connector依賴的Topic和Group
重要如果您不需要自訂Topic和Group的名稱,您可以直接跳過該步驟。
部分FC Sink Connector依賴的Topic的儲存引擎必須為Local儲存,大版本為0.10.2版本的雲訊息佇列 Kafka 版執行個體不支援手動建立Local儲存的Topic,只支援自動建立。
結果驗證
為FC Sink Connector開啟公網訪問
如需使FC Sink Connector跨地區訪問其他阿里雲服務,您需要為FC Sink Connector開啟公網訪問。具體操作,請參見為Connector開啟公網訪問。
建立自訂權限原則
在目標帳號下建立訪問Function Compute的自訂權限原則。
登入存取控制控制台。
在左側導覽列,選擇 。
在權限原則頁面,單擊建立權限原則。
在建立自訂權限原則頁面,建立自訂權限原則。
在指令碼編輯頁簽,策略指令碼輸入框中自訂權限原則指令碼,單擊下一步。
訪問Function Compute的自訂權限原則指令碼樣本如下:
{ "Version": "1", "Statement": [ { "Action": [ "fc:InvokeFunction", "fc:GetFunction" ], "Resource": "*", "Effect": "Allow" } ] }
在基本資料地區名稱文字框,輸入KafkaConnectorFcAccess。
單擊確定。
建立RAM角色
在目標帳號下建立RAM角色。由於RAM角色不支援直接選擇雲訊息佇列 Kafka 版作為受信服務,您在建立RAM角色時,需要選擇任意支援的服務作為受信服務。RAM角色建立後,手動修改信任策略。
在左側導覽列,選擇 。
在角色頁面,單擊建立角色。
在建立角色面板,建立RAM角色。
選擇可信實體類型為阿里雲服務,單擊下一步。
在角色類型地區,選擇普通服務角色,在角色名稱文字框,輸入AliyunKafkaConnectorRole,從選擇受信服務列表,選擇Function Compute,然後單擊完成。
在角色頁面,找到AliyunKafkaConnectorRole,單擊AliyunKafkaConnectorRole。
在AliyunKafkaConnectorRole頁面,單擊信任策略管理頁簽,單擊修改信任策略。
在修改信任策略面板,將指令碼中fc替換為alikafka,單擊確定。
添加許可權
在目標帳號下為建立的RAM角色授予訪問Function Compute的許可權。
在左側導覽列,選擇 。
在角色頁面,找到AliyunKafkaConnectorRole,在其右側操作列,單擊添加許可權。
在添加許可權面板,添加KafkaConnectorFcAccess許可權。
在選擇許可權地區,選擇自訂策略。
在權限原則名稱列表,找到KafkaConnectorFcAccess,單擊KafkaConnectorFcAccess。
單擊確定。
單擊完成。
建立FC Sink Connector依賴的Topic
您可以在雲訊息佇列 Kafka 版控制台手動建立FC Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、無效信件佇列Topic以及異常資料Topic。每個Topic所需要滿足的分區數與儲存引擎會有差異,具體資訊,請參見配置源服務參數列表。
在概览頁面的资源分布地區,選擇地區。
重要Topic需要在應用程式所在的地區(即所部署的ECS的所在地區)進行建立。Topic不能跨地區使用。例如Topic建立在華北2(北京)這個地區,那麼訊息生產端和消費端也必須運行在華北2(北京)的ECS。
在实例列表頁面,單擊目標執行個體名稱。
在左側導覽列,單擊Topic 管理。
在Topic 管理頁面,單擊创建 Topic。
在创建 Topic面板,設定Topic屬性,然後單擊確定。
參數
說明
樣本
名称
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分区数
Topic的分區數量。
12
存储引擎
說明當前僅專業版執行個體支援選擇儲存引擎類型,標準版暫不支援,預設選擇為雲端儲存類型。
Topic訊息的儲存引擎。
雲訊息佇列 Kafka 版支援以下兩種儲存引擎。
云存储:底層接入阿里雲雲端硬碟,具有低時延、高效能、持久性、高可靠等特點,採用分布式3副本機制。執行個體的规格类型為标准版(高写版)時,儲存引擎只能為云存储。
Local 存储:使用原生Kafka的ISR複製演算法,採用分布式3副本機制。
云存储
消息类型
Topic訊息的類型。
普通消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,可能會造成訊息亂序。當存储引擎選擇云存储時,預設選擇普通消息。
分区顺序消息:預設情況下,保證相同Key的訊息分布在同一個分區中,且分區內訊息按照發送順序儲存。叢集中出現機器宕機時,仍然保證分區內按照發送順序儲存。但是會出現部分分區發送訊息失敗,等到分區恢複後即可恢複正常。當存储引擎選擇Local 存储時,預設選擇分区顺序消息。
普通消息
日志清理策略
Topic日誌的清理策略。
當存储引擎選擇Local 存储(當前僅專業版執行個體支援選擇儲存引擎類型為Local儲存,標準版暫不支援)時,需要配置日志清理策略。
雲訊息佇列 Kafka 版支援以下兩種日誌清理策略。
Delete:預設的訊息清理策略。在磁碟容量充足的情況下,保留在最長保留時間範圍內的訊息;在磁碟容量不足時(一般磁碟使用率超過85%視為不足),將提前刪除舊訊息,以保證服務可用性。
Compact:使用Kafka Log Compaction日誌清理策略。Log Compaction清理策略保證相同Key的訊息,最新的value值一定會被保留。主要適用於系統宕機後恢複狀態,系統重啟後重新載入緩衝等情境。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic儲存系統狀態資訊或配置資訊。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的訊息收發請勿為Topic設定該屬性。具體資訊,請參見雲訊息佇列 Kafka 版Demo庫。
Compact
标签
Topic的標籤。
demo
建立完成後,在Topic 管理頁面的列表中顯示已建立的Topic。
建立FC Sink Connector依賴的Group
您可以在雲訊息佇列 Kafka 版控制台手動建立FC Sink Connector資料同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體資訊,請參見配置源服務參數列表。
在概览頁面的资源分布地區,選擇地區。
在实例列表頁面,單擊目標執行個體名稱。
在左側導覽列,單擊Group 管理。
在Group 管理頁面,單擊创建 Group。
在创建 Group面板的Group ID文字框輸入Group的名稱,在描述文字框簡要描述Group,並給Group添加標籤,單擊確定。
建立完成後,在Group 管理頁面的列表中顯示已建立的Group。
建立並部署FC Sink Connector
建立並部署用於將資料從雲訊息佇列 Kafka 版同步至Function Compute的FC Sink Connector。
在概览頁面的资源分布地區,選擇地區。
在左側導覽列,單擊Connector 任务列表。
在Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector。
在创建 Connector設定精靈頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。
參數
描述
樣本值
名称
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
同一個雲訊息佇列 Kafka 版執行個體內保持唯一。
Connector的資料同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動建立該Group,系統將為您自動建立。
kafka-fc-sink
实例
預設配置為執行個體的名稱與執行個體ID。
demo alikafka_post-cn-st21p8vj****
在配置源服务頁簽,選擇資料來源為訊息佇列Kafka版,並配置以下參數,然後單擊下一步。
說明如果您已建立好Topic和Group,那麼請選擇手動建立資源,並填寫已建立的資源資訊。否則,請選擇自動建立資源。
表 1. 配置源服務參數列表
參數
描述
樣本值
数据源 Topic
需要同步資料的Topic。
fc-test-input
消费线程并发数
資料來源Topic的消費線程並發數。預設值為6。取值說明如下:
1
2
3
6
12
6
消费初始位置
開始消費的位置。取值說明如下:
最早位点:從最初位點開始消費。
最近位点:從最新位點開始消費。
最早位点
VPC ID
資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。
vsw-bp1d2jgg81***
失败处理
訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下。
继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔
說明如何查看日誌,請參見Connector相關操作。
如何根據錯誤碼尋找解決方案,請參見錯誤碼。
继续订阅
创建资源方式
選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。
自动创建
手动创建
自动创建
Connector 消费组
Connector的資料同步任務使用的Group。單擊配置运行环境顯示該參數。該Group的名稱必須為connect-任務名稱。
connect-kafka-fc-sink
任务位点 Topic
用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-offset開頭。
分區數:Topic的分區數量必須大於1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-offset-kafka-fc-sink
任务配置 Topic
用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-config-kafka-fc-sink
任务状态 Topic
用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎必須為Local儲存。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-status-kafka-fc-sink
死信队列 Topic
用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
connect-error-kafka-fc-sink
异常数据 Topic
用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
connect-error-kafka-fc-sink
在配置目标服务頁簽,選擇目標服務為Function Compute,並配置以下參數,然後單擊创建。
說明如果Connector所屬執行個體的地區為華東1(杭州)或西南1(成都),選擇目標服務為Function Compute時, 會分別彈出建立服務關聯角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC的服務授權對話方塊,在彈出的服務授權對話方塊中單擊確認,然後再配置以下參數並單擊创建。如果服務關聯角色已建立,則不再重複建立,即不會再彈出服務授權對話方塊。
參數
描述
樣本值
是否跨账号/地域
FC Sink Connector是否跨帳號/地區向Function Compute服務同步資料。預設為否。取值:
否:同地區同帳號模式。
是:跨地區同帳號模式、同地區跨帳號模式或跨地區跨帳號模式。
否
服务地域
Function Compute服務的地區。預設為FC Sink Connector所在地區。如需跨地區,您需要為Connector開啟公網訪問,然後選擇目標地區。更多資訊,請參見為FC Sink Connector開啟公網訪問。
重要是否跨账号/地域為是時,顯示服务地域。
cn-hangzhou
服务接入点
Function Compute服務的存取點。在Function Compute控制台的概覽頁的常用資訊地區擷取。
內網Endpoint:低延遲,推薦。適用於雲訊息佇列 Kafka 版執行個體和Function Compute處於同一地區情境。
公網Endpoint:高延遲,不推薦。適用於雲訊息佇列 Kafka 版執行個體和Function Compute處於不同地區的情境。如需使用公網Endpoint,您需要為Connector開啟公網訪問。更多資訊,請參見為FC Sink Connector開啟公網訪問。
重要是否跨账号/地域general.no為是時,顯示服务接入点。
http://188***.cn-hangzhou.fc.aliyuncs.com
服务账号
Function Compute服務的阿里雲帳號ID。在Function Compute控制台的概覽頁的常用資訊地區擷取。
重要是否跨账号/地域為是時,顯示服务账号。
188***
授权角色名
雲訊息佇列 Kafka 版訪問Function Compute服務的RAM角色。
如不需跨帳號,您需要在本帳號下建立RAM角色並為其授權,然後輸入該授權角色名稱。操作步驟,請參見建立自訂權限原則、建立RAM角色和添加許可權。
如需跨帳號,您需要在目標帳號下建立RAM角色並為其授權,然後輸入該授權角色名稱。操作步驟,請參見建立自訂權限原則、建立RAM角色和添加許可權。
重要是否跨账号/地域為是時,顯示授权角色名。
AliyunKafkaConnectorRole
服务名
Function Compute服務的名稱。
guide-hello_world
函数名
Function Compute服務的函數名稱。
hello_world
版本或别名
Function Compute服務的版本或別名。
重要是否跨账号/地域為否時,您需選擇指定版本還是指定别名。
是否跨账号/地域為是時,您需手動輸入Function Compute服務的版本或別名。
LATEST
服务版本
Function Compute的服務版本。
重要是否跨账号/地域為否且版本或别名選擇指定版本時,顯示服务版本。
LATEST
服务别名
Function Compute的服務別名。
重要是否跨账号/地域為否且版本或别名選擇指定别名時,顯示服务别名。
jy
发送模式
訊息發送模式。取值說明如下:
异步:推薦。
同步:不推薦。同步發送模式下,如果Function Compute的處理時間較長,雲訊息佇列 Kafka 版的處理時間也會較長。當同一批次訊息的處理時間超過5分鐘時,會觸發雲訊息佇列 Kafka 版用戶端Rebalance。
异步
发送批大小
批量發送的訊息條數。預設為20。Connector根據發送批次大小和請求大小限制(同步請求大小限制為6 MB,非同步請求大小限制為128 KB)將多條訊息彙總後發送。例如,發送模式為非同步,發送批次大小為20,如果要發送18條訊息,其中有17條訊息的總大小為127 KB,有1條訊息的大小為200 KB,Connector會將總大小不超過128 KB的17條訊息彙總後發送,將大小超過128 KB的1條訊息單獨發送。
說明如果您在發送訊息時將key設定為null,則請求中不包含key。如果將value設定為null,則請求中不包含value。
如果批量發送的多條訊息的大小不超過請求大小限制,則請求中包含訊息內容。請求樣本如下:
[ { "key":"this is the message's key2", "offset":8, "overflowFlag":false, "partition":4, "timestamp":1603785325438, "topic":"Test", "value":"this is the message's value2", "valueSize":28 }, { "key":"this is the message's key9", "offset":9, "overflowFlag":false, "partition":4, "timestamp":1603785325440, "topic":"Test", "value":"this is the message's value9", "valueSize":28 }, { "key":"this is the message's key12", "offset":10, "overflowFlag":false, "partition":4, "timestamp":1603785325442, "topic":"Test", "value":"this is the message's value12", "valueSize":29 }, { "key":"this is the message's key38", "offset":11, "overflowFlag":false, "partition":4, "timestamp":1603785325464, "topic":"Test", "value":"this is the message's value38", "valueSize":29 } ]
如果發送的單條訊息的大小超過請求大小限制,則請求中不包含訊息內容。請求樣本如下:
[ { "key":"123", "offset":4, "overflowFlag":true, "partition":0, "timestamp":1603779578478, "topic":"Test", "value":"1", "valueSize":272687 } ]
說明如需擷取訊息內容,您需要根據位點主動拉取訊息。
50
重试次数
訊息發送失敗後的重試次數。預設為2。取值範圍為1~3。部分導致訊息發送失敗的錯誤不支援重試。錯誤碼與是否支援重試的對應關係如下:
4XX:除429支援重試外,其餘錯誤碼不支援重試。
5XX:支援重試。
說明Connector調用InvokeFunction向Function Compute發送訊息。
重試次數超出取值範圍之後,訊息會進入無效信件佇列Topic。無效信件佇列Topic的訊息不會再次觸發Function ComputeConnector任務,建議您給無效信件佇列Topic資源配置監控警示,即時監控資源狀態,及時發現並處理異常。
2
建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署。
如需配置Function Compute資源,單擊其操作列的
,跳轉至Function Compute控制台完成操作。
發送測試訊息
部署FC Sink Connector後,您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被同步至Function Compute。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试。
在发送消息面板,發送測試訊息。
发送方式選擇控制台。
在消息 Key文字框中輸入訊息的Key值,例如demo。
在消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
設定发送到指定分区,選擇是否指定分區。
單擊是,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。
发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。
查看函數日誌
向雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,查看函數日誌,驗證是否收到訊息。更多資訊,請參見配置日誌。
日誌中顯示發送的測試訊息。