全部產品
Search
文件中心

Elasticsearch:通過Realtime Compute處理資料並同步到Elasticsearch

更新時間:Jun 30, 2024

當您需要構建一個日誌檢索系統時,可通過Realtime ComputeFlink對日誌資料進行計算後,輸出到Elasticsearch進行搜尋。本文以阿里雲Log ServiceSLS(Log Service)為例,為您介紹具體的實現方法。

前提條件

您已完成以下操作:

背景資訊

阿里雲Realtime ComputeFlink是阿里雲官方支援的Flink產品,支援包括Kafka、Elasticsearch等多種輸入輸出系統。Realtime ComputeFlink與Elasticsearch結合,能夠滿足典型的日誌檢索情境。

Kafka或LOG等系統中的日誌,經過Flink進行簡單或者複雜計算之後,輸出到Elasticsearch進行搜尋。結合Flink的強大計算能力與Elasticsearch的強大搜尋能力,可為業務提供即時資料加工及查詢,助力業務即時化轉型。

Realtime ComputeFlink為您提供了非常簡單的方式來對接Elasticsearch。例如當前業務中的日誌或者資料被寫入了LOG中,並且需要對LOG中的資料進行計算之後再寫到Elasticsearch中進行搜尋,可通過以下鏈路實現。Flink+ES資料鏈路

操作步驟

  1. 登入Realtime Compute控制台

  2. 建立Realtime Compute作業。

    具體操作,請參見阿里雲Blink獨享模式國際站文檔《Blink SQL開發指南》中的《作業開發》 > 《開發》章節。

  3. 編寫Flink SQL。

    1. 建立Log ServiceLOG源表。

      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );

      WITH參數說明如下表。

      變數

      說明

      endPoint

      阿里雲Log Service的公網服務入口,即訪問對應LOG專案及其內部日誌資料的URL。詳細資料,請參見服務入口

      例如杭州地區的Log Service入口為:http://cn-hangzhou.log.aliyuncs.com。需要在對應的服務入口前加http://

      accessId

      您帳號的AccessKey ID。

      accessKey

      您帳號的AccessKey Secret。

      startTime

      消費日誌開始的時間點。運行Flink作業時所選時間要大於此處設定的時間。

      project

      LogService的專案名稱。

      logStore

      LogService專案下具體的LogStore名稱。

      consumerGroup

      Log Service的消費組名稱。

    2. 建立Elasticsearch結果表。

      重要
      • Realtime Compute3.2.2及以上版本增加了Elasticsearch結果表功能。建立Flink作業時,請注意所選的版本。

      • Elasticsearch結果表的實現使用了REST API,可以相容Elasticsearch的各個版本。

      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch-7',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );

      WITH參數說明如下。

      參數

      說明

      endPoint

      Elasticsearch執行個體的公網地址,格式為http://<instanceid>.public.elasticsearch.aliyuncs.com:9200。可在執行個體的基本資料頁面擷取,詳細資料請參見查看執行個體的基本資料

      accessId

      訪問Elasticsearch執行個體的使用者名稱,預設為elastic。

      accessKey

      對應使用者的密碼。elastic使用者的密碼在建立執行個體時設定,如果忘記可進行重設,重設密碼的注意事項和操作步驟,請參見重設執行個體訪問密碼

      index

      索引名稱。如果您還未建立過索引,需要先建立一個索引。具體操作,請參見步驟三:建立索引。您也可以開啟自動建立索引功能,自動建立對應索引。具體操作,請參見配置YML參數

      typeName

      索引類型。7.0及以上版本的Elasticsearch執行個體必須為_doc

      說明
      • Elasticsearch支援根據PRIMARY KEY更新文檔,且PRIMARY KEY只能為1個欄位。指定PRIMARY KEY後,文檔的ID為PRIMARY KEY欄位的值。未指定PRIMARY KEY,文檔的ID由系統隨機產生。詳細資料,請參見Index API

      • Elasticsearch支援多種更新模式,對應WITH中的參數為updateMode

        • updateMode=full時,新增的文檔會完全覆蓋已存在的文檔。

        • updateMode=inc時,Elasticsearch會根據輸入的欄位值更新對應的欄位。

      • Elasticsearch所有的更新預設為UPSERT語義,即INSERT或UPDATE。

    3. 處理商務邏輯並同步資料。

      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  4. 上線並啟動作業。

    上線並啟動作業後,即可將Log Service中的資料進行簡單彙總後寫入Elasticsearch中。

更多資訊

使用Realtime ComputeFlink+Elasticsearch,可協助您快速建立即時搜尋鏈路。如果您有更複雜的Elasticsearch寫入需求,可以使用Realtime ComputeFlink的自訂Sink功能來實現。