Simple Log Serviceトリガーを作成して、Simple Log ServiceをFunction Computeに接続できます。 Simple Log Serviceトリガーは、ビジネス要件に基づいてLogstore内の増分ログを処理する関数を自動的にトリガーします。
シナリオ
データのクレンジングと処理
Simple Log Serviceを使用すると、ログをすばやく収集、処理、クエリ、分析できます。
データ配信
Simple Log Serviceを使用すると、特定の宛先にデータを送信できます。 このシナリオでは、Simple Log Serviceはクラウド内のビッグデータサービスのデータチャネルとして機能します。
Functions for data processing
関数タイプ
テンプレート関数
詳細については、GitHubのaliyun-log-fc-functionsページをご覧ください。
カスタム関数
関数形式は、関数の実装に関連しています。 詳細については、「カスタム関数の作成」をご参照ください。
トリガーメカニズム
抽出、変換、および読み込み (ETL) ジョブは、Simple Log Serviceトリガーに対応し、関数を呼び出すために使用されます。 Simple Log ServiceでLogstoreのETLジョブを作成すると、ジョブ設定に基づいてLogstoreのシャードからデータをポーリングするためのタイマーが開始されます。 データがLogstoreに書き込まれると、<shard_id,begin_cursor,end_cursor >
形式のトリプルデータレコードが関数イベントとして生成されます。 次に、関連するETL関数が呼び出される。 Simple Log Serviceは、関数イベントをfunction Computeにプッシュします。
Logstoreにデータが書き込まれず、ストレージシステムが更新された場合、カーソル情報が変更される可能性があります。 その結果、各シャードに対してETL関数が呼び出されますが、データは変換されません。 この場合、カーソル情報を使用してシャードからデータを取得できます。 データが得られない場合、ETL関数が呼び出されるが、データは変換されない。 関数の呼び出しは無視できます。 詳細については、「カスタム関数の作成」をご参照ください。
ETLジョブは、時間に基づいて関数を呼び出す。 たとえば、ETLジョブは60秒ごとに関数をトリガーし、データは常にLogstoreのShard0に書き込まれます。 この場合、シャードは60秒ごとに関数をトリガーします。 データがシャードに書き込まれなくなった場合、関数はトリガーできません。 関数を実行するための入力は、最新の60秒間のカーソルの開始オフセットと終了オフセットです。 過去60秒のカーソル範囲に基づいてデータを処理できます。
制限事項
Simple Log ServiceプロジェクトのLogstoreの数の最大5倍をプロジェクトに関連付けることができます。
Logstoreごとに5つ以下のSimple Log Serviceトリガーを設定することを推奨します。 そうしないと、データがFunction Computeに効率的に出荷されない可能性があります。
サンプルシナリオ
Simple Log Serviceトリガーを設定して、更新されたデータを定期的に取得し、関数を呼び出すことができます。 Simple Log Serviceトリガーは、Logstoreからデータを段階的に消費するシナリオに適しています。 関数を呼び出して、データクレンジングタスクやデータ処理タスクなどのカスタム処理タスクを実行し、サードパーティのサービスにデータを送信できます。 この例では、ログデータの取得と表示方法のみを示します。
データの処理に使用される関数は、Simple Log Serviceによって提供されるテンプレート関数またはカスタム関数です。
前提条件
Function Compute
サービスが作成されていること。 詳しくは、「サービスの作成」をご参照ください。
説明サービスを作成するときに、サービスロールを構成して、サービス内の関数がロールの権限を取得できるようにします。 サービスロールを構成しない場合、関数コードをテストするときにエラーが報告されます。 この例では、サービスロールは
AliyunFCDefaultRole
で、AliyunLogReadOnlyAccess
ポリシーがロールにアタッチされています。 サービスロールの詳細については、「他のAlibaba Cloudサービスへのアクセス権限の付与」をご参照ください。
関数が作成されます。 詳細については、「関数の作成」をご参照ください。
シンプルなLog Service
Simple Log Serviceプロジェクトと2つのログストアが作成されます。 詳細については、「リソース管理の概要」をご参照ください。
一方のLogstoreはログとデータソースの処理に使用され、もう一方のLogstoreはFunction Computeによって生成されたログの保存に使用されます。
説明ログプロジェクトは、Function Computeサービスと同じリージョンに存在する必要があります。
手順1: シンプルなLog Serviceトリガーの作成
Function Computeコンソールにログインします。
左側のナビゲーションウィンドウで、サービス&機能をクリックします。
上部のナビゲーションバーで、リージョンを選択します。
[サービス] ページで目的のサービスを見つけ、[操作] 列の [関数] をクリックします。
[関数] ページで、管理する関数をクリックします。
関数の詳細ページで、[トリガー] タブをクリックし、[バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択し、[トリガーの作成] をクリックします。
[トリガーの作成] パネルでパラメーターを設定し、[OK] をクリックします。
パラメーター
説明
例
トリガータイプ
[Log Service] を選択します。
Log Service
名前
トリガー名を入力します。
log_trigger
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
Log Serviceプロジェクト
既存のSimple Log Serviceプロジェクトの名前。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457 ****
ログストア
既存のLogstoreの名前。 この例で作成されたトリガーは、Logstore内のデータをサブスクライブし、カスタム処理のためにデータを定期的にFunction Computeに送信します。
関数ログ
トリガー間隔
Simple Log Serviceが関数を呼び出す間隔。
有効値: 3 ~ 600 単位は秒です。 デフォルト値: 0。
60
再試行
Simple Log Serviceが関数呼び出しをトリガーするときにエラーが発生した場合に、1回のトリガーで許可される再試行の最大数。
有効値: 0~100。 デフォルト値: 3。
説明関数がトリガーされると、status=200が返され、レスポンスヘッダーの
X-Fc-Error-Type
パラメーターの値はUnhandledInvocationError
またはHandledInvocationError
ではありません。 それ以外の場合は、関数のトリガーに失敗します。X-Fc-Error-Type
の詳細については、「レスポンスパラメーター」をご参照ください。関数のトリガーに失敗した場合、システムは関数が呼び出されるまで関数の呼び出しを再試行します。 再試行の回数は、このパラメーターの値に従います。 このパラメーターの値に達しても関数が失敗した場合、システムは間隔を広げて指数バックオフモードでリクエストを再試行します。
3
トリガーログ
Simple Log Serviceが関数を呼び出したときに生成されるログを保存するログストア。
function-log2
呼び出しパラメーター
このエディターでカスタムパラメーターを設定します。 このパラメーターは、イベントパラメーターのparameterフィールドとして関数に渡されます。 このパラメーターの値は、JSON形式の文字列である必要があります。
デフォルトでは、このパラメーターは空です。
なし
ロール名
[AliyunLogETLRole] を選択します。
説明上記のパラメーターを設定したら、[OK] をクリックします。 このタイプのトリガーを初めて作成するときは、表示されるダイアログボックスで [今すぐ許可] をクリックします。
AliyunLogETLRole
トリガーが作成されると、[トリガー] タブに表示されます。 既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ2: 関数の入力パラメータを設定する
関数の詳細ページで、コードタブをクリックし、の右にアイコンテスト機能を選択し、ドロップダウンリストからテストパラメーターの設定 を選択します。
テストパラメーターの設定パネルで、新しいテストイベントの作成または既存のテストイベントの変更タブで、イベント名パラメーターを指定し、OKをクリックします。
Function Computeでは、eventは入力パラメーターです。 次のコードは、eventパラメーターの形式を示しています。
{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }
パラメーター
説明
例
パラメーター
トリガーの作成時に設定する [呼び出しパラメーター] パラメーターの値。
なし
ソース
関数がSimple log Serviceから読み取るログブロック情報。
endpoint: Simple Log Serviceプロジェクトが存在するAlibaba Cloudリージョンのエンドポイント。
projectName: Simple Log Serviceプロジェクトの名前。
logstoreName: Logstoreの名前。
shardId: Logstore内の特定のシャードのID。
begin Cursor: データ消費が開始されるオフセット。
endCursor: データ消費が終了するオフセット。
{ "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }
jobName
Simple Log ServiceのETLジョブの名前。 Simple Log Serviceトリガーは、Simple Log ServiceのETLジョブに対応する必要があります。
1f7043ced683de1a4e3d8d70b5a412843d81 ****
taskId
ETLジョブの場合、taskIdは決定論的関数呼び出しの識別子です。
c2691505-38da-4d1b-998a-f1d4bb8c ****
cursorTime
Simple log Serviceに最後のログが到着した時刻のUNIXタイムスタンプ。
1529486425
ステップ3: 関数コードの書き込みとテスト
Simple Log Serviceトリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。 この関数は、Simple Log Serviceが増分ログを収集するときに呼び出されます。 Function Computeは対応するログを取得し、収集したログを表示します。
関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイをクリックします。
このセクションでは、Pythonの関数コードを例として使用します。 サンプルコードでは、
access_key_id
、access_key_secret
、およびsecurity_token
パラメーターの値がcontext.credentials
から照会されます。""" The sample code is used to implement the following features: * Parse Simple Log Service events from the event parameter. * Initialize the Simple Log Service client based on the preceding information. * Obtain real-time log data from the source Logstore. This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # Query the key information from context.credentials. # Access keys can be fetched through context.credentials print("The content in context entity is: \n") print(context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # Parse the event parameter to the OBJECT data type. # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: \n") print(event_obj) # Query the following information from event.source: log project name, Logstore name, the endpoint to access the Simple Log Service project, start cursor, end cursor, and shard ID. # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # Initialize the Simple Log Service client. # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # Read logs based on the start and end cursors in the source Logstore. In this example, the specified cursors include all logs of the function invocation. # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'
コードタブでテスト機能をクリックします。
関数の実行後、[コード] タブで結果を表示できます。
トラブルシューティング
新しいログが生成されても、Simple Log Serviceトリガーが関数の実行をトリガーしない場合は、「トリガーが関数の実行をトリガーできない場合はどうすればよいですか? 」をご参照ください。
Simple Log Serviceの各シャードは、新しいデータが書き込まれると機能をトリガーします。 したがって、トリガー頻度には、Logstore内のすべてのシャードが関数をトリガーする回数が含まれます。 また、トリガが遅延すると、データのキャッチアップが発生し、トリガのトリガ間隔が短くなる可能性がある。 詳細については、「Simple Log Serviceトリガーの実行頻度が予想よりも高いのはなぜですか。 」をご参照ください。