雲訊息佇列 RocketMQ 版提供Java SDK實現訊息發送與訂閱,訂閱者可通過Push或Pull的方式從雲訊息佇列 RocketMQ 版擷取訊息。本文介紹訊息發送和訂閱的介面和參數說明。
背景資訊
雲訊息佇列 RocketMQ 版支援以下兩種訊息擷取方式:
Push:訊息由雲訊息佇列 RocketMQ 版推送至Consumer。Push方式下,雲訊息佇列 RocketMQ 版還支援批量消費功能,可以將訊息統一批量推送至Consumer進行消費。更多資訊,請參見批量消費。
Pull:訊息由Consumer主動從雲訊息佇列 RocketMQ 版拉取。
Pull Consumer提供了更多接收訊息的選擇。相比於Push Consumer,您可以使用Pull Consumer更加自由地控制訊息拉取。
如需使用Pull Consumer,請確保您的雲訊息佇列 RocketMQ 版執行個體為企業鉑金版。
通用參數
參數名 | 參數說明 |
NAMESRV_ADDR | 設定TCP協議存取點,從雲訊息佇列 RocketMQ 版控制台的執行個體詳情頁面擷取。 |
AccessKey | AccessKey ID,阿里雲身分識別驗證標識。擷取方式,請參見建立AccessKey。 |
SecretKey | AccessKey Secret,阿里雲身分識別驗證密鑰。擷取方式,請參見建立AccessKey。 |
OnsChannel | 使用者渠道,預設值為:ALIYUN,聚石塔使用者為:CLOUD。 |
訊息發送介面
訊息發送參數
參數名 | 參數說明 |
SendMsgTimeoutMillis | 設定訊息發送的逾時時間,單位:毫秒。 |
CheckImmunityTimeInSeconds(事務訊息) | 設定事務訊息第一次回查的最快時間,單位:秒。 |
shardingKey(順序訊息) | 順序訊息中用來計算不同分區的值。 |
訊息訂閱介面
Pull方式的介面說明如下所示。
public interface PullConsumer extends Admin {
/**
* 擷取某個Topic下的分區資訊,返回結果是該Topic下的所有分區。注意該介面必須在Pull Consumer Start以後才能被調用。
*/
Set<TopicPartition> topicPartitions(String topic);
/**
* 指定需要拉取訊息的分區,此介面不會參與Rebalance,您需要自己保證所有分區得到消費。此外,若多次調用此介面會替換掉原來訂閱的分區,並不會增量增加訂閱分區的數量。
*/
void assign(Collection<TopicPartition> topicPartitions);
/**
* 拉取訊息,單次最多拉取maxBatchMessageCount個訊息,逾時時間您可以自訂,單位為毫秒。
*/
List<Message> poll(long timeout);
/**
* 將指定的分區的消費位點重設到某個指定位置。該位置必須在分區的最小位點和最大位點之間。注意該介面必須在Pull Consumer Start以後才能被調用,並且指定的分區必須為訂閱的分區。
*/
void seek(TopicPartition topicPartition, long offset);
/**
* 將指定分區的消費位點重設到該分區的最小位點。注意該介面必須在Pull Consumer Start以後才能被調用,並且指定的分區必須為訂閱的分區。
*/
void seekToBeginning(TopicPartition topicPartition);
/**
* 將指定分區的消費位點重設到該分區的最大位點。注意該介面必須在Pull Consumer Start以後才能被調用,並且指定的分區必須為訂閱的分區。
*/
void seekToEnd(TopicPartition topicPartition);
/**
* 暫停指定分區的消費。
*/
void pause(Collection<TopicPartition> topicPartitions);
/**
* 恢複指定分區的消費。
*/
void resume(Collection<TopicPartition> topicPartitions);
/**
* 尋找指定分區對應時間戳記的位點。該時間戳記為訊息儲存到伺服器的時間戳記。該位點是指定分區中第一個儲存時間戳記大於或等於該時間戳記訊息對應的位點。
*/
Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
/**
* 擷取指定分區的最新消費位點。
*/
Long committed(TopicPartition topicPartition);
/**
* 手動提交消費位點,該消費位點首先同步到本地,再由非同步線程提交到伺服器。
*/
void commitSync();
interface TopicPartitionChangeListener {
/**
* 該方法在某個Topic下分區發生變化時被調用,例如在服務端擴縮容時造成Topic的分區數量發生變化時回調該方法。
*/
void onChanged(Set<TopicPartition> topicPartitions);
}
/**
* 註冊監聽某個Topic分區變化的TopicPartitionChangeListener。例如註冊後在服務端擴縮容時造成Topic的分區數量發生變化時回調listener的onChanged方法,預設最多5秒左右延遲。
*/
void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}
訊息訂閱參數
參數 | 說明 |
GROUP_ID | 您在雲訊息佇列 RocketMQ 版控制台上建立的Group ID,更多資訊,請參見基本概念。 |
MessageModel | 設定Consumer執行個體的消費模式,取值說明如下:
|
ConsumeThreadNums | 設定Consumer執行個體的消費線程數,預設值:20。 |
MaxReconsumeTimes | 設定訊息消費失敗的最大重試次數,預設值:16。 |
ConsumeTimeout | 設定每條訊息消費的最大逾時時間,超過設定時間則被視為消費失敗,等下次重新投遞再次消費。每個業務需要設定一個合理的值,預設值:15,單位:分鐘。 |
suspendTimeMillis(順序訊息) | 只適用於順序訊息,設定訊息消費失敗的稍候再試時間。 |
maxCachedMessageAmount | 消費者用戶端本地的最大緩衝訊息數量,取值範圍為[100,50000],預設值:5000,單位:條。 該參數在用戶端層級生效,限定額會平均分配到訂閱的Topic上。例如最大緩衝數為1000條,某消費者客訂閱了2個Topic,則每個Topic將限制緩衝500條訊息。 考慮到消費者用戶端批量拉取訊息的情境,實際最大緩衝量會略大於限制值。 建議取值:若某個消費者用戶端每秒能處理N條訊息,則該參數建議設定為2 * N條。 重要 請合理取值,設定過大可能會引起用戶端OOM。 |
maxCachedMessageSizeInMiB | 用戶端本地的最大緩衝訊息大小,取值範圍:16 MB~2048 MB,預設值:512 MB。 |
參數 | 說明 |
ConsumeMessageBatchMaxSize | 批量消費的最大訊息數量,緩衝的訊息數量達到設定的參數值,雲訊息佇列 RocketMQ 版會將緩衝的訊息統一推送給消費者進行批量消費。預設值:32,取值範圍:1~1024。 |
BatchConsumeMaxAwaitDurationInSeconds | 批量消費的等待時間長度,等待時間長度達到參數設定的值,雲訊息佇列 RocketMQ 版會將緩衝的訊息統一推送給消費者進行批量消費。預設為:0,取值範圍:0~450,單位:秒。 |
參數 | 說明 |
maxCachedMessageSizeInMiB | Consumer單個分區允許在用戶端中緩衝的最大訊息容量,預設值:100 MiB,取值範圍:16 MiB~2048 MiB。 重要 請合理取值,設定過大可能會引起用戶端OOM。 |
autoCommit | 是否允許消費位點自動認可,預設為true。 |
autoCommitIntervalMillis | 消費位點自動認可間隔,預設值:5,單位:秒。 |
pollTimeoutMillis | 每次拉取訊息逾時時間,預設值:5,單位:秒。 |
分區和位點的詳細說明,請參見基本概念。
更多資訊
訊息收發範例程式碼的更多資訊,請參見以下文檔: