ApsaraMQ for RocketMQ は、さまざまな種類のメッセージを送受信するための複数のプログラミング言語用 SDK を提供しています。このトピックでは、Java 用 SDK を使用して ApsaraMQ for RocketMQ ブローカーに接続し、通常のメッセージを送受信する方法について説明します。
前提条件
ApsaraMQ for RocketMQ に必要なリソースが作成されていること。詳細については、「ステップ 2: リソースを作成する」をご参照ください。
IntelliJ IDEA がインストールされていること。詳細については、「IntelliJ IDEA」をご参照ください。
IntelliJ IDEA または Eclipse を使用できます。このトピックの例では、IntelliJ IDEA Ultimate を使用しています。
JDK 1.8 以降がインストールされていること。詳細については、「Java ダウンロード」をご参照ください。
Maven 2.5 以降がインストールされていること。詳細については、「Apache Maven のダウンロード」をご参照ください。
Java 依存関係ライブラリをインストールする
IntelliJ IDEA で Java プロジェクトを作成します。
pom.xml ファイルに次の依存関係を追加して、Java 依存関係ライブラリをインポートします。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
メッセージを生成する
作成した Java プロジェクトで、通常のメッセージを送信するプログラムを作成し、実行します。サンプルコード:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* インスタンスのエンドポイント。エンドポイントは、ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページの [エンドポイント] タブで確認できます。
* ApsaraMQ for RocketMQ インスタンスのクライアントが Elastic Compute Service (ECS) インスタンスにデプロイされていて、内部ネットワークの ApsaraMQ for RocketMQ インスタンスにアクセスする場合、VPC エンドポイントを指定することをお勧めします。
* インターネットまたはデータセンターからインスタンスにアクセスする場合は、パブリックエンドポイントを指定できます。インターネット経由でインスタンスにアクセスする場合は、インスタンスのインターネットアクセス機能を有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// メッセージの送信先のトピックの名前。メッセージを受信するためにトピックを使用する前に、ApsaraMQ for RocketMQ コンソールでトピックを作成する必要があります。そうしないと、エラーが返されます。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* ApsaraMQ for RocketMQ インスタンスがサーバーレスインスタンスで、パブリックエンドポイントを使用してインスタンスにアクセスする場合は、インスタンス ID を指定する必要があります。
*/
//.setNamespace("InstanceId")
/**
* パブリックエンドポイントを使用して ApsaraMQ for RocketMQ インスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを指定する必要があります。ユーザー名とパスワードは、ApsaraMQ for RocketMQ コンソールのインスタンスに対応する [アクセス制御] ページの [インテリジェント認証] タブで取得できます。
* ApsaraMQ for RocketMQ インスタンスのクライアントが ECS インスタンスにデプロイされていて、内部ネットワークの ApsaraMQ for RocketMQ インスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。ブローカーは、VPC 情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレス ApsaraMQ for RocketMQ インスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。サーバーレスインスタンスの VPC で認証なし機能を有効にして、VPC 内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
/**
* プロデューサーを初期化するときに、使用するトピックを指定して、トピック設定が有効かどうかを確認し、無効なトピックが開始されないようにすることができます。
* 非トランザクションメッセージのトピックを指定する必要はありません。ブローカーは、トピックが有効かどうかを動的にチェックします。
* 注: 呼び出される API 操作でトランザクションメッセージのクエリが失敗しないようにするには、トランザクションメッセージのトピックを事前に指定する必要があります。
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
// 通常のメッセージを送信します。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// メッセージキー。キーを使用してメッセージを検索できます。
.setKeys("messageKey")
// メッセージタグ。コンシューマーはタグを使用してメッセージをフィルタリングできます。
.setTag("messageTag")
// メッセージ本文。
.setBody("messageBody".getBytes())
.build();
try {
// メッセージを送信します。結果をメモし、失敗などの例外をキャッチします。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}メッセージを使用する
作成した Java プロジェクトで、通常のメッセージをサブスクライブするプログラムを作成し、実行します。ApsaraMQ for RocketMQ では、シンプルモードとプッシュモードでメッセージを使用できます。詳細については、「SimpleConsumer」および「PushConsumer」をご参照ください。いずれかのモードを選択してメッセージをサブスクライブできます。次の表に、シンプルコンシューマーとプッシュコンシューマーの違いを示します。
項目 | PushConsumer | SimpleConsumer |
API 操作呼び出し | メッセージリスナーを使用してコールバック操作が呼び出され、使用結果が返されます。コンシューマーは、メッセージリスナーの範囲内でのみ使用ロジックを処理できます。 | ビジネスアプリケーションはメッセージ処理を実装し、対応する操作を呼び出して使用結果を返します。 |
同時実行使用管理 | ApsaraMQ for RocketMQ SDK を使用して、メッセージ使用の同時スレッド数を管理します。 | メッセージ使用に使用される同時スレッド数は、個々のビジネスアプリケーションの使用ロジックに基づいています。 |
API の柔軟性 | API 操作はカプセル化されており、柔軟性が低いです。 | アトミック操作は高い柔軟性を提供します。 |
シナリオ | このコンシューマータイプは、カスタムプロセスを必要としない開発シナリオに適しています。 | このコンシューマータイプは、カスタムプロセスを必要とする開発シナリオに適しています。 |
PushConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* インスタンスのエンドポイント。エンドポイントは、ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページの [エンドポイント] タブで確認できます。
* ApsaraMQ for RocketMQ インスタンスのクライアントが ECS インスタンスにデプロイされていて、内部ネットワークの ApsaraMQ for RocketMQ インスタンスにアクセスする場合、VPC エンドポイントを指定することをお勧めします。
* インターネットまたはデータセンターからインスタンスにアクセスする場合は、パブリックエンドポイントを指定できます。インターネット経由でインスタンスにアクセスする場合は、インスタンスのインターネットアクセス機能を有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// サブスクライブするトピック。トピックを指定する前に、ApsaraMQ for RocketMQ コンソールで事前にトピックを作成しておく必要があります。そうしないと、エラーが返されます。
String topic = "Your Topic";
// コンシューマーが属するコンシューマーグループ。コンシューマーグループを指定する前に、ApsaraMQ for RocketMQ コンソールで事前にコンシューマーグループを作成しておく必要があります。そうしないと、エラーが返されます。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* ApsaraMQ for RocketMQ インスタンスがサーバーレスインスタンスで、パブリックエンドポイントを使用してインスタンスにアクセスする場合は、インスタンス ID を指定する必要があります。
*/
//.setNamespace("InstanceId")
/**
* パブリックエンドポイントを使用して ApsaraMQ for RocketMQ インスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを指定する必要があります。ユーザー名とパスワードは、ApsaraMQ for RocketMQ コンソールのインスタンスに対応する [アクセス制御] ページの [インテリジェント認証] タブで取得できます。
* ApsaraMQ for RocketMQ インスタンスのクライアントが ECS インスタンスにデプロイされていて、内部ネットワークの ApsaraMQ for RocketMQ インスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。ブローカーは、VPC 情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレス ApsaraMQ for RocketMQ インスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。サーバーレスインスタンスの VPC で認証なし機能を有効にして、VPC 内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
// メッセージをフィルタリングするために使用されるルール。次の例では、トピック内のすべてのメッセージがサブスクライブされます。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// プッシュコンシューマーを初期化します。プッシュコンシューマーを初期化するときは、コンシューマーのコンシューマーグループ、通信パラメーター、およびサブスクリプションを指定する必要があります。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// コンシューマーグループ。
.setConsumerGroup(consumerGroup)
// サブスクリプション。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// メッセージリスナー。
.setMessageListener(messageView -> {
// メッセージを使用し、使用結果を返します。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// プッシュコンシューマーが不要になった場合は、プロセスをシャットダウンします。
//pushConsumer.close();
}
} SimpleConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* インスタンスのエンドポイント。エンドポイントは、ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページの [エンドポイント] タブで確認できます。
* ApsaraMQ for RocketMQ インスタンスのクライアントが ECS インスタンスにデプロイされていて、内部ネットワークの ApsaraMQ for RocketMQ インスタンスにアクセスする場合、VPC エンドポイントを指定することをお勧めします。
* インターネットまたはデータセンターからインスタンスにアクセスする場合は、パブリックエンドポイントを指定できます。インターネット経由でインスタンスにアクセスする場合は、インスタンスのインターネットアクセス機能を有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// サブスクライブするトピック。トピックを指定する前に、ApsaraMQ for RocketMQ コンソールで事前にトピックを作成しておく必要があります。そうしないと、エラーが返されます。
String topic = "Your Topic";
// コンシューマーが属するコンシューマーグループ。コンシューマーグループを指定する前に、ApsaraMQ for RocketMQ コンソールで事前にコンシューマーグループを作成しておく必要があります。そうしないと、エラーが返されます。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* ApsaraMQ for RocketMQ インスタンスがサーバーレスインスタンスで、パブリックエンドポイントを使用してインスタンスにアクセスする場合は、インスタンス ID を指定する必要があります。
*/
//.setNamespace("InstanceId")
/**
* パブリックエンドポイントを使用して ApsaraMQ for RocketMQ インスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを指定する必要があります。ユーザー名とパスワードは、ApsaraMQ for RocketMQ コンソールのインスタンスに対応する [アクセス制御] ページの [インテリジェント認証] タブで取得できます。
* ApsaraMQ for RocketMQ インスタンスのクライアントが ECS インスタンスにデプロイされていて、内部ネットワークの ApsaraMQ for RocketMQ インスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。ブローカーは、VPC 情報に基づいてユーザー名とパスワードを自動的に取得します。
* インスタンスがサーバーレス ApsaraMQ for RocketMQ インスタンスの場合、インターネット経由でインスタンスにアクセスするには、ユーザー名とパスワードを指定する必要があります。サーバーレスインスタンスの VPC で認証なし機能を有効にして、VPC 内のインスタンスにアクセスする場合、ユーザー名またはパスワードを指定する必要はありません。
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
Duration awaitDuration = Duration.ofSeconds(10);
// メッセージをフィルタリングするために使用されるルール。次の例では、トピック内のすべてのメッセージがサブスクライブされます。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// シンプルコンシューマーを初期化します。シンプルコンシューマーを初期化するときは、コンシューマーのコンシューマーグループ、通信パラメーター、およびサブスクリプションを指定する必要があります。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// コンシューマーグループ。
.setConsumerGroup(consumerGroup)
// ロングポーリングリクエストのタイムアウト期間。
.setAwaitDuration(awaitDuration)
// サブスクリプション。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// プルされるメッセージの最大数。
int maxMessageNum = 16;
// メッセージの非表示時間。
Duration invisibleDuration = Duration.ofSeconds(10);
// シンプルコンシューマーを使用してメッセージを使用する場合、クライアントはループ内でメッセージを取得して使用する必要があります。
// メッセージをリアルタイムで使用するには、複数のスレッドを使用してメッセージを同時にプルすることをお勧めします。
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// 使用が完了したら、コンシューマーは ACK メソッドを呼び出して、使用結果をブローカーにコミットする必要があります。
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// シンプルコンシューマーが不要になった場合は、プロセスをシャットダウンします。
// consumer.close();
}
} インターネット経由でサーバーレスインスタンスにアクセスするためのバージョン説明
特定のバージョンの SDK のみ、インターネット経由でサーバーレス ApsaraMQ for RocketMQ インスタンスにアクセスできます。
Java 5.x 用 SDK
インターネット経由でサーバーレス ApsaraMQ for RocketMQ インスタンスにアクセスしてメッセージを送受信する場合は、Java 用 SDK のバージョンが次の条件を満たしていることを確認し、コードに次の情報を追加する必要があります。
InstanceId を ApsaraMQ for RocketMQ インスタンスの ID に置き換えます。
rocketmq-client: バージョン 5.2.0 以降
メッセージを送信するときに、コードに次の情報を追加します:
producer.setNamespaceV2("InstanceId");メッセージを受信するときに、コードに次の情報を追加します:
consumer.setNamespaceV2("InstanceId");rocketmq-client-java: バージョン 5.0.6 以降
メッセージを送受信するときに、コードに次の情報を追加します。
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();
Java 1.x 用 TCP クライアント SDK
インターネット経由でサーバーレス ApsaraMQ for RocketMQ インスタンスにアクセスしてメッセージを送受信する場合は、Java 用 RocketMQ 1.x TCP クライアント SDK のバージョンが 1.9.0.Final 以降であることを確認し、コードに次の情報を追加する必要があります。
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
InstanceId を ApsaraMQ for RocketMQ インスタンスの ID に置き換えます。
SDK パラメーター
パラメーター | 例 | 説明 |
endpoints | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | ApsaraMQ for RocketMQ インスタンスのエンドポイント。エンドポイントの取得方法については、「インスタンスのエンドポイントを取得する」をご参照ください。
|
InstanceId | rmq-cn-xxx | ApsaraMQ for RocketMQ インスタンスの ID。 |
topic | normal_test | ApsaraMQ for RocketMQ インスタンスでメッセージを送信する、またはメッセージを使用するトピック。 事前に ApsaraMQ for RocketMQ インスタンスでトピックを作成する必要があります。詳細については、「トピックを作成する」をご参照ください。 |
group | GID_test | コンシューマーが ApsaraMQ for RocketMQ インスタンスでメッセージを使用するために使用するコンシューマーグループ。 事前に ApsaraMQ for RocketMQ インスタンスでコンシューマーグループを作成する必要があります。詳細については、「コンシューマーグループを作成する」をご参照ください。 |
Instance UserName | 1XVg0hzgKm****** | ApsaraMQ for RocketMQ インスタンスのユーザー名。インターネット経由でインスタンスにアクセスする場合は、ユーザー名を指定する必要があります。VPC 内でインスタンスにアクセスする場合は、インスタンスがサーバーレスインスタンスで、インスタンスの VPC で認証なし機能が無効になっている場合にのみ、ユーザー名を指定する必要があります。 ユーザー名の取得方法については、「インスタンスのユーザー名とパスワードを取得する」をご参照ください。 |
Instance Password | ijSt8rEc45****** | ApsaraMQ for RocketMQ インスタンスのパスワード。インターネット経由でインスタンスにアクセスする場合は、パスワードを指定する必要があります。VPC 内でインスタンスにアクセスする場合は、インスタンスがサーバーレスインスタンスで、インスタンスの VPC で認証なし機能が無効になっている場合にのみ、パスワードを指定する必要があります。 パスワードの取得方法については、「インスタンスのユーザー名とパスワードを取得する」をご参照ください。 |
メッセージの使用状況を確認する
メッセージを使用した後、ApsaraMQ for RocketMQ コンソールでメッセージの使用状況を確認できます。
ApsaraMQ for RocketMQ コンソール にログインします。インスタンス数 ページで、管理するインスタンスの名前をクリックします。
表示されるページの左側のナビゲーションウィンドウで、メッセージトレース をクリックします。
SDK リファレンス
他のプログラミング言語用 SDK を使用して他の種類のメッセージを送受信する方法については、「概要」をご参照ください。