全部產品
Search
文件中心

AnalyticDB:通過Logstash匯入數倉版

更新時間:Aug 06, 2024

Logstash是開源的伺服器端資料處理管道,能夠同時從多個資料來源採集資料,然後對資料進行轉換,並將資料寫入指定的儲存中。AnalyticDB for MySQL完全相容MySQL,您可以將Logstash Input外掛程式支援的任一資料來源中的資料寫入AnalyticDB for MySQL。本文介紹如何使用Logstash將Kafka資料寫入AnalyticDB for MySQL數倉版

Logstash組件介紹

  • 輸入-採集各種樣式、大小和來源的資料

    在實際業務中,資料往往以各種各樣的形式分散或集中地儲存在多個系統中,Logstash支援多種資料輸入方式,可以在同一時間從多種資料來源採集資料。Logstash能夠以連續的串流方式輕鬆地從使用者的日誌、指標、Web應用、資料存放區以及AWS服務採集資料。

  • 過濾-即時解析和轉換資料

    資料從源傳輸到目標儲存的過程中,Logstash過濾器能夠解析各個事件,識別已命名的欄位來構建結構,並將它們轉換成通用格式,從而更輕鬆、快速地分析和實現商業價值。

    • 使用Grok從非結構化資料中派生出結構化資料。

    • 從IP地址破譯出地理座標。

    • 將PII資料匿名化,完全排除敏感欄位。

    • 簡化整體處理,不受資料來源、格式或架構的影響

  • 輸出-匯出資料

    除了AnalyticDB for MySQL以外,Logstash提供多種資料輸出方向,靈活解鎖眾多下遊用例。

操作步驟

Kafka是一個高輸送量的分布式發布、訂閱Log Service,具有高可用、高效能、分布式、高擴充、持久性等特點。目前Kafka已經被各大公司廣泛使用,同時logstash也可以快速接入業務中,免去重複建設的麻煩。

  1. 在Apache Kafka伺服器根目錄,執行以下命令安裝和更新外掛程式。

    $ bin/plugin install 
    $ bin/plugin update

    Logstash從1.5版本開始整合Kafka,Logstash 1.5及以上版本中所有外掛程式的目錄和命名都發生了改變,外掛程式發布地址為Logstash-plugins

  2. 配置外掛程式。

    • Input配置樣本

      以下配置可以實現對Kafka讀取端(consumer)的基本使用。

      input {
          kafka {
              zk_connect => "localhost:2181"
              group_id => "Logstash"
              topic_id => "test"
              codec => plain
              reset_beginning => false # boolean (optional), default: false
              consumer_threads => 5  # number (optional), default: 1
              decorate_events => true # boolean (optional), default: false
          }
      }          

      參數說明:

      • group_id:消費者分組,可以通過組ID來指定,不同組之間的消費互不影響,相互隔離。

      • topic_id:指定消費話題(Topic),也可以理解為先訂閱某個話題,然後消費。

      • reset_beginning:指定Logstash啟動後從哪個位置開始讀取資料,預設是結束位置,即Logstash進程會從上次讀取結束時的位移量開始繼續讀取資料;如果之前沒有消費過,則從頭讀取資料。

        如果您要匯入原資料,需將reset_beginning值改為true, Logstash進程將從頭開始讀取資料,作用類似於cat ,但是Logstash讀到最後一行時不會終止,而是變成tail -F,繼續監聽相應資料。

      • decorate_events:指定輸出訊息時會輸出自身資訊,包括消費訊息的大小、Topic來源以及consumer的group資訊。

      • rebalance_max_retries:當有新的consumer(Logstash)加入到同一個group時,將會Reblance ,此後將會有Partitions的消費端遷移到新的consumer上。如果一個consumer獲得了某個Partition的消費許可權,那麼它將會向Zookeeper註冊Partition Owner registry節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點,此值用於控制註冊節點的重試次數。

      • consumer_timeout_ms:在指定時間內沒有訊息到達將拋出異常,該參數一般無需修改。

      更多Input參數配置請參見Input

      說明

      如果需要多個Logstash端協同消費同一個Topic,需要先把相應的Topic分多個Partitions(區),此時多個消費者消費將無法保證訊息的消費順序性,然後把兩個或多個Logstash消費端配置成相同的group_idtopic_id

    • Output配置樣本

      output {
          jdbc {
              driver_class => "com.mysql.jdbc.Driver"
              connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
              statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
          }
      }         

      參數說明:

      • connection_string:AnalyticDB MySQL的串連地址。

      • statement:INSERT SQL的聲明數組。

      更多Output參數配置請參見Output

  3. 在Logstash安裝目錄中執行bin/Logstash -f config/xxxx.conf命令啟動任務,將Kafka資料寫入AnalyticDB MySQL。