通過消費組(ConsumerGroup)消費日誌資料有顯著優點,您無需關注Log Service的實現細節和消費者之間的負載平衡、Failover等,只需關注商務邏輯。本文通過程式碼範例介紹如何建立、修改、查詢、刪除消費組等。
前提條件
已開通Log Service。更多資訊,請參見開通Log Service。
已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者並完成授權。
已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
重要阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
已安裝Log ServicePython SDK。具體操作,請參見安裝Python SDK。
已建立Log ServiceProject、Logstore並完成日誌採集。具體操作,請參見建立專案Project、建立Logstore和資料擷取概述。
注意事項
本樣本以華東1(杭州)的公網Endpoint為例,其公網Endpoint為https://cn-hangzhou.log.aliyuncs.com
。如果您通過與Project同地區的其他阿里雲產品訪問Log Service,請使用內網Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com
。關於Log Service支援的地區與Endpoint的對應關係,請參見服務入口。
建立消費組範例程式碼
以下代碼用於建立名為ali-test-consumergroup的消費組。
from aliyun.log import LogClient
import os
# 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 建立Log ServiceClient。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名稱。
project_name = "ali-test-project"
# Logstore名稱。
logstore_name = "ali-test-logstore"
# 消費組名稱。
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to create consumergroup")
res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
print("create consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
預期結果如下:
ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup
修改消費組範例程式碼
以下代碼用於修改名為ali-test-consumergroup的消費組資訊。
from aliyun.log import LogClient
import os
# 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 建立Log ServiceClient。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名稱。
project_name = "ali-test-project"
# Logstore名稱。
logstore_name = "ali-test-logstore"
# 消費組名稱。
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to update consumergroup")
# 修改消費組逾時時間為350秒。
res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
print("update consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
預期結果如下:
ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
查詢所有消費組範例程式碼
以下代碼用於查詢指定Logstore的所有消費組。
from aliyun.log import LogClient
import os
# 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 建立Log ServiceClient。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名稱。
project_name = "ali-test-project"
# Logstore名稱。
logstore_name = "ali-test-logstore"
if __name__ == '__main__':
print("ready to list consumergroup")
# 查詢指定Logstore的所有消費組。
res = client.list_consumer_group(project_name, logstore_name)
for r in res.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
print("The consumergroup order is:%s" % r.is_in_order())
print("list consumergroup success ")
預期結果如下:
ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success
刪除消費組範例程式碼
以下代碼用於刪除目標Project下的消費組。
from aliyun.log import LogClient
import os
# 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 建立Log ServiceClient。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名稱。
project_name = "ali-test-project"
# Logstore名稱。
logstore_name = "ali-test-logstore"
# 消費組名稱。
consumergroup_name = "ali-test-consumergroup2"
if __name__ == '__main__':
print("ready to delete consumergroup")
# 刪除指定消費組。
res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
print("delete consumergroup success ")
預期結果如下:
ready to delete consumergroup
delete consumergroup success
擷取消費組CheckPoint範例程式碼
以下代碼用於擷取指定消費組的CheckPoint。
from aliyun.log import LogClient, ListConsumerGroupResponse
import os
# 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 建立Log ServiceClient。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名稱。
project_name = "ali-test-project"
# Logstore名稱。
logstore_name = "ali-test-logstore"
# 消費組名稱。
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to get CheckPoint")
# 擷取指定消費組中Shard的CheckPoint。
res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
print("list CheckPoint success in shard_0")
預期結果如下:
ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0
相關文檔
阿里雲OpenAPI開發人員門戶提供調試、SDK、樣本和配套文檔。通過OpenAPI,您無需手動封裝請求和簽名操作,就可以快速對Log ServiceAPI進行調試。更多資訊,請參見OpenAPI開發人員門戶。
為滿足越來越多的自動化Log Service配置需求,Log Service提供命令列工具CLI(Command Line Interface)。更多資訊,請參見Log Service命令列工具CLI。
關於消費組API介面說明,請參見如下:
更多範例程式碼,請參見Aliyun Log Python SDK on GitHub。