All Products
Search
Document Center

ApsaraMQ for RocketMQ:Transactional messages

Last Updated:Aug 15, 2024

Transactional messages are a type of featured message provided by ApsaraMQ for RocketMQ. This topic describes the scenarios, working mechanism, limits, use methods, and usage notes of transactional messages.

Scenarios

Distributed transactions

When the logic of a core business system is executed in a distributed system, multiple downstream systems are invoked to simultaneously process the logic. Therefore, the major issue that requires to be resolved for distributed transactions is to ensure the consistency of the execution results between the core business system and the downstream business systems.

事务消息诉求

In an e-commerce scenario, when a user places an order, changes in the downstream systems are also triggered. For example, shipment is initiated in the logistics system, user credit points are updated in the points system, and the items are cleared in the shopping cart system. In this case, the following transaction branches are involved:

  • Order system: The order status is changed from unpaid to paid.

  • Logistics system: A to-be-shipped record is added, and a shipment record is created.

  • Points system: The user credit points are updated.

  • Shopping cart system: Items are cleared, and user records are updated.

Traditional XA-based transaction solution: poor performance

The typical solution that is used to ensure result consistency among the preceding transaction branches is to use a distributed transaction system based on the eXtended Architecture (XA) protocol. The system encapsulates the changes into a large transaction that consists of four independent transaction branches. The XA-based transaction solution can ensure result consistency. However, a large number of resources must be locked during processing, which causes low system concurrency and poor performance. The number of locked resources increases with the number of transaction branches.

Normal message-based solution: poor result consistency

A simpler solution based on the XA-based transaction solution regards the change in the order system as a local transaction and the changes in downstream systems as downstream tasks. Transaction branches are regarded as a transaction of normal messages and order tables. This solution asynchronously processes messages to shorten the processing time and improve system concurrency.

普通消息方案

However, this solution may deliver inconsistent results between the core transaction and transaction branches. Examples:

  • The message is sent, but the order is not completed. In this case, the whole transaction must be rolled back.

  • The order is completed, but the message is not sent. In this case, the message must be sent for consumption again.

  • Timeout errors cannot be reliably detected, which makes it difficult to determine whether the order needs to be rolled back or an order change needs to be committed.

ApsaraMQ for RocketMQ transactional message solution: result consistency

The normal message-based solution cannot ensure result consistency because normal messages do not have the commit, rollback, and unified coordination capabilities of standalone database transactions.

Developed based on the normal message solution, the transactional message solution provided by ApsaraMQ for RocketMQ supports two-phase commit. The solution combines two-phase commit and local transaction to ensure global consistency of commit results.

事务消息

The transactional message solution provided by ApsaraMQ for RocketMQ features high performance, high scalability, and easy business development. For information about the working mechanism and processing workflow of transactional messages, see Working mechanism.

Working mechanism

What is a transactional message?

Transactional messages are a type of featured message provided by ApsaraMQ for RocketMQ to ensure the ultimate consistency between message production and local transaction.

Processing workflow

The following figure shows the process to use transactional messages.事务消息

  1. The producer sends a message to an ApsaraMQ for RocketMQ broker.

  2. The ApsaraMQ for RocketMQ broker persists the message and returns an acknowledgment (ACK) for the message to the producer. In this case, the message is marked as "not ready for delivery". A message in this state is called a half message.

  3. The producer executes the local transaction.

  4. The producer commits the execution result of the local transaction to the broker. The execution result can be Commit or Rollback. The following items describe the processing logic after the broker obtains the execution result:

    • If the execution result is Commit, the broker marks the half message as "ready for delivery" and delivers the message to the consumer.

    • If the execution result is Rollback, the broker rolls back the transaction and does not deliver the half message to the consumer.

  5. If the broker does not receive the execution result or the status of the half message is Unknown due to a network disconnection or producer restart, the broker waits a period of time and sends a request to a producer in the producer cluster to query the status of the half message.

    Note

    For information about the interval between two status queries and the maximum number of queries, see Limits on parameters.

  6. After the producer receives the request to query the status of the half message, the producer checks the execution result of the local transaction that corresponds to the half message.

  7. The producer commits the execution result to the broker based on the queried status of the local transaction. Then, the broker performs the operations in Step 4 to process the half message.

Lifecycle of a transactional message

事务消息

  • Initialization

    A half message is produced and initialized by the producer and is ready to be sent to a broker.

  • Transaction to be committed

    The half message is sent to the broker. Unlike a normal message, the half message is not persisted by the broker. Instead, the half message is stored in the transaction storage system and is not committed until the execution result of the local transaction is returned. In this phase, the message is invisible to downstream consumers.

  • Message rollback

    If the execution result of the local transaction is Rollback, the broker rolls back the half message and terminates the workflow.

  • Committed for consumption

    If the execution result of the local transaction is Commit, the broker stores the half message in the storage system. The message becomes visible and is ready to be consumed by downstream consumers.

  • Being consumed

    The message is obtained by the consumer and processed based on the local consumption logic defined by the consumer.

    In this process, the broker waits for the consumer to return the consumption result. If no response is received from the consumer in a specific period of time, ApsaraMQ for RocketMQ performs retries on the message. For more information, see Consumption retry.

  • Consumption result commit

    The consumer completes consumption and commits the consumption result to the broker. The broker marks whether the message is consumed.

    By default, ApsaraMQ for RocketMQ retains all messages. When the consumption result is committed, the message is marked as consumed instead of being immediately deleted. Messages are deleted only if the retention period expires or the system has insufficient storage space. Before a message is deleted, the consumer can re-consume the message.

  • Message deletion

    If the retention period of messages expires or the storage space is insufficient, ApsaraMQ for RocketMQ deletes the earliest stored messages from the physical file in a rolling manner. For more information, see Message storage and cleanup.

Limits

Message type consistency

Transactional messages can only be used in topics whose MessageType is set to Transaction.

Transaction-centered consumption

Transactional messages provided by ApsaraMQ for RocketMQ ensure only the result consistency between the local core transaction and downstream transaction branches. Downstream business systems must ensure that messages are correctly processed. We recommend that consumers properly perform consumption retries to ensure the successful processing of messages. For more information, see Consumption retry.

Intermediate status visibility

Transactional messages provided by ApsaraMQ for RocketMQ ensure only result consistency. Status inconsistency between downstream transaction branches and upstream transactions exists before a message is delivered to a consumer. Therefore, transactional messages are suitable for only transaction scenarios in which asynchronous execution can be used.

Transaction timeout mechanism

A timeout mechanism is used in the lifecycle of transactional messages provided by ApsaraMQ for RocketMQ. After the broker receives a half message, the message is rolled back by default if the broker cannot determine the execution result of a transaction message. For more information, see Limits on parameters.

Multiple SendReceipts not supported

Only one SendReceipt is allowed for transactional messages in a transaction.

Sample code

Sending transactional messages is different from sending normal messages in the following aspects:

  • Before you send transactional messages, you must enable the transaction checker and associate it with local transaction execution.

  • To ensure transaction consistency, you must configure the transaction checker and bind the topics to which you want to send messages when you build the producer. If exceptions occur in the bound topics, the built-in transaction checker can be used to restore the status.

The following sample code provides an example on how to use transactional messages in Java.

For information about the complete sample code for messaging, see Apache RocketMQ 5.x SDKs (recommended).

Sample code

import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;

public class ProducerTransactionMessageExample {
    /**
     // The demo is used to simulate the order table query service to check whether the order transaction is submitted. 
     */
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    /**
     // The demo is used to simulate the execution result of a local transaction. 
     */
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "xxx-hangzhou.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "topic1";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless instance, you must specify the username and password of the instance, regardless of whether you access the instance over the Internet or in a VPC. 
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        builder.setRequestTimeout(Duration.ofMillis(5000));
        ClientConfiguration configuration = builder.build();
        
        MessageBuilder messageBuilder = new MessageBuilderImpl();

        // Build the transaction producer. The producer must build a transaction checker to check the intermediate status of an exceptional half message. 
        Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * The transaction checker checks whether the local transaction is correctly committed or rolled back based on the business ID. In this example, an order ID is used. 
                 * If this order is found in the order table, the order insertion action is correctly committed by the local transaction. If no order is found in the order table, the local transaction is rolled back. 
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // An error occurs in the message. Rollback is returned. 
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            }).setTopics(topic)
            .setClientConfiguration(configuration)
            .build();
        // Create a transaction branch. 
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            // If the transaction branch fails to be created, the transaction is terminated. 
            return;
        }
        Message message = messageBuilder.setTopic(topic)
            // The message key. You can use a keyword to accurately find the message. 
            .setKeys("messageKey1")
            // The message tag. The consumer can use the tag to filter messages. 
            .setTag("messageTag")
            // The unique ID that is associated with the local transaction. The ID is used to verify the query of the local transaction status. 
            .addProperty("OrderId", "xxx")
            // The message body. 
            .setBody("messageBody".getBytes())
            .build();
        // Send the half message.
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            // If the half message fails to be sent, the transaction is terminated and the message is rolled back. 
            return;
        }
        /**
         * Execute the local transaction and determine the execution result. 
         * 1. If the result is Commit, deliver the message. 
         * 2. If the result is Rollback, roll back the message. 
         * 3. If an unknown exception occurs, no action is performed until a response is obtained from the query of the local transaction status. 
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // You can determine whether to retry the message based on your business requirements. If you do not want to retry the message, you can query the local transaction status to commit the transaction status. 
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // We recommend that you record the exception information. This way, if an exception occurs during the rollback of a message, you can query the local transaction status to commit the transaction status without retrying the message. 
                e.printStackTrace();
            }
        }
    }
}

Usage notes

Prevent timeout caused by transactions with unknown results

ApsaraMQ for RocketMQ allows you to initiate requests to query local transaction status if exceptions occur in the transaction commit phase to ensure transactional consistency. However, producers must prevent local transactions from returning unknown results. A large number of transaction checks can deteriorate system performance and cause delays in transaction processing.

Properly handle ongoing transactions

During the query of the local transaction status, do not return Rollback or Commit for an ongoing transaction. Instead, keep the Unknown status for the transaction.

In most cases, a transaction is ongoing because the transaction execution is slow and the query of the local transaction status is too soon. The following solutions are provided:

  • Specify a larger value for the time to perform the first query of the local transaction status. This may cause a large delay for transactions that depend on the query result.

  • Make the program correctly identify ongoing transactions.