All Products
Search
Document Center

ApsaraMQ for Kafka:Manage Schema Registry

Last Updated:May 30, 2025

ApsaraMQ for Confluent uses Schema Registry to manage schemas. This topic describes basic operations on Schema Registry in Linux.

Before you start

  • Purchase an ApsraMQ for Confluent instance. For more information, see Purchase and deploy instances.

  • Obtain the permissions to access the Kafka and Schema Registry clusters. For more information, see RBAC authorization.

  • Install Java 8 or 11. For information about supported Java versions, see Java.

  • Install Maven 3.8 or later. For more information, see Downloads.

Step 1: Prepare sample code

  1. Run the following commands to clone the sample code and switch to the 7.9.0-post branch.

    git clone https://github.com/confluentinc/examples.git
    
    cd examples/clients/avro
    
    git checkout 7.9.0-post
  2. Create a configuration file named java.config in the $HOME/.confluent/ path, where $HOME is your home directory. In the configuration file, configure the following items:

    # Required connection configs for Kafka producer, consumer, and admin
    bootstrap.servers={{ BROKER_ENDPOINT }}
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
    sasl.mechanism=PLAIN
    
    # Required for correctness in Apache Kafka clients prior to 2.6
    client.dns.lookup=use_all_dns_ips
    
    # Best practice for higher availability in Apache Kafka clients prior to 3.0
    session.timeout.ms=45000
    
    # Best practice for Kafka producer to prevent data loss
    acks=all
    
    # Required connection configs for Confluent Cloud Schema Registry
    schema.registry.url=https://{{ SR_ENDPOINT }}
    basic.auth.credentials.source=USER_INFO
    basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

    The following table describes the parameters:

    Parameter

    Description

    Example

    BROKER_ENDPOINT

    The endpoint of the Kafka service.

    You can obtain the endpoint on the Access Links and Ports page in the ApsaraMQ for Confluent console. If you want to use the public endpoint, you must enable Internet access. For more information, see Enable the Internet access feature. For information about security settings, see Configure network access and security settings.

    pub-kafka-xxxxxxxxxxx.csp.aliyuncs.com:9092

    CLUSTER_API_KEY

    The username and password of the LDAP user. You can obtain the username and password on the Users page in the ApsaraMQ for Confluent console.

    During testing, you can use the root account and its password. If you want to use another LDAP user, you must create the user in the ApsaraMQ for Confluent console and grant the corresponding permissions of the Kafka cluster to it. For more information, see Manage users and grant permissions to them.

    root

    CLUSTER_API_SECRET

    ******

    SR_ENDPOINT

    The endpoint of the Schema Registry service.

    You can obtain the endpoint on the Access Links and Ports page in the ApsaraMQ for Confluent console. If you want to use the public endpoint, you must enable Internet access. For more information, see Enable the Internet access feature. For information about security settings, see Configure network access and security settings.

    pub-schemaregistry-xxxxxxxxxxx.csp.aliyuncs.com:443

    SR_API_KEY

    The username and password of the LDAP user. You can obtain the username and password on the Users page in the ApsaraMQ for Confluent console.

    During testing, you can use the root account and its password. If you want to use another LDAP user, you must create the user in the ApsaraMQ for Confluent console and grant the corresponding permissions of Schema Registry to it. For more information, see Manage users and grant permissions to them.

    root

    SR_API_SECRET

    ******

Step 2: Create a topic

Note

In the sample code, the Topic parameter is set to transactions. During testing, you can create a topic named transactions. If you want to use another topic, replace the value of the Topic parameter with your actual topic name.

  1. Log on to Control Center. On the Home page, click the controlcenter.clusterk card to go to the Cluster overview page.

    image

  2. In the left-side navigation pane, click Topics. Then, in the upper-right corner of the Topics page, click + Add topic.

    image

  3. On the New topic page, specify the topic name and the number of partitions and click Create with defaults.

    image

  4. After you create the topic, go to the topic details page to view the topic details.

    image

Step 3: Enable schema validation

  1. On the topic details page, click the Configuration tab. Then, click Edit settings.

    image

  2. Click Switch to expert mode.

    image

  3. Set the confluent_value_schema_validation parameter to true and click Save changes to enable schema validation for message bodies. After schema validation is enabled, the data format is validated during message sending and consumption.

    image

Step 4: Create a schema

  1. Go to the examples/clients/avro directory of the project and run the following command to view the content of the Payment.avsc file:

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

    Returned result:

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"}
     ]
    }
    
  2. On the topic details page in Control Center, click Schema. Then, click Set a schema.

  3. On the Schema tab, click Avro, enter the content of the Payment.avsc file in the code editor and click Create.

    image

Step 5: Send and consume messages

Send messages

If you specify Avro as the validation format when creating the schema, you must specify the KafkaAvroSerializer class as the message serialization method and configure Payment as the message value class when sending messages.

Sample code:

Sample code

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ProducerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
          // Backwards compatibility, assume localhost
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
          // Load properties from a local configuration file
          // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
          // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
          // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
          configFile = args[0];
          if (!Files.exists(Paths.get(configFile))) {
            throw new IOException(configFile + " not found.");
          } else {
            try (InputStream inputStream = new FileInputStream(configFile)) {
              props.load(inputStream);
            }
          }
        }

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) {

            for (long i = 0; i < 10; i++) {
                final String orderId = "id" + Long.toString(i);
                final Payment payment = new Payment(orderId, 1000.00d);
                final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
                producer.send(record);
                Thread.sleep(1000L);
            }

            producer.flush();
            System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);

        } catch (final SerializationException e) {
            e.printStackTrace();
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }

    }

}

To send messages, perform the following steps:

  1. Go to the examples/clients/avro directory of the project and run the following command to compile the project:

    mvn clean compile package
  2. Run the following code to send messages:

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    If information similar to the following one is returned, the messages are sent.

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
  3. View the sent messages in Control Center.

    image

Consume messages

If you specify Avro as the validation format when creating the schema, you must specify the KafkaAvroDeSerializer class as the message serialization method and configure Payment as the message value class when sending messages.

Sample code:

Sample code

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.InputStream;

public class ConsumerExample {

    private static final String TOPIC = "transactions";
    private static final Properties props = new Properties();
    private static String configFile;

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) throws IOException {

        if (args.length < 1) {
          // Backwards compatibility, assume localhost
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        } else {
          // Load properties from a local configuration file
          // Create the configuration file (e.g. at '$HOME/.confluent/java.config') with configuration parameters
          // to connect to your Kafka cluster, which can be on your local host, Confluent Cloud, or any other cluster.
          // Documentation at https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java.html
          configFile = args[0];
          if (!Files.exists(Paths.get(configFile))) {
            throw new IOException(configFile + " not found.");
          } else {
            try (InputStream inputStream = new FileInputStream(configFile)) {
              props.load(inputStream);
            }
          }
        }

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-payments");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

        try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<String, Payment> record : records) {
                    final String key = record.key();
                    final Payment value = record.value();
                    System.out.printf("key = %s, value = %s%n", key, value);
                }
            }

        }
    }

}

To consume messages, perform the following steps:

  1. Go to the examples/clients/avro directory of the project and run the following command to compile the project:

    mvn clean compile package
  2. Run the following code to consume messages:

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"

    If information similar to the following one is returned, the messages are consumed.

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...

References

For more information about Schema Registry, see Schema Registry for Confluent Platform.