全部產品
Search
文件中心

Simple Log Service:資源函數

更新時間:Dec 19, 2024

本文介紹資源函數的文法規則,包括參數解釋、函數樣本等。

函數列表

重要

使用如下資源函數時,必須配置進階預覽才能拉取到目標資料。如何配置進階預覽,請參見進階預覽

函數

說明

res_local

從當前資料加工任務中拉取進階參數配置資訊。

支援和其他函數組合使用。相關樣本,請參見通過Log Service訪問RDS MySQL進行資料富化

res_rds_mysql

從RDS MySQL資料庫中拉取資料庫表內容或SQL執行結果,支援定期更新資料。

支援和其他函數組合使用。相關樣本,請參見通過Log Service訪問RDS MySQL進行資料富化

res_log_logstore_pull

從另一個Logstore中拉取資料,支援持續拉取資料。

支援和其他函數組合使用。相關樣本,請參見從其他Logstore擷取資料進行資料富化

res_oss_file

從OSS中擷取特定Bucket下的檔案內容,支援定期更新資料。

支援和其他函數組合使用。相關樣本,請參見從OSS擷取CSV檔案進行資料富化

res_local

使用res_local函數從當前資料加工任務中拉取進階參數配置資訊。

  • 函數格式

    res_local(param, default=None, type="auto")
  • 參數說明

    名稱

    類型

    是否必填

    說明

    param

    String

    對應進階參數配置中的Key。

    default

    String

    param參數的值不存在時,返回該參數的值,預設值為None

    type

    String

    資料輸出時的資料格式。

    • auto(預設值):將原始值轉化為JSON格式。如果轉換失敗則返回原始值。

    • JSON:將原始值轉化為JSON格式。如果轉換失敗則返回default參數的值。

    • raw:返回原始值。

  • 返回結果

    根據參數配置,返回JSON格式資料或者原始值。

    • 成功樣本

      原始值

      傳回值

      傳回值類型

      1

      1

      整數

      1.2

      1.2

      浮點

      true

      True

      布爾

      false

      False

      布爾

      "123"

      123

      字串

      null

      None

      None

      ["v1", "v2", "v3"]

      ["v1", "v2", "v3"]

      列表

      ["v1", 3, 4.0]

      ["v1", 3, 4.0]

      列表

      {"v1": 100, "v2": "good"}

      {"v1": 100, "v2": "good"}

      列表

      {"v1": {"v11": 100, "v2": 200}, "v3": "good"}

      {"v1": {"v11": 100, "v2": 200}, "v3": "good"}

      列表

    • 失敗樣本

      如下形式的原始值轉換JSON失敗,則返回字串。

      原始值

      傳回值

      說明

      (1,2,3)

      "(1,2,3)"

      不支援元組,需使用列表形式。

      True

      "True"

      只支援true、false(小寫)這兩種布爾類型。

      {1: 2, 3: 4}

      "{1: 2, 3: 4}"

      字典的關鍵字只能是字串。

  • 函數樣本

    從進階參數配置中擷取資訊並賦值給local

    進階參數配置中的Key為endpoint,Value為hangzhou進階參數

    • 原始日誌

      content: 1
    • 加工規則

      e_set("local", res_local('endpoint'))
    • 加工結果

      content: 1
      local: hangzhou
  • 更多參考

    支援和其他函數組合使用。相關樣本,請參見通過Log Service訪問RDS MySQL進行資料富化

res_rds_mysql

使用res_rds_mysql函數從RDS MySQL資料庫中拉取資料庫表內容或SQL執行結果。Log Service加工功能提供如下3種拉模數式:

重要
  • 使用res_rds_mysql函數從RDS MySQL資料庫中拉取資料時,需先在RDS MySQL中配置白名單為0.0.0.0,使所有IP地址都可訪問,但是這種方式會增加資料庫的風險。如果您需要設定指定的Log ServiceIP地址,可以提工單進行配置。

  • Log Service支援使用RDS公網地址或內網地址訪問RDS MySQL資料庫。使用內網訪問時,必須配置進階參數 。具體操作,請參見通過Log Service訪問RDS MySQL進行資料富化

  • 普通拉模數式

    在首次執行任務時,Log Service拉取全量資料,後續不再拉取資料。如果您資料庫中的資料無更新,可選擇普通拉模數式。

  • 全量更新模式

    在執行任務過程中,Log Service定期全量拉取一次資料。雖然在全量更新模式下,Log Service會定期拉取資料,及時同步資料庫中的資料,但每次都是全量拉取,非常耗時。如果您資料庫中的資料量小(資料量<=2 GB)、更新不頻繁(配置refresh_interval>=300s),可選擇全量更新模式。全量重新整理

  • 累加式更新模式

    在執行任務過程中,Log Service根據資料庫中的時間戳記欄位增量拉取資料,只拉取新增的資料。在累加式更新模式下,Log Service每次只拉取新增的資料,效率高。您還可配置refresh_interval為1秒,秒層級同步資料庫中的資料,即時性高。如果您資料庫中的資料量大、更新頻繁、資料即時性要求高,可選擇累加式更新模式。累加式更新

  • 函數格式

    res_rds_mysql(address="待拉取的資料庫地址", username="資料庫的使用者名稱", password="資料庫的密碼", database="資料庫的名稱", table=None, sql=None, fields=None, fetch_include_data=None, fetch_exclude_data=None, refresh_interval=0, base_retry_back_off=1, max_retry_back_off=60, primary_keys=None, use_ssl=false, update_time_key=None, deleted_flag_key=None)

    說明

    Log Service還支援通過res_rds_mysql函數從AnalyticDB MySQL資料庫、PolarDB MySQL資料庫拉取資料,您只需將加工規則中的資料庫地址、資料庫使用者名稱、資料庫密碼、資料庫名稱替換為對應資料庫資訊即可。

  • 參數說明

    名稱

    類型

    是否必填

    說明

    address

    String

    訪問網域名稱或IP地址。當連接埠號碼不是3306時,配置為IP地址:port格式。更多資訊,請參見查看RDS MySQL執行個體的外網地址和連接埠

    username

    String

    串連資料庫的使用者名稱。

    password

    String

    串連資料庫的密碼。

    database

    String

    待串連的資料庫名。

    table

    String

    待擷取的資料庫表名。如果您已配置sql參數,則無需配置該參數。

    sql

    String

    通過以Select開頭的SQL語句查詢資料庫,把合格資料全部載入到加工的記憶體中;SQL 可以通過篩選欄位、行過濾,以減少加工記憶體維表佔用的空間。如果您已配置table參數,則無需配置該參數。

    fields

    String List

    字串列表或者字串別名列表。如果不填,則預設使用sqltable參數返回的所有列。例如需要將["user_id", "province", "city", "name", "age"]中的name改名為user_name,可以將fields參數配置為["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]

    說明

    如果同時設定sqltablefields參數,僅執行sql參數中的SQL語句,tablefields參數配置不生效。

    fetch_include_data

    String

    配置欄位白名單,滿足fetch_include_data時保留資料,否則丟棄。

    • 不配置或配置為None時,表示關閉欄位白名單功能。

    • 配置為具體的欄位和欄位值時,表示保留該欄位和欄位值所在的日誌。

    fetch_exclude_data

    String

    配置欄位黑名單,滿足fetch_exclude_data時丟棄資料,否則保留。

    • 不配置或配置為None時,表示關閉欄位黑名單功能。

    • 配置為具體的欄位和欄位值時,表示丟棄該欄位和欄位值所在的日誌。

    說明

    如果您同時設定了fetch_include_datafetch_exclude_data參數,則優先執行fetch_include_data參數,再執行fetch_exclude_data參數。

    refresh_interval

    數字字串或數字

    從RDS MySQL拉取資料的時間間隔,單位:秒。預設值為0,表示僅全量拉取一次。

    base_retry_back_off

    Number

    拉取資料失敗後重新拉取的時間間隔,預設值為1,單位:秒。

    max_retry_back_off

    int

    拉取資料失敗後重試請求的最大時間間隔。預設值為60,單位:秒,建議使用預設值。

    primary_keys

    String/List

    SLS加工記憶體維表的KV 主鍵,配置該欄位後,將資料庫表中的資料以Key:Value字典形式儲存到記憶體中,其中Key為此處設定的primary_keys參數的值,Value為拉取到的資料庫表的整行資料。

    說明
    • 請設定primary_keys參數,否則會嚴重影響效能並可能導致任務延遲。

    • 此處設定的primary_keys參數的值必須存在於從資料庫表拉取的欄位中。

    • 此處設定的primary_key參數的值大小寫敏感。

    use_ssl

    Boolean

    是否使用SSL協議進行安全連線。預設值為false,表示不使用SSL協議。

    說明

    如果RDS MySQL開啟SSL,則使用該參數使用SSL通道進行串連,但是不校正服務端的CA認證。目前不支援使用服務端產生的CA認證進行串連。

    update_time_key

    String

    用於增量擷取資料。如果不配置此參數,則執行全量更新。例如update_time_key="update_time",其中update_time為MySQL資料庫中資料更新的時間欄位,該時間欄位支援如下資料類型:Datetime、Timestamp、Int、Float、Decimal。需保證該時間欄位的值隨時間遞增。

    說明

    資料加工基於該時間欄位做資料篩選,實現累加式更新。請確保已在資料表中對該欄位做索引,否則將進行全表掃描。如果不配置該時間欄位的索引,將會報錯無法進行累加式更新。

    deleted_flag_key

    String

    在增量擷取資料時,用於丟棄不需要加工的資料。例如update_time_key="key"key欄位的值出現如下任意情況則被解析為該資料已被刪除:

    • Boolean類型:true

    • Datetime、Timestamp類型:非空

    • Char、Varchar類型:1、true、t、yes、y

    • Int類型:非零

    說明
    • deleted_flag_key參數必須和update_time_key參數一起使用。

    • 如果配置了update_time_key參數,但未配置deleted_flag_key,則累加式更新時不會丟棄已拉取的資料。

    connector

    String

    遠端資料連接器。可選值為mysql、pgsql。預設值為mysql。

  • 返回結果

    返回多列表格,其欄位由fields參數定義。

  • 錯誤處理

    如果拉取過程出錯則會拋出異常,此時資料加工任務不會停止,而是按照base_retry_back_off參數對應的時間間隔進行重試。假設首次重試的時間間隔為1s,如果重試失敗,下次重試的時間間隔為上一次的2倍,以此類推,直到達到max_retry_back_off參數值。如果依然失敗,則後續將一直按照max_retry_back_off參數對應的時間間隔進行重試。如果重試成功,則稍候再試會恢複到初始值。

  • 函數樣本

    • 全量更新

      • 樣本1:拉取test_db資料庫中的test_table表資料,每隔300秒重新拉取一次表資料。

        res_rds_mysql(
            address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
            username="test_username",
            password="****",
            database="test_db",
            table="test_table",
            refresh_interval=300,
        )
      • 樣本2:不拉取test_table表中status欄位值為delete的資料。

        res_rds_mysql(
            address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
            username="test_username",
            password="****",
            database="test_db",
            table="test_table",
            refresh_interval=300,
            fetch_exclude_data="'status':'delete'",
        )
      • 樣本3:僅拉取test_table表中status欄位值為exit的資料。

        res_rds_mysql(
            address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
            username="test_username",
            password="****",
            database="test_db",
            table="test_table",
            refresh_interval=300,
            fetch_include_data="'status':'exit'",
        )
      • 樣本4:拉取test_table表中status欄位值為exit的資料,並且該資料不包含name欄位值為aliyun的資料。

        res_rds_mysql(
            address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
            username="test_username",
            password="****",
            database="test_db",
            table="test_table",
            refresh_interval=300,
            fetch_include_data="'status':'exit'",
            fetch_exclude_data="'name':'aliyun'",
        )
      • 樣本5:使用pgsql連接器串連阿里雲Hologres資料庫,並拉取test_table表中的資料。

        res_rds_mysql(
            address="hgpostcn-cn-****-cn-hangzhou.hologres.aliyuncs.com:80",
            username="test_username",
            password="****",
            database="aliyun",
            table="test_table",
            connector="pgsql",
        )
      • 樣本6:使用主鍵模式拉取資料。

        使用主鍵模式時,將primary_keys參數的值作為Key拉取出來,拉取的表資料在記憶體中的存放形式為{"10001":{"userid":"10001","city_name":"beijing","city_number":"12345"}},效率高,適用於巨量資料量。不使用主鍵模式時,以遍曆方式逐行匹配表資料,拉取的表資料在記憶體中的存放形式為[{"userid":"10001","city_name":"beijing","city_number":"12345"}],效率較低,但佔用記憶體小,適合於小資料量。

        • 資料庫表

          userid

          city_name

          city_number

          10001

          beijing

          12345

        • 原始日誌

          #資料1
          userid:10001
          gdp:1000
          
          #資料2
          userid:10002
          gdp:800
        • 加工規則

          e_table_map(
              res_rds_mysql(
                  address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
                  username="test_username",
                  password="****",
                  database="test_db",
                  table="test_table",
                  primary_keys="userid",
              ),
              "userid",
              ["city_name", "city_number"],
          )
        • 加工結果

          #資料1
          userid:10001
          gdp:1000
          city_name: beijing
          city_number:12345
          #資料2
          userid:10002
          gdp:800
    • 累加式更新

      • 樣本1:使用累加式更新模式拉取資料

        說明

        使用累加式更新模式拉取資料,必須符合如下條件:

        • 資料庫表中必須有唯一主鍵並且有一個時間欄位,例如樣本中的item_idupdate_time

        • 使用累加式更新模式拉取資料時,必須在函數中設定primary_keysrefresh_intervalupdate_time_key這三個參數。

        • 資料庫表

          item_id

          item_name

          price

          1001

          橘子

          10

          1002

          蘋果

          12

          1003

          芒果

          16

        • 原始日誌

          #資料1
          item_id: 1001
          total: 100
          
          #資料2
          item_id: 1002
          total: 200
          
          #資料3
          item_id: 1003
          total: 300
        • 加工規則

          e_table_map(
              res_rds_mysql(
                  address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
                  username="test_username",
                  password="****",
                  database="test_db",
                  table="test_table",
                  primary_key="item_id",
                  refresh_interval=1,
                  update_time_key="update_time",
              ),
              "item_id",
              ["item_name", "price"],
          )
        • 加工結果

          # 資料1
          item_id: 1001
          total: 100
          item_name: 橘子
          price:10
          
          # 資料2
          item_id: 1002
          total: 200
          item_name: 蘋果
          price:12
          
          # 資料3
          item_id: 1003
          total: 300
          item_name: 芒果
          price:16
      • 樣本2:在增量模式中,使用deleted_flag_key參數實現歷史資料丟棄功能。

        • 資料庫表

          item_id

          item_name

          price

          update_time

          Is_deleted

          1001

          橘子

          10

          1603856138

          False

          1002

          蘋果

          12

          1603856140

          False

          1003

          芒果

          16

          1603856150

          False

        • 原始日誌

          # 資料1
          item_id: 1001
          total: 100
          
          # 資料2
          item_id: 1002
          total: 200
          
          # 資料3
          item_id: 1003
          total: 300
        • 加工規則

          e_table_map(
              res_rds_mysql(
                  address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com",
                  username="test_username",
                  password="****",
                  database="test_db",
                  table="test_table",
                  primary_key="item_id",
                  refresh_interval=1,
                  update_time_key="update_time",
                  deleted_flag_key="is_deleted",
              ),
              "item_id",
              ["item_name", "price"],
          )
        • 加工結果

          使用res_rds_mysq函數已拉取資料表中的3條資料到Log Service記憶體中,與源logstore中的資料進行富化匹配。此時您希望丟棄1001這條資料,不再加工該資料,則您可以在資料表1001這條資料中,將Is_deleted欄位值更新為true。更新後,在下一次加工記憶體維表更新時,1001這條資料將在維表中消失。

          # 資料2
          item_id: 1002
          total: 200
          item_name: 蘋果
          price:12
          
          # 資料3
          item_id: 1003
          total: 300
          item_name: 芒果
          price:1
  • 更多參考

    支援和其他函數組合使用。相關樣本,請參見通過Log Service訪問RDS MySQL進行資料富化

res_log_logstore_pull

使用res_log_logstore_pull函數從另一個Logstore中拉取資料。

  • 函數格式

    res_log_logstore_pull(endpoint, ak_id, ak_secret, project, logstore, fields, from_time="begin", to_time=None, fetch_include_data=None, fetch_exclude_data=None, primary_keys=None, fetch_interval=2, delete_data=None,  base_retry_back_off=1, max_retry_back_off=60, ttl=None, role_arn=None)
  • 參數說明

    名稱

    類型

    是否必填

    說明

    endpoint

    String

    訪問網域名稱。更多資訊,請參見服務入口。預設為HTTPS格式,也支援HTTP格式。特殊情況下,需使用非80、非443連接埠。

    ak_id

    String

    阿里雲帳號的AccessKey ID。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務

    ak_secret

    String

    阿里雲帳號的AccessKey Secret。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務

    project

    String

    待拉取資料的Project名稱。

    logstore

    String

    待拉取資料的Logstore名稱。

    fields

    String List

    字串列表或者字串別名列表。日誌中不包含某個欄位時,該欄位的值為空白。例如需要將["user_id", "province", "city", "name", "age"]name改名為user_name時,可以配置為["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]

    from_time

    String

    首次開始拉取日誌的伺服器時間,預設值為begin,表示會從第一條資料開始拉取。支援如下時間格式:

    • Unix時間戳記。

    • 時間字串。

    • 特定字串,例如beginend

    • 運算式:dt_類函數返回的時間。例如dt_totimestamp(dt_truncate(dt_today(tz="Asia/Shanghai"), day=op_neg(-1))),表示昨天拉取日誌的開始時間,如果目前時間是2019-5-5 10:10:10 8:00,則上述運算式表示時間2019-5-4 10:10:10 8:00。

    to_time

    String

    首次結束讀取日誌的伺服器時間。預設值為None,表示持續到當前的最後一條日誌。支援如下時間格式:

    • Unix時間戳記。

    • 時間字串。

    • 特定字串。例如beginend

    • 運算式:dt_類函數返回的時間。

    不配置或者配置為None表示持續拉取最新的日誌。

    說明

    如果填入的是一個未來時間,只會將該Logstore所有資料拉取完畢,並不會開啟持續拉取任務。

    fetch_include_data

    String

    配置欄位白名單,滿足fetch_include_data時保留資料,否則丟棄。

    • 不配置或配置為None時,表示關閉欄位白名單功能。

    • 配置為具體的欄位和欄位值時,表示保留該欄位和欄位值所在的日誌。

    fetch_exclude_data

    String

    配置欄位黑名單,滿足fetch_exclude_data時丟棄資料,否則保留。

    • 不配置或配置為None時,表示關閉欄位黑名單功能。

    • 配置為具體的欄位和欄位值時,表示丟棄該欄位和欄位值所在的日誌。

    說明

    如果您同時設定了黑名單和白名單參數,則先丟棄黑名單的欄位,然後在剩餘資料中保留白名單的欄位。

    primary_keys

    字串列表

    維護表格時的主鍵欄位列表。如果fields參數中對主鍵欄位進行修改,這裡應使用修改後的欄位名,將修改後的欄位作為主鍵欄位。

    說明
    • primary_keys參數只支援單個字串,且必須存在於fields參數配置的欄位中。

    • 待拉取資料的目標Logstore中只能有一個Shard。

    • 請設定primary_keys參數,否則會嚴重影響效能並可能導致任務延遲。

    • 此處設定的primary_key參數的值大小寫敏感。

    fetch_interval

    Int

    開啟持續拉取任務時,每次提取要求的時間間隔,預設值為2,單位:秒。該值必須大於或者等於1。

    delete_data

    String

    對滿足條件且配置了primary_keys的資料,在表格中進行刪除操作。更多資訊,請參見查詢字串文法

    base_retry_back_off

    Number

    拉取資料失敗後重新拉取的時間間隔,預設值為1,單位:秒。

    max_retry_back_off

    Int

    拉取資料失敗後,重試請求的最大時間間隔,預設值為60,單位:秒。建議使用預設值。

    ttl

    Int

    開啟持續拉取任務時,拉取日誌產生時間開始ttl時間內的日誌,單位為秒。預設值為None,表示拉取全部時間的日誌。

    role_arn

    String

    RAM角色的ARN,該角色必須擁有讀Logstore資料的許可權。在RAM控制台,您可以在該角色的基本資料中查看,例如acs:ram::1379******44:role/role-a。 如何擷取ARN,請參見查看RAM角色

  • 返回結果

    返回多列表格。

  • 錯誤處理

    如果拉取過程出錯則會拋出異常,此時資料加工任務不會停止,而是按照base_retry_back_off參數對應的時間間隔進行重試。假設首次重試的時間間隔為1秒,如果重試失敗,下次重試的時間間隔為上一次的2倍,以此類推,直到達到max_retry_back_off參數值。如果依然失敗,則後續將一直按照max_retry_back_off參數對應的時間間隔進行重試。如果重試成功,則稍候再試會恢複到初始值。

  • 函數樣本

    • 拉取test_project下test_logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,指定時間範圍為從日誌開始寫入到結束,僅拉取一次。

      res_log_logstore_pull(
          "cn-hangzhou.log.aliyuncs.com",
          "LT****Gw",
          "ab****uu",
          "test_project",
          "test_logstore",
          ["key1", "key2"],
          from_time="begin",
          to_time="end",
      )
    • 拉取test_project下test_logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,指定時間範圍為從日誌開始寫入到結束,設定為持續拉取,時間間隔為30秒。

      res_log_logstore_pull(
          "cn-hangzhou.log.aliyuncs.com",
          "LT****Gw",
          "ab****uu",
          "test_project",
          "test_logstore",
          ["key1", "key2"],
          from_time="begin",
          to_time=None,
          fetch_interval=30,
      )
    • 設定黑名單,不拉取包含key1:value1的資料。

      res_log_logstore_pull(
          "cn-hangzhou.log.aliyuncs.com",
          "LT****Gw",
          "ab****uu",
          "test_project",
          "test_logstore",
          ["key1", "key2"],
          from_time="begin",
          to_time=None,
          fetch_interval=30,
          fetch_exclude_data="key1:value1",
      )
    • 設定白名單,僅拉取包含key1:value1的資料。

      res_log_logstore_pull(
          "cn-hangzhou.log.aliyuncs.com",
          "LT****Gw",
          "ab****uu",
          "test_project",
          "test_logstore",
          ["key1", "key2"],
          from_time="begin",
          to_time=None,
          fetch_interval=30,
          fetch_include_data="key1:value1",
      )
    • 拉取test_project下test_logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,指定時間範圍為日誌產生時間開始的40000000秒。

      res_log_logstore_pull(
          "cn-hangzhou.log.aliyuncs.com",
          "LTAI*****Cajvr",
          "qO0Tp*****jJ9",
          "test_project",
          "test_logstore",
          fields=["key1","key2"],
          ttl="40000000"
      )
    • 拉取project-test1下test-logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,使用SLR服務角色授權,指定時間範圍為從日誌開始寫入到結束,僅拉取一次。

      res_log_logstore_pull(
          "pub-cn-hangzhou-staging-intranet.log.aliyuncs.com",
          "",
          "",
          "project-test1",
          "test-logstore",
          ["key1", "key2"],
          from_time="2022-7-27 10:10:10 8:00",
          to_time="2022-7-27 14:30:10 8:00",
          role_arn="acs:ram::***:role/aliyunserviceroleforslsaudit",
      )
                                  
    • 拉取project-test1下test-logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,使用預設角色授權,指定時間範圍為從日誌開始寫入到結束,僅拉取一次。

      res_log_logstore_pull(
          "cn-chengdu.log.aliyuncs.com",
          "",
          "",
          "project-test1",
          "test-logstore",
           ["key1", "key2"],
          from_time="2022-7-21 10:10:10 8:00",
          to_time="2022-7-21 10:30:10 8:00",
          role_arn="acs:ram::***:role/aliyunlogetlrole",
      )
  • 更多參考

    支援和其他函數組合使用。相關樣本,請參見從其他Logstore擷取資料進行資料富化

res_oss_file

使用res_oss_file函數從OSS Bucket中擷取檔案內容,並支援定期重新整理。

說明

建議Log ServiceProject和OSS Bucket處於同一地區,使用阿里雲內網擷取資料。阿里雲內網網路穩定且速度快。

  • 函數格式

    res_oss_file(endpoint, ak_id, ak_key, bucket, file, format='text', change_detect_interval=0, base_retry_back_off=1, max_retry_back_off=60, encoding='utf8', error='ignore')
  • 參數說明

    名稱

    類型

    是否必填

    說明

    endpoint

    String

    OSS服務入口。更多資訊,請參見OSS地區和訪問網域名稱。預設為HTTPS格式,也支援HTTP格式。特殊情況下,需使用非80/443連接埠。

    ak_id

    String

    阿里雲帳號的AccessKey ID。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務

    ak_key

    String

    阿里雲帳號的AccessKey Secret。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務

    bucket

    String

    目標OSS Bucket名稱。

    file

    String

    目標OSS檔案的路徑。例如test/data.txt,不能以正斜線(/)開頭。

    format

    String

    檔案輸出形式。

    • Text:以文本形式輸出。

    • Binary:以位元組流形式輸出。

    change_detect_interval

    String

    從OSS拉取檔案的時間間隔,單位:秒。拉取時會檢查檔案是否有更新,如果有更新則重新整理。預設為0,表示不重新整理,僅在程式啟動時拉取一次資料。

    base_retry_back_off

    Number

    拉取失敗後重新拉取檔案的時間間隔,預設值為1,單位:秒。

    max_retry_back_off

    int

    拉取資料失敗後,重試請求的最大時間間隔,預設值為60,單位:秒。建議使用預設值。

    encoding

    String

    編碼方式。當formattext時,該參數預設為utf8

    error

    String

    設定不同錯誤的處理方案,該欄位僅在編碼上報UnicodeError錯誤訊息時有效。取值範圍:

    • ignore表示忽略格式錯誤的資料,繼續進行編碼。

    • xmlcharrefreplace表示將無法編碼的字元替換為適當的XML字元。

    更多資訊,請參見Error Handlers

    decompress

    String

    是否解壓擷取到的OSS檔案。取值範圍:

    • None(預設值):不解壓。

    • gzip:使用Gzip方式解壓。

  • 返回結果

    返回位元組流形式或文本形式的檔案資料。

  • 錯誤處理

    如果拉取過程出錯則會拋出異常,此時資料加工任務不會停止,而是按照base_retry_back_off參數對應的時間間隔進行重試。假設首次重試的時間間隔為1秒,如果重試失敗,下次重試的時間間隔為上一次的2倍,以此類推,直到達到max_retry_back_off參數值。如果依然失敗,則後續將一直按照max_retry_back_off參數對應的時間間隔進行重試。如果重試成功,則稍候再試會恢複到初始值。

  • 函數樣本

    • 樣本1:從OSS中拉取JSON格式的資料。

      • JSON內容

        {
          "users": [
            {
                "name": "user1",
                "login_historys": [
                  {
                    "date": "2019-10-10 0:0:0",
                    "login_ip": "203.0.113.10"
                  },
                  {
                    "date": "2019-10-10 1:0:0",
                    "login_ip": "203.0.113.10"
                  }
                ]
            },
            {
                "name": "user2",
                "login_historys": [
                  {
                    "date": "2019-10-11 0:0:0",
                    "login_ip": "203.0.113.20"
                  },
                  {
                    "date": "2019-10-11 1:0:0",
                    "login_ip": "203.0.113.30"
                  },
                  {
                    "date": "2019-10-11 1:1:0",
                    "login_ip": "203.0.113.50"
                  }
                ]
            }
          ]
        }
      • 原始日誌

        content: 123
      • 加工規則

        e_set(
            "json_parse",
            json_parse(
                res_oss_file(
                    endpoint="http://oss-cn-hangzhou.aliyuncs.com",
                    ak_id="LT****Gw",
                    ak_key="ab****uu",
                    bucket="log-etl-staging",
                    file="testjson.json",
                )
            ),
        )
      • 加工結果

        content: 123
            prjson_parse: 
        '{
          "users": [
            {
                "name": "user1",
                "login_historys": [
                  {
                    "date": "2019-10-10 0:0:0",
                    "login_ip": "203.0.113.10"
                  },
                  {
                    "date": "2019-10-10 1:0:0",
                    "login_ip": "203.0.113.10"
                  }
                ]
            },
            {
                "name": "user2",
                "login_historys": [
                  {
                    "date": "2019-10-11 0:0:0",
                    "login_ip": "203.0.113.20"
                  },
                  {
                    "date": "2019-10-11 1:0:0",
                    "login_ip": "203.0.113.30"
                  },
                  {
                    "date": "2019-10-11 1:1:0",
                    "login_ip": "203.0.113.50"
                  }
                ]
            }
          ]
        }'
    • 樣本2:從OSS中拉取常值內容。

      • 常值內容

        Test bytes
      • 原始日誌

        content: 123
      • 加工規則

        e_set(
            "test_txt",
            res_oss_file(
                endpoint="http://oss-cn-hangzhou.aliyuncs.com",
                ak_id="LT****Gw",
                ak_key="ab****uu",
                bucket="log-etl-staging",
                file="test.txt",
            ),
        )
      • 加工結果

        content: 123
        test_txt: Test bytes
    • 樣本3:從OSS拉取壓縮檔並解壓。

      • OSS壓縮檔內容

        Test bytes\nupdate\n123
      • 原始日誌

        content:123
      • 加工規則

        e_set(
            "text",
            res_oss_file(
                endpoint="http://oss-cn-hangzhou.aliyuncs.com",
                ak_id="LT****Gw",
                ak_key="ab****uu",
                bucket="log-etl-staging",
                file="test.txt.gz",
                format="binary",
                change_detect_interval=30,
                decompress="gzip",
            ),
        )
      • 加工結果

        content:123
        text: Test bytes\nupdate\n123
    • 樣本4:不使用存取金鑰訪問OSS公用讀寫檔案。

      • OSS壓縮檔內容

        Test bytes
      • 原始日誌

        content:123
      • 加工規則

        e_set(
            "test_txt",
            res_oss_file(
                endpoint="http://oss-cn-hangzhou.aliyuncs.com",
                bucket="log-etl-staging",
                file="test.txt",
            ),
        )
      • 加工結果

        content: 123
        test_txt: Test bytes
  • 更多參考

    支援和其他函數組合使用。相關樣本,請參見從OSS擷取CSV檔案進行資料富化