本文介紹如何建立OSS Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Object Storage Service。
前提條件
- 為雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector。
- 為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic。
- 在OSS管理主控台建立儲存空間。更多資訊,請參見控制台建立儲存空間。
- 開通Function Compute服務。更多資訊,請參見開通Function Compute服務。
注意事項
- 僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至Object Storage Service。Connector的限制說明,請參見使用限制。
- 該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述。
- Function Compute的函數調用支援日誌查詢。具體操作步驟,請參見配置日誌。
- 訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。
建立並部署OSS Sink Connector
在概览頁面的资源分布地區,選擇地區。
在左側導覽列,單擊Connector 任务列表。
在Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector。
- 在创建 Connector設定精靈面頁面,完成以下操作。
- 在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。重要 雲訊息佇列 Kafka 版會為您自動選中授权创建服务关联角色。
- 如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Object Storage Service的功能。
- 如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。
參數 描述 樣本值 名称 Connector的名稱。命名規則: - 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
- 同一個雲訊息佇列 Kafka 版執行個體內保持唯一。
Connector的資料同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動建立該Group,系統將為您自動建立。
kafka-oss-sink 实例 預設配置為執行個體的名稱與執行個體ID。 demo alikafka_post-cn-st21p8vj**** - 在配置源服务頁簽,選擇資料來源為訊息佇列Kafka版,並配置以下參數,然後單擊下一步。
參數 描述 樣本值 数据源 Topic 需要同步資料的Topic。 oss-test-input 消费线程并发数 資料來源Topic的消費線程並發數。預設值為6。取值說明如下: - 1
- 2
- 3
- 6
- 12
6 消费初始位置 開始消費的位置。取值說明如下: - 最早位点:從最初位點開始消費。
- 最近位点:從最新位點開始消費。
最早位点 VPC ID 資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。 vpc-bp1xpdnd3l*** vSwitch ID 資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。 vsw-bp1d2jgg81*** 失败处理 訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下。 - 继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
- 停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔
說明- 如何查看日誌,請參見Connector相關操作。
- 如何根據錯誤碼尋找解決方案,請參見錯誤碼。
继续订阅 创建资源方式 選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。 - 自动创建
- 手动创建
自动创建 Connector 消费组 Connector使用的Group。單擊配置运行环境顯示該參數。該Group的名稱建議以connect-cluster開頭。 connect-cluster-kafka-oss-sink 任务位点 Topic 用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。 - Topic:建議以connect-offset開頭。
- 分區數:Topic的分區數量必須大於1。
- 儲存引擎:Topic的儲存引擎必須為Local儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
- cleanup.policy:Topic的日誌清理策略必須為compact。
connect-offset-kafka-oss-sink 任务配置 Topic 用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。 - Topic:建議以connect-config開頭。
- 分區數:Topic的分區數量必須為1。
- 儲存引擎:Topic的儲存引擎必須為Local儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
- cleanup.policy:Topic的日誌清理策略必須為compact。
connect-config-kafka-oss-sink 任务状态 Topic 用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。 - Topic:建議以connect-status開頭。
- 分區數:Topic的分區數量建議為6。
- 儲存引擎:Topic的儲存引擎必須為Local儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
- cleanup.policy:Topic的日誌清理策略必須為compact。
connect-status-kafka-oss-sink 死信队列 Topic 用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。 - Topic:建議以connect-error開頭。
- 分區數:Topic的分區數量建議為6。
- 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
connect-error-kafka-oss-sink 异常数据 Topic 用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。 - Topic:建議以connect-error開頭。
- 分區數:Topic的分區數量建議為6。
- 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
connect-error-kafka-oss-sink - 在配置目标服务頁簽,選擇目標服務為Object Storage Service,並配置以下參數,然後單擊创建。
參數 描述 樣本值 Bucket 名称 Object Storage ServiceBucket的名稱。 bucket_test Access Key 阿里雲帳號的AccessKey ID。 LTAI4GG2RGAjppjK******** Secret Key 阿里雲帳號的AccessKey Secret。 WbGPVb5rrecVw3SQvEPw6R******** 請確保您使用的AccessKey ID所對應的帳號已被授予以下最小許可權:
{ "Version": "1", "Statement": [ { "Action": [ "oss:GetObject", "oss:PutObject" ], "Resource": "*", "Effect": "Allow" } ] }
說明AccessKey ID和AccessKey Secret是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Object Storage Service的資料,任務建立成功後,雲訊息佇列 Kafka 版不儲存AccessKey ID和AccessKey Secret資訊。
建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
- 在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。
- 建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署。
發送測試訊息
您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至Object Storage Service。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试。
在发送消息面板,發送測試訊息。
发送方式選擇控制台。
在消息 Key文字框中輸入訊息的Key值,例如demo。
在消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
設定发送到指定分区,選擇是否指定分區。
單擊是,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。
发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。
驗證結果
向雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,查看OSS檔案管理,驗證資料匯出結果。更多資訊,請參見檔案概覽。
檔案管理中顯示新匯出的檔案。

[
{
"key":"123",
"offset":4,
"overflowFlag":true,
"partition":0,
"timestamp":1603779578478,
"topic":"Test",
"value":"1",
"valueSize":272687
}
]
更多操作
您可以按需對該Connector所依賴的Function Compute資源進行配置。