このトピックでは、Python用のHTTPクライアントSDKを使用して、スケジュールされたメッセージと遅延されたメッセージを送受信する方法に関するサンプルコードを提供します。
の背景情報
遅延メッセージは、特定の期間後にApsaraMQ for RocketMQブローカーによってコンシューマに配信されるメッセージです。
スケジュールされたメッセージは、ApsaraMQ for RocketMQブローカーが特定の時点でコンシューマに配信するメッセージです。
スケジュールされたメッセージのコード構成は、HTTP上の遅延メッセージのコード構成と同じです。 どちらのタイプのメッセージも、メッセージの属性に基づいて特定の期間後にコンシューマに配信されます。
詳細については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。
の前提条件
開始する前に、次の操作が実行されていることを確認してください。
SDK for Pythonをインストールします。 詳細については、「環境の準備」をご参照ください。
ApsaraMQ for RocketMQコンソールのコードで指定するリソースを作成します。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアを取得します。 詳細については、「AccessKey の作成」をご参照ください。
スケジュールされたメッセージと遅延メッセージの送信
次のサンプルコードは、Python用のHTTPクライアントSDKを使用して、スケジュールされたメッセージまたは遅延されたメッセージを送信する方法の例を示します。
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
# Initialize the producer client.
mq_client = MQClient(
# The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
# Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
# The AccessKey ID that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# The AccessKey secret that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic_name = "${TOPIC}"
# The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
# If the instance has a namespace, the instance ID must be specified. If the instance does not have a namespace, set the instanceID parameter to an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# Cyclically send four messages.
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
msg = TopicMessage(
# The message content.
"I am test message %s.hello" % i,
# The message tag.
"tag1"
)
# The message attributes.
msg.put_property("a", "i")
# The message key.
msg.set_message_key("MessageKey")
# The period of time after which the broker delivers the message to the consumer. In this example, the broker delivers the message to the consumer after a delay of 10 seconds. Set this parameter to a timestamp in milliseconds.
# If the producer sends a scheduled message, set the parameter to the time difference between the scheduled point in time and the current point in time.
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 10 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)
定期メッセージと遅延メッセージの購読
次のサンプルコードは、Python用のHTTPクライアントSDKを使用して、スケジュールされたメッセージと遅延されたメッセージをサブスクライブする方法の例を示しています。
mq_http_sdk.mq_exception import MQExceptionBaseからの
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Initialize the consumer client.
mq_client = MQClient(
# The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
# Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
# The AccessKey ID that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# The AccessKey secret that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic_name = "${TOPIC}"
# The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
group_id = "${GROUP_ID}"
# The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
# If the instance has a namespace, the instance ID must be specified. If the instance does not have a namespace, set the instanceID parameter to an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instance_id = "${INSTANCE_ID}"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# In long polling mode, if no message in the topic is available for consumption, the request is suspended on the broker for the specified period of time. If a message becomes available for consumption within the specified period of time, a response is immediately sent to the consumer. In this example, the value is specified as 3 seconds.
# The long polling period. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30.
wait_seconds = 3
# The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16.
batch = 3
print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
try:
# Consume messages in long polling mode.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s \
\nProperties: %s\n" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle, msg.properties)))
except MQExceptionBase as e:
# No message in the topic is available for consumption.
if e.type == "MessageNotExist":
print(("No new message! RequestId: %s" % e.req_id))
continue
print(("Consume Message Fail! Exception:%s\n" % e))
time.sleep(2)
continue
# If the broker fails to receive an acknowledgement (ACK) for a message from the consumer before the period of time specified by the msg.next_consume_time parameter elapses, the broker delivers the message for consumption again.
# A unique timestamp is specified for the handle of a message each time the message is consumed.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
except MQExceptionBase as e:
print(("\nAk Message Fail! Exception:%s" % e))
# If the handle of a message times out, the broker cannot receive an ACK for the message from the consumer.
if e.sub_errors:
for sub_error in e.sub_errors:
print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))