If you use consumer groups to consume log data, you do not need to consider factors such as Simple Log Service implementation, load balancing among consumers, or failovers that may occur. You can focus on business logic during log data consumption. This topic describes how to create, modify, query, and delete a consumer group and provides sample code.
Prerequisites
A Resource Access Management (RAM) user is created, and the required permissions are granted to the RAM user. For more information, see Create a RAM user and grant permissions to the RAM user.
The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
Important
The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.
We recommend that you do not save the AccessKey ID or AccessKey secret in your project code. Otherwise, the AccessKey pair may be leaked, and the security of all resources within your account may be compromised.
Simple Log Service SDK for Python is installed. For more information, see Install Simple Log Service SDK for Python.
Logs are written to a Logstore. For more information, see Data collection overview.
Usage notes
In this example, the public Simple Log Service endpoint for the China (Hangzhou) region is used, which is https://cn-hangzhou.log.aliyuncs.com
. If you want to access Simple Log Service by using other Alibaba Cloud services that reside in the same region as your project, you can use the internal Simple Log Service endpoint, which is https://cn-hangzhou-intranet.log.aliyuncs.com
. For more information about the supported regions and endpoints of Simple Log Service, see Endpoints.
Sample code that is used to create a consumer group
The following sample code provides an example on how to create a consumer group named ali-test-consumergroup:
from aliyun.log import LogClient
import os
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, accessKeyId, accessKey)
project_name = "ali-test-project"
logstore_name = "ali-test-logstore"
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to create consumergroup")
res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
print("create consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
Expected results:
ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup
Sample code that is used to modify a consumer group
The following sample code provides an example on how to modify the ali-test-consumergroup consumer group:
from aliyun.log import LogClient
import os
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, accessKeyId, accessKey)
project_name = "ali-test-project"
logstore_name = "ali-test-logstore"
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to update consumergroup")
res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
print("update consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
Expected results:
ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
Sample code that is used to query all consumer groups
The following sample code provides an example on how to query all consumer groups of a specified Logstore:
from aliyun.log import LogClient
import os
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, accessKeyId, accessKey)
project_name = "ali-test-project"
logstore_name = "ali-test-logstore"
if __name__ == '__main__':
print("ready to list consumergroup")
res = client.list_consumer_group(project_name, logstore_name)
for r in res.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
print("The consumergroup order is:%s" % r.is_in_order())
print("list consumergroup success ")
Expected results:
ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success
Sample code that is used to delete a consumer group
The following sample code provides an example on how to delete a consumer group in a specified project:
from aliyun.log import LogClient
import os
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, accessKeyId, accessKey)
project_name = "ali-test-project"
logstore_name = "ali-test-logstore"
consumergroup_name = "ali-test-consumergroup2"
if __name__ == '__main__':
print("ready to delete consumergroup")
res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
print("delete consumergroup success ")
Expected results:
ready to delete consumergroup
delete consumergroup success
Sample code that is used to obtain a checkpoint of a consumer group
The following sample code provides an example on how to obtain a checkpoint of a specified consumer group:
from aliyun.log import LogClient, ListConsumerGroupResponse
import os
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, accessKeyId, accessKey)
project_name = "ali-test-project"
logstore_name = "ali-test-logstore"
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to get CheckPoint")
res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
print("list CheckPoint success in shard_0")
Expected results:
ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0