All Products
Search
Document Center

ApsaraMQ for Kafka:Cluster Linking

Last Updated:Jan 06, 2026

Cluster Linking is a feature of the Confluent Platform that connects multiple Kafka clusters. This feature mirrors and replicates data between different Kafka clusters. A cluster link is initiated on the destination cluster and copies data from the source cluster to the destination cluster. This topic describes how to use Cluster Linking in ApsaraMQ for Confluent. It covers how to remotely create a cluster link using the Confluent command-line interface (CLI) and how to perform basic management of cluster links.

Prerequisites

  • A source cluster and a destination cluster are ready.

  • A machine is ready to connect the source and destination clusters. This topic uses an Elastic Compute Service (ECS) instance as an example. For more information about how to create and use an instance, see Create and manage an ECS instance in the console (express version).

    • Confluent Platform 7.0.0 or a later version is installed. For more information, see Confluent.

    • Install Java 8 or 11. For more information, see Install JDK.

Configuration files

Create configuration files on the ECS instance to connect the source and destination clusters. Replace <username>, <password>, and <source-cluster-address:port> in the sample code with your local configuration.

  1. Create a configuration file named /tmp/source.config to connect to the source cluster. In the file, enable automatic mirror topic creation, consumer offset synchronization, and access control list (ACL) synchronization.

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    bootstrap.servers=<source-cluster-address:port>
    auto.create.mirror.topics.enable=true
    consumer.offset.sync.enable=true
    acl.sync.enable=true
  2. Create a configuration file named /tmp/destination.config to connect to the destination cluster.

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";

Prepare test data

Run the following commands using the Confluent Platform CLI to prepare test data on the source cluster. Replace <source-cluster-address:port> in the sample code with your local configuration.

  1. Run the following command to create a topic with a single partition on the source cluster. This topic will be mirrored. A single partition makes it easier to observe the message replication order.

    kafka-topics --create --topic test-topic --partitions 1 \
    --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config

    You can use the list topic and describe topic commands to view the topic details.

    #list topic
    kafka-topics --list --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config
    
    #describe topic
    kafka-topics --describe --topic test-topic \
    --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config
  2. Run the following command to send messages to the test-topic topic on the source cluster.

    seq 1 5 | kafka-console-producer --topic test-topic \
    --bootstrap-server <source-cluster-address:port> \
    --producer.config /tmp/source.config
  3. Run the following command to consume data from the test-topic topic on the source cluster and specify a consumer group.

    # consume
    kafka-console-consumer --topic test-topic ---beginning \
    --bootstrap-server <source-cluster-address:port> --group test-group \
    --consumer.config /tmp/source.config
    
    # list consumer groups
    kafka-consumer-groups --bootstrap-server <source-cluster-address:port> --list \
    --command-config /tmp/source.config
    
    # describe offsets of consumer groups
    kafka-consumer-groups --bootstrap-server <source-cluster-address:port> \
    --group test-group --describe --offsets \
    --command-config /tmp/source.config

    If the messages are consumed successfully, the output is:

    1

    2

    3

    4

    5

  4. Run the following command to add an ACL user and grant the write permission.

    # add user and write permission
    kafka-acls --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config  --add --allow-principal User:test-user \
    --operation READ --topic test-topic
      
    # list
    kafka-acls --list --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config

Data synchronization

This example assumes that you use SASL_SSL to log on to the source and destination clusters. It also assumes that a certificate is used for domain name verification when connecting to the clusters. Replace <source-cluster-address:port> and <destination-cluster-address:port> in the sample code with your local configuration.

  1. Create a configuration file named /tmp/topic_filter.json to select the topics to migrate.

    { 
      "topicFilters": [ 
        {
          "name": "test-topic",  
          "patternType": "LITERAL",  
          "filterType": "INCLUDE"
        } 
      ]
    }
  2. Create a configuration file named /tmp/group.json to select the consumer groups to migrate.

    {
      "groupFilters": [
        {
          "name": "test-group",
          "patternType": "LITERAL",
          "filterType": "INCLUDE"
        }
      ]
    }
  3. Create a configuration file named /tmp/acl.json to select the ACL permissions to migrate.

    {
      "aclFilters": [
        {
          "resourceFilter": {
            "resourceType": "any",
            "patternType": "any"
          },
          "accessFilter": {
            "operation": "any",
            "permissionType": "any"
          }
        }
      ]
    }
  4. Run the following command to create a cluster link to replicate the topics, consumer groups, and ACL user permissions.

    kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config --create --link test-cluster-link \
    --config-file /tmp/source.config \
    --topic-filters-json-file /tmp/topic_filter.json \
    --consumer-group-filters-json-file /tmp/group.json \
    --acl-filters-json-file /tmp/acl.json
  5. After the data synchronization is complete, run the following command to change the status of the mirror topic to promote. This makes the mirror topic readable and writable. The mirror topic will no longer synchronize messages from the source topic.

    kafka-mirrors --promote --topics test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

Migration test

After the data is synchronized, perform the following steps to verify that the migration was successful.

  1. Run the following command to check whether the topics, consumer groups, and ACL user permissions were synchronized to the destination cluster.

    # list topic
    kafka-topics --list --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
    
    # list consumer group
    kafka-consumer-groups --bootstrap-server <destination-cluster-address:port> \
    --list --command-config /tmp/destination.config
    
    # list acl
    kafka-acls --list --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
  2. Verify that Topic production and consumption are functioning correctly.

    # produce
    kafka-console-producer --topic test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --producer.config /tmp/destination.config
    
    # consume
    kafka-console-consumer --topic test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --consumer.config /tmp/destination.config

Manage cluster links

This section describes how to manage existing cluster links. Replace <destination-cluster-address:port> in the sample code with your local configuration.

  1. Run the following command to view the list of cluster links.

    kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \
     --list --command-config /tmp/destination.config 
  2. Run the following command to view the details of a cluster link.

    kafka-configs --describe --cluster-link test-cluster-link \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
  3. Run the following command to convert a mirror topic into a regular topic.

    kafka-mirrors --promote --topics test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

    Expected output:

    Calculating max offset and ms lag for mirror topics: [test-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
    Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
  4. Run the following command to delete a cluster link.

    kafka-cluster-links --delete --link test-cluster-link \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

    Expected output:

    Cluster link 'test-cluster-link' deletion successfully completed.

References