This topic describes how to use the SDK for Ruby to connect to an endpoint of a ApsaraMQ for Kafka instance and send and subscribe to messages.
Environment requirements
Ruby is installed. For more information, see Download Ruby.
Install the Ruby dependency library
gem install ruby-kafka -v 0.6.8
Prepare configuration files
Optional:Download the SSL root certificate. If you use the SSL endpoint to connect to your Message Queue for Apache Kafka instance, you must download this certificate.
Go to the Aliware-kafka-demos page, click the icon to download the demo project to your on-premises machine, and then decompress the demo project.
In the decompressed package, go to the kafka-ruby-demo folder. Then, open the corresponding folder based on the endpoint that you want to use, and configure the producer.ruby file and the consumer.ruby file in the folder.
Table 1. Parameters Parameter
Description
brokers
The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic
The name of the topic. You can obtain the name of the topic on the Topics page in the ApsaraMQ for Kafka console.
username
The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
Note- If the ACL feature is not enabled for your ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
- If the ACL feature is enabled for your ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
password
The password of the SASL user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
consumerGroup
The ID of the consumer group. You can obtain the ID of the consumer group on the Groups page in the ApsaraMQ for Kafka console.
After the required parameters are configured, upload all files in the folder in which the configuration file is located to the Ruby installation directory on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.
Send messages
Run the following command to run producer.ruby to send messages:
ruby producer.ruby
For information about the parameters in the code, see Parameters.
The following sample code provides an example of producer.ruby:
In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the Message Queue for Apache Kafka instance, and modify the other code based on the comments that are formatted in bold.
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-producer", # If you use the default endpoint, change the value to simple-producer.
logger: logger,
# put "./cert.pem" to anywhere this can read
# If you use the default endpoint, delete the following three lines:
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
producer.deliver_messages
producer.shutdown
end
Subscribe to messages
Run the following command to run consumer.ruby to consume messages:
ruby consumer.ruby
The following sample code provides an example of consumer.ruby:
For information about the parameters in the code, see Parameters.
In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the Message Queue for Apache Kafka instance, and modify the other code based on the comments that are formatted in bold.
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-consumer", # If you use the default endpoint, change the value to test.
socket_timeout: 20,
logger: logger,
# put "./cert.pem" to anywhere this can read
# If you use the default endpoint, delete the following three lines:
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Get message: #{message.value}")
end
rescue Kafka::ProcessingError => e
warn "Got error: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
end