This topic describes how to use the SDK for C# to connect to ApsaraMQ for Kafka to send and receive messages.
Environment requirements
.NET is installed. For more information, visit the download page of .NET.
Install the C# library
Run the following command to install the C# library:
dotnet add package -v 1.5.2 Confluent.Kafka
Create configuration files
(Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.
Configure the producer.cs and consumer.cs files.
Table 1. Parameters Parameter
Description
BootstrapServers
The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
SslCaLocation
The path of the SSL root certificate that you downloaded. This parameter is required only if you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance.
SaslMechanism
The security mechanism that you want to use to send and receive messages.
If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SaslMechanism.Plain.
If you use the Simple Authentication and Security Layer (SASL) endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SaslMechanism.Plain to specify the PLAIN mechanism or set this parameter to SaslMechanism.ScramSha256 to specify the Salted Challenge Response Authentication Mechanism (SCRAM).
SecurityProtocol
The security protocol that you want to use to send and receive messages.
If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SecurityProtocol.SaslSsl.
If you use the SASL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SecurityProtocol.SaslPlaintext if the PLAIN mechanism is used, or set this parameter to SecurityProtocol.SaslPlaintext if the SCRAM mechanism is used.
SaslUsername
The username of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.
NoteIf the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.
SaslPassword
The password of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.
topic
The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.
GroupId
The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.
Send messages
Run the following command to run producer.cs to send messages:
dotnet run producer.cs
The following sample code provides an example of producer.cs.
For information about the parameters in the sample code, see Parameters.
In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the ApsaraMQ for Kafka instance.
using System;
using Confluent.Kafka;
class Producer
{
public static void Main(string[] args)
{
var conf = new ProducerConfig {
BootstrapServers = "XXX,XXX,XXX",
SslCaLocation = "XXX/only-4096-ca-cert.pem",
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
SaslUsername = "XXX",
SaslPassword = "XXX",
};
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
string topic ="XXX";
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
for (int i=0; i<100; ++i)
{
p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
}
p.Flush(TimeSpan.FromSeconds(10));
}
}
}
Receive messages
Run the following command to run consumer.cs to receive messages:
dotnet run consumer.cs
The following sample code provides an example of consumer.cs.
For information about the parameters in the sample code, see Parameters.
In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the ApsaraMQ for Kafka instance.
using System;
using System.Threading;
using Confluent.Kafka;
class Consumer
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig {
GroupId = "XXX",
BootstrapServers = "XXX,XXX,XXX",
SslCaLocation = "XXX/only-4096-ca-cert.pem",
SaslMechanism = SaslMechanism.Plain,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = "XXX",
SaslPassword = "XXX",
AutoOffsetReset = AutoOffsetReset.Earliest
};
string topic = "XXX";
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}