當您需要構建一個日誌檢索系統時,可通過Realtime ComputeFlink對日誌資料進行計算後,輸出到Elasticsearch進行搜尋。本文以阿里雲Log ServiceSLS(Log Service)為例,為您介紹具體的實現方法。
前提條件
您已完成以下操作:
開通阿里雲Realtime Compute服務並建立專案。
建立Elasticsearch執行個體。
具體操作,請參見建立Elasticsearch執行個體。
開通SLS服務、建立Project和Logstore。
具體操作,請參見開通阿里雲Log Service、建立專案Project和建立Logstore。
背景資訊
阿里雲Realtime ComputeFlink是阿里雲官方支援的Flink產品,支援包括Kafka、Elasticsearch等多種輸入輸出系統。Realtime ComputeFlink與Elasticsearch結合,能夠滿足典型的日誌檢索情境。
Kafka或LOG等系統中的日誌,經過Flink進行簡單或者複雜計算之後,輸出到Elasticsearch進行搜尋。結合Flink的強大計算能力與Elasticsearch的強大搜尋能力,可為業務提供即時資料加工及查詢,助力業務即時化轉型。
Realtime ComputeFlink為您提供了非常簡單的方式來對接Elasticsearch。例如當前業務中的日誌或者資料被寫入了LOG中,並且需要對LOG中的資料進行計算之後再寫到Elasticsearch中進行搜尋,可通過以下鏈路實現。
操作步驟
建立Realtime Compute作業。
具體操作,請參見阿里雲Blink獨享模式國際站文檔《Blink SQL開發指南》中的《作業開發》 > 《開發》章節。
編寫Flink SQL。
建立Log ServiceLOG源表。
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );
WITH參數說明如下表。
變數
說明
endPoint
阿里雲Log Service的公網服務入口,即訪問對應LOG專案及其內部日誌資料的URL。詳細資料,請參見服務入口。
例如杭州地區的Log Service入口為:http://cn-hangzhou.log.aliyuncs.com。需要在對應的服務入口前加http://。
accessId
您帳號的AccessKey ID。
accessKey
您帳號的AccessKey Secret。
startTime
消費日誌開始的時間點。運行Flink作業時所選時間要大於此處設定的時間。
project
LogService的專案名稱。
logStore
LogService專案下具體的LogStore名稱。
consumerGroup
Log Service的消費組名稱。
建立Elasticsearch結果表。
重要Realtime Compute3.2.2及以上版本增加了Elasticsearch結果表功能。建立Flink作業時,請注意所選的版本。
Elasticsearch結果表的實現使用了REST API,可以相容Elasticsearch的各個版本。
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );
WITH參數說明如下。
參數
說明
endPoint
Elasticsearch執行個體的公網地址,格式為http://<instanceid>.public.elasticsearch.aliyuncs.com:9200。可在執行個體的基本資料頁面擷取,詳細資料請參見查看執行個體的基本資料。
accessId
訪問Elasticsearch執行個體的使用者名稱,預設為elastic。
accessKey
對應使用者的密碼。elastic使用者的密碼在建立執行個體時設定,如果忘記可進行重設,重設密碼的注意事項和操作步驟,請參見重設執行個體訪問密碼。
index
索引名稱。如果您還未建立過索引,需要先建立一個索引。具體操作,請參見步驟三:建立索引。您也可以開啟自動建立索引功能,自動建立對應索引。具體操作,請參見配置YML參數。
typeName
索引類型。7.0及以上版本的Elasticsearch執行個體必須為_doc。
說明Elasticsearch支援根據PRIMARY KEY更新文檔,且
PRIMARY KEY
只能為1個欄位。指定PRIMARY KEY
後,文檔的ID為PRIMARY KEY
欄位的值。未指定PRIMARY KEY
,文檔的ID由系統隨機產生。詳細資料,請參見Index API。Elasticsearch支援多種更新模式,對應WITH中的參數為updateMode:
當
updateMode=full
時,新增的文檔會完全覆蓋已存在的文檔。當
updateMode=inc
時,Elasticsearch會根據輸入的欄位值更新對應的欄位。
Elasticsearch所有的更新預設為UPSERT語義,即INSERT或UPDATE。
處理商務邏輯並同步資料。
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
上線並啟動作業。
上線並啟動作業後,即可將Log Service中的資料進行簡單彙總後寫入Elasticsearch中。
更多資訊
使用Realtime ComputeFlink+Elasticsearch,可協助您快速建立即時搜尋鏈路。如果您有更複雜的Elasticsearch寫入需求,可以使用Realtime ComputeFlink的自訂Sink功能來實現。