全部產品
Search
文件中心

ApsaraMQ for Kafka:C# SDK收發訊息

更新時間:Dec 27, 2024

本文介紹如何使用C# SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。

環境配置

您已安裝.NET。更多資訊,請參見安裝.NET

安裝C#依賴庫

執行以下命令安裝C#依賴庫。

dotnet add package -v 1.5.2 Confluent.Kafka

準備配置

  1. 可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。

  2. 配置producer.csconsumer.cs檔案。

    表 1. 配置項說明

    參數

    描述

    BootstrapServers

    SSL存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    SslCaLocation

    下載的SSL根憑證的路徑。僅SSL存取點需要此配置項。

    SaslMechanism

    收發訊息使用的安全機制。

    • SSL存取點:取值為SaslMechanism.Plain

    • SASL存取點:PLAIN機制時,取值為SaslMechanism.Plain;SCRAM機制時,取值為SaslMechanism.ScramSha256

    SecurityProtocol

    收發訊息使用的安全性通訊協定。

    • SSL存取點:取值為SecurityProtocol.SaslSsl

    • SASL存取點:PLAIN機制時,取值為SecurityProtocol.SaslPlaintext;SCRAM機制時,取值為SecurityProtocol.SaslPlaintext

    SaslUsername

    SASL使用者名稱。如果是預設存取點,則無此配置項。

    說明
    • 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台实例详情頁面的配置信息地區擷取預設的用户名密码
    • 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權

    SaslPassword

    SASL使用者名稱密碼。如果是預設存取點,則無此配置項。

    topic

    Topic名稱。您可在雲訊息佇列 Kafka 版控制台Topic 管理頁面擷取。

    GroupId

    Group名稱。您可在雲訊息佇列 Kafka 版控制台Group 管理頁面擷取。

發送訊息

執行以下命令發送訊息。

dotnet run producer.cs

訊息程式producer.cs範例程式碼如下:

關於代碼中配置項說明,請參見配置項說明

重要

範例程式碼為SSL存取點的代碼。您需要根據實際存取點類型,刪除或者修改配置項代碼。

using System;
using Confluent.Kafka;

class Producer
{
    public static void Main(string[] args)
    {
        var conf = new ProducerConfig {
            BootstrapServers = "XXX,XXX,XXX",
            SslCaLocation = "XXX/only-4096-ca-cert.pem",
            SaslMechanism = SaslMechanism.Plain,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
            SaslUsername = "XXX",
            SaslPassword = "XXX",
            };

        Action<DeliveryReport<Null, string>> handler = r =>
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");

        string topic ="XXX";

        using (var p = new ProducerBuilder<Null, string>(conf).Build())
        {
            for (int i=0; i<100; ++i)
            {
                p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
            }
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }
}

訂閱訊息

執行以下命令消費訊息。

dotnet run consumer.cs

訊息程式consumer.cs範例程式碼如下:

關於代碼中配置項說明,請參見配置項說明

重要

範例程式碼為SSL存取點的代碼。您需要根據實際存取點類型,刪除或者修改配置項代碼。

using System;
using System.Threading;
using Confluent.Kafka;

class Consumer
{
    public static void Main(string[] args)
    {
        var conf = new ConsumerConfig {
            GroupId = "XXX",
            BootstrapServers = "XXX,XXX,XXX",
            SslCaLocation = "XXX/only-4096-ca-cert.pem",
            SaslMechanism = SaslMechanism.Plain,
            SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslUsername = "XXX",
            SaslPassword = "XXX",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        string topic = "XXX";

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                c.Close();
            }
        }
    }
}