當使用應用程式與Kafka叢集建立串連時,可通過執行個體存取點接入雲訊息佇列 Kafka 版並收發訊息。Kafka執行個體提供預設存取點、SSL存取點和SASL存取點,以滿足您不同的串連和安全需求。預設存取點適用於在保密性較高的VPC環境收發訊息;SASL存取點適用於無需對傳輸鏈路加密但需對訊息收發鑒權;同時需要鏈路加密和訊息鑒權,建議使用SSL存取點。
環境準備
安裝1.8或以上版本JDK。具體操作。請參見安裝JDK。
安裝2.5或以上版本Maven。具體操作,請參見安裝Maven。
安裝編譯工具。
本文以IntelliJ IDEA Ultimate為例。
購買並部署雲訊息佇列 Kafka 版執行個體,如果執行個體類型為VPC執行個體,則僅顯示預設存取點。如果執行個體類型為公網/VPC執行個體,則同時顯示預設存取點和SSL存取點。執行個體預設不開啟SASL存取點,因此預設不顯示SASL存取點。如需使用SASL存取點,您需要手動開啟。具體操作,請參見SASL使用者授權。
預設存取點:適用於在VPC環境收發訊息,但不支援SASL校正。
SASL存取點:適用於在VPC環境收發訊息,且支援SASL校正。
SSL存取點:適用於在公網環境收發訊息,且支援SASL校正。
安裝Java依賴庫
使用Java SDK接入Kafka執行個體的依賴如下所示。此依賴已在下文提到的kafka-java-demo檔案夾中的pom.xml內建,無需手動添加。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
建議您保持服務端和用戶端版本一致,即保持用戶端庫版本和雲訊息佇列 Kafka 版執行個體的大版本一致。您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面擷取雲訊息佇列 Kafka 版執行個體的大版本。
準備配置
可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。
訪問Aliware-kafka-demos,單擊,下載Demo工程到本地並解壓。
在解壓的Demo工程中,找到kafka-java-demo檔案夾,將此檔案夾匯入IntelliJ IDEA。
可選:如果採用SSL存取點或者SASL存取點接入執行個體,需修改設定檔kafka_client_jaas.conf。關於執行個體不同存取點的資訊,請參見存取點對比。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx"; };
當執行個體類型為VPC執行個體時,只有在同一VPC內的資源才能訪問Kafka執行個體,資料轉送具有較高的安全性和私密性。在安全要求更高的情境下,可以開啟ACL功能,訊息需結合SASL身分識別驗證鑒權之後再在安全通道傳輸。您可以根據訊息傳輸的安全要求層級選擇PLAIN機制或SCRAM機制進行身份認證。更多資訊,請參見開啟ACL。
當執行個體類型為公網/VPC執行個體時,公網環境必須對訊息進行鑒權與加密,SASL的PLAIN機制必須與SSL一起用作傳輸層,採用SASL_SSL協議,確保訊息在沒有加密的情況下不會線上路上傳輸明文。
樣本中的username和password為執行個體的SASL使用者名稱和密碼。
如果執行個體未開啟ACL,對於已開啟公網的執行個體,您可以在雲訊息佇列 Kafka 版控制台的執行個體詳情頁面配置信息地區擷取預設使用者的使用者名稱和密碼。
如果執行個體已開啟ACL,請確保要使用的SASL使用者為PLAIN類型且已授權收發訊息的許可權,詳情請參見SASL使用者授權。
修改設定檔kafka.properties。
##==============================通用配置參數============================== bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx topic=xxx group.id=xxx ##=======================以下內容請根據實際情況配置======================== ##SSL存取點配置 ssl.truststore.location=/xxxx/kafka.client.truststore.jks java.security.auth.login.config=/xxxx/kafka_client_jaas.conf ##SASL存取點PLAIN機制配置 java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf ##SASL存取點SCRAM機制配置 java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.conf
參數
描述
bootstrap.servers
存取點資訊。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
topic
執行個體的Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。
group.id
執行個體的Group。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。
說明如果應用運行producer.go發送訊息,該參數可以不配置;如果應用運行consumer.go訂閱訊息,該參數必須配置。
ssl.truststore.location
SSL根憑證的路徑。請將上文準備配置中第一步下載的SSL認證檔案儲存至本地路徑,用本地路徑替換範例程式碼中的xxxx。例如:/home/ssl/kafka.client.truststore.jks。
重要如果是預設存取點執行個體或者SASL存取點執行個體,該參數不需要配置;如果是SSL存取點執行個體,該參數必須配置。
java.security.auth.login.config
JAAS設定檔所在路徑,將Demo工程中的kafka_client_jaas.conf檔案另存至本地路徑,用本地路徑替換範例程式碼中的xxxx。例如:/home/ssl/kafka_client_jaas.conf。
重要如果是預設存取點執行個體,該參數不需要配置;如果是SSL存取點執行個體或者SASL存取點執行個體,該參數必須配置。
發送訊息
編譯並運行KafkaProducerDemo.java發送訊息。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
//如果是SSL存取點執行個體或者SASL存取點執行個體,請注釋以下第一行代碼。
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
*如果是SSL存取點執行個體或者SASL存取點執行個體,請取消以下兩行代碼的注釋。
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
public class KafkaProducerDemo {
public static void main(String args[]) {
/*
* 如果是SSL存取點執行個體,請取消以下一行代碼的注釋。
設定JAAS設定檔的路徑。
JavaKafkaConfigurer.configureSasl();
*/
/*
* 如果是SASL存取點PLAIN機制執行個體,請取消以下一行代碼的注釋。
設定JAAS設定檔的路徑。
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* 如果是SASL存取點SCRAM機制執行個體,請取消以下一行代碼的注釋。
設定JAAS設定檔的路徑。
JavaKafkaConfigurer.configureSaslScram();
*/
//載入kafka.properties。
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//設定存取點,請通過控制台擷取對應Topic的存取點。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
/*
* 如果是SSL存取點執行個體,請取消以下四行代碼的注釋。
* 與sasl路徑類似,該檔案也不能被打包到jar中。
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* 根憑證store的密碼,保持不變。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* 接入協議,目前支援使用SASL_SSL協議接入。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* SASL鑒權方式,保持不變。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* 如果是SASL存取點PLAIN機制執行個體,請取消以下兩行代碼的注釋。
* 接入協議。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* Plain方式。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* 如果是SASL存取點SCRAM機制執行個體,請取消以下兩行代碼的注釋。
* 接入協議。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* SCRAM方式。
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
*/
//雲訊息佇列 Kafka 版訊息的序列化方式。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//請求的最長等待時間。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//設定用戶端內部重試次數。
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//設定用戶端內部稍候再試。
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
/*
* 如果是SSL存取點執行個體或,請取消以下一行代碼的注釋。
* Hostname校正改成空。
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
*/
//構造Producer對象,注意,該對象是安全執行緒的,一般來說,一個進程內一個Producer對象即可。
//如果想提高效能,可以多構造幾個對象,但不要太多,最好不要超過5個。
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//構造一個雲訊息佇列 Kafka 版訊息。
String topic = kafkaProperties.getProperty("topic"); //訊息所屬的Topic,請在控制台申請之後,填寫在這裡。
String value = "this is the message's value"; //訊息的內容。
try {
//批量擷取Future對象可以加快速度,但注意,批量不要太大。
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i =0; i < 100; i++) {
//發送訊息,並獲得一個Future對象。
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future: futures) {
//同步獲得Future對象的結果。
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
//用戶端內部重試之後,仍然發送失敗,業務要應對此類錯誤。
System.out.println("error occurred");
e.printStackTrace();
}
}
}
訂閱訊息
選擇以下任意一種方式訂閱訊息。
單Consumer訂閱訊息
編譯並運行KafkaConsumerDemo.java發送訊息。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* 如果是SSL存取點執行個體,請取消以下三行代碼的注釋;如果是SASL存取點,請取消以下前兩行代碼的注釋。
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
public class KafkaConsumerDemo {
public static void main(String args[]) {
//設定JAAS設定檔的路徑。
/*
* 如果是SSL存取點執行個體,請取消以下一行代碼的注釋。
JavaKafkaConfigurer.configureSasl();
*/
/*
* 如果是SASL存取點PLAIN機制執行個體,請取消以下一行代碼的注釋。
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* 如果是SASL存取點SCRAM機制執行個體,請取消以下一行代碼的注釋。
JavaKafkaConfigurer.configureSaslScram();
*/
//載入kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//設定存取點,請通過控制台擷取對應Topic的存取點。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//如果是SSL存取點執行個體,請注釋以下第一行代碼。
//可更加實際拉去資料和客戶的版本等設定此值,預設30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
/*
* 如果是SSL存取點執行個體,請取消以下六行代碼的注釋。
* 設定SSL根憑證的路徑,請記得將XXX修改為自己的路徑。
* 與SASL路徑類似,該檔案也不能被打包到jar中。
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* 根憑證儲存的密碼,保持不變。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* 接入協議,目前支援使用SASL_SSL協議接入。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* SASL鑒權方式,保持不變。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
* 兩次Poll之間的最大允許間隔。
* 消費者超過該值沒有返回心跳,服務端判斷消費者處於非存活狀態,服務端將消費者從Group移除並觸發Rebalance,預設30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
* 設定單次拉取的量,走公網訪問時,該參數會有較大影響。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
*/
//如果是SASL存取點PLAIN機制執行個體,請注釋以下一行代碼。
//可更加實際拉去資料和客戶的版本等設定此值,預設30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
/*
* 如果是SASL存取點PLAIN機制執行個體,請取消以下三行代碼的注釋。
* 接入協議。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* Plain方式。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
* 兩次Poll之間的最大允許間隔。
* 消費者超過該值沒有返回心跳,服務端判斷消費者處於非存活狀態,服務端將消費者從Group移除並觸發Rebalance,預設30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
*/
//如果是SASL存取點SCRAM機制執行個體,請注釋以下一行代碼。
//可更加實際拉去資料和客戶的版本等設定此值,預設30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
/*
* 如果是SASL存取點SCRAM機制執行個體,請取消以下四行代碼的注釋。
* 接入協議。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* SCRAM方式。
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
* 兩次Poll之間的最大允許間隔。
* 消費者超過該值沒有返回心跳,服務端判斷消費者處於非存活狀態,服務端將消費者從Group移除並觸發Rebalance,預設30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
*/
//每次poll的最大數量。
//注意該值不要改得太大,如果poll太多資料,而不能在下次poll之前消費完,則會觸發一次負載平衡,產生卡頓。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//訊息的還原序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//當前消費執行個體所屬的消費組,請在控制台申請之後填寫。
//屬於同一個組的消費執行個體,會負載消費訊息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//如果是SSL存取點執行個體,請取消以下一行代碼的注釋。
//Hostname校正改成空。
//props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
//構造訊息對象,也即產生一個消費執行個體。
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
//設定消費組訂閱的Topic,可以訂閱多個。
//如果GROUP_ID_CONFIG是一樣,則訂閱的Topic也建議設定成一樣。
List<String> subscribedTopics = new ArrayList<String>();
//如果是SSL存取點執行個體,請注釋以下前五行代碼,取消第六行代碼的注釋。
//如果需要訂閱多個Topic,則在這裡add進去即可。
//每個Topic需要先在控制台進行建立。
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
subscribedTopics.add(topic.trim());
}
//subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
//迴圈消費訊息。
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
//必須在下次poll之前消費完這些資料, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
//建議開一個單獨的線程池來消費訊息,然後非同步返回結果。
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}
多Consumer訂閱訊息
編譯並運行KafkaMultiConsumerDemo.java消費訊息。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
//如果是SSL存取點執行個體或者SASL存取點執行個體,請取消以下第一行代碼的注釋。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* 如果是SSL存取點執行個體,請取消以下前三行代碼的注釋;如果是SASL存取點執行個體,請取消以下前兩行代碼的注釋。
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
import org.apache.kafka.common.errors.WakeupException;
/**
* 本教程示範如何在一個進程內開啟多個Consumer同時消費Topic。
* 注意全域Consumer數量不要超過訂閱的Topic總分區數。
*/
public class KafkaMultiConsumerDemo {
public static void main(String args[]) throws InterruptedException {
//設定JAAS設定檔的路徑。
/*
* 如果是SSL存取點執行個體,請取消以下一行代碼的注釋。
JavaKafkaConfigurer.configureSasl();
*/
/*
* 如果是SASL存取點PLAIN機制執行個體,請取消以下一行代碼的注釋。
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* 如果是SASL存取點SCRAM機制執行個體,請取消以下一行代碼的注釋。
JavaKafkaConfigurer.configureSaslScram();
*/
//載入kafka.properties。
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//設定存取點,請通過控制台擷取對應Topic的存取點。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
/*
* 如果是SSL存取點執行個體,請取消以下四行代碼的注釋。
* 與SASL路徑類似,該檔案也不能被打包到JAR中。
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* 根憑證儲存的密碼,保持不變。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* 接入協議,目前支援使用SASL_SSL協議接入。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* SASL鑒權方式,保持不變。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* 如果是SASL存取點PLAIN機制執行個體,請取消以下兩行代碼的注釋。
* 接入協議。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* Plain方式。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* 如果是SASL存取點SCRAM機制執行個體,請取消以下兩行代碼的注釋。
* 接入協議。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* SCRAM方式。
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
*/
//兩次Poll之間的最大允許間隔。
//消費者超過該值沒有返回心跳,服務端判斷消費者處於非存活狀態,服務端將消費者從Group移除並觸發Rebalance,預設30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次Poll的最大數量。
//注意該值不要改得太大,如果Poll太多資料,而不能在下次Poll之前消費完,則會觸發一次負載平衡,產生卡頓。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//訊息的還原序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//當前消費執行個體所屬的消費組,請在控制台申請之後填寫。
//屬於同一個組的消費執行個體,會負載消費訊息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
/*
* 如果是SSL存取點執行個體,請取消以下一行代碼的注釋。
* 構造消費對象,也即產生一個消費執行個體。
* Hostname校正改成空。
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
*/
int consumerNum = 2;
Thread[] consumerThreads = new Thread[consumerNum];
for (int i = 0; i < consumerNum; i++) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
consumerThreads[i] = new Thread(kafkaConsumerRunner);
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].start();
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].join();
}
}
static class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
while (!closed.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
//必須在下次Poll之前消費完這些資料, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
} catch (WakeupException e) {
//如果關閉則忽略異常。
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
//可以被另一個線程調用的關閉Hook。
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
常見問題
雲訊息佇列 Kafka 版的SASL_SSL認證怎麼配置?
訪問上文準備配置的第一步中的地址,下載SSL認證到本地路徑,配置Demo工程中kafka.properties設定檔的ssl.truststore.location
參數,即將SSL認證本地路徑配置在此參數中。
使用Java SDK通過執行個體存取點收發訊息時,可以綁定自己的SSL認證嗎?
不支援。建議您使用ApsaraMQ for Kafka提供的SSL認證。
相關文檔
除了使用執行個體存取點接入執行個體外,還可使用Spring Cloud架構接入執行個體並收發訊息,請參見使用Spring Cloud架構收發訊息。
若使用Java SDK不能成功收發訊息,請檢查執行個體的運行狀態是否健康,請參見執行個體運行健康自檢指南。