通常のメッセージは、ApsaraMQ for RocketMQによって提供される機能のないメッセージです。 通常のメッセージは、スケジュールメッセージ、遅延メッセージ、順序付けられたメッセージ、およびトランザクションメッセージを含む、特徴的なメッセージとは異なる。 このトピックでは、Python用HTTPクライアントSDKを使用して通常のメッセージを送受信する方法に関するサンプルコードを提供します。
の前提条件
開始する前に、次の操作が実行されていることを確認してください。
SDK for Pythonをインストールします。 詳細については、「環境の準備」をご参照ください。
ApsaraMQ for RocketMQコンソールのコードで指定するリソースを作成します。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアを取得します。 詳細については、「AccessKey の作成」をご参照ください。
通常のメッセージを送信する
次のサンプルコードは、Python用HTTPクライアントSDKを使用して通常のメッセージを送信する方法の例を示しています。
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
# Initialize the producer client.
mq_client = MQClient(
# The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
# Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
# The AccessKey ID that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# The AccessKey secret that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic_name = "${TOPIC}"
# The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
# If the instance has a namespace, the instance ID must be specified. If the instance does not have a namespace, set the instanceID parameter to an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# Cyclically send four messages.
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
msg = TopicMessage(
# The message content.
"I am test message %s.hello" % i,
# The message tag.
"tag %s" % i
)
# The custom attributes of the message.
msg.put_property("a", i)
# The message key.
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)
通常のメッセージを購読する
次のサンプルコードは、Python用のHTTPクライアントSDKを使用して通常のメッセージをサブスクライブする方法の例を示しています。
mq_http_sdk.mq_exception import MQExceptionBaseからの
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Initialize the consumer client.
mq_client = MQClient(
# The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
# Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
# The AccessKey ID that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# The AccessKey secret that is used for authentication.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic_name = "${TOPIC}"
# The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
group_id = "${GROUP_ID}"
# The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
# If the instance has a namespace, the instance ID must be specified. If the instance does not have a namespace, set the instanceID parameter to an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instance_id = "${INSTANCE_ID}"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# In long polling mode, if no message in the topic is available for consumption, the request is suspended on the broker for the specified period of time. If a message becomes available for consumption within the specified period of time, a response is immediately sent to the consumer. In this example, the value is specified as 3 seconds.
# The long polling period. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30.
wait_seconds = 3
# The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16.
batch = 3
print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
try:
# Consume messages in long polling mode.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s \
\nProperties: %s\n" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle, msg.properties)))
except MQExceptionBase as e:
# No message in the topic is available for consumption.
if e.type == "MessageNotExist":
print(("No new message! RequestId: %s" % e.req_id))
continue
print(("Consume Message Fail! Exception:%s\n" % e))
time.sleep(2)
continue
# If the broker fails to receive an acknowledgement (ACK) for a message from the consumer before the period of time specified by the msg.next_consume_time parameter elapses, the broker delivers the message for consumption again.
# A unique timestamp is specified for the handle of a message each time the message is consumed.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
except MQExceptionBase as e:
print(("\nAk Message Fail! Exception:%s" % e))
# If the handle of a message times out, the broker cannot receive an ACK for the message from the consumer.
if e.sub_errors:
for sub_error in e.sub_errors:
print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))