本文介紹資源函數的文法規則,包括參數解釋、函數樣本等。
函數列表
使用如下資源函數時,必須配置進階預覽才能拉取到目標資料。如何配置進階預覽,請參見進階預覽。
函數 | 說明 |
從當前資料加工任務中拉取進階參數配置資訊。 支援和其他函數組合使用。相關樣本,請參見使用RDS內網地址訪問RDS MySQL資料庫。 | |
從RDS MySQL資料庫中拉取資料庫表內容或SQL執行結果,支援定期更新資料。 支援和其他函數組合使用。相關樣本,請參見使用RDS內網地址訪問RDS MySQL資料庫。 | |
從另一個Logstore中拉取資料,支援持續拉取資料。 支援和其他函數組合使用。相關樣本,請參見從其他Logstore擷取資料進行資料富化。 | |
從OSS中擷取特定Bucket下的檔案內容,支援定期更新資料。 支援和其他函數組合使用。相關樣本,請參見從OSS擷取CSV檔案進行資料富化。 |
res_local
使用res_local函數從當前資料加工任務中拉取進階參數配置資訊。
函數格式
res_local(param, default=None, type="auto")
參數說明
名稱
類型
是否必填
說明
param
String
是
對應進階參數配置中的Key。
default
String
否
當param參數的值不存在時,返回該參數的值,預設值為None。
type
String
否
資料輸出時的資料格式。
auto(預設值):將原始值轉化為JSON格式。如果轉換失敗則返回原始值。
JSON:將原始值轉化為JSON格式。如果轉換失敗則返回default參數的值。
raw:返回原始值。
返回結果
根據參數配置,返回JSON格式資料或者原始值。
成功樣本
原始值
傳回值
傳回值類型
1
1
整數
1.2
1.2
浮點
true
True
布爾
false
False
布爾
"123"
123
字串
null
None
None
["v1", "v2", "v3"]
["v1", "v2", "v3"]
列表
["v1", 3, 4.0]
["v1", 3, 4.0]
列表
{"v1": 100, "v2": "good"}
{"v1": 100, "v2": "good"}
列表
{"v1": {"v11": 100, "v2": 200}, "v3": "good"}
{"v1": {"v11": 100, "v2": 200}, "v3": "good"}
列表
失敗樣本
如下形式的原始值轉換JSON失敗,則返回字串。
原始值
傳回值
說明
(1,2,3)
"(1,2,3)"
不支援元組,需使用列表形式。
True
"True"
只支援true、false(小寫)這兩種布爾類型。
{1: 2, 3: 4}
"{1: 2, 3: 4}"
字典的關鍵字只能是字串。
函數樣本
從進階參數配置中擷取資訊並賦值給local。
進階參數配置中的Key為endpoint,Value為hangzhou。
原始日誌
content: 1
加工規則
e_set("local", res_local('endpoint'))
加工結果
content: 1 local: hangzhou
更多參考
支援和其他函數組合使用。相關樣本,請參見使用RDS內網地址訪問RDS MySQL資料庫。
res_rds_mysql
使用res_rds_mysql函數從RDS MySQL資料庫中拉取資料庫表內容或SQL執行結果。Log Service加工功能提供如下3種拉模數式:
使用res_rds_mysql函數從RDS MySQL資料庫中拉取資料時,需先在RDS MySQL中配置白名單為
0.0.0.0
,使所有IP地址都可訪問,但是這種方式會增加資料庫的風險。如果您需要設定指定的Log ServiceIP地址,可以提工單進行配置。Log Service支援使用RDS公網地址或內網地址訪問RDS MySQL資料庫。使用內網訪問時,必須配置進階參數 。具體操作,請參見使用RDS內網地址訪問RDS MySQL資料庫。
普通拉模數式
在首次執行任務時,Log Service拉取全量資料,後續不再拉取資料。如果您資料庫中的資料無更新,可選擇普通拉模數式。
全量更新模式
在執行任務過程中,Log Service定期全量拉取一次資料。雖然在全量更新模式下,Log Service會定期拉取資料,及時同步資料庫中的資料,但每次都是全量拉取,非常耗時。如果您資料庫中的資料量小(資料量<=2 GB)、更新不頻繁(配置refresh_interval>=300s),可選擇全量更新模式。
累加式更新模式
在執行任務過程中,Log Service根據資料庫中的時間戳記欄位增量拉取資料,只拉取新增的資料。在累加式更新模式下,Log Service每次只拉取新增的資料,效率高。您還可配置refresh_interval為1秒,秒層級同步資料庫中的資料,即時性高。如果您資料庫中的資料量大、更新頻繁、資料即時性要求高,可選擇累加式更新模式。
函數格式
res_rds_mysql(address="待拉取的資料庫地址", username="資料庫的使用者名稱", password="資料庫的密碼", database="資料庫的名稱", table=None, sql=None, fields=None, fetch_include_data=None, fetch_exclude_data=None, refresh_interval=0, base_retry_back_off=1, max_retry_back_off=60, primary_keys=None, use_ssl=false, update_time_key=None, deleted_flag_key=None)
說明Log Service還支援通過res_rds_mysql函數從AnalyticDB MySQL資料庫、PolarDB MySQL資料庫拉取資料,您只需將加工規則中的資料庫地址、資料庫使用者名稱、資料庫密碼、資料庫名稱替換為對應資料庫資訊即可。
參數說明
名稱
類型
是否必填
說明
address
String
是
訪問網域名稱或IP地址。當連接埠號碼不是3306時,配置為
IP地址:port
格式。更多資訊,請參見查看RDS MySQL執行個體的外網地址和連接埠。username
String
是
串連資料庫的使用者名稱。
password
String
是
串連資料庫的密碼。
database
String
是
待串連的資料庫名。
table
String
是
待擷取的資料庫表名。如果您已配置sql參數,則無需配置該參數。
sql
String
是
通過以Select開頭的SQL語句查詢資料。如果您已配置table參數,則無需配置該參數。
fields
String List
否
字串列表或者字串別名列表。如果不填,則預設使用sql或table參數返回的所有列。例如需要將["user_id", "province", "city", "name", "age"]中的name改名為user_name,可以將fields參數配置為["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]。
說明如果同時設定sql、table和fields參數,僅執行sql參數中的SQL語句,table和fields參數配置不生效。
fetch_include_data
String
否
配置欄位白名單,滿足fetch_include_data時保留資料,否則丟棄。
不配置或配置為None時,表示關閉欄位白名單功能。
配置為具體的欄位和欄位值時,表示保留該欄位和欄位值所在的日誌。
fetch_exclude_data
String
否
配置欄位黑名單,滿足fetch_exclude_data時丟棄資料,否則保留。
不配置或配置為None時,表示關閉欄位黑名單功能。
配置為具體的欄位和欄位值時,表示丟棄該欄位和欄位值所在的日誌。
說明如果您同時設定了fetch_include_data和fetch_exclude_data參數,則優先執行fetch_include_data參數,再執行fetch_exclude_data參數。
refresh_interval
數字字串或數字
否
從RDS MySQL拉取資料的時間間隔,單位:秒。預設值為0,表示僅全量拉取一次。
base_retry_back_off
Number
否
拉取資料失敗後重新拉取的時間間隔,預設值為1,單位:秒。
max_retry_back_off
int
否
拉取資料失敗後重試請求的最大時間間隔。預設值為60,單位:秒,建議使用預設值。
primary_keys
String/List
否
主鍵,配置該欄位後,將資料庫表中的資料以Key:Value字典形式儲存到記憶體中,其中Key為此處設定的primary_keys參數的值,Value為拉取到的資料庫表的整行資料。
說明請設定
primary_keys
參數,否則會嚴重影響效能並可能導致任務延遲。此處設定的primary_keys參數的值必須存在於從資料庫表拉取的欄位中。
此處設定的
primary_key
參數的值大小寫敏感。
use_ssl
Boolean
否
是否使用SSL協議進行安全連線。預設值為false,表示不使用SSL協議。
說明如果RDS MySQL開啟SSL,則使用該參數使用SSL通道進行串連,但是不校正服務端的CA認證。目前不支援使用服務端產生的CA認證進行串連。
update_time_key
String
否
用於增量擷取資料。如果不配置此參數,則執行全量更新。例如update_time_key="update_time",其中update_time為MySQL資料庫中資料更新的時間欄位,該時間欄位支援如下資料類型:Datetime、Timestamp、Int、Float、Decimal。需保證該時間欄位的值隨時間遞增。
說明資料加工基於該時間欄位做資料篩選,實現累加式更新。請確保已在資料表中對該欄位做索引,否則將進行全表掃描。如果不配置該時間欄位的索引,將會報錯無法進行累加式更新。
deleted_flag_key
String
否
在增量擷取資料時,用於丟棄不需要加工的資料。例如update_time_key="key",key欄位的值出現如下任意情況則被解析為該資料已被刪除:
Boolean類型:true
Datetime、Timestamp類型:非空
Char、Varchar類型:1、true、t、yes、y
Int類型:非零
說明deleted_flag_key參數必須和update_time_key參數一起使用。
如果配置了update_time_key參數,但未配置deleted_flag_key,則累加式更新時不會丟棄已拉取的資料。
connector
String
否
遠端資料連接器。可選值為mysql、pgsql。預設值為mysql。
返回結果
返回多列表格,其欄位由fields參數定義。
錯誤處理
如果拉取過程出錯則會拋出異常,此時資料加工任務不會停止,而是按照base_retry_back_off參數對應的時間間隔進行重試。假設首次重試的時間間隔為1s,如果重試失敗,下次重試的時間間隔為上一次的2倍,以此類推,直到達到max_retry_back_off參數值。如果依然失敗,則後續將一直按照max_retry_back_off參數對應的時間間隔進行重試。如果重試成功,則稍候再試會恢複到初始值。
函數樣本
全量更新
樣本1:拉取test_db資料庫中的test_table表資料,每隔300秒重新拉取一次表資料。
res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", refresh_interval=300, )
樣本2:不拉取test_table表中status欄位值為delete的資料。
res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", refresh_interval=300, fetch_exclude_data="'status':'delete'", )
樣本3:僅拉取test_table表中status欄位值為exit的資料。
res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", refresh_interval=300, fetch_include_data="'status':'exit'", )
樣本4:拉取test_table表中status欄位值為exit的資料,並且該資料不包含name欄位值為aliyun的資料。
res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", refresh_interval=300, fetch_include_data="'status':'exit'", fetch_exclude_data="'name':'aliyun'", )
樣本5:使用pgsql連接器串連阿里雲Hologres資料庫,並拉取test_table表中的資料。
res_rds_mysql( address="hgpostcn-cn-****-cn-hangzhou.hologres.aliyuncs.com:80", username="test_username", password="****", database="aliyun", table="test_table", connector="pgsql", )
樣本6:使用主鍵模式拉取資料。
使用主鍵模式時,將primary_keys參數的值作為Key拉取出來,拉取的表資料在記憶體中的存放形式為{"10001":{"userid":"10001","city_name":"beijing","city_number":"12345"}},效率高,適用於巨量資料量。不使用主鍵模式時,以遍曆方式逐行匹配表資料,拉取的表資料在記憶體中的存放形式為[{"userid":"10001","city_name":"beijing","city_number":"12345"}],效率較低,但佔用記憶體小,適合於小資料量。
資料庫表
userid
city_name
city_number
10001
beijing
12345
原始日誌
#資料1 userid:10001 gdp:1000 #資料2 userid:10002 gdp:800
加工規則
e_table_map( res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", primary_keys="userid", ), "userid", ["city_name", "city_number"], )
加工結果
#資料1 userid:10001 gdp:1000 city_name: beijing city_number:12345 #資料2 userid:10002 gdp:800
累加式更新
樣本1:使用累加式更新模式拉取資料
說明使用累加式更新模式拉取資料,必須符合如下條件:
資料庫表中必須有唯一主鍵並且有一個時間欄位,例如樣本中的item_id和update_time。
使用累加式更新模式拉取資料時,必須在函數中設定primary_keys、refresh_interval和update_time_key這三個參數。
資料庫表
item_id
item_name
price
1001
橘子
10
1002
蘋果
12
1003
芒果
16
原始日誌
#資料1 item_id: 1001 total: 100 #資料2 item_id: 1002 total: 200 #資料3 item_id: 1003 total: 300
加工規則
e_table_map( res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", primary_key="item_id", refresh_interval=1, update_time_key="update_time", ), "item_id", ["item_name", "price"], )
加工結果
# 資料1 item_id: 1001 total: 100 item_name: 橘子 price:10 # 資料2 item_id: 1002 total: 200 item_name: 蘋果 price:12 # 資料3 item_id: 1003 total: 300 item_name: 芒果 price:16
樣本2:在增量模式中,使用deleted_flag_key參數實現歷史資料丟棄功能。
資料庫表
item_id
item_name
price
update_time
Is_deleted
1001
橘子
10
1603856138
False
1002
蘋果
12
1603856140
False
1003
芒果
16
1603856150
False
原始日誌
# 資料1 item_id: 1001 total: 100 # 資料2 item_id: 1002 total: 200 # 資料3 item_id: 1003 total: 300
加工規則
e_table_map( res_rds_mysql( address="rm-uf6wjk5****mo.mysql.rds.aliyuncs.com", username="test_username", password="****", database="test_db", table="test_table", primary_key="item_id", refresh_interval=1, update_time_key="update_time", deleted_flag_key="is_deleted", ), "item_id", ["item_name", "price"], )
加工結果
使用res_rds_mysq函數已拉取資料表中的3條資料到Log Service記憶體中,與源logstore中的資料進行富化匹配。此時您希望丟棄1001這條資料,不再加工該資料,則您可以在資料表1001這條資料中,將Is_deleted欄位值更新為true。更新後,在下一次加工記憶體維表更新時,1001這條資料將在維表中消失。
# 資料2 item_id: 1002 total: 200 item_name: 蘋果 price:12 # 資料3 item_id: 1003 total: 300 item_name: 芒果 price:1
更多參考
支援和其他函數組合使用。相關樣本,請參見使用RDS內網地址訪問RDS MySQL資料庫。
res_log_logstore_pull
使用res_log_logstore_pull函數從另一個Logstore中拉取資料。
函數格式
res_log_logstore_pull(endpoint, ak_id, ak_secret, project, logstore, fields, from_time="begin", to_time=None, fetch_include_data=None, fetch_exclude_data=None, primary_keys=None, fetch_interval=2, delete_data=None, base_retry_back_off=1, max_retry_back_off=60, ttl=None, role_arn=None)
參數說明
名稱
類型
是否必填
說明
endpoint
String
是
訪問網域名稱。更多資訊,請參見服務入口。預設為HTTPS格式,也支援HTTP格式。特殊情況下,需使用非80、非443連接埠。
ak_id
String
是
阿里雲帳號的AccessKey ID。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務。
ak_secret
String
是
阿里雲帳號的AccessKey Secret。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務。
project
String
是
待拉取資料的Project名稱。
logstore
String
是
待拉取資料的Logstore名稱。
fields
String List
是
字串列表或者字串別名列表。日誌中不包含某個欄位時,該欄位的值為空白。例如需要將["user_id", "province", "city", "name", "age"]的name改名為user_name時,可以配置為["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]。
from_time
String
否
首次開始拉取日誌的伺服器時間,預設值為begin,表示會從第一條資料開始拉取。支援如下時間格式:
Unix時間戳記。
時間字串。
特定字串,例如begin、end。
運算式:dt_類函數返回的時間。例如dt_totimestamp(dt_truncate(dt_today(tz="Asia/Shanghai"), day=op_neg(-1))),表示昨天拉取日誌的開始時間,如果目前時間是2019-5-5 10:10:10 8:00,則上述運算式表示時間2019-5-4 10:10:10 8:00。
to_time
String
否
首次結束讀取日誌的伺服器時間。預設值為None,表示持續到當前的最後一條日誌。支援如下時間格式:
Unix時間戳記。
時間字串。
特定字串。例如begin、end。
運算式:dt_類函數返回的時間。
不配置或者配置為None表示持續拉取最新的日誌。
說明如果填入的是一個未來時間,只會將該Logstore所有資料拉取完畢,並不會開啟持續拉取任務。
fetch_include_data
String
否
配置欄位白名單,滿足fetch_include_data時保留資料,否則丟棄。
不配置或配置為None時,表示關閉欄位白名單功能。
配置為具體的欄位和欄位值時,表示保留該欄位和欄位值所在的日誌。
fetch_exclude_data
String
否
配置欄位黑名單,滿足fetch_exclude_data時丟棄資料,否則保留。
不配置或配置為None時,表示關閉欄位黑名單功能。
配置為具體的欄位和欄位值時,表示丟棄該欄位和欄位值所在的日誌。
說明如果您同時設定了黑名單和白名單參數,則先丟棄黑名單的欄位,然後在剩餘資料中保留白名單的欄位。
primary_keys
字串列表
否
維護表格時的主鍵欄位列表。如果fields參數中對主鍵欄位進行修改,這裡應使用修改後的欄位名,將修改後的欄位作為主鍵欄位。
說明primary_keys參數只支援單個字串,且必須存在於fields參數配置的欄位中。
待拉取資料的目標Logstore中只能有一個Shard。
請設定
primary_keys
參數,否則會嚴重影響效能並可能導致任務延遲。此處設定的
primary_key
參數的值大小寫敏感。
fetch_interval
Int
否
開啟持續拉取任務時,每次提取要求的時間間隔,預設值為2,單位:秒。該值必須大於或者等於1。
delete_data
String
否
對滿足條件且配置了
primary_keys
的資料,在表格中進行刪除操作。更多資訊,請參見查詢字串文法。base_retry_back_off
Number
否
拉取資料失敗後重新拉取的時間間隔,預設值為1,單位:秒。
max_retry_back_off
Int
否
拉取資料失敗後,重試請求的最大時間間隔,預設值為60,單位:秒。建議使用預設值。
ttl
Int
否
開啟持續拉取任務時,拉取日誌產生時間開始ttl時間內的日誌,單位為秒。預設值為None,表示拉取全部時間的日誌。
role_arn
String
否
RAM角色的ARN,該角色必須擁有讀Logstore資料的許可權。在RAM控制台,您可以在該角色的基本資料中查看,例如
acs:ram::1379******44:role/role-a
。 如何擷取ARN,請參見查看RAM角色。返回結果
返回多列表格。
錯誤處理
如果拉取過程出錯則會拋出異常,此時資料加工任務不會停止,而是按照base_retry_back_off參數對應的時間間隔進行重試。假設首次重試的時間間隔為1秒,如果重試失敗,下次重試的時間間隔為上一次的2倍,以此類推,直到達到max_retry_back_off參數值。如果依然失敗,則後續將一直按照max_retry_back_off參數對應的時間間隔進行重試。如果重試成功,則稍候再試會恢複到初始值。
函數樣本
拉取test_project下test_logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,指定時間範圍為從日誌開始寫入到結束,僅拉取一次。
res_log_logstore_pull( "cn-hangzhou.log.aliyuncs.com", "LT****Gw", "ab****uu", "test_project", "test_logstore", ["key1", "key2"], from_time="begin", to_time="end", )
拉取test_project下test_logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,指定時間範圍為從日誌開始寫入到結束,設定為持續拉取,時間間隔為30秒。
res_log_logstore_pull( "cn-hangzhou.log.aliyuncs.com", "LT****Gw", "ab****uu", "test_project", "test_logstore", ["key1", "key2"], from_time="begin", to_time=None, fetch_interval=30, )
設定黑名單,不拉取包含key1:value1的資料。
res_log_logstore_pull( "cn-hangzhou.log.aliyuncs.com", "LT****Gw", "ab****uu", "test_project", "test_logstore", ["key1", "key2"], from_time="begin", to_time=None, fetch_interval=30, fetch_exclude_data="key1:value1", )
設定白名單,僅拉取包含key1:value1的資料。
res_log_logstore_pull( "cn-hangzhou.log.aliyuncs.com", "LT****Gw", "ab****uu", "test_project", "test_logstore", ["key1", "key2"], from_time="begin", to_time=None, fetch_interval=30, fetch_include_data="key1:value1", )
拉取test_project下test_logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,指定時間範圍為日誌產生時間開始的40000000秒。
res_log_logstore_pull( "cn-hangzhou.log.aliyuncs.com", "LTAI*****Cajvr", "qO0Tp*****jJ9", "test_project", "test_logstore", fields=["key1","key2"], ttl="40000000" )
拉取project-test1下test-logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,使用SLR服務角色授權,指定時間範圍為從日誌開始寫入到結束,僅拉取一次。
res_log_logstore_pull( "pub-cn-hangzhou-staging-intranet.log.aliyuncs.com", "", "", "project-test1", "test-logstore", ["key1", "key2"], from_time="2022-7-27 10:10:10 8:00", to_time="2022-7-27 14:30:10 8:00", role_arn="acs:ram::***:role/aliyunserviceroleforslsaudit", )
拉取project-test1下test-logstore中的資料,並且僅拉取日誌中key1、key2兩個欄位對應的資料,使用預設角色授權,指定時間範圍為從日誌開始寫入到結束,僅拉取一次。
res_log_logstore_pull( "cn-chengdu.log.aliyuncs.com", "", "", "project-test1", "test-logstore", ["key1", "key2"], from_time="2022-7-21 10:10:10 8:00", to_time="2022-7-21 10:30:10 8:00", role_arn="acs:ram::***:role/aliyunlogetlrole", )
更多參考
支援和其他函數組合使用。相關樣本,請參見從其他Logstore擷取資料進行資料富化。
res_oss_file
使用res_oss_file函數從OSS Bucket中擷取檔案內容,並支援定期重新整理。
建議Log ServiceProject和OSS Bucket處於同一地區,使用阿里雲內網擷取資料。阿里雲內網網路穩定且速度快。
函數格式
res_oss_file(endpoint, ak_id, ak_key, bucket, file, format='text', change_detect_interval=0, base_retry_back_off=1, max_retry_back_off=60, encoding='utf8', error='ignore')
參數說明
名稱
類型
是否必填
說明
endpoint
String
是
OSS服務入口。更多資訊,請參見訪問網域名稱和資料中心。預設為HTTPS格式,也支援HTTP格式。特殊情況下,需使用非80/443連接埠。
ak_id
String
是
阿里雲帳號的AccessKey ID。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務。
ak_key
String
是
阿里雲帳號的AccessKey Secret。為了資料安全,建議在進階參數配置中配置。關於如何配置進階參數,請參見建立資料加工任務。
bucket
String
是
目標OSS Bucket名稱。
file
String
是
目標OSS檔案的路徑。例如test/data.txt,不能以正斜線(/)開頭。
format
String
是
檔案輸出形式。
Text:以文本形式輸出。
Binary:以位元組流形式輸出。
change_detect_interval
String
否
從OSS拉取檔案的時間間隔,單位:秒。拉取時會檢查檔案是否有更新,如果有更新則重新整理。預設為0,表示不重新整理,僅在程式啟動時拉取一次資料。
base_retry_back_off
Number
否
拉取失敗後重新拉取檔案的時間間隔,預設值為1,單位:秒。
max_retry_back_off
int
否
拉取資料失敗後,重試請求的最大時間間隔,預設值為60,單位:秒。建議使用預設值。
encoding
String
否
編碼方式。當format為text時,該參數預設為utf8。
error
String
否
設定不同錯誤的處理方案,該欄位僅在編碼上報UnicodeError錯誤訊息時有效。取值範圍:
ignore表示忽略格式錯誤的資料,繼續進行編碼。
xmlcharrefreplace表示將無法編碼的字元替換為適當的XML字元。
更多資訊,請參見Error Handlers。
decompress
String
否
是否解壓擷取到的OSS檔案。取值範圍:
None(預設值):不解壓。
gzip:使用Gzip方式解壓。
返回結果
返回位元組流形式或文本形式的檔案資料。
錯誤處理
如果拉取過程出錯則會拋出異常,此時資料加工任務不會停止,而是按照base_retry_back_off參數對應的時間間隔進行重試。假設首次重試的時間間隔為1秒,如果重試失敗,下次重試的時間間隔為上一次的2倍,以此類推,直到達到max_retry_back_off參數值。如果依然失敗,則後續將一直按照max_retry_back_off參數對應的時間間隔進行重試。如果重試成功,則稍候再試會恢複到初始值。
函數樣本
樣本1:從OSS中拉取JSON格式的資料。
JSON內容
{ "users": [ { "name": "user1", "login_historys": [ { "date": "2019-10-10 0:0:0", "login_ip": "203.0.113.10" }, { "date": "2019-10-10 1:0:0", "login_ip": "203.0.113.10" } ] }, { "name": "user2", "login_historys": [ { "date": "2019-10-11 0:0:0", "login_ip": "203.0.113.20" }, { "date": "2019-10-11 1:0:0", "login_ip": "203.0.113.30" }, { "date": "2019-10-11 1:1:0", "login_ip": "203.0.113.50" } ] } ] }
原始日誌
content: 123
加工規則
e_set( "json_parse", json_parse( res_oss_file( endpoint="http://oss-cn-hangzhou.aliyuncs.com", ak_id="LT****Gw", ak_key="ab****uu", bucket="log-etl-staging", file="testjson.json", ) ), )
加工結果
content: 123 prjson_parse: '{ "users": [ { "name": "user1", "login_historys": [ { "date": "2019-10-10 0:0:0", "login_ip": "203.0.113.10" }, { "date": "2019-10-10 1:0:0", "login_ip": "203.0.113.10" } ] }, { "name": "user2", "login_historys": [ { "date": "2019-10-11 0:0:0", "login_ip": "203.0.113.20" }, { "date": "2019-10-11 1:0:0", "login_ip": "203.0.113.30" }, { "date": "2019-10-11 1:1:0", "login_ip": "203.0.113.50" } ] } ] }'
樣本2:從OSS中拉取常值內容。
常值內容
Test bytes
原始日誌
content: 123
加工規則
e_set( "test_txt", res_oss_file( endpoint="http://oss-cn-hangzhou.aliyuncs.com", ak_id="LT****Gw", ak_key="ab****uu", bucket="log-etl-staging", file="test.txt", ), )
加工結果
content: 123 test_txt: Test bytes
樣本3:從OSS拉取壓縮檔並解壓。
OSS壓縮檔內容
Test bytes\nupdate\n123
原始日誌
content:123
加工規則
e_set( "text", res_oss_file( endpoint="http://oss-cn-hangzhou.aliyuncs.com", ak_id="LT****Gw", ak_key="ab****uu", bucket="log-etl-staging", file="test.txt.gz", format="binary", change_detect_interval=30, decompress="gzip", ), )
加工結果
content:123 text: Test bytes\nupdate\n123
樣本4:不使用存取金鑰訪問OSS公用讀寫檔案。
OSS壓縮檔內容
Test bytes
原始日誌
content:123
加工規則
e_set( "test_txt", res_oss_file( endpoint="http://oss-cn-hangzhou.aliyuncs.com", bucket="log-etl-staging", file="test.txt", ), )
加工結果
content: 123 test_txt: Test bytes
更多參考
支援和其他函數組合使用。相關樣本,請參見從OSS擷取CSV檔案進行資料富化。