すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:サンプルコード

最終更新日:Nov 11, 2024

このトピックでは、Apache RocketMQ SDK for Javaを使用してメッセージを送受信するためのサンプルコードについて説明します。

サンプルコード

重要

serverless ApsaraMQ for RocketMQインスタンスを使用している場合は、インターネット経由でインスタンスにアクセスするときに、SDK for Javaのバージョンに注意してください。 インターネット経由でサーバーレスApsaraMQ for RocketMQインスタンスにアクセスできるのは、特定のバージョンのSDK for Javaのみです。 詳細については、「インターネット経由での Serverless インスタンスへのアクセスに関するバージョンの説明」をご参照ください。

rocketmq-client-java

メッセージタイプ

送信メッセージのサンプルコード

受信メッセージのサンプルコード

PushConsumer

SimpleConsumer

通常のメッセージ

PushConsumerExample.java

注文メッセージ

ProducerFifoMessageExample.java

スケジュールおよび遅延メッセージ

ProducerDelayMessageExample.java

トランザクションメッセージ

ProducerTransactionMessageExample.java

rocketmq-client

通常のメッセージ

同期伝送モードで通常のメッセージを送信する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;

public class RocketMQProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        producer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist a message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

非同期伝送モードで通常のメッセージを送信する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public class RocketMQAsyncProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        producer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        // The message is sent. 
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }
        // Block the current thread for 3 seconds and wait for the result to return. 
        TimeUnit.SECONDS.sleep(3);

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

一方向送信モードで通常のメッセージを送信する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;

public class RocketMQOnewayProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        producer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

通常のメッセージを購読する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQPushConsumer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        consumer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        consumer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

注文メッセージ

順序付けられたメッセージを送信する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class RocketMQOrderProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        producer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                        "YOUR MESSAGE TAG",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // Note: This parameter is required. Ordered messages can be evenly distributed to each queue only after this parameter is configured. 
                // If you use an ApsaraMQ for RocketMQ 5.x instance, you can replace the following line with msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Select a partition selection algorithm that is suitable for your business. The algorithm can be used to ensure that the results of the same parameter are the same. 
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

注文されたメッセージを購読する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        /**
         * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        consumer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        consumer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // If consumption fails, the request is suspended and retried. The following value is returned: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT. 
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

スケジュールおよび遅延メッセージ

スケジュールまたは遅延メッセージの送信

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;

public class RocketMQDelayProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        producer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                // The topic that you created in the ApsaraMQ for RocketMQ console. 
                Message msg = new Message("YOUR TOPIC",
                        // The message tag. 
                        "YOUR MESSAGE TAG",
                        // The message content. 
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // The delay time for sending the delayed message. Unit: milliseconds. After you configure this parameter, the message is sent after the specified period of time. For example, if you set this parameter to 3000, the message is sent after 3 seconds. 
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // The scheduled time for sending the scheduled message. After you configure this parameter, the message is sent at the specified point in time. For example, if you set this parameter to 2021-08-10 18:45:00, the message is sent at 18:45:00 on August 10, 2021. 
                // Specify the time in the yyyy-MM-dd HH:mm:ss format. If you specify a time that is earlier than the current time, the message is immediately sent to the consumer. 
                // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

スケジュールまたは遅延メッセージをサブスクライブするために使用されるサンプルコードは、通常のメッセージをサブスクライブするために使用されるサンプルコードと同じです。

トランザクションメッセージ

トランザクションメッセージの送信

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQTransactionProducer {

    private static RPCHook getAclRPCHook() {
        /**
         * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: Transactional messages cannot share a group with messages of other types. 
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
        // If you want to enable the message trace feature for the SDK for Java of version 5.3.0 or later, you must also configure the EnableTrace parameter in addition to the AccessChannel parameter.
        transactionMQProducer.setEnableTrace(true);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
        transactionMQProducer.setTransactionListener(new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println("Start executing the local transaction: " + msg);
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgId());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        transactionMQProducer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message message = new Message("YOUR TRANSACTION TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
                assert sendResult != null;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

トランザクションメッセージのサブスクライブに使用されるサンプルコードは、通常のメッセージのサブスクライブに使用されるサンプルコードと同じです。

インターネット経由でのサーバーレスインスタンスへのアクセスに関するバージョンの説明

インターネット経由でサーバーレスのApsaraMQ for RocketMQインスタンスにアクセスしてメッセージを送受信する場合、SDK for Javaのバージョンが次の条件を満たしていることを確認し、コードに次の情報を追加する必要があります。

説明

InstanceIdをApsaraMQ for RocketMQインスタンスのIDに置き換えます。

  • rocketmq-client: バージョン5.2.0以降

    メッセージを送信するときに、コードに次の情報を追加します。producer.setNamespaceV2("InstanceId");

    メッセージを受信したときに、コードに次の情報を追加します。consumer.setNamespaceV2("InstanceId");

  • rocketmq-client-java: バージョン5.0.6以降

    メッセージを送受信するときに、コードに次の情報を追加します。

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();