Logstash是開源的伺服器端資料處理管道,能夠同時從多個資料來源採集資料,然後對資料進行轉換,並將資料寫入指定的儲存中。AnalyticDB for MySQL完全相容MySQL,您可以將Logstash Input外掛程式支援的任一資料來源中的資料寫入AnalyticDB for MySQL。本文介紹如何使用Logstash將Kafka資料寫入AnalyticDB for MySQL數倉版。
Logstash組件介紹
輸入-採集各種樣式、大小和來源的資料
在實際業務中,資料往往以各種各樣的形式分散或集中地儲存在多個系統中,Logstash支援多種資料輸入方式,可以在同一時間從多種資料來源採集資料。Logstash能夠以連續的串流方式輕鬆地從使用者的日誌、指標、Web應用、資料存放區以及AWS服務採集資料。
過濾-即時解析和轉換資料
資料從源傳輸到目標儲存的過程中,Logstash過濾器能夠解析各個事件,識別已命名的欄位來構建結構,並將它們轉換成通用格式,從而更輕鬆、快速地分析和實現商業價值。
使用Grok從非結構化資料中派生出結構化資料。
從IP地址破譯出地理座標。
將PII資料匿名化,完全排除敏感欄位。
簡化整體處理,不受資料來源、格式或架構的影響
輸出-匯出資料
除了AnalyticDB for MySQL以外,Logstash提供多種資料輸出方向,靈活解鎖眾多下遊用例。
操作步驟
Kafka是一個高輸送量的分布式發布、訂閱Log Service,具有高可用、高效能、分布式、高擴充、持久性等特點。目前Kafka已經被各大公司廣泛使用,同時logstash也可以快速接入業務中,免去重複建設的麻煩。
在Apache Kafka伺服器根目錄,執行以下命令安裝和更新外掛程式。
$ bin/plugin install $ bin/plugin update
Logstash從1.5版本開始整合Kafka,Logstash 1.5及以上版本中所有外掛程式的目錄和命名都發生了改變,外掛程式發布地址為Logstash-plugins。
配置外掛程式。
Input配置樣本
以下配置可以實現對Kafka讀取端(consumer)的基本使用。
input { kafka { zk_connect => "localhost:2181" group_id => "Logstash" topic_id => "test" codec => plain reset_beginning => false # boolean (optional), default: false consumer_threads => 5 # number (optional), default: 1 decorate_events => true # boolean (optional), default: false } }
參數說明:
group_id
:消費者分組,可以通過組ID來指定,不同組之間的消費互不影響,相互隔離。topic_id
:指定消費話題(Topic),也可以理解為先訂閱某個話題,然後消費。reset_beginning
:指定Logstash啟動後從哪個位置開始讀取資料,預設是結束位置,即Logstash進程會從上次讀取結束時的位移量開始繼續讀取資料;如果之前沒有消費過,則從頭讀取資料。如果您要匯入原資料,需將
reset_beginning
值改為true
, Logstash進程將從頭開始讀取資料,作用類似於cat ,但是Logstash讀到最後一行時不會終止,而是變成tail -F
,繼續監聽相應資料。decorate_events
:指定輸出訊息時會輸出自身資訊,包括消費訊息的大小、Topic來源以及consumer的group資訊。rebalance_max_retries
:當有新的consumer(Logstash)加入到同一個group時,將會Reblance ,此後將會有Partitions的消費端遷移到新的consumer上。如果一個consumer獲得了某個Partition的消費許可權,那麼它將會向Zookeeper註冊Partition Owner registry節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點,此值用於控制註冊節點的重試次數。consumer_timeout_ms
:在指定時間內沒有訊息到達將拋出異常,該參數一般無需修改。
更多Input參數配置請參見Input。
說明如果需要多個Logstash端協同消費同一個Topic,需要先把相應的Topic分多個Partitions(區),此時多個消費者消費將無法保證訊息的消費順序性,然後把兩個或多個Logstash消費端配置成相同的
group_id
和topic_id
。Output配置樣本
output { jdbc { driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] } }
參數說明:
connection_string
:AnalyticDB for MySQL的串連地址。statement
:INSERT SQL的聲明數組。
更多Output參數配置請參見Output。
在Logstash安裝目錄中執行
bin/Logstash -f config/xxxx.conf
命令啟動任務,將Kafka資料寫入AnalyticDB for MySQL。