全部產品
Search
文件中心

Function Compute:RocketMQ觸發器

更新時間:Jul 06, 2024

雲訊息佇列 RocketMQ 版作為事件來源通過事件匯流排EventBridgeFunction Compute整合後,通過雲訊息佇列 RocketMQ 版觸發器(以下簡稱RocketMQ觸發器)能夠觸發關聯函數執行,通過函數可以對發布到雲訊息佇列 RocketMQ 版中的訊息進行自訂處理。本文介紹如何在Function Compute控制台建立RocketMQ觸發器、配置函數入口參數和編寫代碼並測試。

功能簡介

您在Function Compute的控制台提交觸發器建立請求之後,Function Compute會根據觸發器的配置資訊,自動建立事件匯流排EventBridge相關的資源。目前提供事件模式事件流模式兩種訊息推送模式,每個模式建立的資源如下:
建立完成後,您可以在Function Compute控制台查看觸發器資訊,同時也可以在事件匯流排EventBridge控制台查看自動建立的資源資訊。當源雲訊息佇列 RocketMQ 版執行個體中有訊息入隊時,將會觸發Function Compute執行,不同訊息推送模式支援的參數內容也不同。
  • 事件模式:每次會將單個訊息作為事件參數傳入函數中,事件遵循CloudEvents規範。訊息內容和CloudEvents的關係,請參見參數內容
  • 事件流模式:根據您的攢批配置,將一個或多個訊息事件以批的形式推送到函數中進行處理,適合端到端的流式資料處理情境。

注意事項

  • 作為觸發源的雲訊息佇列 RocketMQ 版的執行個體必須和Function Compute的函數在相同的地區。
  • 建立的自訂匯流排以及事件規則的數量超過上限後,將無法再建立事件模式的RocketMQ觸發器。
  • 建立的事件流數量超過上限後,將無法再建立事件流模式的RocketMQ觸發器。

在單個阿里雲帳號單個地區維度下,關於建立觸發器涉及的資源數量的限制,請參見使用限制

前提條件

步驟一:建立觸發器

  1. 登入Function Compute控制台,在左側導覽列,單擊服務及函數
  2. 在頂部功能表列,選擇地區,然後在服務列表頁面,單擊目標服務。
  3. 函數管理頁面,單擊目標函數名稱。

  4. 在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉式清單選擇要建立觸發器的版本或別名,然後單擊建立觸發器

  5. 在建立觸發程序面板,填寫相關資訊。然後單擊確定
    基礎配置項說明如下所示。
    配置項操作本文樣本
    觸發器類型選擇訊息佇列 RocketMQ 版訊息佇列 RocketMQ 版
    名稱填寫自訂的觸發器名稱。rocketmq-trigger
    版本或別名預設值為LATEST,如果您需要建立其他版本或別名的觸發器,首先需要在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見管理版本管理別名LATEST
    RocketMQ 執行個體選擇已建立的雲訊息佇列 RocketMQ 版的執行個體。MQ_INST_164901546557****_BX7****
    Topic選擇已建立的雲訊息佇列 RocketMQ 版執行個體的Topic。topic1
    Tag填寫訊息過濾標籤。

    只有收到包含此處設定的過濾標籤字串的訊息時,才會觸發函數執行。

    tag
    Group ID選擇已建立的雲訊息佇列 RocketMQ 版執行個體的Group ID。GID_group1
    消費位點選擇訊息的消費位點,即雲訊息佇列 RocketMQ 版從事件匯流排開始拉取訊息的位置。取值說明如下。
    • 最新位點:從最新位點開始消費。
    • 最早位點:從最早位點開始消費。
    • 指定時間戳記:從指定時間戳記開始消費。
    最新位點
    調用方式選擇函數調用方式。
    取值說明如下。
    • 同步調用:適用於順序調用情境。單個(批)事件觸發函數調用,等待函數執行完成返回結果後,再由下一個(批)事件繼續觸發函數調用。同步調用請求本文承載最大為32 MB。更多資訊,請參見同步調用
    • 非同步呼叫:可以快速消費事件。單個(批)事件觸發函數調用,Function Compute會立刻返迴響應,再由下一個(批)事件繼續觸發函數調用。該過程中函數會非同步執行。非同步呼叫請求本文承載最大為128 KB。更多資訊,請參見非同步呼叫
    同步調用
    訊息推送模式訊息資料推送到Function Compute時的底層應用模式。
    取值說明如下。
    • 事件流模式:會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理,適合端到端的流式資料處理情境。
    • 事件模式:每次會將單個訊息作為事件參數傳入函數中,事件遵循CloudEvents規範。訊息內容和CloudEvents的關係,請參見步驟二:配置函數入口參數
    事件模
    觸發器啟用狀態建立觸發器後是否立即啟用。預設勾選啟用觸發器,即建立觸發器後立即啟用觸發器。不涉及

    關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能

    建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理


步驟二:配置函數入口參數

雲訊息佇列 RocketMQ 版事件來源會以event的形式作為輸入參數傳遞給函數,您可以手動將event傳給函數類比觸發事件,測試函數代碼是否正確。

  1. 在函數詳情頁面,單擊函數代碼頁簽,然後單擊測試函數右側xialatubiao表徵圖,從下拉式清單中,選擇配置測試參數
  2. 配置測試參數面板,選擇建立新測試事件編輯已有測試事件頁簽,填寫事件名稱和事件內容。然後單擊確定
    事件模式的event格式如下所示。
    {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
    }
    事件流模式的event格式如下所示。
    [
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        },
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        }
    ]
    data欄位包含的參數解釋如下表所示。關於CloudEvents規範中定義的參數解釋,請參見事件概述

    參數

    類型

    樣本值

    描述

    topic

    String

    TopicName

    Topic名稱。

    systemProperties

    Map

    系統屬性。

    MIN_OFFSET

    Int

    0

    最低位點。

    TRACE_ON

    Boolean

    true

    是否有訊息軌跡。取值說明如下:

    • true:有訊息軌跡。

    • false:無訊息軌跡。

    MAX_OFFSET

    Int

    8

    最高位點。

    MSG_REGION

    String

    cn-hangzhou

    發送訊息的地區。

    KEYS

    String

    systemProperties.KEYS

    過濾屬性。

    CONSUME_START_TIME

    Long

    1628577790396

    開始消費時間。單位:毫秒。

    UNIQ_KEY

    String

    AC14C305069E1B28CDFA3181CDA2****

    訊息唯一鍵。

    TAGS

    String

    systemProperties.TAGS

    過濾屬性。

    INSTANCE_ID

    String

    MQ_INST_123456789098****_BXhFHryi

    執行個體ID。

    userProperties

    Map

    使用者屬性。

    body

    String

    TEST

    訊息內容。


步驟三:編寫函數代碼並測試

完成觸發器建立後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當雲訊息佇列 RocketMQ 版事件通過事件匯流排EventBridge投遞到Function Compute時,觸發器會自動觸發函數的執行。

  1. 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼
    本文以Node.js函數代碼為例。
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event參數,對event進行處理。
      callback(null, 'return result');
    }

  2. 單擊函數代碼頁簽的測試函數

    執行完成後,您可以在函數字碼頁簽的上方查看執行結果。

更多資訊

如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理