全部產品
Search
文件中心

Simple Log Service:Flume消費

更新時間:Aug 30, 2024

Log Service支援通過aliyun-log-flume外掛程式與Flume進行對接,實現日誌資料的寫入和消費。

背景資訊

aliyun-log-flume是一個實現Log Service與Flume對接的外掛程式,與Flume對接後,Log Service可以通過Flume與其它資料系統如HDFS、Kafka等對接。aliyun-log-flume提供Sink和Source實現Log Service與Flume的對接。

  • Sink:Flume讀取其他資料來源的資料然後寫入Log Service。

  • Source:Flume消費Log Service的日誌資料然後寫入其他系統。

更多資訊,請參見aliyun-log-flume

操作步驟

  1. 下載並安裝Flume。

    更多資訊,請參見Flume

  2. 下載aliyun-log-flume外掛程式,並將外掛程式存放於***/flume/lib目錄下。

    更多資訊,請參見aliyun-log-flume-1.3.jar

  3. ***/flume/conf目錄下,建立設定檔flumejob.conf。

    • Sink配置及樣本請參見Sink

    • Source配置及樣本請參見Source

  4. 啟動Flume。

Sink

通過Sink將其他資料來源的資料通過Flume寫入Log Service。目前支援兩種解析格式:

  • SIMPLE:將整個Flume Event作為一個欄位寫入Log Service。

  • DELIMITED:將整個Flume Event作為被分隔字元分隔的資料,根據配置的列名解析成對應的欄位寫入Log Service。

Sink的配置如下:

參數

是否必須

說明

type

預設配置為com.aliyun.Loghub.flume.sink.LoghubSink。

endpoint

Project的服務入口,例如http://cn-qingdao.log.aliyuncs.com。請根據實際情況替換服務入口。更多資訊,請參見服務存取點

project

Project名稱。

logstore

Logstore名稱。

accessKeyId

阿里雲AccessKey ID,用於標識使用者。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰

accessKey

阿里雲AccessKey Secret,用於驗證使用者的密鑰。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰

batchSize

每批次寫入Log Service的資料條數。預設為1000條。

maxBufferSize

緩衝隊列的大小。預設為1000條。

serializer

Event序列化格式。支援的模式如下:

  • DELIMITED:設定解析格式為分隔字元模式。

  • SIMPLE:設定解析格式為單行模式。預設為該模式。

  • JSON:設定解析格式為JSON模式。

  • 自訂serializer:設定解析格式為自訂的序列化模式,設定為該模式時需要填寫完整列名稱。

columns

serializerDELIMITED時,必須指定該欄位列表,用半形逗號(,)分隔,順序與實際資料中的欄位順序一致。

separatorChar

serializerDELIMITED時,用於指定資料的分隔字元,必須為單個字元。預設為英文逗號(,)。

quoteChar

serializerDELIMITED時,用於指定引用符。預設為半形雙引號(")。

escapeChar

serializerDELIMITED時,用於指定逸出字元。預設為半形雙引號(")。

useRecordTime

用於設定是否使用資料中的timestamp欄位作為日誌時間。預設為false表示使用目前時間。

Sink配置樣本請參見GitHub

Source

通過Source將Log Service的日誌資料通過Flume投遞到其他的資料來源。目前支援兩種輸出格式。

  • DELIMITED:資料以分隔字元日誌的形式寫入Flume。

  • JSON:資料以JSON日誌的形式寫入Flume。

Source配置如下:

參數

是否必須

說明

type

預設配置為com.aliyun.Loghub.flume.source.LoghubSource。

endpoint

Project的服務入口,例如http://cn-qingdao.log.aliyuncs.com。請根據實際情況替換服務入口。更多資訊,請參見服務存取點

project

Project名稱。

logstore

Logstore名稱。

accessKeyId

阿里雲AccessKey ID,用於標識使用者。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰

accessKey

阿里雲AccessKey Secret,用於驗證使用者的密鑰。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰

heartbeatIntervalMs

用戶端和Log Service的心跳間隔,預設為30000毫秒。

fetchIntervalMs

資料拉取間隔,預設為100毫秒。

fetchInOrder

是否按順序消費。預設為false。

batchSize

每批次讀取的資料條數,預設為100條。

consumerGroup

讀取的消費組名稱。

initialPosition

讀取資料的起點位置,支援beginendtimestamp。預設為begin

說明

如果服務端已經存在Checkpoint,會優先使用服務端的Checkpoint。

timestamp

initialPositiontimestamp時,必須指定時間戳記,為Unix時間戳記格式。

deserializer

Event還原序列化格式,支援的模式如下:

  • DELIMITED:設定解析格式為分隔字元模式。預設為該模式。

  • JSON:設定解析格式為JSON模式。

  • 自訂deserializer:設定解析格式為自訂的還原序列化模式,設定為該模式時需要填寫完整列名稱。

columns

deserializerDELIMITED時,必須指定欄位列表,用半形逗號(,)分隔,順序與實際資料中的欄位順序一致。

separatorChar

deserializerDELIMITED時,用於指定資料的分隔字元,必須為單個字元。預設為英文逗號(,)。

quoteChar

deserializerDELIMITED時,用於指定引用符。預設為半形雙引號(")。

escapeChar

deserializerDELIMITED時,用於指定逸出字元。預設為半形雙引號(")。

appendTimestamp

deserializerDELIMITED時,用於設定是否將時間戳記作為一個欄位自動添加到每行末尾。預設為false。

sourceAsField

deserializerJSON時,用於設定是否將日誌Source作為一個欄位,欄位名稱為__source__。預設為false。

tagAsField

deserializerJSON時,用於設定是否將日誌Tag作為欄位,欄位名稱為__tag__:{tag名稱}。預設為false。

timeAsField

deserializerJSON時,用於設定是否將日誌時間作為一個欄位,欄位名稱為__time__。預設為false。

useRecordTime

用於設定是否使用日誌的時間,如果為false則使用目前時間。預設為false。

Source配置樣本請參見GitHub