すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:サンプルコード

最終更新日:Aug 16, 2024

ApsaraMQ for RocketMQ 5.xインスタンスは、RocketMQ 3.xまたは4.x SDKを使用するクライアントと互換性があります。 これらのSDKを使用してApsaraMQ for RocketMQ 5.xインスタンスに接続し、メッセージを送受信できます。 このトピックでは、RocketMQ 3.xまたは4.x SDK for Javaを使用してメッセージを送受信するために使用されるサンプルコードを提供します。

重要
  • 最新のRocketMQ 5.x SDKを使用することを推奨します。 これらのSDKは、ApsaraMQ for RocketMQ 5.xブローカーと完全に互換性があり、より多くの機能と強化された機能を提供します。 詳細については、「リリースノート」をご参照ください。

  • Alibaba Cloudは、RocketMQ 3.x、4.x、およびTCPクライアントSDKのみを保持します。 既存のビジネスにのみ使用することを推奨します。

通常のメッセージの送受信

同期伝送モードで通常のメッセージを送信する

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the virtual private cloud (VPC) information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist a message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

非同期伝送モードで通常のメッセージを送信する

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQAsyncProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        // The message is sent. 
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }
        // Block the current thread for 3 seconds and wait for the result to return. 
        TimeUnit.SECONDS.sleep(3);

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

一方向送信モードで通常のメッセージを送信する

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOnewayProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

通常のメッセージを購読する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQPushConsumer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

注文メッセージの送受信

順序付けられたメッセージを送信する

import java.util.List;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                        "YOUR MESSAGE TAG",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // Note: This parameter is required. Ordered messages can be evenly distributed to each queue only after this parameter is configured. 
                // If you use an ApsaraMQ for RocketMQ 5.x instance, you can replace the following line with msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Select a partition selection algorithm that is suitable for your business. The algorithm can be used to ensure that the results of the same parameter are the same. 
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

注文されたメッセージを購読する

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        /**
         * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // If consumption fails, the request is suspended and retried. The following value is returned: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT. 
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

スケジュールまたは遅延メッセージの送受信

スケジュールまたは遅延メッセージの送信

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQDelayProducer {
    /**
     * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
     * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
     * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
     * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
        // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
        // DefaultMQProducer producer = new DefaultMQProducer();

        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        producer.setProducerGroup("YOUR GROUP ID");

        // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
        producer.setAccessChannel(AccessChannel.CLOUD);

        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
        // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                // The topic that you created in the ApsaraMQ for RocketMQ console. 
                Message msg = new Message("YOUR TOPIC",
                        // The message tag. 
                        "YOUR MESSAGE TAG",
                        // The message content. 
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // The delay time for sending the delayed message. Unit: milliseconds. After you configure this parameter, the message is sent after the specified period of time. For example, if you set this parameter to 3000, the message is sent after 3 seconds. 
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // The scheduled time for sending the scheduled message. After you configure this parameter, the message is sent at the specified point in time. For example, if you set this parameter to 2021-08-10 18:45:00, the message is sent at 18:45:00 on August 10, 2021. 
                // Specify the time in the yyyy-MM-dd HH:mm:ss format. If you specify a time that is earlier than the current time, the message is immediately sent to the consumer. 
                // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // The logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Before you exit the application, shut down the producer. 
        // Note: If you shut down the producer, memory can be saved. If you need to frequently send messages, do not shut down the producer. 
        producer.shutdown();
    }
}

スケジュールまたは遅延メッセージの購読

スケジュールまたは遅延メッセージをサブスクライブするために使用されるサンプルコードは、通常のメッセージをサブスクライブするために使用されるサンプルコードと同じです。 詳細については、「通常のメッセージの購読」をご参照ください。

トランザクションメッセージの送受信

トランザクションメッセージの送信

  1. 次のサンプルコードは、ハーフメッセージを送信してローカルトランザクションを実行する方法の例を示しています。

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook and enter the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
             * Note: Do not enter the AccessKey pair of your Alibaba Cloud account. 
             * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password of the instance. The ApsaraMQ for RocketMQ broker automatically obtains the username and password based on the VPC information. 
             * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // If you use the public endpoint to access the ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
            // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. Note: Transactional messages cannot share a group with messages of other types. 
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // If you use the VPC endpoint to access the ApsaraMQ for RocketMQ instance, you do not need to configure RPCHook. 
            // If the instance is a serverless ApsaraMQ for RocketMQ instance, you must configure RPCHook. 
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // Set the AccessChannel parameter to Alibaba Cloud. If you want to enable the message trace feature, you must configure this parameter. If you want to disable the message trace feature, leave this parameter empty. 
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
    
            // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The format of the endpoint is similar to rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
            // Note: Enter the domain name and port number that are displayed in the ApsaraMQ for RocketMQ console. Do not include the http:// or https:// prefix or use a resolved IP address. 
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override
                        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("Start executing the local transaction: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. 次のサンプルコードは、トランザクションメッセージのステータスをコミットする方法を示しています。

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    // The class that is used to check the status of local transactions implemented by sending ApsaraMQ for RocketMQ transactional messages. 
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        // The local transaction checker. For more information, see Transactional messages. 
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

トランザクションメッセージの購読

トランザクションメッセージのサブスクライブに使用されるサンプルコードは、通常のメッセージのサブスクライブに使用されるサンプルコードと同じです。 詳細については、「通常のメッセージの送受信」をご参照ください。