通过消费组(ConsumerGroup)消费日志数据有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需关注业务逻辑。本文通过代码示例介绍如何创建、修改、查询、删除消费组等。
前提条件
已开通日志服务。更多信息,请参见开通日志服务。
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
重要阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
已安装日志服务Python SDK。具体操作,请参见安装Python SDK。
已创建日志服务Project、Logstore并完成日志采集。具体操作,请参见创建项目Project、创建Logstore和数据采集概述。
注意事项
本示例以华东1(杭州)的公网Endpoint为例,其公网Endpoint为https://cn-hangzhou.log.aliyuncs.com
。如果您通过与Project同地域的其他阿里云产品访问日志服务,请使用内网Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com
。关于日志服务支持的地域与Endpoint的对应关系,请参见服务入口。
创建消费组示例代码
以下代码用于创建名为ali-test-consumergroup的消费组。
from aliyun.log import LogClient
import os
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名称。
project_name = "ali-test-project"
# Logstore名称。
logstore_name = "ali-test-logstore"
# 消费组名称。
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to create consumergroup")
res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
print("create consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
预期结果如下:
ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup
修改消费组示例代码
以下代码用于修改名为ali-test-consumergroup的消费组信息。
from aliyun.log import LogClient
import os
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名称。
project_name = "ali-test-project"
# Logstore名称。
logstore_name = "ali-test-logstore"
# 消费组名称。
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to update consumergroup")
# 修改消费组超时时间为350秒。
res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
print("update consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
预期结果如下:
ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
查询所有消费组示例代码
以下代码用于查询指定Logstore的所有消费组。
from aliyun.log import LogClient
import os
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名称。
project_name = "ali-test-project"
# Logstore名称。
logstore_name = "ali-test-logstore"
if __name__ == '__main__':
print("ready to list consumergroup")
# 查询指定Logstore的所有消费组。
res = client.list_consumer_group(project_name, logstore_name)
for r in res.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
print("The consumergroup order is:%s" % r.is_in_order())
print("list consumergroup success ")
预期结果如下:
ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success
删除消费组示例代码
以下代码用于删除目标Project下的消费组。
from aliyun.log import LogClient
import os
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名称。
project_name = "ali-test-project"
# Logstore名称。
logstore_name = "ali-test-logstore"
# 消费组名称。
consumergroup_name = "ali-test-consumergroup2"
if __name__ == '__main__':
print("ready to delete consumergroup")
# 删除指定消费组。
res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
print("delete consumergroup success ")
预期结果如下:
ready to delete consumergroup
delete consumergroup success
获取消费组CheckPoint示例代码
以下代码用于获取指定消费组的CheckPoint。
from aliyun.log import LogClient, ListConsumerGroupResponse
import os
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)
# Project名称。
project_name = "ali-test-project"
# Logstore名称。
logstore_name = "ali-test-logstore"
# 消费组名称。
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to get CheckPoint")
# 获取指定消费组中Shard的CheckPoint。
res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
print("list CheckPoint success in shard_0")
预期结果如下:
ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0
相关文档
阿里云OpenAPI开发者门户提供调试、SDK、示例和配套文档。通过OpenAPI,您无需手动封装请求和签名操作,就可以快速对日志服务API进行调试。更多信息,请参见OpenAPI开发者门户。
为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。
关于消费组API接口说明,请参见如下:
更多示例代码,请参见Aliyun Log Python SDK on GitHub。