全部產品
Search
文件中心

ApsaraMQ for Kafka:建立OSS Sink Connector

更新時間:Dec 27, 2024

本文介紹如何建立OSS Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Object Storage Service。

前提條件

在匯出資料前,請確保您已完成以下操作:

注意事項

  • 僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至Object Storage Service。Connector的限制說明,請參見使用限制
  • 該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述
  • Function Compute的函數調用支援日誌查詢。具體操作步驟,請參見配置日誌
  • 訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。

建立並部署OSS Sink Connector

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊Connector 任务列表

  4. Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector

  5. 创建 Connector設定精靈面頁面,完成以下操作。
    1. 配置基本信息頁簽,按需配置以下參數,然後單擊下一步
      重要 雲訊息佇列 Kafka 版會為您自動選中授权创建服务关联角色
      • 如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Object Storage Service的功能。
      • 如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。
      關於該服務關聯角色的更多資訊,請參見服務關聯角色
      參數描述樣本值
      名称Connector的名稱。命名規則:
      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
      • 同一個雲訊息佇列 Kafka 版執行個體內保持唯一。

      Connector的資料同步任務必須使用名稱為connect-任務名稱Group。如果您未手動建立該Group,系統將為您自動建立。

      kafka-oss-sink
      实例預設配置為執行個體的名稱與執行個體ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服务頁簽,選擇資料來源訊息佇列Kafka版,並配置以下參數,然後單擊下一步
      參數描述樣本值
      数据源 Topic需要同步資料的Topic。oss-test-input
      消费线程并发数資料來源Topic的消費線程並發數。預設值為6。取值說明如下:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      消费初始位置開始消費的位置。取值說明如下:
      • 最早位点:從最初位點開始消費。
      • 最近位点:從最新位點開始消費。
      最早位点
      VPC ID資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。vpc-bp1xpdnd3l***
      vSwitch ID資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。vsw-bp1d2jgg81***
      失败处理訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下。
      • 继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
      • 停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔
      說明
      继续订阅
      创建资源方式選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。
      • 自动创建
      • 手动创建
      自动创建
      Connector 消费组Connector使用的Group。單擊配置运行环境顯示該參數。該Group的名稱建議以connect-cluster開頭。connect-cluster-kafka-oss-sink
      任务位点 Topic用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。
      • Topic:建議以connect-offset開頭。
      • 分區數:Topic的分區數量必須大於1。
      • 儲存引擎:Topic的儲存引擎必須為Local儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
      • cleanup.policy:Topic的日誌清理策略必須為compact。
      connect-offset-kafka-oss-sink
      任务配置 Topic用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。
      • Topic:建議以connect-config開頭。
      • 分區數:Topic的分區數量必須為1。
      • 儲存引擎:Topic的儲存引擎必須為Local儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
      • cleanup.policy:Topic的日誌清理策略必須為compact。
      connect-config-kafka-oss-sink
      任务状态 Topic用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。
      • Topic:建議以connect-status開頭。
      • 分區數:Topic的分區數量建議為6。
      • 儲存引擎:Topic的儲存引擎必須為Local儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
      • cleanup.policy:Topic的日誌清理策略必須為compact。
      connect-status-kafka-oss-sink
      死信队列 Topic用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
      connect-error-kafka-oss-sink
      异常数据 Topic用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。
      • Topic:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
        說明 僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
      connect-error-kafka-oss-sink
    3. 配置目标服务頁簽,選擇目標服務Object Storage Service,並配置以下參數,然後單擊创建
      參數描述樣本值
      Bucket 名称Object Storage ServiceBucket的名稱。bucket_test
      Access Key阿里雲帳號的AccessKey ID。LTAI4GG2RGAjppjK********
      Secret Key阿里雲帳號的AccessKey Secret。WbGPVb5rrecVw3SQvEPw6R********

      請確保您使用的AccessKey ID所對應的帳號已被授予以下最小許可權:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject",
                      "oss:PutObject"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      說明

      AccessKey ID和AccessKey Secret是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Object Storage Service的資料,任務建立成功後,雲訊息佇列 Kafka 版不儲存AccessKey ID和AccessKey Secret資訊。

      建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
  6. 建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署

發送測試訊息

您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至Object Storage Service。

  1. Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试

  2. 发送消息面板,發送測試訊息。

    • 发送方式選擇控制台

      1. 消息 Key文字框中輸入訊息的Key值,例如demo。

      2. 消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。

      3. 設定发送到指定分区,選擇是否指定分區。

        • 單擊,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態

        • 單擊,不指定分區。

    • 发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。

    • 发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。

驗證結果

雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,查看OSS檔案管理,驗證資料匯出結果。更多資訊,請參見檔案概覽

檔案管理中顯示新匯出的檔案。

files
雲訊息佇列 Kafka 版資料匯出至Object Storage Service的格式樣本如下:
[
    {
        "key":"123",
        "offset":4,
        "overflowFlag":true,
        "partition":0,
        "timestamp":1603779578478,
        "topic":"Test",
        "value":"1",
        "valueSize":272687
    }
]

更多操作

您可以按需對該Connector所依賴的Function Compute資源進行配置。

Connector 任务列表頁面,找到建立的Connector,單擊其操作列的更多 > 配置函数
頁面跳轉至Function Compute控制台,您可以按需配置函數資源。