If you use consumer groups to consume log data, you do not need to consider factors such as Simple Log Service implementation, load balancing among consumers, or failovers that may occur. You can focus on business logic during log data consumption. This topic describes how to create, modify, query, and delete a consumer group and provides sample code.
Prerequisites
A Resource Access Management (RAM) user is created, and the required permissions are granted to the RAM user. For more information, see Create a RAM user and grant permissions to the RAM user.
The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
ImportantThe AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.
We recommend that you do not save the AccessKey ID or AccessKey secret in your project code. Otherwise, the AccessKey pair may be leaked, and the security of all resources within your account may be compromised.
Simple Log Service SDK for Python is installed. For more information, see Install Simple Log Service SDK for Python.
Logs are written to a Logstore. For more information, see Data collection overview.
Usage notes
In this example, the public Simple Log Service endpoint for the China (Hangzhou) region is used, which is https://cn-hangzhou.log.aliyuncs.com
. If you want to access Simple Log Service by using other Alibaba Cloud services that reside in the same region as your project, you can use the internal Simple Log Service endpoint, which is https://cn-hangzhou-intranet.log.aliyuncs.com
. For more information about the supported regions and endpoints of Simple Log Service, see Endpoints.
Sample code that is used to create a consumer group
The following sample code provides an example on how to create a consumer group named ali-test-consumergroup:
from aliyun.log import LogClient
import os
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)
# The name of the project.
project_name = "ali-test-project"
# The name of the Logstore.
logstore_name = "ali-test-logstore"
# The name of the consumer group.
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to create consumergroup")
res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
print("create consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
Expected results:
ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup
Sample code that is used to modify a consumer group
The following sample code provides an example on how to modify the ali-test-consumergroup consumer group:
from aliyun.log import LogClient
import os
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)
# The name of the project.
project_name = "ali-test-project"
# The name of the Logstore.
logstore_name = "ali-test-logstore"
# The name of the consumer group.
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to update consumergroup")
# Change the timeout period of the consumer group to 350 seconds.
res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
print("update consumergroup success ")
res2 = client.list_consumer_group(project_name, logstore_name)
for r in res2.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
Expected results:
ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
Sample code that is used to query all consumer groups
The following sample code provides an example on how to query all consumer groups of a specified Logstore:
from aliyun.log import LogClient
import os
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)
# The name of the project.
project_name = "ali-test-project"
# The name of the Logstore.
logstore_name = "ali-test-logstore"
if __name__ == '__main__':
print("ready to list consumergroup")
# Query all consumer groups of the Logstore.
res = client.list_consumer_group(project_name, logstore_name)
for r in res.get_consumer_groups():
print("The consumergroup name is:" + r.get_consumer_group_name())
print("The consumergroup timeout is:%s" % r.get_timeout())
print("The consumergroup order is:%s" % r.is_in_order())
print("list consumergroup success ")
Expected results:
ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success
Sample code that is used to delete a consumer group
The following sample code provides an example on how to delete a consumer group in a specified project:
from aliyun.log import LogClient
import os
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)
# The name of the project.
project_name = "ali-test-project"
# The name of the Logstore.
logstore_name = "ali-test-logstore"
# The name of the consumer group.
consumergroup_name = "ali-test-consumergroup2"
if __name__ == '__main__':
print("ready to delete consumergroup")
# Delete the consumer group.
res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
print("delete consumergroup success ")
Expected results:
ready to delete consumergroup
delete consumergroup success
Sample code that is used to obtain a checkpoint of a consumer group
The following sample code provides an example on how to obtain a checkpoint of a specified consumer group:
from aliyun.log import LogClient, ListConsumerGroupResponse
import os
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)
# The name of the project.
project_name = "ali-test-project"
# The name of the Logstore.
logstore_name = "ali-test-logstore"
# The name of the consumer group.
consumergroup_name = "ali-test-consumergroup"
if __name__ == '__main__':
print("ready to get CheckPoint")
# Obtain a checkpoint of the consumer group for a shard.
res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
print("list CheckPoint success in shard_0")
Expected results:
ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0
References
- Alibaba Cloud OpenAPI Explorer provides debugging capabilities, SDKs, examples, and related documents. You can use OpenAPI Explorer to debug Log Service API operations without the need to manually encapsulate or sign requests. For more information, visit OpenAPI Portal.
- Log Service provides the command-line interface (CLI) to meet the requirements for automated configurations in Log Service. For more information, see Log Service CLI.
For more information about consumer group-related API operations, see the following topics:
- For more information about sample code, see Alibaba Cloud Log Service SDK for Python on GitHub.