Log Service支援通過aliyun-log-flume外掛程式與Flume進行對接,實現日誌資料的寫入和消費。
背景資訊
aliyun-log-flume是一個實現Log Service與Flume對接的外掛程式,與Flume對接後,Log Service可以通過Flume與其它資料系統如HDFS、Kafka等對接。aliyun-log-flume提供Sink和Source實現Log Service與Flume的對接。
Sink:Flume讀取其他資料來源的資料然後寫入Log Service。
Source:Flume消費Log Service的日誌資料然後寫入其他系統。
更多資訊,請參見aliyun-log-flume。
操作步驟
下載並安裝Flume。
更多資訊,請參見Flume。
下載aliyun-log-flume外掛程式,並將外掛程式存放於***/flume/lib目錄下。
更多資訊,請參見aliyun-log-flume-1.3.jar。
在***/flume/conf目錄下,建立設定檔flumejob.conf。
啟動Flume。
Sink
通過Sink將其他資料來源的資料通過Flume寫入Log Service。目前支援兩種解析格式:
SIMPLE:將整個Flume Event作為一個欄位寫入Log Service。
DELIMITED:將整個Flume Event作為被分隔字元分隔的資料,根據配置的列名解析成對應的欄位寫入Log Service。
Sink的配置如下:
參數 | 是否必須 | 說明 |
type | 是 | 預設配置為com.aliyun.Loghub.flume.sink.LoghubSink。 |
endpoint | 是 | Project的服務入口,例如 |
project | 是 | Project名稱。 |
logstore | 是 | Logstore名稱。 |
accessKeyId | 是 | 阿里雲AccessKey ID,用於標識使用者。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰。 |
accessKey | 是 | 阿里雲AccessKey Secret,用於驗證使用者的密鑰。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰。 |
batchSize | 否 | 每批次寫入Log Service的資料條數。預設為1000條。 |
maxBufferSize | 否 | 緩衝隊列的大小。預設為1000條。 |
serializer | 否 | Event序列化格式。支援的模式如下:
|
columns | 否 | 當serializer為DELIMITED時,必須指定該欄位列表,用半形逗號(,)分隔,順序與實際資料中的欄位順序一致。 |
separatorChar | 否 | 當serializer為DELIMITED時,用於指定資料的分隔字元,必須為單個字元。預設為英文逗號(,)。 |
quoteChar | 否 | 當serializer為DELIMITED時,用於指定引用符。預設為半形雙引號(")。 |
escapeChar | 否 | 當serializer為DELIMITED時,用於指定逸出字元。預設為半形雙引號(")。 |
useRecordTime | 否 | 用於設定是否使用資料中的timestamp欄位作為日誌時間。預設為false表示使用目前時間。 |
Sink配置樣本請參見GitHub。
Source
通過Source將Log Service的日誌資料通過Flume投遞到其他的資料來源。目前支援兩種輸出格式。
DELIMITED:資料以分隔字元日誌的形式寫入Flume。
JSON:資料以JSON日誌的形式寫入Flume。
Source配置如下:
參數 | 是否必須 | 說明 |
type | 是 | 預設配置為com.aliyun.Loghub.flume.source.LoghubSource。 |
endpoint | 是 | Project的服務入口,例如 |
project | 是 | Project名稱。 |
logstore | 是 | Logstore名稱。 |
accessKeyId | 是 | 阿里雲AccessKey ID,用於標識使用者。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰。 |
accessKey | 是 | 阿里雲AccessKey Secret,用於驗證使用者的密鑰。為保證帳號安全,建議您使用RAM使用者的AccessKey。如何擷取AccessKey,請參見存取金鑰。 |
heartbeatIntervalMs | 否 | 用戶端和Log Service的心跳間隔,預設為30000毫秒。 |
fetchIntervalMs | 否 | 資料拉取間隔,預設為100毫秒。 |
fetchInOrder | 否 | 是否按順序消費。預設為false。 |
batchSize | 否 | 每批次讀取的資料條數,預設為100條。 |
consumerGroup | 否 | 讀取的消費組名稱。 |
initialPosition | 否 | 讀取資料的起點位置,支援begin,end,timestamp。預設為begin。 說明 如果服務端已經存在Checkpoint,會優先使用服務端的Checkpoint。 |
timestamp | 否 | 當initialPosition為timestamp時,必須指定時間戳記,為Unix時間戳記格式。 |
deserializer | 是 | Event還原序列化格式,支援的模式如下:
|
columns | 否 | 當deserializer為DELIMITED時,必須指定欄位列表,用半形逗號(,)分隔,順序與實際資料中的欄位順序一致。 |
separatorChar | 否 | 當deserializer為DELIMITED時,用於指定資料的分隔字元,必須為單個字元。預設為英文逗號(,)。 |
quoteChar | 否 | 當deserializer為DELIMITED時,用於指定引用符。預設為半形雙引號(")。 |
escapeChar | 否 | 當deserializer為DELIMITED時,用於指定逸出字元。預設為半形雙引號(")。 |
appendTimestamp | 否 | 當deserializer為DELIMITED時,用於設定是否將時間戳記作為一個欄位自動添加到每行末尾。預設為false。 |
sourceAsField | 否 | 當deserializer為JSON時,用於設定是否將日誌Source作為一個欄位,欄位名稱為__source__。預設為false。 |
tagAsField | 否 | 當deserializer為JSON時,用於設定是否將日誌Tag作為欄位,欄位名稱為__tag__:{tag名稱}。預設為false。 |
timeAsField | 否 | 當deserializer為JSON時,用於設定是否將日誌時間作為一個欄位,欄位名稱為__time__。預設為false。 |
useRecordTime | 否 | 用於設定是否使用日誌的時間,如果為false則使用目前時間。預設為false。 |
Source配置樣本請參見GitHub。