釋放 AI 的強大力量

100 萬免費代幣

88% 價格優惠

立即啟用
本文由簡體中文內容自動轉碼而成。阿里雲不保證此自動轉碼的準確性、完整性及時效性。本文内容請以簡體中文版本為準。

使用MirrorMaker 2(on Connect)跨叢集同步資料

更新時間:2024-06-30 19:50

本文通過樣本為您介紹如何使用MirrorMaker 2(簡稱MM2)on Kafka Connect跨叢集同步資料。

背景資訊

使用情境

Kafka MM2適用於下列情境:
  • 遠端資料同步:通過MM2,Kafka資料可以在不同地區的叢集進行傳輸複製。
  • 災備情境:通過MM2,可以構建不同資料中心的主備兩個叢集容災架構,MM2即時同步兩個叢集的資料。當其中一個叢集不可用時,可以將上面的應用程式切換到另一個叢集,從而實現異地容災功能。
  • 資料移轉情境:在業務上雲、混合雲、叢集升級等情境,存在資料從舊叢集遷移到新叢集的需求。此時,您可以使用MM2實現新舊資料的遷移,保證業務的連續性。
  • 彙總資料中心情境:通過MM2,可以將多個Kafka子叢集的資料同步到一個中心Kafka叢集,實現資料的匯聚。

功能

Kafka MM2作為資料複製工具,具有以下功能:
  • 複製topics資料以及配置資訊。
  • 複製consumer groups及其消費topic的offset資訊。
  • 複製ACLs。
  • 自動檢測新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平擴充的架構。

任務執行方式

MM2任務有以下執行方式:
  • Distributed Connect叢集的connector方式(推薦):在已有Connect叢集執行MM2 connector任務的方式。您可以參照本文使用Connect叢集服務的功能來管理MM2任務。
  • Dedicated MirrorMaker叢集方式:不需要使用Connect叢集執行MM2 connector任務,而是直接通過Driver程式管理MM2的所有任務。具體操作,請參見使用MirrorMaker 2(Dedicated)跨叢集同步資料
  • Standalone Connect的worker方式:執行單個MirrorSourceConnector任務,適合在測試情境下使用。
說明
推薦在Distributed Connect叢集上啟動MM2 connector任務,可以藉助Connect叢集的Rest服務管理MM2任務。

MM2的詳細資料,請參見Apache Kafka

前提條件

已建立兩個Kafka叢集,一個為源叢集emrsource,一個為目的地組群emrdest,並選擇了Kafka服務,建立DataFlow叢集的具體操作,請參見建立叢集
說明
本文樣本的源叢集和目的地組群都以EMR-3.42.0版本,且在同一VPC下的DataFlow叢集為例。

使用限制

目的地組群的Kafka軟體版本為2.12_2.4.1及以上。

操作流程

  1. 步驟一:在目的地組群建立Kafka Connect叢集
  2. 步驟二:使用MirrorMaker2 connector

步驟一:在目的地組群建立Kafka Connect叢集

  1. 新增EMR Task機器組。
    在EMR控制台目的地組群emrdest的節點管理頁面,建立Task機器組。
    1. 單擊新增機器組
    2. 新增機器組面板,配置以下參數,其餘參數請根據實際情況配置。
      參數說明
      參數說明
      節點群組類型選擇TASK(任務執行個體組)
      節點群組名稱本文樣本為emr-task。
      儲存配置選擇一塊資料盤。
  2. 擴容Task機器組。
    1. 節點管理頁面,單擊新增的emr-task節點群組操作列的擴容
    2. 在彈出的對話方塊中,選擇待增加的數量,勾選服務合約
      本樣本增加的執行個體數量為1台。您可以根據實際需要擴容Task執行個體的數量,如果需要高可用Connect叢集,則建議擴容兩個以上執行個體。
    3. 單擊確定
  3. 查看KafkaConnect服務狀態,確保Kafka Connect叢集已經啟動。
    1. 單擊上方的叢集服務
    2. 單擊Kafka服務地區的狀態
    3. 組件列表地區,查看KafkaConnect的組件狀態,確保組件在運行中。
      KafkaConnect
  4. 使用SSH方式登入目的地組群emrdest,詳情請參見登入叢集
  5. 執行以下命令,檢查Kafka Connect Rest服務狀態。
    curl -X GET http://task-1-1:8083| jq .
    返回以下類似資訊。
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
    {
      "version": "2.4.1",
      "commit": "42ce056344c5625a",
      "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
    }

步驟二:使用MirrorMaker2 connector

  1. 準備MM2 connector設定檔。
    您需要準備以下檔案:
    • 準備MirrorSourceConnector設定檔
      本文樣本MirrorSourceConnector設定檔命名為mm2-source-connector.json。按照如下樣本並根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
      {
        "name": "mm2-source-connector",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "emrsource,emrdest",
        "source.cluster.alias": "emrsource",
        "target.cluster.alias": "emrdest",
        "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
        "source.cluster.bootstrap.servers": "10.0.**.**:9092",
        "topics": "^foo.*",
        "tasks.max": "4",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "replication.factor": "3",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.acls.interval.seconds": "20",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "consumer.group.id": "mm2-mirror-source-consumer-group",
        "producer.enable.idempotence":"true",
        "source.cluster.security.protocol": "PLAINTEXT",
        "target.cluster.security.protocol": "PLAINTEXT"
      }
      說明
      本文範例程式碼中參數:
      • source.cluster.bootstrap.servers:該參數值的IP地址,需要替換為您實際環境源叢集emrsource中Kafka服務的訪問地址,並且需要確保源Kafka叢集和Kafka Connect叢集的聯通性。
      • topics:該參數值表示會複製您源叢集中以foo開頭的Topic。
    • 準備MirrorCheckpointConnector設定檔
      本文樣本MirrorCheckpointConnector設定檔命名為mm2-checkpoint-connector.json。按照如下樣本並根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
      {
          "name": "mm2-checkpoint-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "checkpoints.topic.replication.factor": "3",
          "emit.checkpoints.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
    • 準備MirrorHeartbeatConnector設定檔
      本文樣本MirrorHeartbeatConnector設定檔命名為mm2-heartbeat-connector.json。按照如下樣本並根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
      {
          "name": "mm2-heartbeat-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "heartbeats.topic.replication.factor": "3",
          "emit.heartbeats.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
  2. 使用MirrorSourceConnector。
    1. 通過Connect rest服務,使用mm2-source-connector.json檔案,建立MirrorSourceConnector任務。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-source-connector.json http://task-1-1:8083/connectors/mm2-source-connector/config
    2. 執行以下命令,查看mm2-source-connector狀態。
      curl -s task-1-1:8083/connectors/mm2-source-connector/status | jq .
  3. 使用MirrorCheckpointConnector。
    1. 通過Connect rest服務,使用mm2-checkpoint-connector.json檔案,建立MirrorCheckpointConnector任務。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-checkpoint-connector.json http://task-1-1:8083/connectors/mm2-checkpoint-connector/config
    2. 執行以下命令,查看mm2-checkpoint-connector狀態。
      curl -s task-1-1:8083/connectors/mm2-checkpoint-connector/status | jq .
  4. 使用MirrorHeartbeatConnector。
    1. 通過Connect rest服務,使用mm2-heartbeat-connector.json檔案,建立MirrorHeartbeatConnector任務。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-heartbeat-connector.json http://task-1-1:8083/connectors/mm2-heartbeat-connector/config
    2. 執行以下命令,查看mm2-heartbeat-connector狀態。
      curl -s task-1-1:8083/connectors/mm2-heartbeat-connector/status | jq .
  5. 在目的地組群執行以下命令,查看MM2相關topic。
    kafka-topics.sh --list --bootstrap-server core-1-1:9092
    此時,在目的地組群中,您可以看到以下topic已經建立:
    • emrsource.foo開頭的topic:由MirrorSourceConnector建立。

      foo開頭的topic是您源叢集上已有的,需要複製的topic。

    • emrsource.checkpoints.internal:由MirrorCheckpointConnector建立,用於儲存offset等資訊。
    • heartbeats:由MirrorHeartbeatConnector建立。
  • 本頁導讀 (1, M)
  • 背景資訊
  • 使用情境
  • 功能
  • 任務執行方式
  • 前提條件
  • 使用限制
  • 操作流程
  • 步驟一:在目的地組群建立Kafka Connect叢集
  • 步驟二:使用MirrorMaker2 connector
文檔反饋
phone 聯絡我們

立即和Alibaba Cloud在線服務人員進行交談,獲取您想了解的產品信息以及最新折扣。

alicare alicarealicarealicare