全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:訊息等冪

更新時間:Jun 30, 2024

如果訊息重複消費會影響您的業務處理,請對訊息做等冪處理。本文介紹訊息等冪的概念、適用情境以及處理方法。

什麼是訊息等冪

在數學與電腦學中,等冪操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。在訊息領域,等冪是指Consumer重複消費某條訊息時,重複消費的結果與消費一次的結果是相同的,並且多次消費並未對業務系統產生任何負面影響。

例如,在支付情境下,Consumer消費扣款訊息,對一筆訂單執行扣款操作,扣款金額為100美元。如果因網路不穩定等原因導致扣款訊息重複投遞,Consumer重複消費了該扣款訊息,但最終的業務結果是只扣款一次,計費100美元,且使用者的扣款記錄中對應的訂單只有一條扣款流水,不會多次扣除費用。那麼這次扣款操作是符合要求的,整個消費過程實現了訊息等冪。

適用情境

在互連網應用中,尤其在網路不穩定的情況下,雲訊息佇列 RabbitMQ 版的訊息有可能會出現重複。如果訊息重複消費會影響您的業務處理,請對訊息做等冪處理。訊息重複的可能原因如下:

  • 發送時訊息重複

    當一條訊息已被成功發送到服務端並完成持久化,此時出現了網路閃斷或者用戶端宕機,導致服務端對用戶端應答失敗。 如果此時Producer意識到訊息發送失敗並嘗試再次發送訊息,Consumer後續會收到兩條內容相同並且Message ID也相同的訊息。

  • 投遞時訊息重複

    訊息消費的情境下,訊息已投遞到Consumer並完成業務處理,當用戶端給服務端反饋應答的時候網路閃斷。為了保證訊息至少被消費一次,雲訊息佇列 RabbitMQ 版的服務端將在網路恢複後再次嘗試投遞之前已被處理過的訊息,Consumer後續會收到兩條內容相同並且Message ID也相同的訊息。

  • 負載平衡時訊息重複(包括但不限於網路抖動、服務端重啟以及Consumer應用重啟)

    雲訊息佇列 RabbitMQ 版的服務端或用戶端重啟、擴容或縮容時,會觸發Rebalance,此時Consumer可能會收到重複訊息。

處理方法

以Message ID為等冪鍵對訊息進行等冪處理的步驟如下:

  1. 在資料庫中建立一張unique key索引為唯一Message ID的表。

  2. 在Producer用戶端為每條訊息設定唯一Message ID。

    設定唯一Message ID的範例程式碼如下:

    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
    channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("訊息發送Body" + i).getBytes(StandardCharsets.UTF_8));

    瞭解更多Message ID相關資訊,請參見如何設定Message ID

  3. 在Consumer用戶端根據唯一Message ID對訊息進行等冪處理。

    根據唯一Message ID進行等冪處理的範例程式碼如下:

    channel.basicConsume(Producer.QueueName, false, "YourConsumerTag",
        new DefaultConsumer(channel) {
        @Override public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 1. 擷取業務唯一性索引資料。
            try{
                String messageId = properties.getMessageId();
                // Message ID或者其他作為unique key的資訊。
                // 2. 開啟資料庫事務。
                idempTable.insert(messageId);
                // 3. 對接收到的訊息,進行商務邏輯處理。
                // 4. 提交或復原事務。// 處理成功,則進行ACK,否則不要進行ACK。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
            catch (資料庫主鍵衝突異常 e){
                // 重複訊息,直接確認掉。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    }
    );