全部產品
Search
文件中心

Function Compute:SLS觸發器

更新時間:Jul 06, 2024

通過配置Log ServiceSLS觸發器,您可以實現Log ServiceSLSFunction Compute的整合。SLS觸發器能夠在新日誌產生時自動觸發函數執行,從而增量消費Log ServiceLogstore的資料,並完成自訂加工任務。

使用情境

  • 資料清洗、加工情境

    通過Log Service,快速完成日誌採集、加工、查詢、分析。

  • 資料投遞情境

    為資料的目的端落地提供支撐,構建雲上巨量資料產品間的資料管道。

資料加工函數

函數類型

Function Compute觸發機制

Log ServiceETL Job對應於Function Compute的一個觸發器,當建立Log ServiceETL Job後,Log Service會根據該ETL Job的配置啟動定時器,定時器輪詢Logstore中的Shard資訊,當發現有新的資料寫入時,即產生<shard_id,begin_cursor,end_cursor >三元組資訊作為函數Event,並觸發函數執行。函數Event是Log ServiceSLS推送到Function Compute

說明

當儲存系統升級時,即使沒有新資料寫入,也可能發生Cursor變化,在這種情況下,每個Shard會額外空觸發一次。針對這種情況,您可以在函數內通過Cursor嘗試擷取Shard的資料,如果擷取不到資料說明是一次空觸發,可以在函數內做忽略處理。更多資訊,請參見自訂函數開發指南

Log Service的ETL任務觸發機制是時間觸發。例如:您設定的ETL Job觸發間隔為60秒,Logstore的Shard0一直有資料寫入,那麼Shard每60秒就會觸發一次函數執行(如果Shard沒有新的資料寫入則不會觸發函數執行),函數執行的輸入為最近60秒的Cursor區間。在函數內,可以根據Cursor讀取Shard0資料進行下一步處理。

使用限制

單個記錄項目(Project)關聯的SLS觸發器數量最大不得超過該Project下已有的Logstore數量的5倍。

說明

建議每個Logstore配置的SLS觸發器數量不超過5個,否則可能會影響資料投遞到Function Compute的效率。

樣本情境

您可以配置一個SLS觸發器,該觸發器將定時擷取更新的資料並觸發函數執行,增量消費Log ServiceLogstore中的資料,在函數裡完成自訂加工任務(例如資料清洗和加工)以及將資料投遞給第三方服務。本樣本中只示範如何擷取日誌資料並列印。

說明

用於資料加工的函數可以是Log Service提供的模板,也可以是您的自訂函數。

前提條件

  • Function Compute

  • Log ServiceSLS

    • 建立記錄項目和日誌庫

      您需要建立一個記錄項目和兩個日誌庫。一個日誌庫用於處理日誌及資料來源,另一個日誌庫用於儲存Function Compute產生的日誌。

      說明

      記錄項目(Project)所在地區和Function Compute服務所在地區必須一致。

步驟一:建立SLS觸發器

  1. 登入Function Compute控制台,在左側導覽列,單擊函數

  2. 在頂部功能表列,選擇地區,然後在函數頁面,單擊目標函數。

  3. 在函數詳情頁面,選擇配置頁簽,在左側導覽列,單擊觸發器,然後單擊建立觸發器

  4. 在建立觸發程序面板,填寫相關資訊,然後單擊確定

    配置項

    操作

    本文樣本

    觸發器類型

    選擇Log Service SLS

    Log ServiceSLS

    名稱

    填寫自訂的觸發器名稱。

    log_trigger

    版本或別名

    預設值為LATEST,如果您需要建立其他版本或別名的觸發器,需先在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見版本管理別名管理

    LATEST

    記錄項目

    選擇已建立的記錄項目。

    aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****

    日誌庫

    選擇已建立的日誌庫,當前觸發器會定時從該日誌庫中訂閱資料到函數服務進行自訂加工。

    function-log

    觸發間隔

    填寫Log Service觸發函數啟動並執行時間間隔。

    取值範圍:[3,600],單位:秒。預設值:60。

    60

    重試次數

    填寫單次觸發允許的最大重試次數。

    取值範圍:[0,100]。預設值:3。

    說明
    • 執行成功的情況為status=200並且header中參數X-Fc-Error-Type的值不是UnhandledInvocationErrorHandledInvocationError的錯誤。其他情況表示執行失敗,會觸發重試。關於參數X-Fc-Error-Type請參見返回資料

    • 如果函數執行失敗,會一直重試當前請求,直到函數執行成功。首先會按照配置的重試次數進行重試,超過最大重試次數仍然無法成功的,會增加時間間隔進入退避重試。

    3

    觸發器日誌

    選擇已建立的日誌庫,Log Service觸發函數執行過程的日誌會記錄到該日誌庫中。

    function-log2

    調用參數

    如果您想傳入自訂參數,可以在此處配置。該參數將作為event的parameter參數傳入函數。該參數取值必須是JSON格式的字串。

    預設值為空白。

    角色名稱

    選擇AliyunLogETLRole

    說明

    如果您第一次建立該類型的觸發器,則需要在單擊確定後,在彈出的對話方塊中選擇立即授權

    AliyunLogETLRole

    建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理

步驟二:配置函數的入口參數

  1. 在函數詳情頁面的代碼頁簽,單擊測試函數右側的image.png表徵圖,從下拉式清單中,選擇配置測試參數

  2. 配置測試參數面板,選擇建立新測試事件編輯已有測試事件,填寫事件名稱和事件內容,然後單擊確定。

    event是Function Compute的入口參數。具體格式如下:

    {
        "parameter": {},
        "source": {
            "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com",
            "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****",
            "logstoreName": "function-log",
            "shardId": 0,
            "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==",
            "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA=="
        },
        "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****",
        "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****",
        "cursorTime": 1529486425
    }                       

    參數

    描述

    本文樣本

    parameter

    您配置觸發器時填寫的調用參數的值。

    無。

    source

    設定函數讀取的日誌塊資訊。

    • endpoint:Log ServiceProject所屬的阿里雲地區。

    • projectName:Log ServiceProject名稱。

    • logstoreName:Logstore名稱。

    • shardId:Logstore中一個確定的Shard。

    • beginCursor:開始消費資料的位置。

    • endCursor:停止消費資料的位置。

    {
        "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com",
        "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****",
        "logstoreName": "function-log",
        "shardId": 0,
        "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==",
        "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA=="
    }

    jobName

    Log ServiceETL Job名字,函數配置的SLS觸發器對應一個Log Service的ETL Job。

    1f7043ced683de1a4e3d8d70b5a412843d81****

    taskId

    對於ETL Job而言,taskId是一個確定性函數調用標識。

    c2691505-38da-4d1b-998a-f1d4bb8c****

    cursorTime

    最後一條日誌到達Log Service端的Unix時間戳記,單位:秒。

    1529486425

步驟三:編寫函數並測試

完成建立日誌觸發器後,您可以編寫函數代碼並測試以驗證代碼的正確性。在實際操作過程中,當Log Service收集增量日誌時觸發該函數,Function Compute擷取對應日誌,然後列印收集的日誌。

  1. 在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼

    本文以Python函數代碼為例。以下範例程式碼可以作為提取大部分邏輯日誌的模板。其中accessKeyIdaccessKey可以從contextcreds中擷取。

    """
    本代碼範例主要實現以下功能:
    * 從 event 中解析出 SLS 事件觸發相關資訊
    * 根據以上擷取的資訊,初始化 SLS 用戶端
    * 從源 log store 擷取即時日誌資料
    
    
    This sample code is mainly doing the following things:
    * Get SLS processing related information from event
    * Initiate SLS client
    * Pull logs from source log store
    
    """
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import logging
    import json
    import os
    from aliyun.log import LogClient
    
    
    logger = logging.getLogger()
    
    
    def handler(event, context):
    
        # 可以通過 context.credentials 擷取密鑰資訊
        # Access keys can be fetched through context.credentials
        print("The content in context entity is: \n")
        print(context)
        creds = context.credentials
        access_key_id = creds.access_key_id
        access_key_secret = creds.access_key_secret
        security_token = creds.security_token
    
        # 解析 event 參數至 object 格式
        # parse event in object
        event_obj = json.loads(event.decode())
        print("The content in event entity is: \n")
        print(event_obj)
    
        # 從 event.source 中擷取記錄項目名稱、日誌倉庫名稱、Log Service訪問 endpoint、日誌起始遊標、日誌終點遊標以及分區 id
        # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source
        source = event_obj['source']
        log_project = source['projectName']
        log_store = source['logstoreName']
        endpoint = source['endpoint']
        begin_cursor = source['beginCursor']
        end_cursor = source['endCursor']
        shard_id = source['shardId']
    
        # 初始化 sls 用戶端
        # Initialize client of sls
        client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token)
    
        # 基於日誌的遊標從源日誌庫中讀取日誌,本樣本中的遊標範圍包含了觸發本次執行的所有日誌內容
        # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation
        while True:
          response = client.pull_logs(project_name=log_project, logstore_name=log_store,
                                    shard_id=shard_id, cursor=begin_cursor, count=100,
                                    end_cursor=end_cursor, compress=False)
          log_group_cnt = response.get_loggroup_count()
          if log_group_cnt == 0:
            break
          logger.info("get %d log group from %s" % (log_group_cnt, log_store))
          logger.info(response.get_loggroup_list())
    
          begin_cursor = response.get_next_cursor()
    
        return 'success'
  2. 單擊測試函數

    執行完成後,您可以在函數代碼頁簽的上方查看執行結果。

常見問題