Simple Log Serviceトリガーを作成して、Simple Log ServiceをFunction Computeに接続できます。 Simple Log Serviceトリガーは、ビジネス要件に基づいてLogstore内の増分ログを処理する関数を自動的にトリガーします。
シナリオ
データのクレンジングと変換
Simple Log Serviceを使用すると、ログをすばやく収集、処理、クエリ、分析できます。
データ配信
Simple Log Serviceを使用すると、データを宛先に送信し、クラウド上のビッグデータサービス間にデータパイプラインを構築できます。
Functions for data processing
カテゴリ
テンプレート関数
詳細については、GitHubのaliyun-log-fc-functionsページをご覧ください。
カスタム関数
関数形式は、関数の実装に関連しています。 詳細については、「カスタム関数の作成」をご参照ください。
トリガーメカニズム
抽出、変換、および読み込み (ETL) ジョブは、Simple Log Serviceトリガーに対応し、関数を呼び出すために使用されます。 Simple Log ServiceでLogstoreのETLジョブを作成すると、ジョブ設定に基づいてLogstoreのシャードからデータをポーリングするためのタイマーが開始されます。 データがLogstoreに書き込まれると、<shard_id,begin_cursor,end_cursor >
形式のトリプルデータレコードが関数イベントとして生成されます。 次に、関連するETL関数が呼び出される。 Simple Log Serviceは、関数イベントをFunction Computeにプッシュします。
Logstoreにデータが書き込まれず、ストレージシステムが更新された場合、カーソル情報が変更されます。 ETL関数はシャードごとに呼び出されますが、データは変換されません。 この場合、カーソル情報を使用してシャードからデータを取得できます。 データが得られない場合、ETL関数が呼び出されるが、データは変換されない。 関数の呼び出しは無視できます。 詳細については、「カスタム関数の作成」をご参照ください。
ETLジョブは、時間に基づいて関数を呼び出す。 たとえば、ETLジョブは60秒ごとに関数をトリガーし、データは常にLogstoreのShard0に書き込まれます。 この場合、シャードは60秒ごとに関数をトリガーします。 データがシャードに書き込まれなくなった場合、関数はトリガーできません。 関数を実行するための入力は、最新の60秒間のカーソルの開始オフセットと終了オフセットです。 過去60秒のカーソル範囲に基づいてデータを処理できます。
制限事項
Simple Log ServiceプロジェクトのLogstoreの数の最大5倍をプロジェクトに関連付けることができます。
Logstoreごとに5つ以下のSimple Log Serviceトリガーを設定することを推奨します。 そうしないと、データがFunction Computeに効率的に出荷されない可能性があります。
サンプルシナリオ
Simple Log Serviceトリガーを設定して、更新されたデータを定期的に取得し、関数を呼び出すことができます。 Simple Log Serviceトリガーは、Logstoreからデータを段階的に消費するシナリオに適しています。 関数を呼び出して、データクレンジングタスクやデータ処理タスクなどのカスタム処理タスクを実行し、サードパーティのサービスにデータを送信できます。 この例では、ログデータの取得と表示方法のみを示します。
データの処理に使用される関数は、Simple Log Serviceによって提供されるテンプレート関数またはカスタム関数です。
前提条件
Function Compute
関数が作成されます。 詳細については、「関数の管理」トピックの関数の作成セクションをご参照ください。
シンプルなLog Service
Simple Log Serviceプロジェクトと2つのログストアが作成されます。 詳細については、「リソース管理の概要」をご参照ください。
一方のLogstoreはデータソースからのログを処理するために使用され、もう一方のLogstoreはFunction Computeによって生成されたログを保存するために使用されます。
説明ログプロジェクトは、Function Computeサービスと同じリージョンに存在する必要があります。
手順1: シンプルなLog Serviceトリガーの作成
Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[関数] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。 [関数] ページで、管理する関数をクリックします。
機能の詳細ページで、[設定] タブをクリックします。 左側のナビゲーションウィンドウで、[トリガー] をクリックします。 次に、[トリガーの作成] をクリックします。
トリガーの作成パネルで、パラメーターを設定し、OK.
パラメーター
説明
例
トリガータイプ
トリガーのタイプ。 [Log Service] を選択します。
Log Service
名前
トリガーの名前。
log_trigger
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
Log Serviceプロジェクト
既存のSimple Log Serviceプロジェクトの名前。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457 ****
ログストア
既存のLogstoreの名前。 この例で作成されたトリガーは、Logstore内のデータをサブスクライブし、カスタム処理のためにデータを定期的にFunction Computeに送信します。
関数ログ
トリガー間隔
Simple Log Serviceが関数を呼び出す間隔。
有効値: 3 ~ 600 単位は秒です。 デフォルト値: 0。
60
再試行
各呼び出しで許可される再試行の最大数。
有効値: 0~100。 デフォルト値: 3。
説明関数がトリガーされると、status=200が返され、レスポンスヘッダーの
X-Fc-Error-Type
パラメーターの値はUnhandledInvocationError
またはHandledInvocationError
ではありません。 それ以外の場合は、関数のトリガーに失敗します。X-Fc-Error-Type
の詳細については、「レスポンスパラメーター」をご参照ください。関数のトリガーに失敗した場合、システムは関数が呼び出されるまで関数の呼び出しを再試行します。 再試行の回数は、このパラメーターの値に従います。 このパラメーターの値に達しても関数が失敗した場合、システムは間隔を広げて指数バックオフモードでリクエストを再試行します。
3
トリガーログ
Simple Log Serviceが関数を呼び出したときに生成されるログを保存するログストア。
function-log2
呼び出しパラメーター
呼び出しパラメーター。 このエディターでカスタムパラメーターを設定できます。 カスタムパラメーターは、関数を呼び出すために使用されるイベントのparameterパラメーターの値として関数に渡されます。 Invocation Parametersパラメーターの値は、JSON形式の文字列である必要があります。
デフォルトでは、このパラメーターは空です。
非該当
ロール名
[AliyunLogETLRole] を選択します。
説明上記のパラメーターを設定したら、[OK] をクリックします。 このタイプのトリガーを初めて作成する場合は、表示されるダイアログボックスで [今すぐ許可] をクリックします。
AliyunLogETLRole
ステップ2: 関数の入力パラメータを設定する
関数の詳細ページの [コード] タブで、[テスト関数] の隣のアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。
[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。
イベントは、function Computeの関数を呼び出すために使用されます。 イベントのパラメーターは、関数の入力パラメーターとして使用されます。 次のサンプルコードは、イベントコンテンツの形式の例を示しています。
{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }
パラメーター
説明
例
パラメーター
トリガーの作成時に設定する [呼び出しパラメーター] パラメーターの値。
非該当
ソース
関数がSimple log Serviceから読み取るログブロック情報。
endpoint: Simple Log Serviceプロジェクトが存在するAlibaba Cloudリージョンのエンドポイント。
projectName: Simple Log Serviceプロジェクトの名前。
logstoreName: Logstoreの名前。
shardId: Logstore内の特定のシャードのID。
begin Cursor: データ消費が開始されるオフセット。
endCursor: データ消費が終了するオフセット。
{ "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }
jobName
Simple Log ServiceのETLジョブの名前。 Simple Log Serviceトリガーは、Simple Log ServiceのETLジョブに対応する必要があります。
1f7043ced683de1a4e3d8d70b5a412843d81 ****
taskId
ETLジョブの場合、taskIdパラメーターは関数呼び出しの識別子を指定します。
c2691505-38da-4d1b-998a-f1d4bb8c ****
cursorTime
Simple log Serviceに最後のログが到着した時刻のUNIXタイムスタンプ。
1529486425
ステップ3: 関数コードの書き込みとテスト
Simple Log Serviceトリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。 この関数は、Simple Log Serviceが増分ログを収集するときに呼び出されます。 Function Computeは対応するログを取得し、収集したログを表示します。
関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイ をクリックします。
この例では、関数コードはPythonで記述されています。 次のサンプルコードは、ほとんどの論理ログを抽出する方法の例を示しています。 コード内の
context
とcreds
から、AccessKey ID
とAccessKey secret
を取得できます。""" The sample code is used to implement the following features: * Parse Simple Log Service events from the event parameter. * Initialize the Simple Log Service client based on the preceding information. * Obtain real-time log data from the source Logstore. This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # Query the key information from context.credentials. # Access keys can be fetched through context.credentials print("The content in context entity is: \n") print(context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # Parse the event parameter to the OBJECT data type. # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: \n") print(event_obj) # Query the following information from event.source: log project name, Logstore name, the endpoint to access the Simple Log Service project, start cursor, end cursor, and shard ID. # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # Initialize the Simple Log Service client. # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # Read logs based on the start and end cursors in the source Logstore. In this example, the specified cursors include all logs of the function invocation. # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'
クリックテスト機能.
関数の実行後、[コード] タブで結果を表示できます。
よくある質問
Simple Log Serviceのすべてのシャードは、新しいデータが書き込まれると機能をトリガーします。 したがって、トリガー頻度には、Logstoreが全体としてトリガーされた回数が含まれます。 また、トリガが遅延すると、データのキャッチアップが発生し、トリガのトリガ間隔が短くなる可能性がある。 詳細については、「」をご参照ください。Simple Log Serviceトリガーの実行頻度が予想よりも高いのはなぜですか。