To produce and consume messages on an ApsaraMQ for Kafka instance over the Internet, connect to the SSL endpoint with SASL/PLAIN authentication. The PLAIN mechanism transmits credentials in cleartext, so always pair it with SSL encryption (SASL_SSL) to protect credentials in transit.
All examples in this guide use the Java SDK.
Placeholders
Gather the following values before you start. Replace these placeholders in all configuration and code samples.
| Placeholder | Description | Where to find it |
|---|---|---|
<bootstrap-servers> | SSL endpoint of your ApsaraMQ for Kafka instance | Instance Details page in the ApsaraMQ for Kafka console |
<topic-name> | Topic name | Topics page in the ApsaraMQ for Kafka console |
<group-id> | Consumer group ID | Groups page in the ApsaraMQ for Kafka console |
<truststore-path> | Absolute path to the SSL root certificate on your machine | See Step 2: Download the SSL root certificate |
<jaas-conf-path> | Absolute path to the JAAS configuration file | See Step 3: Create the JAAS configuration file |
<username> | SASL username | Instance Details page (ACL disabled) or your SASL user credentials (ACL enabled) |
<password> | SASL password | Same as username |
Prerequisites
An ApsaraMQ for Kafka instance, topic, and consumer group. For details, see Step 3: Create resources.
Add Java dependencies
Add the following dependencies to your pom.xml file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>Match the kafka-clients major version to your ApsaraMQ for Kafka instance version. Find the instance version on the Instance Details page in the ApsaraMQ for Kafka console.
Prepare configuration files
Create the following files before writing producer and consumer code.
| File | Purpose |
|---|---|
log4j.properties | Log output configuration |
SSL root certificate (.jks) | TLS trust anchor for the broker connection |
kafka_client_jaas.conf | SASL/PLAIN credentials |
kafka.properties | Broker endpoint, topic, consumer group, and file paths |
JavaKafkaConfigurer.java | Helper class that loads the properties and sets the JAAS path |
Step 1: Create the Log4j configuration file
Create a file named log4j.properties:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, STDOUT
log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%nStep 2: Download the SSL root certificate
Download the SSL root certificate and save it to a location accessible by your application. Record the absolute path -- you will use it as <truststore-path>.
In production environments, use the full absolute path to the truststore file. Do not package it inside a JAR.
Step 3: Create the JAAS configuration file
Create a file named kafka_client_jaas.conf:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<username>"
password="<password>";
};If the access control list (ACL) feature is disabled for the instance, find the default Simple Authentication and Security Layer (SASL) user credentials on the Instance Details page in the ApsaraMQ for Kafka console.
If ACL is enabled, make sure the SASL user is of the PLAIN type and has permissions to produce and consume messages. For details, see Grant permissions to SASL users.
Step 4: Create the Kafka properties file
Create kafka.properties:
## SSL endpoint (from the ApsaraMQ for Kafka console)
bootstrap.servers=<bootstrap-servers>
## Topic name (created in the ApsaraMQ for Kafka console)
topic=<topic-name>
## Consumer group ID (created in the ApsaraMQ for Kafka console)
group.id=<group-id>
## Absolute path to the SSL root certificate
ssl.truststore.location=<truststore-path>
## Absolute path to the JAAS configuration file
java.security.auth.login.config=<jaas-conf-path>Step 5: Create the configuration loader
Create JavaKafkaConfigurer.java to load the properties file and set the JAAS configuration path.
import java.util.Properties;
public class JavaKafkaConfigurer {
private static Properties properties;
public static void configureSasl() {
// Skip if the JAAS config path is already set via -D flag or another method
if (null == System.getProperty("java.security.auth.login.config")) {
System.setProperty("java.security.auth.login.config",
getKafkaProperties().getProperty("java.security.auth.login.config"));
}
}
public synchronized static Properties getKafkaProperties() {
if (null != properties) {
return properties;
}
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(
KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
e.printStackTrace();
}
properties = kafkaProperties;
return kafkaProperties;
}
}SSL and SASL properties reference
The following properties are shared by both the producer and consumer examples. They configure the SSL transport and SASL/PLAIN authentication.
| Property | Value | Description |
|---|---|---|
security.protocol | SASL_SSL | Encrypt traffic with TLS and authenticate with SASL |
sasl.mechanism | PLAIN | Use the PLAIN authentication mechanism |
ssl.truststore.location | <truststore-path> | Path to the SSL root certificate (.jks file) |
ssl.truststore.password | KafkaOnsClient | Default truststore password |
ssl.endpoint.identification.algorithm | (empty string) | Disable hostname verification |
Produce messages
Create KafkaProducerDemo.java:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaProducerDemo {
public static void main(String args[]) {
// Load JAAS configuration
JavaKafkaConfigurer.configureSasl();
// Load kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- SSL + SASL/PLAIN authentication ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Disable hostname verification
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- Producer settings ---
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
// Create a thread-safe producer (one per process is usually sufficient;
// for higher throughput, create up to 5)
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// Topic to send messages to
String topic = kafkaProperties.getProperty("topic");
String value = "this is the message's value";
try {
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> kafkaMessage =
new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
System.out.println("error occurred");
e.printStackTrace();
}
}
}Compile and run KafkaProducerDemo.java to send messages.
Consume messages
Single consumer
Create KafkaConsumerDemo.java:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerDemo {
public static void main(String args[]) {
// Load JAAS configuration
JavaKafkaConfigurer.configureSasl();
// Load kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- SSL + SASL/PLAIN authentication ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Disable hostname verification
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- Consumer settings ---
// Session timeout (default: 30s). If no heartbeat is received within this interval,
// the broker removes the consumer from the group and triggers rebalancing.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Maximum bytes fetched per partition and per request.
// Tune these values for Internet connections to control bandwidth.
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
// Maximum number of records returned per poll.
// Keep this low enough to process all records before the next poll;
// otherwise the broker triggers rebalancing.
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.getProperty("group.id"));
// Create a consumer instance
KafkaConsumer<String, String> consumer =
new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
// Subscribe to one or more topics
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
// Poll loop
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// Process all records before the next poll.
// For better throughput, offload processing to a separate thread pool.
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d",
record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}Compile and run KafkaConsumerDemo.java to consume messages.
Multiple consumers
To increase throughput, run multiple consumer threads within the same process. The total number of consumers across all processes must not exceed the number of partitions in the subscribed topic.
Create KafkaMultiConsumerDemo.java:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;
public class KafkaMultiConsumerDemo {
public static void main(String args[]) throws InterruptedException {
// Load JAAS configuration
JavaKafkaConfigurer.configureSasl();
// Load kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- SSL + SASL/PLAIN authentication ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Disable hostname verification
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- Consumer settings ---
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.getProperty("group.id"));
// Start two consumer threads
int consumerNum = 2;
Thread[] consumerThreads = new Thread[consumerNum];
for (int i = 0; i < consumerNum; i++) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
consumerThreads[i] = new Thread(kafkaConsumerRunner);
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].start();
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].join();
}
}
static class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
while (!closed.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Thread:%s Consume partition:%d offset:%d",
Thread.currentThread().getName(),
record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
} catch (WakeupException e) {
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}Compile and run KafkaMultiConsumerDemo.java to consume messages with multiple threads.
Verify the result
After you run the producer and consumer, check the console output.
Producer -- successful output looks like:
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@0
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@1
...Consumer -- successful output looks like:
Consume partition:0 offset:0
Consume partition:0 offset:1
...If either program throws an exception, see Troubleshoot ApsaraMQ for Kafka client errors.