このトピックでは、ApsaraMQ for RocketMQが提供するTCPクライアントSDK for Javaを使用してメッセージをサブスクライブする方法について説明します。
サブスクリプションモード
ApsaraMQ for RocketMQは、次のサブスクリプションモードをサポートしています。
サブスクリプションのクラスター化
同じグループIDによって識別されるすべてのコンシューマは、等しい数のメッセージを消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 クラスタリング消費モードでは、各コンシューマは3つのメッセージを消費します。 次のコードは、クラスター化サブスクリプションモードを設定する方法を示しています。
// Configure clustering subscription, which is the default mode. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
放送サブスクリプション
同じグループIDによって識別される消費者の各々は、すべてのメッセージを1回消費する。 たとえば、トピックには9つのメッセージが含まれ、消費者グループには3つの消費者が含まれます。 ブロードキャスト消費モードでは、各消費者は9つのメッセージを消費する。 次のコードは、ブロードキャストサブスクリプションモードを設定する方法を示しています。
// Configure broadcasting subscription. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
同じグループIDで識別されるすべてのコンシューマインスタンスに対して、一貫したサブスクリプションを維持する必要があります。 詳細については、「サブスクリプションの一貫性」をご参照ください。
前述のサブスクリプションモードにはさまざまな制限が課されます。 たとえば、ブロードキャストサブスクリプションモードでは、順序付きメッセージの送受信、消費の進行状況の維持、消費者オフセットのリセットはできません。 詳細については、「クラスタリング消費とブロードキャスト消費」をご参照ください。
メッセージを取得するモード
ApsaraMQ for RocketMQでは、次のいずれかのモードを使用してメッセージを取得できます。
プッシュ: メッセージはApsaraMQ for RocketMQからコンシューマにプッシュされます。 プッシュモードでは、ApsaraMQ for RocketMQはバッチ消費機能をサポートします。 この機能を使用すると、コンシューマーにメッセージを一括で送信できます。 詳細については、「バッチ消費」をご参照ください。
プル: メッセージは、コンシューマによってApsaraMQ for RocketMQからプルされます。
プッシュモードと比較して、プルモードはメッセージ受信でより多くのオプションを提供し、メッセージプルでより多くの自由を可能にします。 詳細については、「メソッドとパラメーター」をご参照ください。
プルコンシューマを使用するには、ApsaraMQ for RocketMQインスタンスがEnterprise Platinum Editionであることを確認します。
プルコンシューマは、仮想プライベートクラウド (VPC) でのみApsaraMQ for RocketMQインスタンスにアクセスできます。
サンプルコード
サンプルコードの詳細については、「ApsaraMQ For RocketMQコードリポジトリ」をご参照ください。 次の項目では、プッシュモードとプルモードを使用してメッセージをサブスクライブする方法のサンプルコードを示します。
プッシュモード
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.GROUP_ID, "XXX"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); // The clustering subscription mode, which is the default mode. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); // The broadcasting consumption mode. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING); Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe to multiple tags. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); // Subscribe to another topic. To unsubscribe from a topic, delete the code for subscription and restart the consumer. // Subscribe to all tags. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
プッシュモード (バッチ消費)
重要ApsaraMQ for RocketMQが提供するバッチ消費機能を使用するには、TCPクライアントSDK for Javaをバージョン1.8.7.3以降にアップグレードします。 詳細については、「リリースノート」をご参照ください。
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.batch.BatchConsumer; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import java.util.List; import java.util.Properties; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.tcp.example.MqConfig; public class SimpleBatchConsumer { public static void main(String[] args) { Properties consumerProperties = new Properties(); // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR); // The maximum number of messages to be consumed at a time. In this example, the value is specified as 128. If the number of messages cached in the specified topic reaches this value, the SDK immediately calls the callback method for the consumer to consume the messages. Valid values: 1 to 1024. Default value: 32. consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128)); // The maximum wait time between two consecutive batches. In this example, the value is specified as 10 seconds. If the specified wait time is reached, the SDK immediately calls the callback method for the consumer to consume messages. Valid values: 0 to 450. Default value: 0. Unit: seconds. consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10)); BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties); batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() { @Override public Action consume(final List<Message> messages, ConsumeContext context) { System.out.printf("Batch-size: %d\n", messages.size()); // Process messages in batches. return Action.CommitMessage; } }); // Start the consumer for batch consumption. batchConsumer.start(); System.out.println("Consumer start success."); // Wait for a specific period of time to prevent the process from exiting. try { Thread.sleep(200000); } catch (InterruptedException e) { e.printStackTrace(); } } }
プルモード
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.PullConsumer; import com.aliyun.openservices.ons.api.TopicPartition; import java.util.List; import java.util.Properties; import java.util.Set; public class PullConsumerClient { public static void main(String[] args){ Properties properties = new Properties(); // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx"); // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. // The AccessKey ID that is used for authentication. properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // The AccessKey secret that is used for authentication. properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx"); PullConsumer consumer = ONSFactory.createPullConsumer(properties); // Start the consumer. consumer.start(); // Query all partitions in topic-xxx. Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx"); // Specify the partition from which you want to pull messages. consumer.assign(topicPartitions); while (true) { // Pull messages. Specify the timeout period as 3,000 milliseconds. List<Message> messages = consumer.poll(3000); System.out.printf("Received message: %s %n", messages); } } }
パーティションとオフセットについては、「Terms」をご参照ください。
追加情報
ApsaraMQ For RocketMQでのコンシューマースロットリングのベストプラクティスについては、「RocketMQクライアントトラフィック制御デザイン」をご参照ください。