This topic provides sample code on how to send and receive scheduled messages and delayed messages by using the HTTP client SDK for Python.
Background information
Delayed messages are messages that are delivered by ApsaraMQ for RocketMQ brokers to consumers after a specific period of time.
Scheduled messages are messages that are delivered by ApsaraMQ for RocketMQ brokers to consumers at a specific point in time.
The code configurations of scheduled messages are the same as the code configurations of delayed messages over HTTP. Both types of messages are delivered to consumers after a specific period of time based on the attributes of messages.
For more information, see Scheduled messages and delayed messages.
Prerequisites
Before you start, make sure that the following operations are performed:
Install the SDK for Python. For more information, see Prepare the environment.
Create the resources that you want to specify in the code in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
Obtain the AccessKey pair of your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send scheduled messages and delayed messages
The following sample code provides an example on how to send scheduled messages or delayed messages by using the HTTP client SDK for Python:
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
mq_client = MQClient(
"${HTTP_ENDPOINT}",
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
topic_name = "${TOPIC}"
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
msg = TopicMessage(
"I am test message %s.hello" % i,
"tag1"
)
msg.put_property("a", "i")
msg.set_message_key("MessageKey")
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 10 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)
Subscribe to scheduled messages and delayed messages
The following sample code provides an example on how to subscribe to scheduled messages and delayed messages by using the HTTP client SDK for Python:
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
mq_client = MQClient(
"${HTTP_ENDPOINT}",
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
topic_name = "${TOPIC}"
group_id = "${GROUP_ID}"
instance_id = "${INSTANCE_ID}"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
wait_seconds = 3
batch = 3
print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
try:
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s \
\nProperties: %s\n" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle, msg.properties)))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print(("No new message! RequestId: %s" % e.req_id))
continue
print(("Consume Message Fail! Exception:%s\n" % e))
time.sleep(2)
continue
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
except MQExceptionBase as e:
print(("\nAk Message Fail! Exception:%s" % e))
if e.sub_errors:
for sub_error in e.sub_errors:
print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))