Prerequisites
The required resources are created in ApsaraMQ for RocketMQ. For more information, see Step 2: Create resources.
IntelliJ IDEA is installed. For more information, see IntelliJ IDEA.
You can use IntelliJ IDEA or Eclipse. In the examples of this topic, IntelliJ IDEA Ultimate is used.
JDK 1.8 or later is installed. For more information, see Java Downloads.
Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.
Install the Java dependency library
Create a Java project in IntelliJ IDEA.
Add the following dependency to the pom.xml file to import the Java dependency library:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
Produce messages
In the created Java project, create a program that sends normal messages and run the program. Sample code:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
Consume messages
In the created Java project, create a program to subscribe to normal messages and run the program. ApsaraMQ for RocketMQ allows you to consume messages in simple mode and push mode. For more information, see SimpleConsumer and PushConsumer. You can select one of the modes to subscribe to messages. The following table describes the differences between simple consumers and push consumers.
Item | PushConsumer | SimpleConsumer |
Item | PushConsumer | SimpleConsumer |
API operation call | The callback operation is called to return the consumption result by using a message listener. Consumers can process the consumption logic only within the scope of the message listener. | Business applications implement message processing and call the corresponding operation to return the consumption result. |
Consumption concurrency management | ApsaraMQ forRocketMQ SDKs are used to manage the number of concurrent threads for message consumption. | The number of concurrent threads that are used for message consumption is based on the consumption logic of individual business applications. |
API flexibility | The API operations are encapsulated and provide poor flexibility. | The atomic operations provide great flexibility. |
Scenarios | This consumer type is suitable for development scenarios that do not require a custom process. | This consumer type is suitable for development scenarios that require custom processes. |
PushConsumer
SimpleConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
String topic = "Your Topic";
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
}
}
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
String topic = "Your Topic";
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
Duration awaitDuration = Duration.ofSeconds(10);
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setAwaitDuration(awaitDuration)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
int maxMessageNum = 16;
Duration invisibleDuration = Duration.ofSeconds(10);
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
}
Version description for accessing serverless instances over the Internet
Only SDKs of specific versions can access serverless ApsaraMQ for RocketMQ instances over the Internet.
SDK for Java 5.x
TCP client SDK for Java 1.x
If you access a serverless ApsaraMQ for RocketMQ instance over the Internet to send and receive messages, you must make sure that the version of the SDK for Java meets the following conditions and add the following information in the code:
Note
Replace InstanceId
with the ID of your ApsaraMQ for RocketMQ instance.
If you access a serverless ApsaraMQ for RocketMQ instance over the Internet to send and receive messages, you must make sure that the version of RocketMQ 1.x TCP client SDK for Java is 1.9.0.Final or later and add the following information in the code:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
Note
Replace InstanceId
with the ID of your ApsaraMQ for RocketMQ instance.
SDK parameters
Parameter | Example | Description |
Parameter | Example | Description |
endpoints | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | The endpoint of the ApsaraMQ for RocketMQ instance. For information about how to obtain the endpoint, see Obtain the endpoint of an instance. If you access the instance over the Internet, specify the public endpoint. If you access the instance in a VPC, specify the VPC endpoint.
|
InstanceId | rmq-cn-xxx | The ID of the ApsaraMQ for RocketMQ instance. |
topic | normal_test | The topic to which messages are sent or from which messages are consumed in the ApsaraMQ for RocketMQ instance. You must create the topic on the ApsaraMQ for RocketMQ instance in advance. For more information, see Create a topic. |
group | GID_test | The consumer group that is used by consumers to consume messages in the ApsaraMQ for RocketMQ instance. You must create the consumer group on the ApsaraMQ for RocketMQ instance in advance. For more information, see Create a consumer group. |
Instance UserName | 1XVg0hzgKm****** | The username of the ApsaraMQ for RocketMQ instance. If you access the instance over the Internet, you must specify the username. If you access the instance in a VPC, you need to specify the username only if the instance is a serverless instance and the authentication-free in VPCs feature is disabled for the instance. For information about how to obtain the username, see Obtain the username and password of an instance. |
Instance Password | ijSt8rEc45****** | The password of the ApsaraMQ for RocketMQ instance. If you access the instance over the Internet, you must specify the password. If you access the instance in a VPC, you need to specify the password only if the instance is a serverless instance and the authentication-free in VPCs feature is disabled for the instance. For information about how to obtain the password, see Obtain the username and password of an instance. |
Verify message consumption
After you consume a message, you can check the consumption status of the message in the ApsaraMQ for RocketMQ console.
Log on to the ApsaraMQ for RocketMQ console. On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane of the page that appears, click Message Tracing.
SDK references
For information about how to use SDKs for other programming languages to send and receive messages of other types, see Overview.