本文介紹如何使用Ruby SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。
環境準備
您已安裝Ruby。更多資訊,請參見安裝Ruby。
安裝Ruby依賴庫
執行以下命令安裝Ruby依賴庫。
gem install ruby-kafka -v 0.6.8
準備配置
- 可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。
訪問Aliware-kafka-demos,單擊,下載Demo工程到本地並解壓。
- 在解壓的Demo工程找到kafka-ruby-demo檔案夾,根據存取點類型開啟對應的檔案夾,配置producer.ruby檔案和consumer.ruby檔案。
表 1. 配置項說明 參數 描述 brokers SSL存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。 topic Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。 username SASL使用者名稱。如果是預設存取點,則無此配置項。 說明- 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面的配置信息地區擷取預設的用户名和密码。
- 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權。
password SASL使用者名稱密碼。如果是預設存取點,則無此配置項。 consumerGroup Group名稱。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。 - 配置完成後,將設定檔所在檔案夾下的全部檔案(如果是SSL存取點執行個體,包含認證SSL根憑證檔案),上傳至伺服器Ruby安裝目錄下。
發送訊息
執行以下命令發送訊息。
ruby producer.ruby
關於代碼中配置項說明,請參見配置項說明。
訊息程式producer.ruby程式碼範例如下:
說明 範例程式碼為SSL存取點的代碼。您需要根據實際存取點類型,刪除或者修改配置項,其餘代碼請根據加粗代碼注釋修改。
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-producer", #如果是預設存取點,取值需修改為“simple-producer”。
logger: logger,
# put "./cert.pem" to anywhere this can read
#如果是預設存取點,刪除以下三行代碼。
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
producer.deliver_messages
producer.shutdown
end
訂閱訊息
執行以下命令消費訊息。
ruby consumer.ruby
訊息程式consumer.ruby範例程式碼如下:
關於代碼中配置項說明,請參見配置項說明。
說明 範例程式碼為SSL存取點的代碼。您需要根據實際存取點類型,刪除或者修改配置項,其餘代碼請根據加粗代碼注釋修改。
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-consumer", #如果是預設存取點,取值需修改為“test”
socket_timeout: 20,
logger: logger,
# put "./cert.pem" to anywhere this can read
#如果是預設存取點,刪除以下三行代碼。
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Get message: #{message.value}")
end
rescue Kafka::ProcessingError => e
warn "Got error: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
end