All Products
Search
Document Center

Simple Log Service:Use consumer groups to consume logs

Last Updated:Sep 18, 2024

If you use consumer groups to consume logs, you can focus on the business logic without the need to worry about specific implementation details, such as Simple Log Service operations and load balancing and failovers between consumers during log data consumption. This topic describes how to use Simple Log Service SDK for Python to create a consumer group and use the consumer group to consume logs, and provides sample code.

Prerequisites

  • A Resource Access Management (RAM) user is created, and the required permissions are granted to the RAM user. For more information, see Create a RAM user and grant permissions to the RAM user.

  • The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.

    Important
    • The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.

    • We recommend that you do not save the AccessKey ID or AccessKey secret in your project code. Otherwise, the AccessKey pair may be leaked, and the security of all resources within your account may be compromised.

  • Simple Log Service SDK for Python is installed. For more information, see Install Simple Log Service SDK for Python.

  • A project is created. For more information, see Sample code that is used to create a project.

Usage notes

In this example, the public Simple Log Service endpoint for the China (Hangzhou) region is used, which is https://cn-hangzhou.log.aliyuncs.com. If you want to access Simple Log Service by using other Alibaba Cloud services that reside in the same region as your project, you can use the internal Simple Log Service endpoint, which is https://cn-hangzhou-intranet.log.aliyuncs.com. For more information about the supported regions and endpoints of Simple Log Service, see Endpoints.

Sample code

The following sample code provides an example on how to create a Logstore, write logs to the Logstore, create a consumer group, and use the consumer group to consume logs:

import os
import time

from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock


class SampleConsumer(ConsumerProcessorBase):
    shard_id = -1
    last_check_time = 0
    log_results = []
    lock = RLock()

    def initialize(self, shard):
        self.shard_id = shard

    def process(self, log_groups, check_point_tracker):
        for log_group in log_groups.LogGroups:
            items = []
            for log in log_group.Logs:
                item = dict()
                item['time'] = log.Time
                for content in log.Contents:
                    item[content.Key] = content.Value
                items.append(item)
            log_items = dict()
            log_items['topic'] = log_group.Topic
            log_items['source'] = log_group.Source
            log_items['logs'] = items

            with SampleConsumer.lock:
                SampleConsumer.log_results.append(log_items)
                print(log_items)

        current_time = time.time()
        if current_time - self.last_check_time > 3:
            try:
                self.last_check_time = current_time
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
        else:
            try:
                check_point_tracker.save_check_point(False)
            except Exception:
                import traceback
                traceback.print_exc()

        # None means succesful process
        # If you need to roll back to the previous checkpoint, return check_point_tracker.get_check_point().
        return None

    def shutdown(self, check_point_tracker):
        try:
            check_point_tracker.save_check_point(True)
        except Exception:
            import traceback
            traceback.print_exc()


test_item_count = 20

# Write logs to the Logstore. 
def _prepare_data(client, project, logstore):
    topic = 'python-ide-test'
    source = ''

    for i in range(0, test_item_count):
        logitemList = []  # LogItem list

        contents = [
            ('user', 'magic_user_' + str(i)),
            ('avg', 'magic_age_' + str(i))
        ]
        logItem = LogItem()
        logItem.set_time(int(time.time()))
        logItem.set_contents(contents)

        logitemList.append(logItem)
        # Write logs to the Logstore. 
        request = PutLogsRequest(project, logstore, topic, source, logitemList)

        response = client.put_logs(request)
        print("successfully put logs in logstore")


def sleep_until(seconds, exit_condition=None, expect_error=False):
    if not exit_condition:
        time.sleep(seconds)
        return

    s = time.time()
    while time.time() - s < seconds:
        try:
            if exit_condition():
                break
        except Exception:
            if expect_error:
                continue
        time.sleep(1)


def sample_consumer_group():
    # The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
    endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')

    # Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
    accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
    accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')

    # The name of the project. In this example, the project is not created by using the SDK. Before you can run the code, you must create a project. 
    project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')

    # The name of the Logstore. In this example, the Logstore is automatically created by using the SDK. You do not need to create a Logstore in advance. 
    logstore = 'ali-test-logstore'

    # The name of the consumer group. When the SDK runs, a consumer group is automatically created. You do not need to create a consumer group in advance. 
    consumer_group = 'consumer-group-1'
    consumer_name1 = "consumer-group-1-A"
    consumer_name2 = "consumer-group-1-B"
    token = ""

    if not logstore:
        logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')

    assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
                                                                          "project cannot be empty")

    # Create the Logstore. 
    client = LogClient(endpoint, accessKeyId, accessKey, token)
    ret = client.create_logstore(project, logstore, 2, 4)
    print("successfully create logstore")

    time.sleep(60)
    SampleConsumer.log_results = []

    try:
        # Write logs to the Logstore. 
        _prepare_data(client, project, logstore)

        # Create two consumers in the consumer group to consume data. 
        option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)
        option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)

        print("*** start to consume data...")
        client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
        client_worker1.start()
        client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
        client_worker2.start()

        sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)

        print("*** consumer group status ***")
        ret = client.list_consumer_group(project, logstore)
        print("successfully list consumergroup")

        for c in ret.get_consumer_groups():
            ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
            print("successfully get checkpoint fixed")

        print("*** stopping workers")
        client_worker1.shutdown()
        client_worker2.shutdown()

    finally:
        # clean-up
        # ret = client.delete_logstore(project, logstore)
        ret = client.list_logstore(project, logstore)
        print("successfully list logstore")

    # validate
    ret = str(SampleConsumer.log_results)
    print("*** get content:")
    print(ret)

    assert 'magic_user_0' in ret and 'magic_age_0' in ret \
           and 'magic_user_' + str(test_item_count-1) in ret \
           and 'magic_age_' + str(test_item_count-1) in ret


if __name__ == '__main__':
    sample_consumer_group()

Expected results:

successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]

References

  • Alibaba Cloud OpenAPI Explorer provides debugging capabilities, SDKs, examples, and related documents. You can use OpenAPI Explorer to debug Log Service API operations without the need to manually encapsulate or sign requests. For more information, visit OpenAPI Portal.
  • Log Service provides the command-line interface (CLI) to meet the requirements for automated configurations in Log Service. For more information, see Log Service CLI.
  • For more information about sample code, see Alibaba Cloud Log Service SDK for Python on GitHub.