全部產品
Search
文件中心

Function Compute:Kafka觸發器

更新時間:Jul 06, 2024

雲訊息佇列 Kafka 版觸發器(以下簡稱Kafka觸發器)是通過事件匯流排EventBridge雲訊息佇列 Kafka 版作為事件來源與Function Compute進行整合。建立完成後,您可以在Function Compute控制台和事件匯流排EventBridge控制台查看建立的資訊。當有訊息入隊時,事件匯流排EventBridge會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理。

注意事項

  • 作為觸發源的雲訊息佇列 Kafka 版執行個體必須和Function Compute的函數在相同的地區。

  • 建立的事件流數量超過上限後,將無法再建立Kafka觸發器。關於事件流數量的限制,請參見使用限制

前提條件

步驟一:建立Kafka觸發器

  1. 登入Function Compute控制台,在左側導覽列,單擊服務及函數
  2. 在頂部功能表列,選擇地區,然後在服務列表頁面,單擊目標服務。
  3. 函數管理頁面,單擊目標函數名稱。

  4. 在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉式清單選擇要建立觸發器的版本或別名,然後單擊建立觸發器

  5. 在建立觸發程序面板,填寫相關資訊。然後單擊確定

    基礎配置項說明如下所示。

    配置項

    操作

    本文樣本

    觸發器類型

    選擇訊息佇列 Kafka 版

    訊息佇列 Kafka 版

    名稱

    填寫自訂的觸發器名稱。

    kafka-trigger

    版本或別名

    預設值為LATEST,如果您需要建立其他版本或別名的觸發器,首先需要在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見管理版本管理別名

    LATEST

    Kafka 執行個體

    選擇已建立的雲訊息佇列 Kafka 版執行個體。

    alikafka_pre-cn-i7m2t7t1****

    Topic

    選擇已建立的雲訊息佇列 Kafka 版執行個體的Topic。

    topic1

    Group ID

    選擇已建立的雲訊息佇列 Kafka 版執行個體的Group ID。

    說明

    請使用獨立的Group ID來建立觸發器,不要與已有的業務混用Group ID,否則會影響已有的訊息收發。

    GID_group1

    消費任務並發數

    消費者的並發數量,取值範圍為[1,Topic的分區數]。

    2

    消費位點

    選擇訊息的消費位點,即雲訊息佇列 Kafka 版從事件匯流排開始拉取訊息的位置。

    取值說明如下。

    • 最早位點:從最早位點開始消費。

    • 最新位點:從最新位點開始消費。

    最新位點

    調用方式

    選擇函數調用方式。

    取值說明如下。

    • 同步調用:適用於順序調用情境。單個(批)事件觸發函數調用,等待函數執行完成返回結果後,再由下一個(批)事件繼續觸發函數調用。同步調用請求本文承載最大為32 MB。更多資訊,請參見同步調用

    • 非同步呼叫:可以快速消費事件。單個(批)事件觸發函數調用,Function Compute會立刻返迴響應,再由下一個(批)事件繼續觸發函數調用。該過程中函數會非同步執行。非同步呼叫請求本文承載最大為128 KB。更多資訊,請參見非同步呼叫

    同步調用

    投遞並發最大值

    Kafka訊息投遞到Function Compute的並發最大值,取值範圍為1~300。該參數僅對同步調用生效。如果需要更高的並發,請進入EventBridge配額中心申請配額名稱為EventStreaming FC Sink 同步投遞最大並發數的配額。

    1

    觸發器啟用狀態

    建立觸發器後是否立即啟用。預設勾選啟用觸發器,即建立觸發器後立即啟用觸發器。

    不涉及

    關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能

    建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理

步驟二:配置函數入口參數

雲訊息佇列 Kafka 版事件來源會以event的形式作為輸入參數傳遞給函數,您可以手動將event傳給函數類比觸發事件。

  1. 在函數詳情頁面,單擊函數代碼頁簽,然後單擊測試函數右側xialatubiao表徵圖,從下拉式清單中,選擇配置測試參數

  2. 配置測試參數面板,選擇建立新測試事件編輯已有測試事件頁簽,填寫事件名稱和事件內容。然後單擊確定

    event格式如下所示:

    [
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        },
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        }
    ]

    CloudEvents規範中定義的參數解釋,請參見事件概述

    data欄位包含的參數解釋如下表所示。

    參數

    類型

    樣本值

    描述

    topic

    String

    TopicName

    Topic的名稱。

    partition

    Int

    1

    雲訊息佇列 Kafka 版的消費分區資訊。

    offset

    Int

    0

    雲訊息佇列 Kafka 版的訊息位點。

    timestamp

    String

    1655952591589

    開始消費時間戳記。

步驟三:編寫函數代碼並測試

觸發器建立完成後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當雲訊息佇列 Kafka 版事件發生時,觸發器會自動觸發函數的執行。

  1. 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼

    本文以Node.js函數代碼為例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event參數,對event進行處理。
      callback(null, 'return result');
    }
  2. 單擊函數代碼頁簽的測試函數

    執行完成後,您可以在函數字碼頁簽的上方查看執行結果。

常見問題

為什麼Kafka觸發器觸發函數的次數與訊息的條數不一致?

事件匯流排EventBridge會根據您的攢批配置將訊息推送給函數處理,不能通過函數調用次數判斷訊息是否全部被消費。您可以在函數中列印日誌來確認消費的訊息情況。更多關於攢批配置的資訊請參見推送配置

Function Compute支援處理自建的Kafka訊息佇列產生的訊息嗎?

暫時不支援。Function Compute不會主動監測自建的Kafka訊息佇列是否有新的訊息,您可以使用HTTP函數,將訊息附加到HTTP請求中主動觸發函數。