コンシューマーグループを使用してログを消費する場合、Simple Log Serviceの操作や、ログデータの消費中のコンシューマー間の負荷分散やフェイルオーバーなど、特定の実装の詳細について心配する必要なく、ビジネスロジックに集中できます。 このトピックでは、Simple Log Service SDK for Pythonを使用してコンシューマーグループを作成し、そのコンシューマーグループを使用してログを消費する方法と、サンプルコードについて説明します。
前提条件
RAM (Resource Access Management) ユーザーが作成され、必要な権限がRAMユーザーに付与されます。 詳細については、「RAMユーザーの作成とRAMユーザーへの権限付与」をご参照ください。
ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定されています。 詳細については、「環境変数の設定」をご参照ください。
重要Alibaba CloudアカウントのAccessKeyペアには、すべてのAPI操作に対する権限があります。 RAMユーザーのAccessKeyペアを使用して、API操作を呼び出したり、ルーチンのO&Mを実行したりすることを推奨します。
プロジェクトコードにAccessKey IDまたはAccessKey secretを保存しないことを推奨します。 そうしないと、AccessKeyペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。
Python用のSimple Log Service SDKがインストールされています。 詳細については、「Simple Log Service SDK For Pythonのインストール」をご参照ください。
プロジェクトが作成されます。 詳細については、「プロジェクトの作成に使用されるサンプルコード」をご参照ください。
使用上の注意
この例では、中国 (杭州) リージョンのパブリックSimple Log Serviceエンドポイントが使用されています。これは https://cn-hangzhou.log.aliyuncs.com
です。 プロジェクトと同じリージョンにある他のAlibaba Cloudサービスを使用してSimple Log Serviceにアクセスする場合は、内部のSimple Log Serviceエンドポイント ( https://cn-hangzhou-intranet.log.aliyuncs.com
) を使用できます。 Simple Log Serviceのサポートされているリージョンとエンドポイントの詳細については、「エンドポイント」をご参照ください。
サンプルコード
次のサンプルコードは、Logstoreの作成、Logstoreへのログの書き込み、コンシューマグループの作成、およびコンシューマグループを使用してログを消費する方法の例を示しています。
import os
import time
from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock
class SampleConsumer(ConsumerProcessorBase):
shard_id = -1
last_check_time = 0
log_results = []
lock = RLock()
def initialize(self, shard):
self.shard_id = shard
def process(self, log_groups, check_point_tracker):
for log_group in log_groups.LogGroups:
items = []
for log in log_group.Logs:
item = dict()
item['time'] = log.Time
for content in log.Contents:
item[content.Key] = content.Value
items.append(item)
log_items = dict()
log_items['topic'] = log_group.Topic
log_items['source'] = log_group.Source
log_items['logs'] = items
with SampleConsumer.lock:
SampleConsumer.log_results.append(log_items)
print(log_items)
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
check_point_tracker.save_check_point(False)
except Exception:
import traceback
traceback.print_exc()
# None means succesful process
# If you need to roll back to the previous checkpoint, return check_point_tracker.get_check_point().
return None
def shutdown(self, check_point_tracker):
try:
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
test_item_count = 20
# Write logs to the Logstore.
def _prepare_data(client, project, logstore):
topic = 'python-ide-test'
source = ''
for i in range(0, test_item_count):
logitemList = [] # LogItem list
contents = [
('user', 'magic_user_' + str(i)),
('avg', 'magic_age_' + str(i))
]
logItem = LogItem()
logItem.set_time(int(time.time()))
logItem.set_contents(contents)
logitemList.append(logItem)
# Write logs to the Logstore.
request = PutLogsRequest(project, logstore, topic, source, logitemList)
response = client.put_logs(request)
print("successfully put logs in logstore")
def sleep_until(seconds, exit_condition=None, expect_error=False):
if not exit_condition:
time.sleep(seconds)
return
s = time.time()
while time.time() - s < seconds:
try:
if exit_condition():
break
except Exception:
if expect_error:
continue
time.sleep(1)
def sample_consumer_group():
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The name of the project. In this example, the project is not created by using the SDK. Before you can run the code, you must create a project.
project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')
# The name of the Logstore. In this example, the Logstore is automatically created by using the SDK. You do not need to create a Logstore in advance.
logstore = 'ali-test-logstore'
# The name of the consumer group. When the SDK runs, a consumer group is automatically created. You do not need to create a consumer group in advance.
consumer_group = 'consumer-group-1'
consumer_name1 = "consumer-group-1-A"
consumer_name2 = "consumer-group-1-B"
token = ""
if not logstore:
logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')
assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
"project cannot be empty")
# Create the Logstore.
client = LogClient(endpoint, accessKeyId, accessKey, token)
ret = client.create_logstore(project, logstore, 2, 4)
print("successfully create logstore")
time.sleep(60)
SampleConsumer.log_results = []
try:
# Write logs to the Logstore.
_prepare_data(client, project, logstore)
# Create two consumers in the consumer group to consume data.
option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
print("*** start to consume data...")
client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
client_worker1.start()
client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
client_worker2.start()
sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)
print("*** consumer group status ***")
ret = client.list_consumer_group(project, logstore)
print("successfully list consumergroup")
for c in ret.get_consumer_groups():
ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
print("successfully get checkpoint fixed")
print("*** stopping workers")
client_worker1.shutdown()
client_worker2.shutdown()
finally:
# clean-up
# ret = client.delete_logstore(project, logstore)
ret = client.list_logstore(project, logstore)
print("successfully list logstore")
# validate
ret = str(SampleConsumer.log_results)
print("*** get content:")
print(ret)
assert 'magic_user_0' in ret and 'magic_age_0' in ret \
and 'magic_user_' + str(test_item_count-1) in ret \
and 'magic_age_' + str(test_item_count-1) in ret
if __name__ == '__main__':
sample_consumer_group()
期待される結果
successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]
、...
関連ドキュメント
Alibaba Cloud OpenAPI Explorerは、デバッグ機能、SDK、サンプル、および関連ドキュメントを提供します。 OpenAPI Explorerを使用して、リクエストを手動でカプセル化したり署名したりすることなく、Log Service API操作をデバッグできます。 詳細については、をご覧ください。 OpenAPIポータル。
Log Serviceは、Log Serviceの自動設定の要件を満たすコマンドラインインターフェイス (CLI) を提供します。 詳細については、「Log Service CLI」をご参照ください。
サンプルコードの詳細については、GitHubの「Alibaba Cloud Log Service SDK For Python」をご参照ください。