すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:コンシューマグループを使用したログの消費

最終更新日:Sep 05, 2024

コンシューマーグループを使用してログを消費する場合、Simple Log Serviceの操作や、ログデータの消費中のコンシューマー間の負荷分散やフェイルオーバーなど、特定の実装の詳細について心配する必要なく、ビジネスロジックに集中できます。 このトピックでは、Simple Log Service SDK for Pythonを使用してコンシューマーグループを作成し、そのコンシューマーグループを使用してログを消費する方法と、サンプルコードについて説明します。

前提条件

  • RAM (Resource Access Management) ユーザーが作成され、必要な権限がRAMユーザーに付与されます。 詳細については、「RAMユーザーの作成とRAMユーザーへの権限付与」をご参照ください。

  • ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定されています。 詳細については、「環境変数の設定」をご参照ください。

    重要
    • Alibaba CloudアカウントのAccessKeyペアには、すべてのAPI操作に対する権限があります。 RAMユーザーのAccessKeyペアを使用して、API操作を呼び出したり、ルーチンのO&Mを実行したりすることを推奨します。

    • プロジェクトコードにAccessKey IDまたはAccessKey secretを保存しないことを推奨します。 そうしないと、AccessKeyペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。

  • Python用のSimple Log Service SDKがインストールされています。 詳細については、「Simple Log Service SDK For Pythonのインストール」をご参照ください。

  • プロジェクトが作成されます。 詳細については、「プロジェクトの作成に使用されるサンプルコード」をご参照ください。

使用上の注意

この例では、中国 (杭州) リージョンのパブリックSimple Log Serviceエンドポイントが使用されています。これは https://cn-hangzhou.log.aliyuncs.com です。 プロジェクトと同じリージョンにある他のAlibaba Cloudサービスを使用してSimple Log Serviceにアクセスする場合は、内部のSimple Log Serviceエンドポイント ( https://cn-hangzhou-intranet.log.aliyuncs.com ) を使用できます。 Simple Log Serviceのサポートされているリージョンとエンドポイントの詳細については、「エンドポイント」をご参照ください。

サンプルコード

次のサンプルコードは、Logstoreの作成、Logstoreへのログの書き込み、コンシューマグループの作成、およびコンシューマグループを使用してログを消費する方法の例を示しています。

import os
import time

from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock


class SampleConsumer(ConsumerProcessorBase):
    shard_id = -1
    last_check_time = 0
    log_results = []
    lock = RLock()

    def initialize(self, shard):
        self.shard_id = shard

    def process(self, log_groups, check_point_tracker):
        for log_group in log_groups.LogGroups:
            items = []
            for log in log_group.Logs:
                item = dict()
                item['time'] = log.Time
                for content in log.Contents:
                    item[content.Key] = content.Value
                items.append(item)
            log_items = dict()
            log_items['topic'] = log_group.Topic
            log_items['source'] = log_group.Source
            log_items['logs'] = items

            with SampleConsumer.lock:
                SampleConsumer.log_results.append(log_items)
                print(log_items)

        current_time = time.time()
        if current_time - self.last_check_time > 3:
            try:
                self.last_check_time = current_time
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
        else:
            try:
                check_point_tracker.save_check_point(False)
            except Exception:
                import traceback
                traceback.print_exc()

        # None means succesful process
        # If you need to roll back to the previous checkpoint, return check_point_tracker.get_check_point().
        return None

    def shutdown(self, check_point_tracker):
        try:
            check_point_tracker.save_check_point(True)
        except Exception:
            import traceback
            traceback.print_exc()


test_item_count = 20

# Write logs to the Logstore. 
def _prepare_data(client, project, logstore):
    topic = 'python-ide-test'
    source = ''

    for i in range(0, test_item_count):
        logitemList = []  # LogItem list

        contents = [
            ('user', 'magic_user_' + str(i)),
            ('avg', 'magic_age_' + str(i))
        ]
        logItem = LogItem()
        logItem.set_time(int(time.time()))
        logItem.set_contents(contents)

        logitemList.append(logItem)
        # Write logs to the Logstore. 
        request = PutLogsRequest(project, logstore, topic, source, logitemList)

        response = client.put_logs(request)
        print("successfully put logs in logstore")


def sleep_until(seconds, exit_condition=None, expect_error=False):
    if not exit_condition:
        time.sleep(seconds)
        return

    s = time.time()
    while time.time() - s < seconds:
        try:
            if exit_condition():
                break
        except Exception:
            if expect_error:
                continue
        time.sleep(1)


def sample_consumer_group():
    # The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
    endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')

    # Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
    accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
    accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')

    # The name of the project. In this example, the project is not created by using the SDK. Before you can run the code, you must create a project. 
    project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')

    # The name of the Logstore. In this example, the Logstore is automatically created by using the SDK. You do not need to create a Logstore in advance. 
    logstore = 'ali-test-logstore'

    # The name of the consumer group. When the SDK runs, a consumer group is automatically created. You do not need to create a consumer group in advance. 
    consumer_group = 'consumer-group-1'
    consumer_name1 = "consumer-group-1-A"
    consumer_name2 = "consumer-group-1-B"
    token = ""

    if not logstore:
        logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')

    assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
                                                                          "project cannot be empty")

    # Create the Logstore. 
    client = LogClient(endpoint, accessKeyId, accessKey, token)
    ret = client.create_logstore(project, logstore, 2, 4)
    print("successfully create logstore")

    time.sleep(60)
    SampleConsumer.log_results = []

    try:
        # Write logs to the Logstore. 
        _prepare_data(client, project, logstore)

        # Create two consumers in the consumer group to consume data. 
        option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)
        option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)

        print("*** start to consume data...")
        client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
        client_worker1.start()
        client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
        client_worker2.start()

        sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)

        print("*** consumer group status ***")
        ret = client.list_consumer_group(project, logstore)
        print("successfully list consumergroup")

        for c in ret.get_consumer_groups():
            ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
            print("successfully get checkpoint fixed")

        print("*** stopping workers")
        client_worker1.shutdown()
        client_worker2.shutdown()

    finally:
        # clean-up
        # ret = client.delete_logstore(project, logstore)
        ret = client.list_logstore(project, logstore)
        print("successfully list logstore")

    # validate
    ret = str(SampleConsumer.log_results)
    print("*** get content:")
    print(ret)

    assert 'magic_user_0' in ret and 'magic_age_0' in ret \
           and 'magic_user_' + str(test_item_count-1) in ret \
           and 'magic_age_' + str(test_item_count-1) in ret


if __name__ == '__main__':
    sample_consumer_group()

期待される結果

successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]

、...

関連ドキュメント

  • Alibaba Cloud OpenAPI Explorerは、デバッグ機能、SDK、サンプル、および関連ドキュメントを提供します。 OpenAPI Explorerを使用して、リクエストを手動でカプセル化したり署名したりすることなく、Log Service API操作をデバッグできます。 詳細については、をご覧ください。 OpenAPIポータル

  • Log Serviceは、Log Serviceの自動設定の要件を満たすコマンドラインインターフェイス (CLI) を提供します。 詳細については、「Log Service CLI」をご参照ください。

  • サンプルコードの詳細については、GitHubの「Alibaba Cloud Log Service SDK For Python」をご参照ください。