本文介紹如何使用Go SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。
環境準備
您已安裝Go。更多資訊,請參見安裝Go。
該kafka-confluent-go-demo不支援Windows系統。
準備配置
可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。
訪問aliware-kafka-demos,單擊表徵圖,然後在下拉框選擇Download ZIP,下載Demo工程並解壓。
在解壓的Demo工程中,找到kafka-confluent-go-demo檔案夾,將此檔案夾上傳在Linux系統的/home路徑下。
登入Linux系統,進入/home/kafka-confluent-go-demo路徑,修改設定檔conf/kafka.json。
{ "topic": "XXX", "group.id": "XXX", "bootstrap.servers" : "XXX:XX,XXX:XX,XXX:XX", "security.protocol" : "plaintext", "sasl.mechanism" : "XXX", "sasl.username" : "XXX", "sasl.password" : "XXX" }
參數
描述
topic
執行個體的Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。
group.id
執行個體的Group。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。
說明如果應用運行producer.go發送訊息,該參數可以不配置;如果應用運行consumer.go訂閱訊息,該參數必須配置。
bootstrap.servers
SSL存取點的IP地址以及連接埠。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
security.protocol
SASL使用者認證協議,預設為plaintext。各類型存取點對應取值如下:
預設存取點:plaintext。
SSL存取點:sasl_ssl。
SASL存取點:sasl_plaintext。
sasl.mechanism
訊息收發的機制。各類型存取點對應取值如下:
預設存取點:不涉及,無需配置。
SSL存取點:PLAIN。
SASL存取點:PLAIN機制需配置為PLAIN;SCRAM機制需配置為SCRAM-SHA-256。
sasl.username
SASL使用者名稱。如果是SSL存取點或SASL存取點,需配置該參數。
說明- 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面的配置信息地區擷取預設的用户名和密码。
- 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權。
sasl.password
SASL使用者密碼。如果是SSL存取點或SASL存取點,需配置該參數。
發送訊息
執行以下命令運行producer.go發送訊息。
go run -mod=vendor producer/producer.go
訊息程式producer.go範例程式碼如下:
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"path/filepath"
)
type KafkaConfig struct {
Topic string `json:"topic"`
GroupId string `json:"group.id"`
BootstrapServers string `json:"bootstrap.servers"`
SecurityProtocol string `json:"security.protocol"`
SslCaLocation string `json:"ssl.ca.location"`
SaslMechanism string `json:"sasl.mechanism"`
SaslUsername string `json:"sasl.username"`
SaslPassword string `json:"sasl.password"`
}
// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
workPath, err := os.Getwd()
if err != nil {
panic(err)
}
configPath := filepath.Join(workPath, "conf")
fullPath := filepath.Join(configPath, "kafka.json")
file, err := os.Open(fullPath);
if (err != nil) {
msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
panic(msg)
}
defer file.Close()
decoder := json.NewDecoder(file)
var config = &KafkaConfig{}
err = decoder.Decode(config);
if (err != nil) {
msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
panic(msg)
}
json.Marshal(config)
return config
}
func doInitProducer(cfg *KafkaConfig) *kafka.Producer {
fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
//common arguments
var kafkaconf = &kafka.ConfigMap{
"api.version.request": "true",
"message.max.bytes": 1000000,
"linger.ms": 10,
"retries": 30,
"retry.backoff.ms": 1000,
"acks": "1"}
kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers)
switch cfg.SecurityProtocol {
case "plaintext" :
kafkaconf.SetKey("security.protocol", "plaintext");
case "sasl_ssl":
kafkaconf.SetKey("security.protocol", "sasl_ssl");
kafkaconf.SetKey("ssl.ca.location", "conf/ca-cert.pem");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
kafkaconf.SetKey("enable.ssl.certificate.verification", "false");
kafkaconf.SetKey("ssl.endpoint.identification.algorithm", "None")
case "sasl_plaintext":
kafkaconf.SetKey("sasl.mechanism", "PLAIN")
kafkaconf.SetKey("security.protocol", "sasl_plaintext");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
default:
panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
}
producer, err := kafka.NewProducer(kafkaconf)
if err != nil {
panic(err)
}
fmt.Print("init kafka producer success\n")
return producer;
}
func main() {
// Choose the correct protocol
// 9092 for PLAINTEXT
// 9093 for SASL_SSL, need to provide sasl.username and sasl.password
// 9094 for SASL_PLAINTEXT, need to provide sasl.username and sasl.password
cfg := loadJsonConfig();
producer := doInitProducer(cfg)
defer producer.Close()
// Delivery report handler for produced messages
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := cfg.Topic
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
producer.Flush(15 * 1000)
}
訂閱訊息
執行以下命令運行consumer.go訂閱訊息。
go run -mod=vendor consumer/consumer.go
訊息程式consumer.go範例程式碼如下:
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"path/filepath"
)
type KafkaConfig struct {
Topic string `json:"topic"`
GroupId string `json:"group.id"`
BootstrapServers string `json:"bootstrap.servers"`
SecurityProtocol string `json:"security.protocol"`
SaslMechanism string `json:"sasl.mechanism"`
SaslUsername string `json:"sasl.username"`
SaslPassword string `json:"sasl.password"`
}
// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
workPath, err := os.Getwd()
if err != nil {
panic(err)
}
configPath := filepath.Join(workPath, "conf")
fullPath := filepath.Join(configPath, "kafka.json")
file, err := os.Open(fullPath);
if (err != nil) {
msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
panic(msg)
}
defer file.Close()
decoder := json.NewDecoder(file)
var config = &KafkaConfig{}
err = decoder.Decode(config);
if (err != nil) {
msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
panic(msg)
}
json.Marshal(config)
return config
}
func doInitConsumer(cfg *KafkaConfig) *kafka.Consumer {
fmt.Print("init kafka consumer, it may take a few seconds to init the connection\n")
//common arguments
var kafkaconf = &kafka.ConfigMap{
"api.version.request": "true",
"auto.offset.reset": "latest",
"heartbeat.interval.ms": 3000,
"session.timeout.ms": 30000,
"max.poll.interval.ms": 120000,
"fetch.max.bytes": 1024000,
"max.partition.fetch.bytes": 256000}
kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers);
kafkaconf.SetKey("group.id", cfg.GroupId)
switch cfg.SecurityProtocol {
case "plaintext" :
kafkaconf.SetKey("security.protocol", "plaintext");
case "sasl_ssl":
kafkaconf.SetKey("security.protocol", "sasl_ssl");
kafkaconf.SetKey("ssl.ca.location", "./conf/ca-cert.pem");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
kafkaconf.SetKey("ssl.endpoint.identification.algorithm", "None");
kafkaconf.SetKey("enable.ssl.certificate.verification", "false")
case "sasl_plaintext":
kafkaconf.SetKey("security.protocol", "sasl_plaintext");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
default:
panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
}
consumer, err := kafka.NewConsumer(kafkaconf)
if err != nil {
panic(err)
}
fmt.Print("init kafka consumer success\n")
return consumer;
}
func main() {
// Choose the correct protocol
// 9092 for PLAINTEXT
// 9093 for SASL_SSL, need to provide sasl.username and sasl.password
// 9094 for SASL_PLAINTEXT, need to provide sasl.username and sasl.password
cfg := loadJsonConfig();
consumer := doInitConsumer(cfg)
consumer.SubscribeTopics([]string{cfg.Topic}, nil)
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
consumer.Close()
}