All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive transactional messages

Last Updated:Dec 13, 2023

This topic provides sample code based on which you can use the TCP client SDK for Java of the Community Edition to send and receive transactional messages.

Interaction process

The following figure shows the interaction process of transactional messages.

process

For more information, see Transactional messages.

Prerequisites

Before you start, make sure that the following operations are performed:

Send transactional messages

The following steps are required to send a transactional message:

  1. Send a half message and execute the corresponding local transaction. Sample code:

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
            * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
            * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
            */       
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
             /**
             * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
             * If you do not want to enable the message trace feature, you can use the following method to create a producer:
             * TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
             */
             TransactionMQProducer transactionMQProducer = new TransactionMQProducer(null, "YOUR TRANSACTION GROUP ID", getAclRPCHook(), true, null);
             /**
           * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
           */
         transactionMQProducer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
            // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("Start executing the local transaction: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. Commit the status of the transactional message.

    After the local transaction is executed, the ApsaraMQ for RocketMQ broker must be notified of the transaction status of the current message, regardless of whether the execution is successful. The ApsaraMQ for RocketMQ broker can be notified by using one of the following methods:

    • Commit the status after the local transaction is executed.

    • Wait until the ApsaraMQ for RocketMQ broker sends a request to check the transaction status of the message.

    A transaction can be in one of the following states:

    • LocalTransactionState.COMMIT_MESSAGE: The transaction is committed. The message can be consumed by consumers.

    • LocalTransactionState.ROLLBACK_MESSAGE: The transaction is rolled back. The message is discarded and cannot be consumed.

    • LocalTransactionState.UNKNOW: The status of the transaction is unknown, and the system is waiting for the ApsaraMQ for RocketMQ broker to query the status of the local transaction that corresponds to the message.

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * The class that is used to check the status of local transactions implemented by sending ApsaraMQ for RocketMQ transactional messages. 
     */
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        /**
         * The local transaction checker. For more information, see Transactional messages. 
         */
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

Mechanism of transaction status check

  • Why must the mechanism of transaction status check be implemented when transactional messages are sent?

    If the half message is sent in Step 1 but TransactionStatus.Unknow is returned or no status is committed for the local transaction due to application exit, the status of the half message is unknown to the ApsaraMQ for RocketMQ broker. In this case, the broker periodically requests the sender to check and report the status of the half message.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages must contain the logic that is used to check transaction consistency. After a transactional message is sent, ApsaraMQ for RocketMQ must call the LocalTransactionChecker API operation to respond to the status check request from the broker for the local transaction. Therefore, the method that is used to check transactional messages must achieve the following objectives:

    1. Check the status (committed or rollback) of the local transaction that corresponds to the half message.

    2. Commit the status of the local transaction that corresponds to the half message to the broker.

Subscribe to transactional messages

The method that is used to subscribe to transactional messages is the same as the method that is used to subscribe to normal messages. Sample code:

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Create a consumer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // The endpoint of the ApsaraMQ for RocketMQ instance. 
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}