全部產品
Search
文件中心

IoT Platform:資料轉寄到訊息佇列Kafka

更新時間:Jun 30, 2024

您可以使用規則引擎,將物聯網平台資料轉寄到訊息佇列(Kafka)中儲存,從而實現訊息從裝置、物聯網平台、Kafka到應用伺服器之間的全鏈路高可靠傳輸能力。本文以物模型資料上報Topic為例,介紹流轉訊息資料的完整流程。

前提條件

  • 已添加待轉寄的裝置Topic資料來源。例如:建立資料來源DataSource,添加指定裝置的物模型資料上報Topic。具體步驟,請參見添加待流轉的資料來源
  • 已建立訊息佇列(Kafka)執行個體和用於接收資料的Topic。Kafka使用方法,請參見Kafka快速入門
    重要 Kafka執行個體所在地區必須與物聯網平台服務的當前執行個體所在地區一致。
  • 當前阿里雲帳號已添加白名單許可權,支援將資料轉寄到訊息佇列Kafka。您可申請開通白名單許可權。

背景資訊

轉寄的資料目的配置完成後,會自動完成以下配置,實現裝置資料通過物聯網平台的規則引擎轉寄到訊息佇列(Kafka)。

  • 物聯網平台佔用Kafka執行個體所在虛擬交換器的2個IP地址。
  • 在Kafka執行個體所在的VPC網路下建立託管安全性群組,安全性群組名稱預設以sg-nsm-開頭。

建立資料目的

  1. 登入物聯網平台控制台

  2. 執行個體概覽頁簽的全部環境下,找到對應的執行個體,單擊執行個體卡片。

  3. 在左側導覽列,選擇訊息轉寄 > 雲產品流轉

  4. 雲產品流轉頁面,單擊右上方體驗新版,進入新版功能頁面。

    說明

    如果您已執行過此操作,再次進入雲產品流轉頁面,會直接進入新版功能頁面。

  5. 單擊資料目的頁簽,然後單擊建立資料目的
  6. 建立資料目的對話方塊,輸入資料目的名稱,例如DataPurpose,按照以下參數說明,完成配置,然後單擊確定
    參數描述
    選擇操作選擇發送資料到訊息佇列(Kafka)中
    角色授權物聯網平台將資料寫入Kafka。

    如您還未建立相關角色,單擊建立RAM角色,跳轉到RAM控制台,建立角色和授權策略,請參見建立RAM角色

    地區固定為您物聯網平台執行個體所在地區。
    執行個體選擇Kafka執行個體。

    您可以單擊建立執行個體,跳轉到訊息佇列控制台,建立Kafka執行個體。具體操作,請參見建立執行個體

    Topic選擇用於接收物聯網平台資料的Kafka Topic。

    您可以單擊建立Topic,跳轉到訊息佇列控制台,建立Kafka Topic。具體操作,請參見建立Topic


配置並啟動解析器

  1. 建立解析器,例如DataParser。具體操作,請參見步驟一:建立解析器
  2. 解析器詳情頁面,關聯資料來源。
    1. 在設定精靈的資料來源下,單擊關聯資料來源
    2. 在彈出的對話方塊中,單擊資料來源下拉式清單,選擇已建立的資料來源DataSource,單擊確定
  3. 解析器詳情頁面,關聯資料目的。
    1. 單擊設定精靈的資料目的,然後單擊資料目的列表右上方的關聯資料目的
    2. 在彈出的對話方塊中,單擊資料目的下拉式清單,選擇已建立的資料目的DataPurpose,單擊確定
    3. 在資料目的列表,查看並儲存資料目的ID,例如為1000
      後續解析指令碼中,需使用此處的資料目的ID
  4. 解析器詳情頁面,單擊解析器
  5. 在指令碼輸入框,輸入解析指令碼。
    函數參數說明,請參見函數列表
    //通過payload函數,擷取裝置上報的訊息內容,並按照JSON格式轉換。
    var data = payload("json");
    //直接流轉物模型上報資料。
    writeKafka(1000, data, "調試");

  6. 單擊調試,根據頁面提示,選擇產品和裝置,輸入Topic和Payload資料,驗證指令碼可執行。
    參數樣本如下:調試樣本

    運行結果如下,表示指令碼執行成功。

    action:
        transmit to kafka[destinationId=1000], data:{"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
    variables:
        data : {"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}

  7. 單擊發布

  8. 回到雲產品流轉頁面的解析器頁簽,單擊解析器DataParser對應的啟動按鈕,啟動解析器。
  9. 雲訊息佇列 Kafka 版控制台對應執行個體的Topic詳情頁面,查詢流轉的訊息。
    訊息