全部產品
Search
文件中心

Function Compute:RocketMQ觸發器

更新時間:Jul 19, 2024

訊息佇列 RocketMQ 版作為事件來源通過事件匯流排EventBridgeFunction Compute整合後,通過訊息佇列 RocketMQ 版觸發器(以下簡稱RocketMQ觸發器)能夠觸發關聯函數執行,通過函數可以對發布到訊息佇列 RocketMQ 版中的訊息進行自訂處理。本文介紹如何在Function Compute控制台建立RocketMQ觸發器、配置函數入口參數和編寫代碼並測試。

功能簡介

您在Function Compute的控制台提交觸發器建立請求之後,Function Compute會根據觸發器的配置資訊,自動在事件匯流排EventBridge側建立事件流資源。

建立完成後,您可以在Function Compute控制台查看觸發器資訊,同時也可以在事件匯流排EventBridge控制台查看自動建立的資源資訊。當源訊息佇列 RocketMQ 版執行個體中有訊息入隊時,將會觸發Function Compute執行。執行時會根據您的攢批配置,將一個或多個訊息事件以批的形式推送到函數中進行處理,適合端到端的流式資料處理情境。

注意事項

  • 作為觸發源的訊息佇列 RocketMQ 版的執行個體必須和Function Compute的函數在相同的地區。

  • 建立的事件流數量超過上限後,將無法再建立RocketMQ觸發器。

在單個阿里雲帳號單個地區維度下,關於建立觸發器涉及的資源數量的限制,請參見使用限制

前提條件

步驟一:建立觸發器

當您已經建立好RocketMQ的執行個體,您需要登入Function Compute控制台,進入目標函數,選擇配置頁簽,建立觸發器,建立完成點擊確定如下圖所示。

image

上圖中的配置項如下所示。

配置項

操作

本文樣本

消費位點

選擇訊息的消費位點,即訊息佇列 RocketMQ 版從事件匯流排開始拉取訊息的位置。取值說明如下。

  • 最新位點:從最新位點開始消費。

  • 最早位點:從最早位點開始消費。

  • 指定時間戳記:從指定時間戳記開始消費。

最新位點

調用方式

選擇函數調用方式。

取值說明如下。

  • 同步調用:適用於順序調用情境。單個(批)事件觸發函數調用,等待函數執行完成返回結果後,再由下一個(批)事件繼續觸發函數調用。同步調用請求本文承載最大為32 MB。更多資訊,請參見同步調用

  • 非同步呼叫:可以快速消費事件。單個(批)事件觸發函數調用,Function Compute會立刻返迴響應,再由下一個(批)事件繼續觸發函數調用。該過程中函數會非同步執行。非同步呼叫請求本文承載最大為128 KB。更多資訊,請參見非同步呼叫

同步調用

關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能

步驟二:(可選)配置函數入口參數

訊息佇列 RocketMQ 版事件來源會以event的形式作為輸入參數傳遞給函數,您可以使用代碼解析event參數,並對event進行處理。您可以手動將event傳給函數類比觸發事件,測試函數代碼是否正確。

  1. 在函數詳情頁面的代碼頁簽,單擊測試函數右側的image.png表徵圖,從下拉式清單中,選擇配置測試參數

  2. 配置測試參數面板,選擇建立新測試事件編輯已有測試事件,填寫事件名稱和事件內容,然後單擊確定。

    event格式如下所示。

    [
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        },
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        }
    ]

    data欄位包含的參數解釋如下表所示。關於CloudEvents規範中定義的參數解釋,請參見事件概述

    參數

    類型

    樣本值

    描述

    topic

    String

    TopicName

    Topic名稱。

    systemProperties

    Map

    系統屬性。

    MIN_OFFSET

    Int

    0

    最低位點。

    TRACE_ON

    Boolean

    true

    是否有訊息軌跡。取值說明如下:

    • true:有訊息軌跡。

    • false:無訊息軌跡。

    MAX_OFFSET

    Int

    8

    最高位點。

    MSG_REGION

    String

    cn-hangzhou

    發送訊息的地區。

    KEYS

    String

    systemProperties.KEYS

    過濾屬性。

    CONSUME_START_TIME

    Long

    1628577790396

    開始消費時間。單位:毫秒。

    UNIQ_KEY

    String

    AC14C305069E1B28CDFA3181CDA2****

    訊息唯一鍵。

    TAGS

    String

    systemProperties.TAGS

    過濾屬性。

    INSTANCE_ID

    String

    MQ_INST_123456789098****_BXhFHryi

    執行個體ID。

    userProperties

    Map

    使用者屬性。

    body

    String

    TEST

    訊息內容。

步驟三:編寫函數代碼並測試

完成觸發器建立後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當訊息佇列 RocketMQ 版事件通過事件匯流排EventBridge投遞到Function Compute時,觸發器會自動觸發函數的執行。

  1. 在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼

    本文以Node.js函數代碼為例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // 解析event參數,對event進行處理。
      callback(null, 'return result');
    }
  2. 測試函數。

    方式一:如果您是配置函數入口參數event類比事件來源,單擊測試函數

    方式二:登入訊息佇列 RocketMQ 版控制台選擇您建立的目標Topic,點擊發送訊息,如下圖。

    image

  3. 執行完成後,在即時日誌查看結果。

    image

更多資訊

如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理