通過Function Compute訪問Table Store,對錶格儲存增量資料進行Realtime Compute。
背景資訊
Function Compute(Function Compute,簡稱FC)是事件驅動的全託管計算服務。使用Function Compute,您無需採購與管理伺服器等基礎設施,只需編寫並上傳代碼或鏡像。Function Compute為您準備好計算資源,彈性地、可靠地運行任務,並提供日誌查詢、效能監控和警示等功能。更多資訊,請參見什麼是Function Compute。
Tablestore Stream是用於擷取Tablestore表中增量資料的一個資料通道,通過建立Tablestore觸發器,能夠實現Stream和Function Compute的自動對接,讓計算函數中自訂的程式邏輯自動處理Tablestore表中發生的資料修改。
使用情境
使用Function Compute可以實現如下任務,如下圖所示。
資料同步:同步Table Store中的即時資料到資料緩衝、搜尋引擎或者其他資料庫執行個體中。
資料歸檔:增量歸檔Table Store中的資料到OSS等做冷備份。
事件驅動:利用觸發器觸發函數調用IoT套件、雲應用API或者做訊息通知等。
準備工作
已開通Function Compute服務。具體操作,請參見開通Function Compute服務。
注意事項
目前支援使用Tablestore觸發器的地區有:華北2(北京)、華東1(杭州)、華東2(上海)、華南1(深圳)、日本(東京)、新加坡、德國(法蘭克福)和中國香港。
Table Store資料表與Function Compute服務要處於同一地區。
如果您需要Tablestore觸發器對應的函數通過內網訪問Table Store,您必須使用Table Store的VPC Endpoint,不可以使用Table Store的內網 Endpoint。
VPC Endpoint的格式為{instanceName}.{regionId}.vpc.tablestore.aliyuncs.com。詳細資料,請參見擷取服務地址。
編寫函數時,請注意不要出現以下邏輯:Table StoreTable A觸發函數B,函數B又更新Table A的資料。這種邏輯會造成函數無限調用問題。
觸發的函數執行時間不能超過一分鐘。
如果函數執行出現異常,函數將無限重試直到Tablestore中的日誌資料到期。
說明函數執行異常有以下情況:
函數代碼運行異常:函數執行個體已拉起,因此函數執行個體啟動並執行時間段內會產生費用。
函數啟動異常:函數執行個體由於啟動指令錯誤等原因未成功拉起,此時不會產生費用。
如果函數執行異常,為避免函數無限重試,您可以關閉資料表的Stream功能。在關閉資料表的Stream功能前請確認沒有其他觸發器在使用該資料表,防止導致其他觸發器異常。
步驟一:為資料表開啟Stream功能
使用觸發器功能需要先在Table Store控制台開啟資料表的Stream功能,才能在Function Compute中處理寫入Table Store中的增量資料。
在頁面上方,選擇地區。
在概覽頁面,單擊執行個體名稱或在執行個體操作列單擊執行個體管理。
在執行個體詳情頁簽的資料表列表頁簽,單擊資料表名稱後選擇即時消費通道頁簽或單擊後選擇即時消費通道。
在即時消費通道頁簽,單擊Stream資訊對應的開啟。
在開啟Stream功能對話方塊,設定日誌到期時間長度,單擊開啟。
日誌到期時間長度取值為非零整數,單位為小時,最長時間長度為168小時。
重要日誌到期時間長度設定後不能修改,請謹慎設定。
步驟二:建立函數和Tablestore觸發器
建立函數。
可選:在頁面右上方,單擊體驗Function Compute3.0。
說明Function Compute3.0進行了多項功能改進,本文採用Function Compute3.0進行Function Compute的使用介紹。
若您已進入新版控制台頁面(頁面右上方的按鈕為返回Function Compute2.0),則無需執行此操作。
在左側導覽列,單擊函數。
在頂部功能表列,選擇地區,然後在函數頁面,單擊建立函數。
在建立函數頁面,按需選擇建立函數的方式,配置以下配置項,然後單擊建立。
此處以建立事件函數為例,介紹對錶格儲存中資料修改進行Realtime Compute的操作。
基本設定:設定函數名稱。
函數代碼:配置函數的運行環境和代碼相關資訊。
配置項
說明
樣本
運行環境
選擇您熟悉的語言,例如Python、Java、PHP、Node.js或自訂容器等。
自訂容器鏡像。
此處選擇Python 3.9。
代碼上傳方式
選擇代碼上傳到Function Compute的方式。
使用範例程式碼:預設,您可以根據業務需要選擇Function Compute為您提供的建立函數的範例程式碼。
通過 ZIP 包上傳代碼:選擇函數代碼ZIP包並上傳。
通過檔案夾上傳代碼:選擇包含函數代碼的檔案夾並上傳。
通過 OSS 上傳代碼:選擇上傳函數代碼的Bucket 名稱和檔案名稱。
此處請選擇使用範例程式碼後,在範例程式碼列表中選擇Hello, world! 樣本。
進階配置:配置函數的執行個體相關資訊和函數執行逾時時間等。
配置項
說明
樣本
規格方案
根據您的業務情況,選擇或手動輸入合理的vCPU規格和記憶體規格組合。關於各資源使用的計費詳情,請參見計費概述。
說明vCPU大小(單位為核)與記憶體大小(單位為GB)的比例必須設定在1:1到1:4之間。
0.35核,512 MB
臨時硬碟大小
根據您的業務情況,選擇臨時隱藏檔的硬碟大小。
取值說明如下。
512 MB:預設值。不計費,Function Compute為您提供512 MB以內的硬碟免費使用額度。
10 GB:按9.5 GB進行計費。
說明臨時硬碟中所有目錄可寫,共用臨時硬碟的空間。
臨時硬碟大小與底層執行函數的執行個體生命週期一致,執行個體被系統回收後,硬碟上的資料也會消失。如您需要對檔案進行持久化儲存,可以選擇掛載NAS或OSS。具體操作,請參見配置NAS檔案系統和配置OSSObject Storage Service。
512 MB
執行逾時時間
設定逾時時間。執行逾時時間預設為180秒,最長為86400秒。
180
請求處理常式
佈建要求處理常式,Function Compute的運行時會載入並調用您的請求處理常式處理請求。建立函數的方式選擇Web函數時,無需設定此配置項。
說明代碼上傳方式選擇使用範例程式碼時,不需要修改請求處理常式。當選擇其他代碼上傳方式時,則需要根據實際情況修改請求處理常式,否則函數執行時會報錯。
index.handler
時區
選擇函數的時區。此處設定函數的時區後,將自動為函數添加一條環境變數TZ,其值為您設定的目標時區。
UTC
函數角色
Function Compute平台會使用這個RAM角色來產生訪問您的阿里雲資源的臨時密鑰,並傳遞給您的代碼。
重要需授予函數角色訪問Table Store服務的許可權。更多資訊,請參見附錄:授予Function Compute訪問Table Store的許可權。
AliyunFCDefaultRole
允許訪問 VPC
是否允許函數訪問VPC內資源。更多資訊,請參見配置網路。
是
專用網路
允許訪問 VPC選擇是時必填。建立新的VPC或在下拉式清單中選擇要訪問的VPC ID。
fc.auto.create.vpc.1632317****
交換器
允許訪問 VPC選擇是時必填。建立新的交換器或在下拉式清單中選擇交換器ID。
fc.auto.create.vswitch.vpc-bp1p8248****
安全性群組
允許訪問 VPC選擇是時必填。建立新的安全性群組或在下拉式清單中選擇安全性群組。
fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****
允許函數預設網卡訪問公網
是否允許函數可以通過預設網卡訪問公網。關閉後,當前服務中的函數將無法通過Function Compute的預設網卡訪問公網。
重要使用固定公網IP地址功能時,您必須關閉允許函數預設網卡訪問公網,否則配置的固定公網IP地址不生效。更多資訊,請參見配置固定公網IP地址。
是
日誌功能
是否啟用阿里雲Log Service。取值說明如下:
啟用:函數的執行日誌被持久化儲存到Log Service,方便您代碼調試、故障分析和資料分析等。
說明啟用日誌功能後,函數中列印到 stdout 的內容就會被阿里雲Log Service採集到。然後您可以查看函數的執行日誌,從而方便您的代碼調試、故障分析、資料分析等操作。
點擊配置日誌查看更多詳情。
Function Compute在後台為您建立的Log Service資源會產生費用,詳情請參見按使用功能計費模式計費項目。
禁用:函數的執行日誌將無法通過Log Service儲存和查詢。
啟用
(可選)環境變數:設定函數運行環境中的環境變數。更多資訊,請參見配置環境變數。
建立Tablestore觸發器。
在函數詳情頁簽,選擇配置頁簽,在左側導覽列,單擊觸發器,然後單擊建立觸發器。
在建立觸發程序面板,填寫相關資訊,然後單擊確定。
配置項
操作
本文樣本
觸發器類型
選擇Table Store Tablestore。
Table StoreTablestore
名稱
自訂填寫觸發器名稱。
Tablestore-trigger
版本或別名
預設值為LATEST,如果您需要建立其他版本或別名的觸發器,需先在函數詳情頁的版本或別名下拉式清單選擇該版本或別名。關於版本和別名的簡介,請參見版本管理和別名管理。
LATEST
執行個體
在列表中選擇已建立的Tablestore執行個體。
d00dd8xm****
表格
在列表中選擇已建立的表格。
mytable
角色名稱
選擇AliyunTableStoreStreamNotificationRole。
說明如果您第一次建立該類型的觸發器,則需要在單擊確定後,在彈出的對話方塊中選擇立即授權。
AliyunTableStoreStreamNotificationRole
建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟三:配置函數的測試參數
在函數詳情頁簽的代碼頁簽,單擊測試函數右側的表徵圖,從下拉式清單中,選擇配置測試參數。
在配置測試參數對話方塊,選擇建立新測試事件頁簽,選擇事件模板為Tablestore,並填寫事件名稱和事件內容。單擊確定。
說明如果已建立Tablestore測試事件,您可以在編輯已有測試事件頁簽,選擇已有事件名稱。
Table Store觸發器使用CBOR格式對增量資料進行編碼,構成Function Compute的event。具體格式如下所示。
{ "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }
event參數中不同屬性欄位的解釋請參見下表。
參數
描述
Version
Payload版本號碼。樣本如Sync-v1。類型為String。
Records
資料表中的增量資料行數組。包含如下內部成員:
Type:資料行類型,包含PutRow、UpdateRow和DeleteRow。類型為String。
Info:包含Timestamp內部成員。Timestamp表示該行的最後修改UTC時間。類型為Int64。
PrimaryKey
主鍵列數組。包含如下內部成員:
ColumnName:主鍵列名稱。類型為String。
Value:主鍵列內容。類型為formated_value,支援Integer、String和Blob。
Columns
屬性列數組。包括如下內部成員:
Type:屬性列類型,包含Put、DeleteOneVersion和DeleteAllVersions。類型為String。
ColumnName:屬性列名稱。類型為String。
Value:屬性列內容。類型為formated_value,支援Integer、Boolean、Double、String和Blob。
Timestamp:屬性列最後修改UTC時間。類型為Int64。
步驟四:編寫函數代碼並測試
完成Tablestore觸發器建立後,您可以開始編寫函數代碼並測試,以驗證代碼的正確性。在實際操作過程中,Tablestore中有資料更新時會自動觸發函數執行。
在函數詳情頁簽的代碼頁簽,使用代碼編輯器編寫代碼,然後單擊部署代碼。
本文以Python函數代碼為例。如果您想使用其他運行環境,更多程式碼範例,請參見Table Store觸發Function Compute樣本。
import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") # records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'
單擊測試函數。
執行完成後,您可以在函數代碼頁簽的上方查看執行結果。
修改代碼並部署代碼。
當使用
records=json.loads(event)
測試OK後,修改records
代碼為records = cbor.loads(event)
。單擊部署代碼。
當Tablestore中有資料寫入時,相關的函數邏輯就會被觸發。
常見問題
如果您無法在某一地區建立Tablestore觸發器,請確認支援建立Tablestore觸發器的地區,具體請參見注意事項。
如果您在建立Tablestore觸發器時無法找到已經建立好的Table Store資料表,請確認Table Store資料表與Function Compute服務是否處於同一地區。
使用Tablestore觸發器時,總是會報用戶端取消的報錯,一般是由於用戶端調用函數時設定的逾時時間小於函數執行時間。建議您將用戶端逾時時間調大,具體請參見用戶端中斷連線,報錯Invocation canceled by client怎麼辦?。
如果Tablestore資料表中有新增的資料,但是Tablestore觸發器沒有被觸發,您可以從以下方面進行排查。關於觸發器不能正常觸發的詳細排查方案可參見觸發器不能正常觸發函數執行怎麼辦?。
確認資料表是否開啟了Stream功能,具體請參見為資料表開啟Stream功能。
確認在建立觸發器時配置的角色是否正確,您可以使用預設的觸發器角色
AliyunTableStoreStreamNotificationRole
,具體請參見建立Tablestore觸發器。查看是否有函數作業記錄,可以根據日誌確認是否是函數執行失敗。函數執行失敗後,會一直重試直到Tablestore中的日誌資料到期。
附錄:授予Function Compute訪問Table Store的許可權
在使用Function Compute提供的功能時,Function Compute需要訪問Table Store。此時,需要為函數授予相應許可權。如果是較粗粒度的授權,可以選擇Function Compute系統提供的預設服務角色AliyunFCDefaultRole,如果需要更細粒度的授權,則需要為函數授予其他角色及相應的權限原則。
採用系統預設角色方式授權。
為AliyunFCDefaultRole角色授予AliyunOTSFullAccess許可權(管理Table Store服務的許可權)。具體操作,請參見為RAM角色授權。
說明AliyunFCDefaultRole是Function Compute的預設服務角色,但不包含Table Store的許可權。
如果您是初次使用該角色,您需要為該角色添加Table Store許可權。
如果您已為該角色添加Table Store許可權,則無需執行此操作。
採用自訂角色方式授權。
具體操作,請參見樣本:授予Function Compute訪問OSS的許可權。