すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Simple Log Service SDK for Pythonを使用したコンシューマーグループの管理

最終更新日:Sep 05, 2024

コンシューマーグループを使用してログデータを消費する場合、Simple log Serviceの実装、コンシューマー間の負荷分散、発生する可能性のあるフェイルオーバーなどの要因を考慮する必要はありません。 ログデータの消費中にビジネスロジックに集中できます。 このトピックでは、コンシューマーグループを作成、変更、クエリ、および削除する方法について説明し、サンプルコードを提供します。

前提条件

  • RAM (Resource Access Management) ユーザーが作成され、必要な権限がRAMユーザーに付与されます。 詳細については、「RAMユーザーの作成とRAMユーザーへの権限付与」をご参照ください。

  • ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定されています。 詳細については、「環境変数の設定」をご参照ください。

    重要
    • Alibaba CloudアカウントのAccessKeyペアには、すべてのAPI操作に対する権限があります。 RAMユーザーのAccessKeyペアを使用して、API操作を呼び出したり、ルーチンのO&Mを実行したりすることを推奨します。

    • プロジェクトコードにAccessKey IDまたはAccessKey secretを保存しないことを推奨します。 そうしないと、AccessKeyペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。

  • Python用のSimple Log Service SDKがインストールされています。 詳細については、「Simple Log Service SDK For Pythonのインストール」をご参照ください。

  • ログはLogstoreに書き込まれます。 詳細については、「データ収集の概要」をご参照ください。

使用上の注意

この例では、中国 (杭州) リージョンのパブリックSimple Log Serviceエンドポイントが使用されています。これは https://cn-hangzhou.log.aliyuncs.com です。 プロジェクトと同じリージョンにある他のAlibaba Cloudサービスを使用してSimple Log Serviceにアクセスする場合は、内部のSimple Log Serviceエンドポイント ( https://cn-hangzhou-intranet.log.aliyuncs.com ) を使用できます。 Simple Log Serviceのサポートされているリージョンとエンドポイントの詳細については、「エンドポイント」をご参照ください。

コンシューマーグループの作成に使用されるサンプルコード

次のサンプルコードは、ali-test-consumergroupという名前のコンシューマグループを作成する方法の例を示しています。

from aliyun.log import LogClient
import os

# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client. 
client = LogClient(endpoint, accessKeyId, accessKey)

# The name of the project. 
project_name = "ali-test-project"

# The name of the Logstore. 
logstore_name = "ali-test-logstore"

# The name of the consumer group. 
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to create consumergroup")
    res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
    print("create consumergroup success ")

    res2 = client.list_consumer_group(project_name, logstore_name)
    for r in res2.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())

期待される結果

ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup

コンシューマーグループの変更に使用されるサンプルコード

次のサンプルコードは、ali-test-consumergroupコンシューマーグループを変更する方法の例を示しています。

from aliyun.log import LogClient
import os

# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client. 
client = LogClient(endpoint, accessKeyId, accessKey)

# The name of the project. 
project_name = "ali-test-project"

# The name of the Logstore. 
logstore_name = "ali-test-logstore"

# The name of the consumer group. 
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to update consumergroup")
    # Change the timeout period of the consumer group to 350 seconds. 
    res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
    print("update consumergroup success ")

    res2 = client.list_consumer_group(project_name, logstore_name)
    for r in res2.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())
        print("The consumergroup timeout is:%s" % r.get_timeout())

期待される結果

ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350

すべての消費者グループのクエリに使用されるサンプルコード

次のサンプルコードは、指定されたLogstoreのすべてのコンシューマーグループをクエリする方法の例を示しています。

from aliyun.log import LogClient
import os

# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client. 
client = LogClient(endpoint, accessKeyId, accessKey)

# The name of the project. 
project_name = "ali-test-project"

# The name of the Logstore. 
logstore_name = "ali-test-logstore"

if __name__ == '__main__':
    print("ready to list consumergroup")
    # Query all consumer groups of the Logstore. 
    res = client.list_consumer_group(project_name, logstore_name)
    for r in res.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())
        print("The consumergroup timeout is:%s" % r.get_timeout())
        print("The consumergroup order is:%s" % r.is_in_order())
    print("list consumergroup success ")

期待される結果

ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success

コンシューマーグループの削除に使用されるサンプルコード

次のサンプルコードは、指定したプロジェクト内のコンシューマーグループを削除する方法の例を示しています。

from aliyun.log import LogClient
import os

# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client. 
client = LogClient(endpoint, accessKeyId, accessKey)

# The name of the project. 
project_name = "ali-test-project"

# The name of the Logstore. 
logstore_name = "ali-test-logstore"

# The name of the consumer group. 
consumergroup_name = "ali-test-consumergroup2"

if __name__ == '__main__':
    print("ready to delete consumergroup")
    # Delete the consumer group. 
    res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
    print("delete consumergroup success ")

期待される結果

ready to delete consumergroup
delete consumergroup success

コンシューマーグループのチェックポイントを取得するために使用されるサンプルコード

次のサンプルコードは、指定したコンシューマーグループのチェックポイントを取得する方法の例を示しています。

from aliyun.log import LogClient, ListConsumerGroupResponse
import os

# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client. 
client = LogClient(endpoint, accessKeyId, accessKey)

# The name of the project. 
project_name = "ali-test-project"

# The name of the Logstore. 
logstore_name = "ali-test-logstore"

# The name of the consumer group. 
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to get CheckPoint")
    # Obtain a checkpoint of the consumer group for a shard. 
    res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
    print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
    print("list CheckPoint success in shard_0")

期待される結果

ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0

関連ドキュメント

  • Alibaba Cloud OpenAPI Explorerは、デバッグ機能、SDK、サンプル、および関連ドキュメントを提供します。 OpenAPI Explorerを使用して、リクエストを手動でカプセル化したり署名したりすることなく、Log Service API操作をデバッグできます。 詳細については、をご覧ください。 OpenAPIポータル

  • Log Serviceは、Log Serviceの自動設定の要件を満たすコマンドラインインターフェイス (CLI) を提供します。 詳細については、「Log Service CLI」をご参照ください。

  • コンシューマーグループ関連のAPI操作の詳細については、以下のトピックを参照してください。

  • サンプルコードの詳細については、GitHubの「Alibaba Cloud Log Service SDK For Python」をご参照ください。