ApsaraMQ for RocketMQ SDK for Java is used to send and subscribe to messages. In ApsaraMQ for RocketMQ, subscribers can obtain messages in push or pull mode. This topic describes the methods and parameters for sending and subscribing to messages.
Background information
ApsaraMQ for RocketMQ supports the following modes to obtain messages:
Push: ApsaraMQ for RocketMQ pushes messages to consumers. In push mode, ApsaraMQ for RocketMQ can push multiple messages to consumers at a time. For more information, see Batch consumption.
Pull: Consumers pull messages from ApsaraMQ for RocketMQ.
Pull consumers provide more options to receive messages and provide you more control over message pulls than push consumers.
To use pull consumers, make sure that your ApsaraMQ for RocketMQ instance is an Enterprise Platinum Edition instance.
Common parameters
Parameter | Description |
NAMESRV_ADDR | The TCP endpoint. You can click Instance Details in the ApsaraMQ for RocketMQ console to obtain the TCP endpoint. |
AccessKey | An AccessKey ID is used as the unique identifier for authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair. |
SecretKey | An AccessKey secret is used as the password for authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair. |
OnsChannel | The user channel. Default value: ALIYUN. If you are a CloudTmall user, set the value to CLOUD. |
Methods for sending messages
Parameters for sending messages
Parameter | Description |
SendMsgTimeoutMillis | The timeout period of sending messages. Unit: milliseconds. |
CheckImmunityTimeInSeconds | The shortest time for which the system waits before checking the status of a transactional message for the first time. Unit: seconds. |
shardingKey | The partition key that is used to determine the partition to which an ordered message is distributed. |
Methods for subscribing to messages
The following content describes the methods that can be called in pull mode:
public interface PullConsumer extends Admin {
/**
* Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run.
*/
Set<TopicPartition> topicPartitions(String topic);
/**
* Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe.
*/
void assign(Collection<TopicPartition> topicPartitions);
/**
* Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds.
*/
List<Message> poll(long timeout);
/**
* Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seek(TopicPartition topicPartition, long offset);
/**
* Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seekToBeginning(TopicPartition topicPartition);
/**
* Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seekToEnd(TopicPartition topicPartition);
/**
* Suspend message consumption in a specified partition.
*/
void pause(Collection<TopicPartition> topicPartitions);
/**
* Resume message consumption in a specified partition.
*/
void resume(Collection<TopicPartition> topicPartitions);
/**
* Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp.
*/
Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
/**
* Query the latest consumer offset of a specified partition.
*/
Long committed(TopicPartition topicPartition);
/**
* Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment.
*/
void commitSync();
interface TopicPartitionChangeListener {
/**
* This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling.
*/
void onChanged(Set<TopicPartition> topicPartitions);
}
/**
* Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds.
*/
void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}
Parameters
Parameter | Description |
GROUP_ID | The group ID that you have created in the ApsaraMQ for RocketMQ console. For more information, see Terms. |
MessageModel | The consumption mode of a consumer instance. Valid values:
|
ConsumeThreadNums | The number of consumption threads for a consumer instance. Default value: 20. |
MaxReconsumeTimes | The maximum number of retries upon a consumption failure. Default value: 16. |
ConsumeTimeout | The timeout period for consuming each message. If the time for consuming a message exceeds the specified timeout period, the message fails to be consumed and is redelivered after a retry interval. Configure an appropriate value for each business. Default value: 15. Unit: minutes. |
suspendTimeMillis | The retry interval for an ordered message that fails to be consumed. |
maxCachedMessageAmount | The maximum number of messages that can be cached at your on-premises consumer client. Valid values: 100 to 50000. Default value: 5000. This parameter takes effect on a client. The quota is evenly allocated to topics to which subscribers have subscribed. For example, if you set the parameter value to 1000 for a consumer client that subscribes to 2 topics, a maximum of 500 messages can be cached for each topic. If a consumer client pulls multiple messages at a time, the actual number of messages that need to be cached can be greater than the value that you set for maxCachedMessageAmount. We recommend that you set the parameter value to twice the number of messages that your client consumer can consume per second. Important Specify a proper value. An excessively large value may cause an Out-of-Memory (OOM) error on your client. |
maxCachedMessageSizeInMiB | The maximum size of messages that can be cached at your on-premises client. Valid values: 16 MB to 2048 MB. Default value: 512 MB. |
Parameter | Description |
ConsumeMessageBatchMaxSize | The maximum number of cached messages that can be pushed to consumers at a time. If the number of the cached messages reaches the parameter value that you specify, ApsaraMQ for RocketMQ pushes the cached messages to consumers at a time. Default value: 32. Valid values: 1 to 1024. |
BatchConsumeMaxAwaitDurationInSeconds | The wait time before multiple cached messages are pushed to consumers at a time. ApsaraMQ for RocketMQ pushes multiple cached messages to consumers at a time after the time specified by this parameter. Default value: 0. Valid values: 0 to 450. Unit: seconds. |
Parameter | Description |
maxCachedMessageSizeInMiB | The maximum size of messages that a consumer can cache on the client for a single partition. Default value: 100 MiB. Valid values: 16 MiB to 2048 MiB. Important Specify a proper value. An excessively large value may cause an OOM error on your client. |
autoCommit | Specifies whether to automatically commit a consumer offset. Default value: true. |
autoCommitIntervalMillis | The interval between the operations of automatically committing a consumer offset. Default value: 5. Unit: seconds. |
pollTimeoutMillis | The timeout period for pulling messages each time. Default value: 5. Unit: seconds. |
For more information about partitions and offsets, see Terms.
References
For more information about the sample code for sending and subscribing to messages, see the following topics: