すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:スケジュールされたメッセージと遅延メッセージの送受信

最終更新日:Jul 09, 2024

このトピックでは、Python用のHTTPクライアントSDKを使用して、スケジュールされたメッセージと遅延されたメッセージを送受信する方法に関するサンプルコードを提供します。

の背景情報

  • 遅延メッセージは、特定の期間後にApsaraMQ for RocketMQブローカーによってコンシューマに配信されるメッセージです。

  • スケジュールされたメッセージは、ApsaraMQ for RocketMQブローカーが特定の時点でコンシューマに配信するメッセージです。

スケジュールされたメッセージのコード構成は、HTTP上の遅延メッセージのコード構成と同じです。 どちらのタイプのメッセージも、メッセージの属性に基づいて特定の期間後にコンシューマに配信されます。

詳細については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。

の前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • SDK for Pythonをインストールします。 詳細については、「環境の準備」をご参照ください。

  • ApsaraMQ for RocketMQコンソールのコードで指定するリソースを作成します。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。

  • Alibaba CloudアカウントのAccessKeyペアを取得します。 詳細については、「AccessKey の作成」をご参照ください。

スケジュールされたメッセージと遅延メッセージの送信

次のサンプルコードは、Python用のHTTPクライアントSDKを使用して、スケジュールされたメッセージまたは遅延されたメッセージを送信する方法の例を示します。

import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

# Initialize the producer client. 
mq_client = MQClient(
    # The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
     "${HTTP_ENDPOINT}",
    # Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    # The AccessKey ID that is used for authentication. 
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # The AccessKey secret that is used for authentication. 
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
# The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console. 
topic_name = "${TOPIC}"
# The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console. 
# If the instance has a namespace, the instance ID must be specified. If the instance does not have a namespace, set the instanceID parameter to an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# Cyclically send four messages. 
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
            msg = TopicMessage(
            # The message content. 
            "I am test message %s.hello" % i,
            # The message tag. 
            "tag1"
            )
            # The message attributes. 
            msg.put_property("a", "i")
            # The message key. 
            msg.set_message_key("MessageKey")

            # The period of time after which the broker delivers the message to the consumer. In this example, the broker delivers the message to the consumer after a delay of 10 seconds. Set this parameter to a timestamp in milliseconds. 
            # If the producer sends a scheduled message, set the parameter to the time difference between the scheduled point in time and the current point in time. 
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 10 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)

定期メッセージと遅延メッセージの購読

次のサンプルコードは、Python用のHTTPクライアントSDKを使用して、スケジュールされたメッセージと遅延されたメッセージをサブスクライブする方法の例を示しています。

mq_http_sdk.mq_exception import MQExceptionBaseからの

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the consumer client. 
mq_client = MQClient(
    # The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    "${HTTP_ENDPOINT}",
    # Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    # The AccessKey ID that is used for authentication. 
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # The AccessKey secret that is used for authentication. 
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
     )
# The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console. 
topic_name = "${TOPIC}"
# The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
group_id = "${GROUP_ID}"
# The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console. 
# If the instance has a namespace, the instance ID must be specified. If the instance does not have a namespace, set the instanceID parameter to an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
instance_id = "${INSTANCE_ID}"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# In long polling mode, if no message in the topic is available for consumption, the request is suspended on the broker for the specified period of time. If a message becomes available for consumption within the specified period of time, a response is immediately sent to the consumer. In this example, the value is specified as 3 seconds. 
# The long polling period. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30. 
wait_seconds = 3
# The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16. 
batch = 3
print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
        % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
    try:
        # Consume messages in long polling mode. 
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s \
                              \nProperties: %s\n" % \
                  (msg.message_id, msg.message_body_md5,
                   msg.message_tag, msg.consumed_times,
                   msg.publish_time, msg.message_body,
                   msg.next_consume_time, msg.receipt_handle, msg.properties)))
    except MQExceptionBase as e:
        # No message in the topic is available for consumption. 
        if e.type == "MessageNotExist":
            print(("No new message! RequestId: %s" % e.req_id))
            continue

        print(("Consume Message Fail! Exception:%s\n" % e))
        time.sleep(2)
        continue

    # If the broker fails to receive an acknowledgement (ACK) for a message from the consumer before the period of time specified by the msg.next_consume_time parameter elapses, the broker delivers the message for consumption again. 
    # A unique timestamp is specified for the handle of a message each time the message is consumed. 
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
    except MQExceptionBase as e:
        print(("\nAk Message Fail! Exception:%s" % e))
        # If the handle of a message times out, the broker cannot receive an ACK for the message from the consumer. 
        if e.sub_errors:
            for sub_error in e.sub_errors:
                print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
                      (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))