全部產品
Search
文件中心

ApsaraMQ for RocketMQ:消費者分類

更新時間:Jun 30, 2024

雲訊息佇列 RocketMQ 版支援PushConsumer和SimpleConsumer這兩種類型的消費者,本文分別從使用方式、實現原理、可靠性重試和適用情境等方面為您介紹這兩種類型的消費者。

背景資訊

雲訊息佇列 RocketMQ 版面向不同的業務情境提供了不同消費者類型,每種消費者類型的整合方式和控制方式都不一樣。瞭解如下問題,可以協助您選擇更匹配業務情境的消費者類型。

  • 如何?並發消費:消費者如何使用並發的多線程機制處理訊息,以此提高訊息處理效率?

  • 如何?同步、非同步訊息處理:對於不同的整合情境,消費者擷取訊息後可能會將訊息非同步分發到商務邏輯中處理,此時,訊息非同步化處理如何??

  • 如何?訊息可靠處理:消費者處理訊息時如何返迴響應結果?如何在訊息異常情況進行重試,保證訊息的可靠處理?

以上問題的具體答案,請參考下文的PushConsumerSimpleConsumer

功能概述

訊息消費流程

如上圖所示,雲訊息佇列 RocketMQ 版的消費者處理訊息時主要經過以下階段:訊息擷取—>訊息處理—>消費狀態提交。

針對以上幾個階段,雲訊息佇列 RocketMQ 版提供了不同的消費者類型:PushConsumer和SimpleConsumer。這兩種類型的消費者通過不同的實現方式和介面可滿足您在不同業務情境下的消費需求。具體差異如下:

說明

若您的業務情境發生變更,或您當前使用的消費者類型不適合當前業務,您可以選擇變更消費者類型。變更消費者類型不影響當前雲訊息佇列 RocketMQ 版資源的使用和業務處理。

對比項

PushConsumer

SimpleConsumer

介面方式

使用監聽器回調介面返回消費結果,消費者僅允許在監聽器範圍內處理消費邏輯。

業務方自行實現訊息處理,並主動調用介面返回消費結果。

消費並發度管理

由SDK管理消費並發度。

由業務方消費邏輯自行管理消費線程。

介面靈活度

高度封裝,不夠靈活。

原子介面,可靈活自訂。

適用情境

適用於無自訂流程的開發情境。

適用於需要高度自訂商務程序的開發情境。

PushConsumer

PushConsumers是一種高度封裝的消費者類型,消費訊息僅通過消費監聽器處理業務並返回消費結果。訊息的擷取、消費狀態提交以及消費重試都通過雲訊息佇列 RocketMQ 版的用戶端SDK完成。

使用方式

PushConsumer的使用方式比較固定,在消費者初始化時註冊一個消費監聽器,並在消費監聽器內部實現訊息處理邏輯。由雲訊息佇列 RocketMQ 版的SDK在後台完成訊息擷取、觸發監聽器調用以及進行訊息重試處理。

範例程式碼如下:

//消費樣本:使用PushConsumer消費普通訊息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
        String topic = "Your Topic";
        FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                //設定消費者分組。
                .setConsumerGroup("Your ConsumerGroup")
                //設定存取點。
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                //設定預綁定的訂閱關係。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //設定消費監聽器。
                .setMessageListener(new MessageListener() {
                    @Override
                    public ConsumeResult consume(MessageView messageView) {
                        //消費訊息並返回處理結果。
                        return ConsumeResult.SUCCESS;
                    }
                })
                .build();
                

PushConsumer的消費監聽器執行結果分為以下三種情況:

  • 返回消費成功:以Java SDK為例,返回ConsumeResult.SUCCESS,表示該訊息處理成功,服務端按照消費結果更新消費進度。

  • 返回消費失敗:以Java SDK為例,返回ConsumeResult.FAILURE,表示該訊息處理失敗,需要根據消費重試邏輯判斷是否進行重試消費。

  • 出現非預期失敗:例如拋異常等行為,該結果按照消費失敗處理,需要根據消費重試邏輯判斷是否進行重試消費。

PushConsumer消費訊息時,若訊息處理邏輯出現預期之外的阻塞導致訊息處理一直無法執行成功,SDK會按照消費逾時處理強制提交消費失敗結果,並按照消費重試邏輯進行處理。訊息逾時,請參見PushConsumer消費重試策略

說明

出現消費逾時情況時,SDK雖然提交消費失敗結果,但是當前消費線程可能仍然無法響應中斷,還會繼續處理訊息。

內部原理

在PushConsumer類型中,訊息的即時處理能力是基於SDK內部的典型Reactor執行緒模式實現的。如下圖所示,SDK內建了一個長輪詢線程,先將訊息非同步拉取到SDK內建的緩衝隊列中,再分別提交到消費線程中,觸發監聽器執行本地消費邏輯。

PushConsumer原理

可靠性重試

PushConsumer消費者類型中,用戶端SDK和消費邏輯的唯一邊界是消費監聽器介面。用戶端SDK嚴格按照監聽器的返回結果判斷訊息是否消費成功,並做可靠性重試。所有訊息必須以同步方式進行消費處理,並在監聽器介面結束時返回調用結果,不允許再做非同步化分發。訊息重試具體資訊,請參見PushConsumer消費重試策略

使用PushConsumer消費者消費時,不允許使用以下方式處理訊息,否則雲訊息佇列 RocketMQ 版無法保證訊息的可靠性。

  • 錯誤方式一:訊息還未處理完成,就提前返回消費成功結果。此時如果訊息消費失敗,雲訊息佇列 RocketMQ 版服務端是無法感知的,因此不會進行消費重試。

  • 錯誤方式二:在消費監聽器內將訊息再次分發到自訂的其他線程,消費監聽器提前返回消費結果。此時如果訊息消費失敗,雲訊息佇列 RocketMQ 版服務端同樣無法感知,因此也不會進行消費重試。

順序性保障

基於雲訊息佇列 RocketMQ 版順序訊息的定義,如果消費者分組設定了順序消費模式,則PushConsumer在觸發消費監聽器時,嚴格遵循訊息的先後順序。業務處理邏輯無感知即可保證訊息的消費順序。

說明

訊息消費按照順序處理的前提是遵循同步提交原則,如果商務邏輯自訂實現了非同步分發,則雲訊息佇列 RocketMQ 版無法保證訊息的順序性。

適用情境

PushConsumer嚴格限制了訊息同步處理及每條訊息的處理逾時時間,適用於以下情境:

  • 訊息處理時間可預估:如果不確定訊息處理耗時,經常有預期之外的長時間耗時的訊息,PushConsumer的可靠性保證會頻繁觸發訊息重試機製造成大量重複訊息。

  • 無非同步化、進階定製情境:PushConsumer限制了消費邏輯的執行緒模式,由用戶端SDK內部按最大輸送量觸發訊息處理。該模型開發邏輯簡單,但是不允許使用非同步化和自訂處理流程。

SimpleConsumer

SimpleConsumer是一種介面原子型的消費者類型,訊息的擷取、消費狀態提交以及消費重試都是通過消費者商務邏輯主動發起調用完成。

使用方式

SimpleConsumer的使用涉及多個介面調用,由商務邏輯按需調用介面擷取訊息,然後分發給業務線程處理訊息,最後按照處理的結果調用提交介面,返回服務端當前訊息的處理結果。樣本如下:

 //消費樣本:使用SimpleConsumer消費普通訊息,主動擷取訊息處理並提交。
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        String topic = "Your Topic";
        FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);

        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                //設定消費者分組。
                .setConsumerGroup("Your ConsumerGroup")
                //設定存取點。
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                //設定預綁定的訂閱關係。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        List<MessageView> messageViewList = null;
        try {
            //SimpleConsumer需要主動擷取訊息,並處理。
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //消費處理完成後,需要主動調用ACK提交消費結果。
                try {
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            });
        } catch (ClientException e) {
            //如果遇到系統流控等原因造成拉取失敗,需要重新發起擷取訊息請求。
            e.printStackTrace();
        }

SimpleConsumer主要涉及以下幾個介面行為:

介面名稱

主要作用

可修改參數

ReceiveMessage

消費者主動調用該介面從服務端擷取訊息。

說明

由於服務端儲存為分布式,可能會出現服務端實際有訊息,但是返回為空白的現象。

一般可通過重新發起ReceiveMessage調用或提高ReceiveMessage的並發度解決。

  • 批量拉取訊息數:SimpleConsumer可以一次性批量擷取多條訊息實現批量消費,該介面可修改批量擷取的訊息數量。

  • 消費不可見時間:訊息的最長處理耗時,該參數用於控制消費失敗時的訊息稍候再試。具體資訊,請參見SimpleConsumer消費重試策略。消費者調用ReceiveMessage介面時需要指定消費不可見時間。

AckMessage

消費者成功消費訊息後,主動調用該介面向服務端返回消費成功響應。

ChangeInvisibleDuration

消費重試情境下,消費者可通過該介面修改訊息處理時間長度,即控制訊息的稍候再試。

消費不可見時間:調用本介面可修改ReceiveMessage介面預設的消費不可見時間的參數值。一般用於需要延長訊息處理時間長度的情境。

可靠性重試

SimpleConsumer消費者類型中,用戶端SDK和服務端通過ReceiveMessageAckMessage介面通訊。用戶端SDK如果處理訊息成功則調用AckMessage介面;如果處理失敗只需要不回複ACK響應,即可在定義的消費不可見時間到達後觸發消費重試流程。更多資訊,請參見SimpleConsumer消費重試策略

順序性保障

基於雲訊息佇列 RocketMQ 版順序訊息的定義,SimpleConsumer在處理順序訊息時,會按照訊息儲存的先後順序擷取訊息。即需要保持順序的一組訊息中,如果前面的訊息未處理完成,則無法擷取到後面的訊息。

適用情境

SimpleConsumer提供原子介面,用於訊息擷取和提交消費結果,相對於PushConsumer方式更加靈活。SimpleConsumer適用於以下情境:

  • 訊息處理時間長度不可控:如果訊息處理時間長度無法預估,經常有長時間耗時的訊息處理情況。建議使用SimpleConsumer消費類型,可以在消費時自訂訊息的預估處理時間長度,若實際業務中預估的訊息處理時間長度不符合預期,也可以通過介面提前修改。

  • 需要非同步化、批量消費等進階定製情境:SimpleConsumer在SDK內部沒有複雜的線程封裝,完全由商務邏輯自由定製,可以實現非同步分發、批量消費等進階定製情境。

  • 需要自訂消費速率:SimpleConsumer是由商務邏輯主動調用介面擷取訊息,因此可以自由調整擷取訊息的頻率,自訂控制消費速率。

使用建議

PushConsumer合理控制消費耗時,避免無限阻塞

對於PushConsumer消費類型,需要嚴格控制訊息的消費耗時,盡量避免出現訊息處理逾時導致訊息重複。如果業務經常會出現一些預期外的長時間耗時的訊息,建議使用SimpleConsumer,並設定好消費不可見時間。