すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:メッセージの購読

最終更新日:Jul 09, 2024

このトピックでは、TCPクライアントSDK forを使用してメッセージをサブスクライブする方法について説明します。ApsaraMQ for RocketMQが提供するNET。

説明

同じグループIDで識別されるすべてのコンシューマインスタンスに対して、一貫したサブスクリプションを維持する必要があります。 詳細については、「サブスクリプションの一貫性」をご参照ください。

ApsaraMQ for RocketMQは、次のサブスクリプションモードをサポートしています。

  • サブスクリプションのクラスター化

    同じグループIDによって識別されるすべてのコンシューマは、等しい数のメッセージを消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 クラスタリング消費モードでは、各コンシューマは3つのメッセージを消費します。

        // Configure clustering subscription, which is the default mode.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • 放送サブスクリプション

    同じグループIDによって識別される消費者の各々は、すべてのメッセージを1回消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 ブロードキャスト消費モードでは、各消費者は9つのメッセージすべてを消費する。

        // Configure broadcasting subscription.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

サンプルコード:

using System;
using System.Threading;
using System.Text;
using ons;

// The callback function that is executed when a message is pulled from the ApsaraMQ for RocketMQ broker. 
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      // The AccessKey secret that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // Clustering consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // Broadcasting consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create the consumer instance. 
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to the topic. 
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance. 
        consumer.start();

        // This setting is used only in this demo. In actual production environments, you cannot exit the process. 
        Thread.Sleep(300000);

        // Before you exit the process, terminate the consumer instance. 
        consumer.shutdown();
    }
}