All Products
Search
Document Center

ApsaraMQ for Kafka:Connect to ApsaraMQ for Kafka over SSL with PLAIN authentication

Last Updated:Mar 11, 2026

To produce and consume messages on an ApsaraMQ for Kafka instance over the Internet, connect to the SSL endpoint with SASL/PLAIN authentication. The PLAIN mechanism transmits credentials in cleartext, so always pair it with SSL encryption (SASL_SSL) to protect credentials in transit.

All examples in this guide use the Java SDK.

Placeholders

Gather the following values before you start. Replace these placeholders in all configuration and code samples.

PlaceholderDescriptionWhere to find it
<bootstrap-servers>SSL endpoint of your ApsaraMQ for Kafka instanceInstance Details page in the ApsaraMQ for Kafka console
<topic-name>Topic nameTopics page in the ApsaraMQ for Kafka console
<group-id>Consumer group IDGroups page in the ApsaraMQ for Kafka console
<truststore-path>Absolute path to the SSL root certificate on your machineSee Step 2: Download the SSL root certificate
<jaas-conf-path>Absolute path to the JAAS configuration fileSee Step 3: Create the JAAS configuration file
<username>SASL usernameInstance Details page (ACL disabled) or your SASL user credentials (ACL enabled)
<password>SASL passwordSame as username

Prerequisites

Add Java dependencies

Add the following dependencies to your pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
Note

Match the kafka-clients major version to your ApsaraMQ for Kafka instance version. Find the instance version on the Instance Details page in the ApsaraMQ for Kafka console.

Prepare configuration files

Create the following files before writing producer and consumer code.

FilePurpose
log4j.propertiesLog output configuration
SSL root certificate (.jks)TLS trust anchor for the broker connection
kafka_client_jaas.confSASL/PLAIN credentials
kafka.propertiesBroker endpoint, topic, consumer group, and file paths
JavaKafkaConfigurer.javaHelper class that loads the properties and sets the JAAS path

Step 1: Create the Log4j configuration file

Create a file named log4j.properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, STDOUT

log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n

Step 2: Download the SSL root certificate

Download the SSL root certificate and save it to a location accessible by your application. Record the absolute path -- you will use it as <truststore-path>.

Warning

In production environments, use the full absolute path to the truststore file. Do not package it inside a JAR.

Step 3: Create the JAAS configuration file

Create a file named kafka_client_jaas.conf:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<username>"
  password="<password>";
};
Note
  • If the access control list (ACL) feature is disabled for the instance, find the default Simple Authentication and Security Layer (SASL) user credentials on the Instance Details page in the ApsaraMQ for Kafka console.

  • If ACL is enabled, make sure the SASL user is of the PLAIN type and has permissions to produce and consume messages. For details, see Grant permissions to SASL users.

Step 4: Create the Kafka properties file

Create kafka.properties:

## SSL endpoint (from the ApsaraMQ for Kafka console)
bootstrap.servers=<bootstrap-servers>
## Topic name (created in the ApsaraMQ for Kafka console)
topic=<topic-name>
## Consumer group ID (created in the ApsaraMQ for Kafka console)
group.id=<group-id>
## Absolute path to the SSL root certificate
ssl.truststore.location=<truststore-path>
## Absolute path to the JAAS configuration file
java.security.auth.login.config=<jaas-conf-path>

Step 5: Create the configuration loader

Create JavaKafkaConfigurer.java to load the properties file and set the JAAS configuration path.

import java.util.Properties;

public class JavaKafkaConfigurer {

    private static Properties properties;

    public static void configureSasl() {
        // Skip if the JAAS config path is already set via -D flag or another method
        if (null == System.getProperty("java.security.auth.login.config")) {
            System.setProperty("java.security.auth.login.config",
                getKafkaProperties().getProperty("java.security.auth.login.config"));
        }
    }

    public synchronized static Properties getKafkaProperties() {
        if (null != properties) {
            return properties;
        }
        Properties kafkaProperties = new Properties();
        try {
            kafkaProperties.load(
                KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        properties = kafkaProperties;
        return kafkaProperties;
    }
}

SSL and SASL properties reference

The following properties are shared by both the producer and consumer examples. They configure the SSL transport and SASL/PLAIN authentication.

PropertyValueDescription
security.protocolSASL_SSLEncrypt traffic with TLS and authenticate with SASL
sasl.mechanismPLAINUse the PLAIN authentication mechanism
ssl.truststore.location<truststore-path>Path to the SSL root certificate (.jks file)
ssl.truststore.passwordKafkaOnsClientDefault truststore password
ssl.endpoint.identification.algorithm(empty string)Disable hostname verification

Produce messages

Create KafkaProducerDemo.java:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

public class KafkaProducerDemo {

    public static void main(String args[]) {
        // Load JAAS configuration
        JavaKafkaConfigurer.configureSasl();

        // Load kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        // --- SSL + SASL/PLAIN authentication ---
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Disable hostname verification
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // --- Producer settings ---
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);

        // Create a thread-safe producer (one per process is usually sufficient;
        // for higher throughput, create up to 5)
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // Topic to send messages to
        String topic = kafkaProperties.getProperty("topic");
        String value = "this is the message's value";

        try {
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> kafkaMessage =
                    new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);
            }
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

Compile and run KafkaProducerDemo.java to send messages.

Consume messages

Single consumer

Create KafkaConsumerDemo.java:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

public class KafkaConsumerDemo {

    public static void main(String args[]) {
        // Load JAAS configuration
        JavaKafkaConfigurer.configureSasl();

        // Load kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        // --- SSL + SASL/PLAIN authentication ---
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Disable hostname verification
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // --- Consumer settings ---
        // Session timeout (default: 30s). If no heartbeat is received within this interval,
        // the broker removes the consumer from the group and triggers rebalancing.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        // Maximum bytes fetched per partition and per request.
        // Tune these values for Internet connections to control bandwidth.
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        // Maximum number of records returned per poll.
        // Keep this low enough to process all records before the next poll;
        // otherwise the broker triggers rebalancing.
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
            kafkaProperties.getProperty("group.id"));

        // Create a consumer instance
        KafkaConsumer<String, String> consumer =
            new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);

        // Subscribe to one or more topics
        List<String> subscribedTopics = new ArrayList<String>();
        subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);

        // Poll loop
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // Process all records before the next poll.
                // For better throughput, offload processing to a separate thread pool.
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(
                        String.format("Consume partition:%d offset:%d",
                            record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }
}

Compile and run KafkaConsumerDemo.java to consume messages.

Multiple consumers

To increase throughput, run multiple consumer threads within the same process. The total number of consumers across all processes must not exceed the number of partitions in the subscribed topic.

Create KafkaMultiConsumerDemo.java:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaMultiConsumerDemo {

    public static void main(String args[]) throws InterruptedException {
        // Load JAAS configuration
        JavaKafkaConfigurer.configureSasl();

        // Load kafka.properties
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        // --- SSL + SASL/PLAIN authentication ---
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Disable hostname verification
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // --- Consumer settings ---
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
            kafkaProperties.getProperty("group.id"));

        // Start two consumer threads
        int consumerNum = 2;
        Thread[] consumerThreads = new Thread[consumerNum];
        for (int i = 0; i < consumerNum; i++) {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            List<String> subscribedTopics = new ArrayList<String>();
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);

            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
            consumerThreads[i] = new Thread(kafkaConsumerRunner);
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].start();
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].join();
        }
    }

    static class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    try {
                        ConsumerRecords<String, String> records = consumer.poll(1000);
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(
                                String.format("Thread:%s Consume partition:%d offset:%d",
                                    Thread.currentThread().getName(),
                                    record.partition(), record.offset()));
                        }
                    } catch (Exception e) {
                        try {
                            Thread.sleep(1000);
                        } catch (Throwable ignore) {
                        }
                        e.printStackTrace();
                    }
                }
            } catch (WakeupException e) {
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

Compile and run KafkaMultiConsumerDemo.java to consume messages with multiple threads.

Verify the result

After you run the producer and consumer, check the console output.

Producer -- successful output looks like:

Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@0
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@1
...

Consumer -- successful output looks like:

Consume partition:0 offset:0
Consume partition:0 offset:1
...

If either program throws an exception, see Troubleshoot ApsaraMQ for Kafka client errors.