すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Function Computeを使用してログデータを消費する

最終更新日:Aug 30, 2024

Simple Log Serviceでは、Function Computeを使用してストリーミングデータを変換できます。 Function Computeトリガーを設定して、データの更新を検出し、関数を呼び出すことができます。 これにより、Logstoreの増分データを使用してデータを変換できます。 テンプレート関数またはユーザー定義関数を使用して、データを変換できます。

制限事項

1つのログプロジェクトに関連付けられたSimple Log Serviceトリガーの数は、そのプロジェクトの既存のLogstoreの数の5倍を超えてはなりません。

説明

各Logstoreに設定されたSimple Log Serviceトリガーの数は5を超えないことをお勧めします。この制限を超えると、Function Computeへのデータ配信の効率に影響を与える可能性があるためです。

前提条件

シナリオ

  • データのクレンジングと変換

    Simple Log Serviceを使用すると、効率的にログを収集、変換、クエリ、および分析できます。

  • データ配信

    Simple Log Serviceを使用して、指定した宛先にデータを送信できます。 このシナリオでは、Simple Log Serviceはクラウド内のビッグデータサービスのデータチャネルとして機能します。

データ変換関数

  • 関数タイプ

    • テンプレート関数

      詳細については、「aliyun-log-fc-functions」をご参照ください。

    • ユーザー定義関数

      関数形式は、関数の実装に関連しています。 詳細については、「カスタム関数の作成」をご参照ください。

  • 関数呼び出しメカニズム

    Function Computeトリガーは、関数を呼び出すために使用されます。 Simple Log ServiceでLogstoreのFunction Computeトリガーを作成すると、トリガー設定に基づいてLogstoreのシャードを監視するタイマーが起動されます。 新しいデータがLogstoreに書き込まれると、<shard_id,begin_cursor,end_cursor > 形式の3組のデータレコードが関数イベントとして生成されます。 次に、関数が呼び出される。

    説明

    新しいデータがLogstoreに書き込まれず、ストレージシステムが更新された場合、カーソル情報が変更される可能性があります。 その結果、各シャードに対して関数が呼び出されますが、データは変換されません。 この場合、カーソル情報を使用してシャードからデータを取得できます。 データが取得されない場合、関数が呼び出されますが、データは変換されません。 関数呼び出しは無視できます。 詳細については、「カスタム関数の作成」をご参照ください。

    Function Computeトリガーは、時間に基づいて関数を呼び出します。 たとえば、LogstoreのFunction Computeトリガーの呼び出し間隔は60秒に設定されています。 Shard 0にデータが連続して書き込まれている場合、60秒間隔で関数が呼び出され、前の60秒のカーソル範囲に基づいてデータが変換されます。

手順1: シンプルなLog Serviceトリガーの作成

  1. Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[関数] をクリックします。

  2. 上部のナビゲーションバーで、リージョンを選択します。 [関数] ページで、管理する関数をクリックします。

  3. 機能の詳細ページで、[設定] タブをクリックします。 左側のナビゲーションウィンドウで、[トリガー] をクリックします。 次に、[トリガーの作成] をクリックします。

  4. トリガーの作成パネルで、パラメーターを設定し、OK.

    パラメーター

    説明

    トリガータイプ

    トリガーのタイプ。 [Log Service] を選択します。

    Log Service

    名前

    トリガーの名前。

    log_trigger

    バージョンまたはエイリアス

    トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。

    LATEST

    Log Serviceプロジェクト

    既存のSimple Log Serviceプロジェクトの名前。

    aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457 ****

    ログストア

    既存のLogstoreの名前。 この例で作成されたトリガーは、Logstore内のデータをサブスクライブし、カスタム処理のためにデータを定期的にFunction Computeに送信します。

    関数ログ

    トリガー間隔

    Simple Log Serviceが関数を呼び出す間隔。

    有効値: 3 ~ 600 単位は秒です。 デフォルト値: 0。

    60

    再試行

    各呼び出しで許可される再試行の最大数。

    有効値: 0~100。 デフォルト値: 3。

    説明
    • 関数がトリガーされると、status=200が返され、レスポンスヘッダーのX-Fc-Error-Typeパラメーターの値はUnhandledInvocationErrorまたはHandledInvocationErrorではありません。 それ以外の場合は、関数のトリガーに失敗します。 X-Fc-Error-Typeの詳細については、「レスポンスパラメーター」をご参照ください。

    • 関数のトリガーに失敗した場合、システムは関数が呼び出されるまで関数の呼び出しを再試行します。 再試行の回数は、このパラメーターの値に従います。 このパラメーターの値に達しても関数が失敗した場合、システムは間隔を広げて指数バックオフモードでリクエストを再試行します。

    3

    トリガーログ

    Simple Log Serviceが関数を呼び出したときに生成されるログを保存するログストア。

    function-log2

    呼び出しパラメーター

    呼び出しパラメーター。 このエディターでカスタムパラメーターを設定できます。 カスタムパラメーターは、関数を呼び出すために使用されるイベントのparameterパラメーターの値として関数に渡されます。 Invocation Parametersパラメーターの値は、JSON形式の文字列である必要があります。

    デフォルトでは、このパラメーターは空です。

    非該当

    ロール名

    [AliyunLogETLRole] を選択します。

    説明

    上記のパラメーターを設定したら、[OK] をクリックします。 このタイプのトリガーを初めて作成する場合は、表示されるダイアログボックスで [今すぐ許可] をクリックします。

    AliyunLogETLRole

    トリガーが作成されると、[トリガー] タブに表示されます。 トリガーを変更または削除するには、「トリガー管理」をご参照ください。

ステップ2: 関数の入力パラメータを設定する

  1. 関数の詳細ページの [コード] タブで、[テスト関数] の隣のimage.pngアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。

  2. [テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。

    イベントは、function Computeの関数を呼び出すために使用されます。 イベントのパラメーターは、関数の入力パラメーターとして使用されます。 次のサンプルコードは、イベントコンテンツの形式の例を示しています。

    {
        "parameter": {},
        "source": {
            "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com",
            "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****",
            "logstoreName": "function-log",
            "shardId": 0,
            "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==",
            "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA=="
        },
        "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****",
        "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****",
        "cursorTime": 1529486425
    }                       

    パラメーター

    説明

    パラメーター

    トリガーの作成時に設定する [呼び出しパラメーター] パラメーターの値。

    非該当

    ソース

    関数がSimple log Serviceから読み取るログブロック情報。

    • endpoint: Simple Log Serviceプロジェクトが存在するAlibaba Cloudリージョンのエンドポイント。

    • projectName: Simple Log Serviceプロジェクトの名前。

    • logstoreName: Logstoreの名前。

    • shardId: Logstore内の特定のシャードのID。

    • begin Cursor: データ消費が開始されるオフセット。

    • endCursor: データ消費が終了するオフセット。

    {
        "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com",
        "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****",
        "logstoreName": "function-log",
        "shardId": 0,
        "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==",
        "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA=="
    }

    jobName

    Simple Log ServiceのETLジョブの名前。 Simple Log Serviceトリガーは、Simple Log ServiceのETLジョブに対応する必要があります。

    1f7043ced683de1a4e3d8d70b5a412843d81 ****

    taskId

    ETLジョブの場合、taskIdパラメーターは関数呼び出しの識別子を指定します。

    c2691505-38da-4d1b-998a-f1d4bb8c ****

    cursorTime

    Simple log Serviceに最後のログが到着した時刻のUNIXタイムスタンプ。

    1529486425

ステップ3: 関数コードの書き込みとテスト

Simple Log Serviceトリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。 この関数は、Simple Log Serviceが増分ログを収集するときに呼び出されます。 Function Computeは対応するログを取得し、収集したログを表示します。

  1. 関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイ.

    この例では、関数コードはPythonで記述されています。 次のサンプルコードは、ほとんどの論理ログを抽出する方法の例を示しています。 コード内のcontextcredsから、AccessKey IDAccessKey secretを取得できます。

    """
    The sample code is used to implement the following features:
    * Parse Simple Log Service events from the event parameter.
    * Initialize the Simple Log Service client based on the preceding information.
    * Obtain real-time log data from the source Logstore.
    
    
    This sample code is mainly doing the following things:
    * Get SLS processing related information from event
    * Initiate SLS client
    * Pull logs from source log store
    
    """
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import logging
    import json
    import os
    from aliyun.log import LogClient
    
    
    logger = logging.getLogger()
    
    
    def handler(event, context):
    
        # Query the key information from context.credentials.
        # Access keys can be fetched through context.credentials
        print("The content in context entity is: \n")
        print(context)
        creds = context.credentials
        access_key_id = creds.access_key_id
        access_key_secret = creds.access_key_secret
        security_token = creds.security_token
    
        # Parse the event parameter to the OBJECT data type.
        # parse event in object
        event_obj = json.loads(event.decode())
        print("The content in event entity is: \n")
        print(event_obj)
    
        # Query the following information from event.source: log project name, Logstore name, the endpoint to access the Simple Log Service project, start cursor, end cursor, and shard ID.
        # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source
        source = event_obj['source']
        log_project = source['projectName']
        log_store = source['logstoreName']
        endpoint = source['endpoint']
        begin_cursor = source['beginCursor']
        end_cursor = source['endCursor']
        shard_id = source['shardId']
    
        # Initialize the Simple Log Service client.
        # Initialize client of sls
        client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token)
    
        # Read logs based on the start and end cursors in the source Logstore. In this example, the specified cursors include all logs of the function invocation.
        # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation
        while True:
          response = client.pull_logs(project_name=log_project, logstore_name=log_store,
                                    shard_id=shard_id, cursor=begin_cursor, count=100,
                                    end_cursor=end_cursor, compress=False)
          log_group_cnt = response.get_loggroup_count()
          if log_group_cnt == 0:
            break
          logger.info("get %d log group from %s" % (log_group_cnt, log_store))
          logger.info(response.get_loggroup_list())
    
          begin_cursor = response.get_next_cursor()
    
        return 'success'
  2. テスト機能.

    関数の実行後、[コード] タブで結果を表示できます。

次のステップ

  • クエリトリガログ

    トリガーログが保存されているログストアのインデックスを作成し、トリガーの実行結果を表示できます。 詳細については、「インデックスの作成」をご参照ください。

  • 関数の操作ログを表示する

    CLIを使用して、関数呼び出しに関する詳細情報を表示できます。 詳細については、「関数呼び出しログの表示」をご参照ください。

よくある質問

  • トリガーを作成しても関数が呼び出されない場合はどうすればよいですか?

    次の方法を使用して問題をトラブルシューティングできます。

    • Function Computeトリガーが設定されているLogstoreに新しいデータが書き込まれているかどうかを確認します。 新しいデータがLogstoreに書き込まれると、関数が呼び出されます。

    • 関数のトリガーログと操作ログに例外があるかどうかを確認します。

  • 関数の呼び出し間隔が予想よりも大きいのはなぜですか?

    関数はシャードごとに個別に呼び出されます。 Logstore内のシャードに対して関数が呼び出される回数が多い場合でも、各シャードに対して関数が呼び出される間隔は、指定された呼び出し間隔と一致する可能性があります。

    関数がシャードに対して呼び出される呼び出し間隔は、データ変換に指定された時間間隔と同じです。 関数が呼び出されると、レイテンシが存在する可能性があります。 これにより、コール間隔が予想よりも大きくなる可能性があります。 次のリストでは、指定された呼び出し間隔が60秒の2つのシナリオについて説明します。

    • シナリオ1: 関数が呼び出され、レイテンシが存在しません。 この関数は60秒間隔で呼び出され、[now -60s, now) の時間範囲で生成されたデータを変換します。

      説明

      関数はシャードごとに個別に呼び出されます。 Logstoreに10個のシャードが含まれ、関数の呼び出し時にレイテンシが存在しない場合、関数は60秒間隔で10回呼び出され、データをリアルタイムで変換します。

    • シナリオ2: 関数が呼び出され、レイテンシが存在します。 Simple Log Serviceシャード内のデータが変換された時点と、最新のデータがSimple Log Serviceに書き込まれた時点との時間差が10秒を超えています。 この場合、トリガーは通話間隔を短くします。 たとえば、関数を2秒間隔で呼び出して、60秒以内に生成されたデータを変換できます。