Cluster Linking is a feature of the Confluent Platform that connects multiple Kafka clusters. This feature mirrors and replicates data between different Kafka clusters. A cluster link is initiated on the destination cluster and copies data from the source cluster to the destination cluster. This topic describes how to use Cluster Linking in ApsaraMQ for Confluent. It covers how to remotely create a cluster link using the Confluent command-line interface (CLI) and how to perform basic management of cluster links.
Prerequisites
A source cluster and a destination cluster are ready.
A machine is ready to connect the source and destination clusters. This topic uses an Elastic Compute Service (ECS) instance as an example. For more information about how to create and use an instance, see Create and manage an ECS instance in the console (express version).
Confluent Platform 7.0.0 or a later version is installed. For more information, see Confluent.
Install Java 8 or 11. For more information, see Install JDK.
Configuration files
Create configuration files on the ECS instance to connect the source and destination clusters. Replace <username>, <password>, and <source-cluster-address:port> in the sample code with your local configuration.
Create a configuration file named
/tmp/source.configto connect to the source cluster. In the file, enable automatic mirror topic creation, consumer offset synchronization, and access control list (ACL) synchronization.security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; bootstrap.servers=<source-cluster-address:port> auto.create.mirror.topics.enable=true consumer.offset.sync.enable=true acl.sync.enable=trueCreate a configuration file named
/tmp/destination.configto connect to the destination cluster.security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
Prepare test data
Run the following commands using the Confluent Platform CLI to prepare test data on the source cluster. Replace <source-cluster-address:port> in the sample code with your local configuration.
Run the following command to create a topic with a single partition on the source cluster. This topic will be mirrored. A single partition makes it easier to observe the message replication order.
kafka-topics --create --topic test-topic --partitions 1 \ --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.configYou can use the
list topicanddescribe topiccommands to view the topic details.#list topic kafka-topics --list --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config #describe topic kafka-topics --describe --topic test-topic \ --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.configRun the following command to send messages to the test-topic topic on the source cluster.
seq 1 5 | kafka-console-producer --topic test-topic \ --bootstrap-server <source-cluster-address:port> \ --producer.config /tmp/source.configRun the following command to consume data from the test-topic topic on the source cluster and specify a consumer group.
# consume kafka-console-consumer --topic test-topic ---beginning \ --bootstrap-server <source-cluster-address:port> --group test-group \ --consumer.config /tmp/source.config # list consumer groups kafka-consumer-groups --bootstrap-server <source-cluster-address:port> --list \ --command-config /tmp/source.config # describe offsets of consumer groups kafka-consumer-groups --bootstrap-server <source-cluster-address:port> \ --group test-group --describe --offsets \ --command-config /tmp/source.configIf the messages are consumed successfully, the output is:
1
2
3
4
5
Run the following command to add an ACL user and grant the write permission.
# add user and write permission kafka-acls --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config --add --allow-principal User:test-user \ --operation READ --topic test-topic # list kafka-acls --list --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config
Data synchronization
This example assumes that you use SASL_SSL to log on to the source and destination clusters. It also assumes that a certificate is used for domain name verification when connecting to the clusters. Replace <source-cluster-address:port> and <destination-cluster-address:port> in the sample code with your local configuration.
Create a configuration file named
/tmp/topic_filter.jsonto select the topics to migrate.{ "topicFilters": [ { "name": "test-topic", "patternType": "LITERAL", "filterType": "INCLUDE" } ] }Create a configuration file named
/tmp/group.jsonto select the consumer groups to migrate.{ "groupFilters": [ { "name": "test-group", "patternType": "LITERAL", "filterType": "INCLUDE" } ] }Create a configuration file named
/tmp/acl.jsonto select the ACL permissions to migrate.{ "aclFilters": [ { "resourceFilter": { "resourceType": "any", "patternType": "any" }, "accessFilter": { "operation": "any", "permissionType": "any" } } ] }Run the following command to create a cluster link to replicate the topics, consumer groups, and ACL user permissions.
kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config --create --link test-cluster-link \ --config-file /tmp/source.config \ --topic-filters-json-file /tmp/topic_filter.json \ --consumer-group-filters-json-file /tmp/group.json \ --acl-filters-json-file /tmp/acl.jsonAfter the data synchronization is complete, run the following command to change the status of the mirror topic to
promote. This makes the mirror topic readable and writable. The mirror topic will no longer synchronize messages from the source topic.kafka-mirrors --promote --topics test-topic \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
Migration test
After the data is synchronized, perform the following steps to verify that the migration was successful.
Run the following command to check whether the topics, consumer groups, and ACL user permissions were synchronized to the destination cluster.
# list topic kafka-topics --list --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config # list consumer group kafka-consumer-groups --bootstrap-server <destination-cluster-address:port> \ --list --command-config /tmp/destination.config # list acl kafka-acls --list --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configVerify that Topic production and consumption are functioning correctly.
# produce kafka-console-producer --topic test-topic \ --bootstrap-server <destination-cluster-address:port> \ --producer.config /tmp/destination.config # consume kafka-console-consumer --topic test-topic \ --bootstrap-server <destination-cluster-address:port> \ --consumer.config /tmp/destination.config
Manage cluster links
This section describes how to manage existing cluster links. Replace <destination-cluster-address:port> in the sample code with your local configuration.
Run the following command to view the list of cluster links.
kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \ --list --command-config /tmp/destination.configRun the following command to view the details of a cluster link.
kafka-configs --describe --cluster-link test-cluster-link \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configRun the following command to convert a mirror topic into a regular topic.
kafka-mirrors --promote --topics test-topic \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configExpected output:
Calculating max offset and ms lag for mirror topics: [test-topic] Finished calculating max offset lag and max lag ms for mirror topics: [test-topic] Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.Run the following command to delete a cluster link.
kafka-cluster-links --delete --link test-cluster-link \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configExpected output:
Cluster link 'test-cluster-link' deletion successfully completed.
References
For more information about Cluster Linking, see Cluster Linking for Confluent Platform.
To determine whether your cluster can use Cluster Linking, see Supported cluster types.